You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2016/10/22 00:33:09 UTC

[4/5] drill git commit: DRILL-4905: Push down the LIMIT to the parquet reader scan to limit the numbers of records read

DRILL-4905: Push down the LIMIT to the parquet reader scan to limit the numbers of records read

close apache/drill#597


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2c43535a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2c43535a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2c43535a

Branch: refs/heads/master
Commit: 2c43535adc59c5995e2ebe80df570fad9ba1aab3
Parents: 4efc9f2
Author: Padma Penumarthy <pp...@yahoo.com>
Authored: Mon Oct 17 16:46:51 2016 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Fri Oct 21 16:01:03 2016 -0700

----------------------------------------------------------------------
 .../exec/store/parquet/ParquetGroupScan.java    | 69 +++++++++++++++-----
 .../store/parquet/ParquetScanBatchCreator.java  |  2 +-
 .../exec/store/parquet/RowGroupReadEntry.java   | 10 ++-
 .../columnreaders/ParquetRecordReader.java      | 38 ++++++++++-
 .../store/parquet/ParquetRecordReaderTest.java  | 65 +++++++++++++++++-
 5 files changed, 163 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index b9f0ac0..a8e55b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -521,6 +521,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     private int rowGroupIndex;
     private String root;
     private long rowCount;  // rowCount = -1 indicates to include all rows.
+    private long numRecordsToRead;
 
     @JsonCreator
     public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start,
@@ -528,10 +529,12 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       super(path, start, length);
       this.rowGroupIndex = rowGroupIndex;
       this.rowCount = rowCount;
+      this.numRecordsToRead = rowCount;
     }
 
     public RowGroupReadEntry getRowGroupReadEntry() {
-      return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
+      return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(),
+                                   this.rowGroupIndex, this.getNumRecordsToRead());
     }
 
     public int getRowGroupIndex() {
@@ -553,6 +556,14 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       return byteMap;
     }
 
+    public long getNumRecordsToRead() {
+      return numRecordsToRead;
+    }
+
+    public void setNumRecordsToRead(long numRecords) {
+      numRecordsToRead = numRecords;
+    }
+
     public void setEndpointByteMap(EndpointByteMap byteMap) {
       this.byteMap = byteMap;
     }
@@ -834,7 +845,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups) {
     List<RowGroupReadEntry> entries = Lists.newArrayList();
     for (RowGroupInfo rgi : rowGroups) {
-      RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex());
+      RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex(), rgi.getNumRecordsToRead());
       entries.add(entry);
     }
     return entries;
@@ -867,6 +878,10 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     return toString();
   }
 
+  public void setCacheFileRoot(String cacheFileRoot) {
+    this.cacheFileRoot = cacheFileRoot;
+  }
+
   @Override
   public String toString() {
     String cacheFileString = "";
@@ -893,15 +908,44 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     return newScan;
   }
 
+  // Based on maxRecords to read for the scan,
+  // figure out how many rowGroups to read and update number of records to read for each of them.
+  // Returns total number of rowGroups to read.
+  private int updateRowGroupInfo(long maxRecords) {
+    long count = 0;
+    int index = 0;
+    for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
+      long rowCount = rowGroupInfo.getRowCount();
+      if (count + rowCount <= maxRecords) {
+        count += rowCount;
+        rowGroupInfo.setNumRecordsToRead(rowCount);
+        index++;
+        continue;
+      } else if (count < maxRecords) {
+        rowGroupInfo.setNumRecordsToRead(maxRecords - count);
+        index++;
+      }
+      break;
+    }
+
+    return index;
+  }
+
   @Override
