You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2020/10/19 11:44:29 UTC
[drill] branch master updated: DRILL-7763: Add Limit Pushdown to
File Based Storage Plugins
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 6c000ec DRILL-7763: Add Limit Pushdown to File Based Storage Plugins
6c000ec is described below
commit 6c000ece952d532c11c7a64780ad4b58cd3c8222
Author: Charles Givre <cg...@apache.org>
AuthorDate: Sun Oct 18 14:17:54 2020 -0400
DRILL-7763: Add Limit Pushdown to File Based Storage Plugins
---
.../drill/exec/store/esri/ShpBatchReader.java | 8 ++-
.../drill/exec/store/esri/ShpFormatPlugin.java | 12 +++--
.../drill/exec/store/excel/ExcelBatchReader.java | 9 +++-
.../drill/exec/store/excel/ExcelFormatPlugin.java | 11 ++--
.../drill/exec/store/excel/TestExcelFormat.java | 11 ++++
.../drill/exec/store/hdf5/HDF5BatchReader.java | 11 +++-
.../drill/exec/store/hdf5/HDF5FormatPlugin.java | 10 ++--
.../drill/exec/store/spss/SpssBatchReader.java | 11 ++++
.../drill/exec/store/spss/SpssFormatPlugin.java | 13 +++--
.../base/AbstractGroupScanWithMetadata.java | 60 +++++++++++++++-------
.../exec/physical/resultSet/RowSetLoader.java | 9 ++++
.../physical/resultSet/impl/RowSetLoaderImpl.java | 6 +++
.../drill/exec/store/avro/AvroBatchReader.java | 9 ++++
.../drill/exec/store/avro/AvroFormatPlugin.java | 10 +++-
.../exec/store/dfs/easy/EasyFormatPlugin.java | 13 ++++-
.../drill/exec/store/dfs/easy/EasyGroupScan.java | 24 +++++++--
.../drill/exec/store/dfs/easy/EasySubScan.java | 12 ++++-
.../exec/store/easy/text/TextFormatPlugin.java | 19 +++++--
.../easy/text/reader/CompliantTextBatchReader.java | 20 +++++---
.../drill/exec/store/log/LogBatchReader.java | 22 +++++---
.../drill/exec/store/log/LogFormatPlugin.java | 21 ++++----
.../drill/exec/store/pcap/PcapBatchReader.java | 9 +++-
.../drill/exec/store/pcap/PcapFormatPlugin.java | 11 ++--
.../TestMetastoreWithEasyFormatPlugin.java | 4 +-
24 files changed, 265 insertions(+), 80 deletions(-)
diff --git a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
index 18cc022..1b49667 100644
--- a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
+++ b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
@@ -75,6 +75,7 @@ public class ShpBatchReader implements ManagedReader<FileSchemaNegotiator> {
private RowSetLoader rowWriter;
private int srid;
private SpatialReference spatialReference;
+ private final int maxRecords;
public static class ShpReaderConfig {
protected final ShpFormatPlugin plugin;
@@ -84,8 +85,9 @@ public class ShpBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
}
- public ShpBatchReader(ShpReaderConfig readerConfig) {
+ public ShpBatchReader(ShpReaderConfig readerConfig, int maxRecords) {
this.readerConfig = readerConfig;
+ this.maxRecords = maxRecords;
}
@Override
@@ -180,6 +182,10 @@ public class ShpBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
private void processShapefileSet(RowSetLoader rowWriter, final int gid, final Geometry geom, final Object[] dbfRow) {
+ if (rowWriter.limitReached(maxRecords)) {
+ return;
+ }
+
rowWriter.start();
gidWriter.setInt(gid);
diff --git a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
index 49ee364..01dda6c 100644
--- a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
+++ b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
@@ -47,13 +47,16 @@ public class ShpFormatPlugin extends EasyFormatPlugin<ShpFormatConfig> {
public static class ShpReaderFactory extends FileReaderFactory {
private final ShpReaderConfig readerConfig;
- public ShpReaderFactory(ShpReaderConfig config) {
+ private final int maxRecords;
+
+ public ShpReaderFactory(ShpReaderConfig config, int maxRecords) {
readerConfig = config;
+ this.maxRecords = maxRecords;
}
@Override
public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
- return new ShpBatchReader(readerConfig);
+ return new ShpBatchReader(readerConfig, maxRecords);
}
}
@@ -63,13 +66,13 @@ public class ShpFormatPlugin extends EasyFormatPlugin<ShpFormatConfig> {
@Override
public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) throws ExecutionSetupException {
- return new ShpBatchReader(formatConfig.getReaderConfig(this));
+ return new ShpBatchReader(formatConfig.getReaderConfig(this), scan.getMaxRecords());
}
@Override
protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
- builder.setReaderFactory(new ShpReaderFactory(new ShpReaderConfig(this)));
+ builder.setReaderFactory(new ShpReaderFactory(new ShpReaderConfig(this), scan.getMaxRecords()));
initScanBuilder(builder, scan);
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
return builder;
@@ -87,6 +90,7 @@ public class ShpFormatPlugin extends EasyFormatPlugin<ShpFormatConfig> {
config.defaultName = PLUGIN_NAME;
config.readerOperatorType = CoreOperatorType.SHP_SUB_SCAN_VALUE;
config.useEnhancedScan = true;
+ config.supportsLimitPushdown = true;
return config;
}
}
diff --git a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
index 22ac590..c60b163 100644
--- a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
+++ b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
@@ -120,6 +120,7 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
private static final int BUFFER_SIZE = 4096;
private final ExcelReaderConfig readerConfig;
+ private final int maxRecords;
private Sheet sheet;
private Row currentRow;
private StreamingWorkbook streamingWorkbook;
@@ -159,8 +160,9 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
}
- public ExcelBatchReader(ExcelReaderConfig readerConfig) {
+ public ExcelBatchReader(ExcelReaderConfig readerConfig, int maxRecords) {
this.readerConfig = readerConfig;
+ this.maxRecords = maxRecords;
firstLine = true;
}
@@ -372,6 +374,11 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
if (readerConfig.lastColumn != 0) {
finalColumn = readerConfig.lastColumn - 1;
}
+
+ if (rowWriter.limitReached(maxRecords)) {
+ return false;
+ }
+
rowWriter.start();
for (int colWriterIndex = 0; colPosition < finalColumn; colWriterIndex++) {
Cell cell = currentRow.getCell(colPosition);
diff --git a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
index 47e35b2..0d8d52d 100644
--- a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
+++ b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
@@ -46,14 +46,16 @@ public class ExcelFormatPlugin extends EasyFormatPlugin<ExcelFormatConfig> {
private static class ExcelReaderFactory extends FileReaderFactory {
private final ExcelBatchReader.ExcelReaderConfig readerConfig;
+ private final int maxRecords;
- public ExcelReaderFactory(ExcelReaderConfig config) {
+ public ExcelReaderFactory(ExcelReaderConfig config, int maxRecords) {
readerConfig = config;
+ this.maxRecords = maxRecords;
}
@Override
public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- return new ExcelBatchReader(readerConfig);
+ return new ExcelBatchReader(readerConfig, maxRecords);
}
}
@@ -75,13 +77,14 @@ public class ExcelFormatPlugin extends EasyFormatPlugin<ExcelFormatConfig> {
config.defaultName = DEFAULT_NAME;
config.readerOperatorType = UserBitShared.CoreOperatorType.EXCEL_SUB_SCAN_VALUE;
config.useEnhancedScan = true;
+ config.supportsLimitPushdown = true;
return config;
}
@Override
public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
EasySubScan scan, OptionManager options) throws ExecutionSetupException {
- return new ExcelBatchReader(formatConfig.getReaderConfig(this));
+ return new ExcelBatchReader(formatConfig.getReaderConfig(this), scan.getMaxRecords());
}
@Override
@@ -90,7 +93,7 @@ public class ExcelFormatPlugin extends EasyFormatPlugin<ExcelFormatConfig> {
ExcelReaderConfig readerConfig = new ExcelReaderConfig(this);
verifyConfigOptions(readerConfig);
- builder.setReaderFactory(new ExcelReaderFactory(readerConfig));
+ builder.setReaderFactory(new ExcelReaderFactory(readerConfig, scan.getMaxRecords()));
initScanBuilder(builder, scan);
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
diff --git a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
index faecce8..387d708 100644
--- a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
+++ b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
@@ -420,4 +420,15 @@ public class TestExcelFormat extends ClusterTest {
new RowSetComparison(expected).verifyAndClearAll(results);
}
+
+ @Test
+ public void testLimitPushdown() throws Exception {
+ String sql = "SELECT id, first_name, order_count FROM cp.`excel/test_data.xlsx` LIMIT 5";
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("Limit", "maxRecords=5")
+ .match();
+ }
}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
index 68094f8..828061f 100644
--- a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
@@ -111,6 +111,8 @@ public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
private final List<HDF5DataWriter> dataWriters;
+ private final int maxRecords;
+
private FileSplit split;
private IHDF5Reader hdf5Reader;
@@ -158,8 +160,9 @@ public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
}
}
- public HDF5BatchReader(HDF5ReaderConfig readerConfig) {
+ public HDF5BatchReader(HDF5ReaderConfig readerConfig, int maxRecords) {
this.readerConfig = readerConfig;
+ this.maxRecords = maxRecords;
dataWriters = new ArrayList<>();
}
@@ -369,6 +372,12 @@ public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
@Override
public boolean next() {
+
+ // Limit pushdown
+ if (rowWriter.limitReached(maxRecords)) {
+ return false;
+ }
+
while (!rowWriter.isFull()) {
if (readerConfig.defaultPath == null || readerConfig.defaultPath.isEmpty()) {
if (!metadataIterator.hasNext()){
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
index 60d6ec5..3e9480f 100644
--- a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
@@ -66,6 +66,7 @@ public class HDF5FormatPlugin extends EasyFormatPlugin<HDF5FormatConfig> {
config.defaultName = DEFAULT_NAME;
config.readerOperatorType = UserBitShared.CoreOperatorType.HDF5_SUB_SCAN_VALUE;
config.useEnhancedScan = true;
+ config.supportsLimitPushdown = true;
return config;
}
@@ -73,7 +74,7 @@ public class HDF5FormatPlugin extends EasyFormatPlugin<HDF5FormatConfig> {
protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) throws ExecutionSetupException {
FileScanBuilder builder = new FileScanBuilder();
- builder.setReaderFactory(new HDF5ReaderFactory(new HDF5BatchReader.HDF5ReaderConfig(this, formatConfig)));
+ builder.setReaderFactory(new HDF5ReaderFactory(new HDF5BatchReader.HDF5ReaderConfig(this, formatConfig), scan.getMaxRecords()));
initScanBuilder(builder, scan);
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
return builder;
@@ -81,14 +82,17 @@ public class HDF5FormatPlugin extends EasyFormatPlugin<HDF5FormatConfig> {
public static class HDF5ReaderFactory extends FileScanFramework.FileReaderFactory {
private final HDF5ReaderConfig readerConfig;
+ private final int maxRecords;
- HDF5ReaderFactory(HDF5ReaderConfig config) {
+
+ HDF5ReaderFactory(HDF5ReaderConfig config, int maxRecords) {
readerConfig = config;
+ this.maxRecords = maxRecords;
}
@Override
public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
- return new HDF5BatchReader(readerConfig);
+ return new HDF5BatchReader(readerConfig, maxRecords);
}
}
diff --git a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java
index 8d6ba99..46cff86 100644
--- a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java
+++ b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java
@@ -47,6 +47,8 @@ public class SpssBatchReader implements ManagedReader<FileSchemaNegotiator> {
private static final String VALUE_LABEL = "_value";
+ private final int maxRecords;
+
private FileSplit split;
private InputStream fsStream;
@@ -71,6 +73,10 @@ public class SpssBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
}
+ public SpssBatchReader(int maxRecords) {
+ this.maxRecords = maxRecords;
+ }
+
@Override
public boolean open(FileSchemaNegotiator negotiator) {
split = negotiator.split();
@@ -117,6 +123,11 @@ public class SpssBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
private boolean processNextRow() {
+ // Check to see if the limit has been reached
+ if (rowWriter.limitReached(maxRecords)) {
+ return false;
+ }
+
try {
// Stop reading when you run out of data
if (!spssReader.readNextCase()) {
diff --git a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
index 5e780b2..53e618b 100644
--- a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
+++ b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
@@ -40,9 +40,15 @@ public class SpssFormatPlugin extends EasyFormatPlugin<SpssFormatConfig> {
private static class SpssReaderFactory extends FileReaderFactory {
+ private final int maxRecords;
+
+ public SpssReaderFactory(int maxRecords) {
+ this.maxRecords = maxRecords;
+ }
+
@Override
public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- return new SpssBatchReader();
+ return new SpssBatchReader(maxRecords);
}
}
@@ -64,19 +70,20 @@ public class SpssFormatPlugin extends EasyFormatPlugin<SpssFormatConfig> {
config.defaultName = DEFAULT_NAME;
config.readerOperatorType = UserBitShared.CoreOperatorType.SPSS_SUB_SCAN_VALUE;
config.useEnhancedScan = true;
+ config.supportsLimitPushdown = true;
return config;
}
@Override
public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
EasySubScan scan, OptionManager options) {
- return new SpssBatchReader();
+ return new SpssBatchReader(scan.getMaxRecords());
}
@Override
protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
FileScanBuilder builder = new FileScanBuilder();
- builder.setReaderFactory(new SpssReaderFactory());
+ builder.setReaderFactory(new SpssReaderFactory(scan.getMaxRecords()));
initScanBuilder(builder, scan);
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index 29224cb..a5330dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -17,22 +17,8 @@
*/
package org.apache.drill.exec.physical.base;
-import static org.apache.drill.exec.ExecConstants.SKIP_RUNTIME_ROWGROUP_PRUNING_KEY;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.drill.common.expression.ErrorCollector;
@@ -87,8 +73,21 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.drill.exec.ExecConstants.SKIP_RUNTIME_ROWGROUP_PRUNING_KEY;
/**
* Represents table group scan with metadata usage.
@@ -120,6 +119,8 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
protected boolean usedMetastore; // false by default
+ protected int maxRecords;
+
protected AbstractGroupScanWithMetadata(String userName, List<SchemaPath> columns, LogicalExpression filter) {
super(userName);
this.columns = columns;
@@ -141,6 +142,7 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
this.usedMetastore = that.usedMetastore;
this.nonInterestingColumnsMetadata = that.nonInterestingColumnsMetadata;
this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
+ this.maxRecords = that.maxRecords;
}
@JsonProperty("columns")
@@ -160,6 +162,9 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
}
@JsonIgnore
+ public int getMaxRecords() { return maxRecords; }
+
+ @JsonIgnore
public boolean isMatchAllMetadata() {
return matchAllMetadata;
}
@@ -453,11 +458,17 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
public GroupScan applyLimit(int maxRecords) {
maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 file.
GroupScanWithMetadataFilterer<?> prunedMetadata = getFilterer();
+
if (getTableMetadata() != null) {
long tableRowCount = TableStatisticsKind.ROW_COUNT.getValue(getTableMetadata());
if (tableRowCount == Statistic.NO_COLUMN_STATS || tableRowCount <= maxRecords) {
logger.debug("limit push down does not apply, since total number of rows [{}] is less or equal to the required [{}].",
tableRowCount, maxRecords);
+ // Return the group scan with the limit pushed down
+ if (this.maxRecords != maxRecords) {
+ prunedMetadata.limit(maxRecords);
+ return prunedMetadata.build();
+ }
return null;
}
}
@@ -468,6 +479,12 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
// some files does not have set row count, do not do files pruning
if (qualifiedFiles == null || qualifiedFiles.size() == getFilesMetadata().size()) {
logger.debug("limit push down does not apply, since number of files was not reduced.");
+
+ // Return the group scan with the limit pushed down
+ if (this.maxRecords != maxRecords) {
+ prunedMetadata.limit(maxRecords);
+ return prunedMetadata.build();
+ }
return null;
}
@@ -479,6 +496,7 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
.segments(getSegmentsMetadata())
.partitions(getPartitionsMetadata())
.files(filesMap)
+ .limit(maxRecords)
.nonInterestingColumns(getNonInterestingColumnsMetadata())
.matching(matchAllMetadata)
.build();
@@ -715,6 +733,7 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
protected TupleMetadata tableSchema;
protected UdfUtilities udfUtilities;
protected FunctionLookupContext context;
+ protected int maxRecords;
// for the case when filtering is possible for partitions, but files count exceeds
// PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD, new group scan with at least filtered partitions
@@ -757,6 +776,11 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
return self();
}
+ public B limit(int maxRecords) {
+ source.maxRecords = maxRecords;
+ return self();
+ }
+
public B matching(boolean matchAllMetadata) {
this.matchAllMetadata = matchAllMetadata;
return self();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
index bf9ba76..462ad2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
@@ -108,6 +108,15 @@ public interface RowSetLoader extends TupleWriter {
boolean isFull();
/**
+ * Used to push a limit down to the file reader. This method checks to see whether
+ * the maxRecords parameter is not zero (for no limit) and is not greater than the
+ * current record count.
+ * @param maxRecords Maximum rows to be returned. (From the limit clause of the query)
+ * @return True if the row count exceeds the maxRecords, false if not.
+ */
+ boolean limitReached(int maxRecords);
+
+ /**
* The number of rows in the current row set. Does not count any overflow row
* saved for the next batch.
*
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
index 4058fa7..d907a59 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
+
/**
* Implementation of the row set loader. Provides row-level operations, leaving the
* result set loader to provide batch-level operations. However, all control
@@ -98,6 +99,11 @@ public class RowSetLoaderImpl extends AbstractTupleWriter implements RowSetLoade
}
@Override
+ public boolean limitReached(int maxRecords) {
+ return (maxRecords > 0 && this.rowCount() >= maxRecords);
+ }
+
+ @Override
public boolean isFull() { return rsLoader.isFull(); }
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
index 580ba7d..51a77d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
@@ -52,6 +52,11 @@ public class AvroBatchReader implements ManagedReader<FileScanFramework.FileSche
private List<ColumnConverter> converters;
// re-use container instance
private GenericRecord record;
+ private int maxRecords;
+
+ public AvroBatchReader(int maxRecords) {
+ this.maxRecords = maxRecords;
+ }
@Override
public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
@@ -151,6 +156,10 @@ public class AvroBatchReader implements ManagedReader<FileScanFramework.FileSche
}
private boolean nextLine(RowSetLoader rowWriter) {
+ if (rowWriter.limitReached(maxRecords)) {
+ return false;
+ }
+
try {
if (!reader.hasNext() || reader.pastSync(endPosition)) {
return false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 5d958ed..854c6be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -56,13 +56,14 @@ public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
config.defaultName = DEFAULT_NAME;
config.readerOperatorType = CoreOperatorType.AVRO_SUB_SCAN_VALUE;
config.useEnhancedScan = true;
+ config.supportsLimitPushdown = true;
return config;
}
@Override
protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
- builder.setReaderFactory(new AvroReaderFactory());
+ builder.setReaderFactory(new AvroReaderFactory(scan.getMaxRecords()));
initScanBuilder(builder, scan);
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
return builder;
@@ -70,9 +71,14 @@ public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
private static class AvroReaderFactory extends FileScanFramework.FileReaderFactory {
+ private final int maxRecords;
+ public AvroReaderFactory(int maxRecords) {
+ this.maxRecords = maxRecords;
+ }
+
@Override
public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
- return new AvroBatchReader();
+ return new AvroBatchReader(maxRecords);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index cacfe26..af931f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Base class for file readers.
* <p>
@@ -103,6 +104,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
// use this simpler form. New plugins should use these options
// instead of overriding methods.
+ public boolean supportsLimitPushdown;
public boolean supportsProjectPushdown;
public boolean supportsFileImplicitColumns = true;
public boolean supportsAutoPartitioning;
@@ -212,6 +214,15 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
public String getName() { return name; }
/**
+ * Does this plugin support pushing the limit down to the batch reader? If so, then
+ * the reader itself should have logic to stop reading the file as soon as the limit has been
+ * reached. It makes the most sense to do this with file formats that have consistent schemata
+ * that are identified at the first row. CSV for example. If the user only wants 100 rows, it
+ * does not make sense to read the entire file.
+ */
+ public boolean supportsLimitPushdown() { return easyConfig.supportsLimitPushdown; }
+
+ /**
* Does this plugin support projection push down? That is, can the reader
* itself handle the tasks of projecting table columns, creating null
* columns for missing table columns, and so on?
@@ -303,7 +314,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
if (! columnExplorer.isStarQuery()) {
scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
columnExplorer.getTableColumns(), scan.getSelectionRoot(),
- scan.getPartitionDepth(), scan.getSchema());
+ scan.getPartitionDepth(), scan.getSchema(), scan.getMaxRecords());
scan.setOperatorId(scan.getOperatorId());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 715fc25..c968f6c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.ValueExpressions;
@@ -70,6 +71,7 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
@JsonTypeName("fs-scan")
public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataProvider> {
private static final Logger logger = LoggerFactory.getLogger(EasyGroupScan.class);
@@ -84,6 +86,8 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
private List<CompleteFileWork> chunks;
private List<EndpointAffinity> endpointAffinities;
private final Path selectionRoot;
+ private final int maxRecords;
+ private final boolean supportsLimitPushdown;
@JsonCreator
public EasyGroupScan(
@@ -101,7 +105,8 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, EasyFormatPlugin.class);
this.columns = columns == null ? ALL_COLUMNS : columns;
this.selectionRoot = selectionRoot;
-
+ this.maxRecords = getMaxRecords();
+ this.supportsLimitPushdown = formatPlugin.easyConfig().supportsLimitPushdown;
this.metadataProvider = defaultTableMetadataProviderBuilder(new FileSystemMetadataProviderManager())
.withSelection(selection)
.withSchema(schema)
@@ -137,6 +142,8 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
this.usedMetastore = metadataProviderManager.usesMetastore();
initFromSelection(selection, formatPlugin);
checkMetadataConsistency(selection, formatPlugin.getFsConf());
+ this.supportsLimitPushdown = formatPlugin.easyConfig().supportsLimitPushdown;
+ this.maxRecords = getMaxRecords();
}
public EasyGroupScan(
@@ -172,6 +179,8 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
mappings = that.mappings;
partitionDepth = that.partitionDepth;
metadataProvider = that.metadataProvider;
+ maxRecords = getMaxRecords();
+ supportsLimitPushdown = that.formatPlugin.easyConfig().supportsLimitPushdown;
}
@JsonIgnore
@@ -288,7 +297,7 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin,
- columns, selectionRoot, partitionDepth, getSchema());
+ columns, selectionRoot, partitionDepth, getSchema(), maxRecords);
subScan.setOperatorId(getOperatorId());
return subScan;
}
@@ -313,8 +322,15 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
@Override
public String toString() {
- String pattern = "EasyGroupScan [selectionRoot=%s, numFiles=%s, columns=%s, files=%s, schema=%s, usedMetastore=%s]";
- return String.format(pattern, selectionRoot, getFiles().size(), columns, getFiles(), getSchema(), usedMetastore());
+ return new PlanStringBuilder(this)
+ .field("selectionRoot", selectionRoot)
+ .field("numFiles", getFiles().size())
+ .field("columns", columns)
+ .field("files", getFiles())
+ .field("schema", getSchema())
+ .field("usedMetastore", usedMetastore())
+ .field("maxRecords", maxRecords)
+ .toString();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index 86a1292..b9a6db2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -35,6 +35,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.hadoop.fs.Path;
+
@JsonTypeName("fs-sub-scan")
public class EasySubScan extends AbstractSubScan {
@@ -44,6 +45,7 @@ public class EasySubScan extends AbstractSubScan {
private final Path selectionRoot;
private final int partitionDepth;
private final TupleMetadata schema;
+ private final int maxRecords;
@JsonCreator
public EasySubScan(
@@ -55,7 +57,8 @@ public class EasySubScan extends AbstractSubScan {
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("selectionRoot") Path selectionRoot,
@JsonProperty("partitionDepth") int partitionDepth,
- @JsonProperty("schema") TupleMetadata schema
+ @JsonProperty("schema") TupleMetadata schema,
+ @JsonProperty("maxRecords") int maxRecords
) throws ExecutionSetupException {
super(userName);
this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, EasyFormatPlugin.class);
@@ -64,10 +67,11 @@ public class EasySubScan extends AbstractSubScan {
this.selectionRoot = selectionRoot;
this.partitionDepth = partitionDepth;
this.schema = schema;
+ this.maxRecords = maxRecords;
}
public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin,
- List<SchemaPath> columns, Path selectionRoot, int partitionDepth, TupleMetadata schema) {
+ List<SchemaPath> columns, Path selectionRoot, int partitionDepth, TupleMetadata schema, int maxRecords) {
super(userName);
this.formatPlugin = plugin;
this.files = files;
@@ -75,6 +79,7 @@ public class EasySubScan extends AbstractSubScan {
this.selectionRoot = selectionRoot;
this.partitionDepth = partitionDepth;
this.schema = schema;
+ this.maxRecords = maxRecords;
}
@JsonProperty
@@ -101,6 +106,9 @@ public class EasySubScan extends AbstractSubScan {
@JsonProperty("schema")
public TupleMetadata getSchema() { return schema; }
+ @JsonProperty("maxRecords")
+ public int getMaxRecords() { return maxRecords; }
+
@Override
public int getOperatorType() { return formatPlugin.getReaderOperatorType(); }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index af710e3..3dd1e89 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -207,14 +207,16 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
private static class ColumnsReaderFactory extends FileReaderFactory {
private final TextParsingSettings settings;
+ private final int maxRecords;
- public ColumnsReaderFactory(TextParsingSettings settings) {
+ public ColumnsReaderFactory(TextParsingSettings settings, int maxRecords) {
this.settings = settings;
+ this.maxRecords = maxRecords;
}
@Override
public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- return new CompliantTextBatchReader(settings);
+ return new CompliantTextBatchReader(settings, maxRecords);
}
}
@@ -241,6 +243,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
config.readerOperatorType = CoreOperatorType.TEXT_SUB_SCAN_VALUE;
config.writerOperatorType = CoreOperatorType.TEXT_WRITER_VALUE;
config.useEnhancedScan = true;
+ config.supportsLimitPushdown = true;
return config;
}
@@ -270,7 +273,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
TextParsingSettings settings =
new TextParsingSettings(getConfig(), scan.getSchema());
- builder.setReaderFactory(new ColumnsReaderFactory(settings));
+ builder.setReaderFactory(new ColumnsReaderFactory(settings, scan.getMaxRecords()));
// If this format has no headers, or wants to skip them,
// then we must use the columns column to hold the data.
@@ -338,8 +341,14 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
for (final CompleteFileWork work : scan.getWorkIterable()) {
data += work.getTotalBytes();
}
+
final double estimatedRowSize = settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE);
- final double estRowCount = data / estimatedRowSize;
- return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
+
+ if (scan.supportsLimitPushdown() && scan.getMaxRecords() > 0) {
+ return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, (long)scan.getMaxRecords(), 1, data);
+ } else {
+ final double estRowCount = data / estimatedRowSize;
+ return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
+ }
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
index 79840c1..bddbac7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
@@ -17,9 +17,8 @@
*/
package org.apache.drill.exec.store.easy.text.reader;
-import java.io.IOException;
-import java.io.InputStream;
-
+import com.univocity.parsers.common.TextParsingException;
+import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -40,9 +39,8 @@ import org.apache.hadoop.mapred.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.univocity.parsers.common.TextParsingException;
-
-import io.netty.buffer.DrillBuf;
+import java.io.IOException;
+import java.io.InputStream;
/**
* Text reader, Complies with the RFC 4180 standard for text/csv files.
@@ -58,6 +56,8 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
private final TextParsingSettings settings;
// Chunk of the file to be read by this reader
private FileSplit split;
+ // Limit pushed down from the query
+ private final int maxRecords;
// text reader implementation
private TextReader reader;
// input buffer
@@ -68,8 +68,9 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
private RowSetLoader writer;
- public CompliantTextBatchReader(TextParsingSettings settings) {
+ public CompliantTextBatchReader(TextParsingSettings settings, int maxRecords) {
this.settings = settings;
+ this.maxRecords = maxRecords;
// Validate. Otherwise, these problems show up later as a data
// read error which is very confusing.
@@ -298,6 +299,11 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
public boolean next() {
reader.resetForNextBatch();
+ // If the limit is defined and the row count is greater than the limit, stop reading the file.
+ if (maxRecords > 0 && writer.rowCount() > maxRecords) {
+ return false;
+ }
+
try {
boolean more = false;
while (! writer.isFull()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
index 99efd76..b808ffe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
@@ -18,13 +18,6 @@
package org.apache.drill.exec.store.log;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
@@ -41,6 +34,13 @@ import org.apache.hadoop.mapred.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
public class LogBatchReader implements ManagedReader<FileSchemaNegotiator> {
private static final Logger logger = LoggerFactory.getLogger(LogBatchReader.class);
public static final String RAW_LINE_COL_NAME = "_raw";
@@ -126,6 +126,7 @@ public class LogBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
private final LogReaderConfig config;
+ private final int maxRecords;
private FileSplit split;
private BufferedReader reader;
private ResultSetLoader loader;
@@ -136,8 +137,9 @@ public class LogBatchReader implements ManagedReader<FileSchemaNegotiator> {
private int lineNumber;
private int errorCount;
- public LogBatchReader(LogReaderConfig config) {
+ public LogBatchReader(LogReaderConfig config, int maxRecords) {
this.config = config;
+ this.maxRecords = maxRecords;
}
@Override
@@ -213,6 +215,10 @@ public class LogBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
private boolean nextLine(RowSetLoader rowWriter) {
+ if (rowWriter.limitReached(maxRecords)) {
+ return false;
+ }
+
String line;
try {
line = reader.readLine();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
index 8a3a6af..1027fc7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
@@ -18,12 +18,6 @@
package org.apache.drill.exec.store.log;
-import java.util.Collections;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
-
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -50,6 +44,12 @@ import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
public class LogFormatPlugin extends EasyFormatPlugin<LogFormatConfig> {
private static final Logger logger = LoggerFactory.getLogger(LogFormatPlugin.class);
@@ -60,14 +60,16 @@ public class LogFormatPlugin extends EasyFormatPlugin<LogFormatConfig> {
private static class LogReaderFactory extends FileReaderFactory {
private final LogReaderConfig readerConfig;
+ private final int maxRecords;
- public LogReaderFactory(LogReaderConfig config) {
+ public LogReaderFactory(LogReaderConfig config, int maxRecords) {
readerConfig = config;
+ this.maxRecords = maxRecords;
}
@Override
public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- return new LogBatchReader(readerConfig);
+ return new LogBatchReader(readerConfig, maxRecords);
}
}
@@ -90,6 +92,7 @@ public class LogFormatPlugin extends EasyFormatPlugin<LogFormatConfig> {
config.defaultName = PLUGIN_NAME;
config.readerOperatorType = CoreOperatorType.REGEX_SUB_SCAN_VALUE;
config.useEnhancedScan = true;
+ config.supportsLimitPushdown = true;
return config;
}
@@ -207,7 +210,7 @@ public class LogFormatPlugin extends EasyFormatPlugin<LogFormatConfig> {
// each input file.
builder.setReaderFactory(new LogReaderFactory(
new LogReaderConfig(this, pattern, providedSchema, tableSchema,
- readerSchema, !hasSchema, groupCount, maxErrors(providedSchema))));
+ readerSchema, !hasSchema, groupCount, maxErrors(providedSchema)), scan.getMaxRecords()));
// The default type of regex columns is nullable VarChar,
// so let's use that as the missing column type.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
index e724d18..3d9d8f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
@@ -145,6 +145,7 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
private ScalarWriter remoteDataWriter;
+ private final int maxRecords;
private Map<Long, TcpSession> sessionQueue;
@@ -164,11 +165,12 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
}
- public PcapBatchReader(PcapReaderConfig readerConfig) {
+ public PcapBatchReader(PcapReaderConfig readerConfig, int maxRecords) {
this.readerConfig = readerConfig;
if (readerConfig.sessionizeTCPStreams) {
sessionQueue = new HashMap<>();
}
+ this.maxRecords = maxRecords;
}
@Override
@@ -297,6 +299,11 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
private boolean parseNextPacket(RowSetLoader rowWriter) {
+ // Push down limit
+ if (rowWriter.limitReached(maxRecords)) {
+ return false;
+ }
+
// Decode the packet
Packet packet = new Packet();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
index 40a70bd..b5b77fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
@@ -39,14 +39,16 @@ public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> {
private static class PcapReaderFactory extends FileReaderFactory {
private final PcapReaderConfig readerConfig;
+ private final int maxRecords;
- public PcapReaderFactory(PcapReaderConfig config) {
+ public PcapReaderFactory(PcapReaderConfig config, int maxRecords) {
readerConfig = config;
+ this.maxRecords = maxRecords;
}
@Override
public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- return new PcapBatchReader(readerConfig);
+ return new PcapBatchReader(readerConfig, maxRecords);
}
}
@@ -68,18 +70,19 @@ public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> {
config.defaultName = PLUGIN_NAME;
config.readerOperatorType = UserBitShared.CoreOperatorType.PCAP_SUB_SCAN_VALUE;
config.useEnhancedScan = true;
+ config.supportsLimitPushdown = true;
return config;
}
@Override
public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) {
- return new PcapBatchReader(new PcapReaderConfig(this));
+ return new PcapBatchReader(new PcapReaderConfig(this), scan.getMaxRecords());
}
@Override
protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
FileScanBuilder builder = new FileScanBuilder();
- builder.setReaderFactory(new PcapReaderFactory(new PcapReaderConfig(this)));
+ builder.setReaderFactory(new PcapReaderFactory(new PcapReaderConfig(this), scan.getMaxRecords()));
initScanBuilder(builder, scan);
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
return builder;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/metastore/TestMetastoreWithEasyFormatPlugin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/metastore/TestMetastoreWithEasyFormatPlugin.java
index a6924c0..3bc0820 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/metastore/TestMetastoreWithEasyFormatPlugin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/metastore/TestMetastoreWithEasyFormatPlugin.java
@@ -1079,14 +1079,14 @@ public class TestMetastoreWithEasyFormatPlugin extends ClusterTest {
queryBuilder()
.sql("select * from dfs.tmp.`%s` limit 1", tableName)
.planMatcher()
- .include("Limit", "numFiles=1,")
+ .include("Limit", "numFiles=1", "maxRecords=1")
.match();
// each file has 10 records, so 3 files should be picked
queryBuilder()
.sql("select * from dfs.tmp.`%s` limit 21", tableName)
.planMatcher()
- .include("Limit", "numFiles=3")
+ .include("Limit", "numFiles=3", "maxRecords=21")
.match();
} finally {
run("analyze table dfs.tmp.`%s` drop metadata if exists", tableName);