You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2016/10/22 00:33:09 UTC
[4/5] drill git commit: DRILL-4905: Push down the LIMIT to the
parquet reader scan to limit the numbers of records read
DRILL-4905: Push down the LIMIT to the parquet reader scan to limit the numbers of records read
close apache/drill#597
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2c43535a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2c43535a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2c43535a
Branch: refs/heads/master
Commit: 2c43535adc59c5995e2ebe80df570fad9ba1aab3
Parents: 4efc9f2
Author: Padma Penumarthy <pp...@yahoo.com>
Authored: Mon Oct 17 16:46:51 2016 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Fri Oct 21 16:01:03 2016 -0700
----------------------------------------------------------------------
.../exec/store/parquet/ParquetGroupScan.java | 69 +++++++++++++++-----
.../store/parquet/ParquetScanBatchCreator.java | 2 +-
.../exec/store/parquet/RowGroupReadEntry.java | 10 ++-
.../columnreaders/ParquetRecordReader.java | 38 ++++++++++-
.../store/parquet/ParquetRecordReaderTest.java | 65 +++++++++++++++++-
5 files changed, 163 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index b9f0ac0..a8e55b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -521,6 +521,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
private int rowGroupIndex;
private String root;
private long rowCount; // rowCount = -1 indicates to include all rows.
+ private long numRecordsToRead;
@JsonCreator
public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start,
@@ -528,10 +529,12 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
super(path, start, length);
this.rowGroupIndex = rowGroupIndex;
this.rowCount = rowCount;
+ this.numRecordsToRead = rowCount;
}
public RowGroupReadEntry getRowGroupReadEntry() {
- return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
+ return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(),
+ this.rowGroupIndex, this.getNumRecordsToRead());
}
public int getRowGroupIndex() {
@@ -553,6 +556,14 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
return byteMap;
}
+ public long getNumRecordsToRead() {
+ return numRecordsToRead;
+ }
+
+ public void setNumRecordsToRead(long numRecords) {
+ numRecordsToRead = numRecords;
+ }
+
public void setEndpointByteMap(EndpointByteMap byteMap) {
this.byteMap = byteMap;
}
@@ -834,7 +845,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups) {
List<RowGroupReadEntry> entries = Lists.newArrayList();
for (RowGroupInfo rgi : rowGroups) {
- RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex());
+ RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex(), rgi.getNumRecordsToRead());
entries.add(entry);
}
return entries;
@@ -867,6 +878,10 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
return toString();
}
+ public void setCacheFileRoot(String cacheFileRoot) {
+ this.cacheFileRoot = cacheFileRoot;
+ }
+
@Override
public String toString() {
String cacheFileString = "";
@@ -893,15 +908,44 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
return newScan;
}
+ // Based on maxRecords to read for the scan,
+ // figure out how many rowGroups to read and update number of records to read for each of them.
+ // Returns total number of rowGroups to read.
+ private int updateRowGroupInfo(long maxRecords) {
+ long count = 0;
+ int index = 0;
+ for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
+ long rowCount = rowGroupInfo.getRowCount();
+ if (count + rowCount <= maxRecords) {
+ count += rowCount;
+ rowGroupInfo.setNumRecordsToRead(rowCount);
+ index++;
+ continue;
+ } else if (count < maxRecords) {
+ rowGroupInfo.setNumRecordsToRead(maxRecords - count);
+ index++;
+ }
+ break;
+ }
+
+ return index;
+ }
+
@Override
- public FileGroupScan clone(FileSelection selection) throws IOException {
+ public ParquetGroupScan clone(FileSelection selection) throws IOException {
ParquetGroupScan newScan = new ParquetGroupScan(this);
newScan.modifyFileSelection(selection);
- newScan.cacheFileRoot = selection.cacheFileRoot;
+ newScan.setCacheFileRoot(selection.cacheFileRoot);
newScan.init(selection.getMetaContext());
return newScan;
}
+ public ParquetGroupScan clone(FileSelection selection, long maxRecords) throws IOException {
+ ParquetGroupScan newScan = clone(selection);
+ newScan.updateRowGroupInfo(maxRecords);
+ return newScan;
+ }
+
@Override
public boolean supportsLimitPushdown() {
return true;
@@ -913,22 +957,17 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup.
// further optimization : minimize # of files chosen, or the affinity of files chosen.
- long count = 0;
- int index = 0;
- for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
- if (count < maxRecords) {
- count += rowGroupInfo.getRowCount();
- index ++;
- } else {
- break;
- }
- }
+
+ // Calculate number of rowGroups to read based on maxRecords and update
+ // number of records to read for each of those rowGroups.
+ int index = updateRowGroupInfo(maxRecords);
Set<String> fileNames = Sets.newHashSet(); // HashSet keeps a fileName unique.
for (RowGroupInfo rowGroupInfo : rowGroupInfos.subList(0, index)) {
fileNames.add(rowGroupInfo.getPath());
}
+ // If there is no change in fileSet, no need to create new groupScan.
if (fileNames.size() == fileSet.size() ) {
// There is no reduction of rowGroups. Return the original groupScan.
logger.debug("applyLimit() does not apply!");
@@ -938,7 +977,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
try {
FileSelection newSelection = new FileSelection(null, Lists.newArrayList(fileNames), getSelectionRoot(), cacheFileRoot, false);
logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), fileNames.size());
- return this.clone(newSelection);
+ return this.clone(newSelection, maxRecords);
} catch (IOException e) {
logger.warn("Could not apply rowcount based prune due to Exception : {}", e);
return null;
http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index bf13977..a98c660 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -111,7 +111,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) {
readers.add(
new ParquetRecordReader(
- context, e.getPath(), e.getRowGroupIndex(), fs,
+ context, e.getPath(), e.getRowGroupIndex(), e.getNumRecordsToRead(), fs,
CodecFactory.createDirectCodecFactory(
fs.getConf(),
new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
index b0c5fd0..594e12b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
@@ -26,20 +26,26 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class RowGroupReadEntry extends ReadEntryFromHDFS {
private int rowGroupIndex;
+ private long numRecordsToRead;
@JsonCreator
public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start,
- @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
+ @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex,
+ @JsonProperty("numRecordsToRead") long numRecordsToRead) {
super(path, start, length);
this.rowGroupIndex = rowGroupIndex;
+ this.numRecordsToRead = numRecordsToRead;
}
@JsonIgnore
public RowGroupReadEntry getRowGroupReadEntry() {
- return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
+ return new RowGroupReadEntry(this.getPath(), this.getStart(),
+ this.getLength(), this.rowGroupIndex, this.numRecordsToRead);
}
public int getRowGroupIndex(){
return rowGroupIndex;
}
+
+ public long getNumRecordsToRead() { return numRecordsToRead; }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 50bb7dc..c51c72c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -68,6 +68,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;
+ private static final int NUM_RECORDS_TO_READ_NOT_SPECIFIED = -1;
// When no column is required by the downstrea operator, ask SCAN to return a DEFAULT column. If such column does not exist,
// it will return as a nullable-int column. If that column happens to exist, return that column.
@@ -91,6 +92,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
private List<ColumnReader<?>> columnStatuses;
private FileSystem fileSystem;
private long batchSize;
+ private long numRecordsToRead; // number of records to read
+
Path hadoopPath;
private VarLenBinaryReader varLengthReader;
private ParquetMetadata footer;
@@ -117,19 +120,34 @@ public class ParquetRecordReader extends AbstractRecordReader {
public ParquetRecordReader(FragmentContext fragmentContext,
String path,
int rowGroupIndex,
+ long numRecordsToRead,
FileSystem fs,
CodecFactory codecFactory,
ParquetMetadata footer,
List<SchemaPath> columns,
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus)
throws ExecutionSetupException {
- this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer,
- columns, dateCorruptionStatus);
+ this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, numRecordsToRead,
+ path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
+ }
+
+ public ParquetRecordReader(FragmentContext fragmentContext,
+ String path,
+ int rowGroupIndex,
+ FileSystem fs,
+ CodecFactory codecFactory,
+ ParquetMetadata footer,
+ List<SchemaPath> columns,
+ ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus)
+ throws ExecutionSetupException {
+ this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, footer.getBlocks().get(rowGroupIndex).getRowCount(),
+ path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
}
public ParquetRecordReader(
FragmentContext fragmentContext,
long batchSize,
+ long numRecordsToRead,
String path,
int rowGroupIndex,
FileSystem fs,
@@ -145,6 +163,13 @@ public class ParquetRecordReader extends AbstractRecordReader {
this.footer = footer;
this.dateCorruptionStatus = dateCorruptionStatus;
this.fragmentContext = fragmentContext;
+ // Callers can pass -1 if they want to read all rows.
+ if (numRecordsToRead == NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
+ this.numRecordsToRead = footer.getBlocks().get(rowGroupIndex).getRowCount();
+ } else {
+ assert (numRecordsToRead >= 0);
+ this.numRecordsToRead = Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount());
+ }
setColumns(columns);
}
@@ -444,11 +469,16 @@ public class ParquetRecordReader extends AbstractRecordReader {
return 0;
}
recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead);
+
+ // Pick the minimum of recordsToRead calculated above and numRecordsToRead (based on rowCount and limit).
+ recordsToRead = Math.min(recordsToRead, numRecordsToRead);
+
for (final ValueVector vv : nullFilledVectors ) {
vv.getMutator().setValueCount( (int) recordsToRead);
}
mockRecordsRead += recordsToRead;
totalRecordsRead += recordsToRead;
+ numRecordsToRead -= recordsToRead;
return (int) recordsToRead;
}
@@ -459,6 +489,9 @@ public class ParquetRecordReader extends AbstractRecordReader {
}
+ // Pick the minimum of recordsToRead calculated above and numRecordsToRead (based on rowCount and limit)
+ recordsToRead = Math.min(recordsToRead, numRecordsToRead);
+
if (allFieldsFixedLength) {
readAllFixedFields(recordsToRead);
} else { // variable length columns
@@ -476,6 +509,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
// logger.debug("So far read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass();
+ numRecordsToRead -= firstColumnStatus.getRecordsReadInCurrentPass();
return firstColumnStatus.getRecordsReadInCurrentPass();
} catch (Exception e) {
handleAndRaise("\nHadoop path: " + hadoopPath.toUri().getPath() +
http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 51fa45c..6f3a19a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -637,7 +637,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
final FileSystem fs = new CachedSingleFileSystem(fileName);
final BufferAllocator allocator = RootAllocatorFactory.newRoot(c);
for(int i = 0; i < 25; i++) {
- final ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs,
+ final ParquetRecordReader rr = new ParquetRecordReader(context, fileName, 0, fs,
CodecFactory.createDirectCodecFactory(dfsConfig, new ParquetDirectByteBufferAllocator(allocator), 0),
f.getParquetMetadata(), columns, ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION);
final TestOutputMutator mutator = new TestOutputMutator(allocator);
@@ -691,4 +691,67 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
final long D = System.nanoTime();
System.out.println(String.format("Took %f s to run query", (float)(D-C) / 1E9));
}
+
+ @Test
+ public void testLimit() throws Exception {
+ List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM cp.`/parquet/tpch/nation/01.parquet` LIMIT 1");
+
+ int recordsInOutput = 0;
+ for (QueryDataBatch batch : results) {
+ recordsInOutput += batch.getHeader().getDef().getRecordCount();
+ batch.release();
+ }
+
+ assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 1, recordsInOutput), 1 == recordsInOutput);
+ }
+
+ @Test
+ public void testLimitBeyondRowCount() throws Exception {
+ List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM cp.`/parquet/tpch/nation/01.parquet` LIMIT 100");
+
+ int recordsInOutput = 0;
+ for (QueryDataBatch batch : results) {
+ recordsInOutput += batch.getHeader().getDef().getRecordCount();
+ batch.release();
+ }
+
+ assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 9, recordsInOutput), 9 == recordsInOutput);
+ }
+
+ @Test
+ public void testLimitMultipleRowGroups() throws Exception {
+ HashMap<String, FieldInfo> fields = new HashMap<>();
+ ParquetTestProperties props = new ParquetTestProperties(3, 100, 1024 * 1024, fields);
+ populateFieldInfoMap(props);
+ TestFileGenerator.generateParquetFile("/tmp/testLimit.parquet", props);
+
+ List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM dfs.`/tmp/testLimit.parquet` LIMIT 225");
+
+ int recordsInOutput = 0;
+ for (QueryDataBatch batch : results) {
+ recordsInOutput += batch.getHeader().getDef().getRecordCount();
+ batch.release();
+ }
+
+ assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 225, recordsInOutput), 225 == recordsInOutput);
+ }
+
+ @Test
+ public void testLimitMultipleRowGroupsBeyondRowCount() throws Exception {
+ HashMap<String, FieldInfo> fields = new HashMap<>();
+ ParquetTestProperties props = new ParquetTestProperties(3, 100, 1024 * 1024, fields);
+ populateFieldInfoMap(props);
+ TestFileGenerator.generateParquetFile("/tmp/testLimit.parquet", props);
+
+ List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM dfs.`/tmp/testLimit.parquet` LIMIT 500");
+
+ int recordsInOutput = 0;
+ for (QueryDataBatch batch : results) {
+ recordsInOutput += batch.getHeader().getDef().getRecordCount();
+ batch.release();
+ }
+
+ assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 300, recordsInOutput), 300 == recordsInOutput);
+ }
+
}