-  public FileGroupScan clone(FileSelection selection) throws IOException {
+  public ParquetGroupScan clone(FileSelection selection) throws IOException {
     ParquetGroupScan newScan = new ParquetGroupScan(this);
     newScan.modifyFileSelection(selection);
-    newScan.cacheFileRoot = selection.cacheFileRoot;
+    newScan.setCacheFileRoot(selection.cacheFileRoot);
     newScan.init(selection.getMetaContext());
     return newScan;
   }
 
+  public ParquetGroupScan clone(FileSelection selection, long maxRecords) throws IOException {
+    ParquetGroupScan newScan = clone(selection);
+    newScan.updateRowGroupInfo(maxRecords);
+    return newScan;
+  }
+
   @Override
   public boolean supportsLimitPushdown() {
     return true;
@@ -913,22 +957,17 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
     maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup.
     // further optimization : minimize # of files chosen, or the affinity of files chosen.
-    long count = 0;
-    int index = 0;
-    for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
-      if (count < maxRecords) {
-        count += rowGroupInfo.getRowCount();
-        index ++;
-      } else {
-        break;
-      }
-    }
+
+    // Calculate number of rowGroups to read based on maxRecords and update
+    // number of records to read for each of those rowGroups.
+    int index = updateRowGroupInfo(maxRecords);
 
     Set<String> fileNames = Sets.newHashSet(); // HashSet keeps a fileName unique.
     for (RowGroupInfo rowGroupInfo : rowGroupInfos.subList(0, index)) {
       fileNames.add(rowGroupInfo.getPath());
     }
 
+    // If there is no change in fileSet, no need to create new groupScan.
     if (fileNames.size() == fileSet.size() ) {
       // There is no reduction of rowGroups. Return the original groupScan.
       logger.debug("applyLimit() does not apply!");
@@ -938,7 +977,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     try {
       FileSelection newSelection = new FileSelection(null, Lists.newArrayList(fileNames), getSelectionRoot(), cacheFileRoot, false);
       logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), fileNames.size());
-      return this.clone(newSelection);
+      return this.clone(newSelection, maxRecords);
     } catch (IOException e) {
       logger.warn("Could not apply rowcount based prune due to Exception : {}", e);
       return null;

http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index bf13977..a98c660 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -111,7 +111,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
         if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) {
           readers.add(
               new ParquetRecordReader(
-                  context, e.getPath(), e.getRowGroupIndex(), fs,
+                  context, e.getPath(), e.getRowGroupIndex(), e.getNumRecordsToRead(), fs,
                   CodecFactory.createDirectCodecFactory(
                   fs.getConf(),
                   new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),

http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
index b0c5fd0..594e12b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
@@ -26,20 +26,26 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 public class RowGroupReadEntry extends ReadEntryFromHDFS {
 
   private int rowGroupIndex;
+  private long numRecordsToRead;
 
   @JsonCreator
   public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start,
-                           @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
+                           @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex,
+                           @JsonProperty("numRecordsToRead") long numRecordsToRead) {
     super(path, start, length);
     this.rowGroupIndex = rowGroupIndex;
+    this.numRecordsToRead = numRecordsToRead;
   }
 
   @JsonIgnore
   public RowGroupReadEntry getRowGroupReadEntry() {
-    return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
+    return new RowGroupReadEntry(this.getPath(), this.getStart(),
+                                 this.getLength(), this.rowGroupIndex, this.numRecordsToRead);
   }
 
   public int getRowGroupIndex(){
     return rowGroupIndex;
   }
+
+  public long getNumRecordsToRead() { return numRecordsToRead; }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 50bb7dc..c51c72c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -68,6 +68,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
   private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
   private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
   private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;
+  private static final int NUM_RECORDS_TO_READ_NOT_SPECIFIED = -1;
 
   // When no column is required by the downstrea operator, ask SCAN to return a DEFAULT column. If such column does not exist,
   // it will return as a nullable-int column. If that column happens to exist, return that column.
@@ -91,6 +92,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
   private List<ColumnReader<?>> columnStatuses;
   private FileSystem fileSystem;
   private long batchSize;
+  private long numRecordsToRead; // number of records to read
+
   Path hadoopPath;
   private VarLenBinaryReader varLengthReader;
   private ParquetMetadata footer;
@@ -117,19 +120,34 @@ public class ParquetRecordReader extends AbstractRecordReader {
   public ParquetRecordReader(FragmentContext fragmentContext,
                              String path,
                              int rowGroupIndex,
+                             long numRecordsToRead,
                              FileSystem fs,
                              CodecFactory codecFactory,
                              ParquetMetadata footer,
                              List<SchemaPath> columns,
                              ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus)
                              throws ExecutionSetupException {
-    this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer,
-        columns, dateCorruptionStatus);
+    this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, numRecordsToRead,
+         path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
+  }
+
+  public ParquetRecordReader(FragmentContext fragmentContext,
+      String path,
+      int rowGroupIndex,
+      FileSystem fs,
+      CodecFactory codecFactory,
+      ParquetMetadata footer,
+      List<SchemaPath> columns,
+      ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus)
+      throws ExecutionSetupException {
+      this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, footer.getBlocks().get(rowGroupIndex).getRowCount(),
+           path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
   }
 
   public ParquetRecordReader(
       FragmentContext fragmentContext,
       long batchSize,
+      long numRecordsToRead,
       String path,
       int rowGroupIndex,
       FileSystem fs,
@@ -145,6 +163,13 @@ public class ParquetRecordReader extends AbstractRecordReader {
     this.footer = footer;
     this.dateCorruptionStatus = dateCorruptionStatus;
     this.fragmentContext = fragmentContext;
+    // Callers can pass -1 if they want to read all rows.
+    if (numRecordsToRead == NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
+      this.numRecordsToRead = footer.getBlocks().get(rowGroupIndex).getRowCount();
+    } else {
+      assert (numRecordsToRead >= 0);
+      this.numRecordsToRead = Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount());
+    }
     setColumns(columns);
   }
 
@@ -444,11 +469,16 @@ public class ParquetRecordReader extends AbstractRecordReader {
           return 0;
         }
         recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead);
+
+        // Pick the minimum of recordsToRead calculated above and numRecordsToRead (based on rowCount and limit).
+        recordsToRead = Math.min(recordsToRead, numRecordsToRead);
+
         for (final ValueVector vv : nullFilledVectors ) {
           vv.getMutator().setValueCount( (int) recordsToRead);
         }
         mockRecordsRead += recordsToRead;
         totalRecordsRead += recordsToRead;
+        numRecordsToRead -= recordsToRead;
         return (int) recordsToRead;
       }
 
@@ -459,6 +489,9 @@ public class ParquetRecordReader extends AbstractRecordReader {
 
       }
 
+      // Pick the minimum of recordsToRead calculated above and numRecordsToRead (based on rowCount and limit)
+      recordsToRead = Math.min(recordsToRead, numRecordsToRead);
+
       if (allFieldsFixedLength) {
         readAllFixedFields(recordsToRead);
       } else { // variable length columns
@@ -476,6 +509,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
 
 //      logger.debug("So far read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
       totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass();
+      numRecordsToRead -= firstColumnStatus.getRecordsReadInCurrentPass();
       return firstColumnStatus.getRecordsReadInCurrentPass();
     } catch (Exception e) {
       handleAndRaise("\nHadoop path: " + hadoopPath.toUri().getPath() +

http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 51fa45c..6f3a19a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -637,7 +637,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
     final FileSystem fs = new CachedSingleFileSystem(fileName);
     final BufferAllocator allocator = RootAllocatorFactory.newRoot(c);
     for(int i = 0; i < 25; i++) {
-      final ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs,
+      final ParquetRecordReader rr = new ParquetRecordReader(context, fileName, 0, fs,
           CodecFactory.createDirectCodecFactory(dfsConfig, new ParquetDirectByteBufferAllocator(allocator), 0),
           f.getParquetMetadata(), columns, ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION);
       final TestOutputMutator mutator = new TestOutputMutator(allocator);
@@ -691,4 +691,67 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
     final long D = System.nanoTime();
     System.out.println(String.format("Took %f s to run query", (float)(D-C) / 1E9));
   }
+
+  @Test
+  public void testLimit() throws Exception {
+    List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM cp.`/parquet/tpch/nation/01.parquet` LIMIT 1");
+
+    int recordsInOutput = 0;
+    for (QueryDataBatch batch : results) {
+      recordsInOutput += batch.getHeader().getDef().getRecordCount();
+      batch.release();
+    }
+
+    assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 1, recordsInOutput), 1 == recordsInOutput);
+  }
+
+  @Test
+  public void testLimitBeyondRowCount() throws Exception {
+    List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM cp.`/parquet/tpch/nation/01.parquet` LIMIT 100");
+
+    int recordsInOutput = 0;
+    for (QueryDataBatch batch : results) {
+      recordsInOutput += batch.getHeader().getDef().getRecordCount();
+      batch.release();
+    }
+
+    assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 9, recordsInOutput), 9 == recordsInOutput);
+  }
+
+  @Test
+  public void testLimitMultipleRowGroups() throws Exception {
+    HashMap<String, FieldInfo> fields = new HashMap<>();
+    ParquetTestProperties props = new ParquetTestProperties(3, 100, 1024 * 1024, fields);
+    populateFieldInfoMap(props);
+    TestFileGenerator.generateParquetFile("/tmp/testLimit.parquet", props);
+
+    List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM dfs.`/tmp/testLimit.parquet` LIMIT 225");
+
+    int recordsInOutput = 0;
+    for (QueryDataBatch batch : results) {
+      recordsInOutput += batch.getHeader().getDef().getRecordCount();
+      batch.release();
+    }
+
+    assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 225, recordsInOutput), 225 == recordsInOutput);
+  }
+
+  @Test
+  public void testLimitMultipleRowGroupsBeyondRowCount() throws Exception {
+    HashMap<String, FieldInfo> fields = new HashMap<>();
+    ParquetTestProperties props = new ParquetTestProperties(3, 100, 1024 * 1024, fields);
+    populateFieldInfoMap(props);
+    TestFileGenerator.generateParquetFile("/tmp/testLimit.parquet", props);
+
+    List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM dfs.`/tmp/testLimit.parquet` LIMIT 500");
+
+    int recordsInOutput = 0;
+    for (QueryDataBatch batch : results) {
+      recordsInOutput += batch.getHeader().getDef().getRecordCount();
+      batch.release();
+    }
+
+    assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 300, recordsInOutput), 300 == recordsInOutput);
+  }
+
 }