You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/11/12 05:11:33 UTC

[16/16] incubator-drill git commit: Drill-1688: Complex parquet reader fails to read wide records.

Drill-1688: Complex parquet reader fails to read wide records.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1e21045b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1e21045b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1e21045b

Branch: refs/heads/master
Commit: 1e21045bfdab8fde8fc7d3d3a4182ddd4d2e41f3
Parents: 52b729e
Author: Jason Altekruse <al...@gmail.com>
Authored: Tue Nov 11 16:53:55 2014 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Nov 11 19:27:41 2014 -0800

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java     |  5 +++++
 .../exec/server/options/SystemOptionManager.java |  2 ++
 .../store/parquet/ParquetScanBatchCreator.java   |  2 +-
 .../exec/store/parquet2/DrillParquetReader.java  | 19 ++++++++++++++++---
 4 files changed, 24 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e21045b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index f01f577..f204506 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -85,6 +85,11 @@ public interface ExecConstants {
   public static final OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet");
   public static final String PARQUET_BLOCK_SIZE = "store.parquet.block-size";
   public static final OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024);
+
+  public static final String PARQUET_VECTOR_FILL_THRESHOLD = "store.parquet.vector_fill_threshold";
+  public static final OptionValidator PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR = new PositiveLongValidator(PARQUET_VECTOR_FILL_THRESHOLD, 99l, 85l);
+  public static final String PARQUET_VECTOR_FILL_CHECK_THRESHOLD = "store.parquet.vector_fill_check_threshold";
+  public static final OptionValidator PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR = new PositiveLongValidator(PARQUET_VECTOR_FILL_CHECK_THRESHOLD, 100l, 10l);
   public static String PARQUET_NEW_RECORD_READER = "store.parquet.use_new_reader";
   public static OptionValidator PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR = new BooleanValidator(PARQUET_NEW_RECORD_READER, false);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e21045b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index e802b44..9f912e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -54,6 +54,8 @@ public class SystemOptionManager implements OptionManager {
       PlannerSettings.HASH_SINGLE_KEY,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,
+      ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR,
+      ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR,
       ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
       ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR,
       ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e21045b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 8aebab9..53a6ffc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -118,7 +118,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
           );
         } else {
           ParquetMetadata footer = footers.get(e.getPath());
-          readers.add(new DrillParquetReader(footer, e, columns, conf));
+          readers.add(new DrillParquetReader(context, footer, e, columns, conf));
         }
         if (rowGroupScan.getSelectionRoot() != null) {
           String[] r = rowGroupScan.getSelectionRoot().split("/");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e21045b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 8b5d035..8765935 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -27,7 +27,9 @@ import java.util.Map;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField.Key;
@@ -73,13 +75,23 @@ public class DrillParquetReader extends AbstractRecordReader {
   private int recordCount;
   private List<ValueVector> primitiveVectors;
   private OperatorContext operatorContext;
+  // The interface for the parquet-mr library does not allow re-winding, to enable us to write into our
+  // fixed size value vectors, we must check how full the vectors are after some number of reads, for performance
+  // we avoid doing this every record. These values are populated with system/session settings to allow users to optimize
+  // for performance or allow a wider record size to be suported
+  private final int fillLevelCheckFrequency;
+  private final int fillLevelCheckThreshold;
+  private FragmentContext fragmentContext;
 
 
-  public DrillParquetReader(ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) {
+  public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) {
     this.footer = footer;
     this.conf = conf;
     this.entry = entry;
     setColumns(columns);
+    this.fragmentContext = fragmentContext;
+    fillLevelCheckFrequency = this.fragmentContext.getOptions().getOption(ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD).num_val.intValue();
+    fillLevelCheckThreshold = this.fragmentContext.getOptions().getOption(ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD).num_val.intValue();
   }
 
   public static MessageType getProjection(MessageType schema, Collection<SchemaPath> columns) {
@@ -200,8 +212,8 @@ public class DrillParquetReader extends AbstractRecordReader {
       recordReader.read();
       count++;
       totalRead++;
-      if (count % 100 == 0) {
-        if (getPercentFilled() > 85) {
+      if (count % fillLevelCheckFrequency == 0) {
+        if (getPercentFilled() > fillLevelCheckThreshold) {
           break;
         }
       }
@@ -217,6 +229,7 @@ public class DrillParquetReader extends AbstractRecordReader {
       if (v instanceof VariableWidthVector) {
         filled = Math.max(filled, ((VariableWidthVector) v).getCurrentSizeInBytes() * 100 / ((VariableWidthVector) v).getByteCapacity());
       }
+      // TODO - need to re-enable this
 //      if (v instanceof RepeatedFixedWidthVector) {
 //        filled = Math.max(filled, ((RepeatedFixedWidthVector) v).getAccessor().getGroupCount() * 100)
 //      }