You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/09 16:38:19 UTC

[GitHub] arina-ielchiieva closed pull request #976: DRILL-5797: Choose parquet reader from read columns

arina-ielchiieva closed pull request #976: DRILL-5797: Choose parquet reader from read columns
URL: https://github.com/apache/drill/pull/976
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 60179482fc3..82764285e99 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -26,6 +26,8 @@
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Maps;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -37,12 +39,15 @@
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.InvalidRecordException;
+import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 
@@ -119,7 +124,7 @@ public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupS
         if (logger.isDebugEnabled()) {
           logger.debug(containsCorruptDates.toString());
         }
-        if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) {
+        if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()), rowGroupScan.getColumns())) {
           readers.add(
               new ParquetRecordReader(
                   context, e.getPath(), e.getRowGroupIndex(), e.getNumRecordsToRead(), fs,
@@ -156,20 +161,49 @@ public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupS
     return new ScanBatch(rowGroupScan, context, oContext, readers, implicitColumns);
   }
 
-  private static boolean isComplex(ParquetMetadata footer) {
+  private static boolean isComplex(ParquetMetadata footer, List<SchemaPath> columns) {
+    /**
+     * ParquetRecordReader is not able to read any nested columns and is not able to handle repeated columns.
+     * It only handles flat column and optional column.
+     * If it is a wildcard query, we check every columns in the metadata.
+     * If not, we only check the projected columns.
+     * We only check the first level columns because :
+     *   - if we need a.b, it means a is a complex type, no need to check b as we don't handle complex type.
+     *   - if we need a[10], a is repeated, ie its repetiton level is greater than 0
+     *   - if we need a, it is at the first level of the schema.
+     */
     MessageType schema = footer.getFileMetaData().getSchema();
-
-    for (Type type : schema.getFields()) {
-      if (!type.isPrimitive()) {
-        return true;
+    if (Utilities.isStarQuery(columns)) {
+      for (Type type : schema.getFields()) {
+        if (!type.isPrimitive()) {
+          return true;
+        }
       }
-    }
-    for (ColumnDescriptor col : schema.getColumns()) {
-      if (col.getMaxRepetitionLevel() > 0) {
-        return true;
+      for (ColumnDescriptor col : schema.getColumns()) {
+        if (col.getMaxRepetitionLevel() > 0) {
+          return true;
+        }
+      }
+      return false;
+    } else {
+      for (SchemaPath column : columns) {
+        if (isColumnComplex(schema, column)) {
+          return true;
+        }
       }
+      return false;
+    }
+  }
+
+  private static boolean isColumnComplex(GroupType grouptype, SchemaPath column) {
+    try {
+      Type type = grouptype.getType(column.getRootSegment().getPath().toLowerCase());
+      return type.isRepetition(Type.Repetition.REPEATED) || !type.isPrimitive();
+    }
+    catch(InvalidRecordException e) {
+      //if the type does exist, we will fill with null so it is a simple type.
+      return false;
     }
-    return false;
   }
 
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services