You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2020/10/19 11:44:29 UTC

[drill] branch master updated: DRILL-7763: Add Limit Pushdown to File Based Storage Plugins

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c000ec  DRILL-7763: Add Limit Pushdown to File Based Storage Plugins
6c000ec is described below

commit 6c000ece952d532c11c7a64780ad4b58cd3c8222
Author: Charles Givre <cg...@apache.org>
AuthorDate: Sun Oct 18 14:17:54 2020 -0400

    DRILL-7763: Add Limit Pushdown to File Based Storage Plugins
---
 .../drill/exec/store/esri/ShpBatchReader.java      |  8 ++-
 .../drill/exec/store/esri/ShpFormatPlugin.java     | 12 +++--
 .../drill/exec/store/excel/ExcelBatchReader.java   |  9 +++-
 .../drill/exec/store/excel/ExcelFormatPlugin.java  | 11 ++--
 .../drill/exec/store/excel/TestExcelFormat.java    | 11 ++++
 .../drill/exec/store/hdf5/HDF5BatchReader.java     | 11 +++-
 .../drill/exec/store/hdf5/HDF5FormatPlugin.java    | 10 ++--
 .../drill/exec/store/spss/SpssBatchReader.java     | 11 ++++
 .../drill/exec/store/spss/SpssFormatPlugin.java    | 13 +++--
 .../base/AbstractGroupScanWithMetadata.java        | 60 +++++++++++++++-------
 .../exec/physical/resultSet/RowSetLoader.java      |  9 ++++
 .../physical/resultSet/impl/RowSetLoaderImpl.java  |  6 +++
 .../drill/exec/store/avro/AvroBatchReader.java     |  9 ++++
 .../drill/exec/store/avro/AvroFormatPlugin.java    | 10 +++-
 .../exec/store/dfs/easy/EasyFormatPlugin.java      | 13 ++++-
 .../drill/exec/store/dfs/easy/EasyGroupScan.java   | 24 +++++++--
 .../drill/exec/store/dfs/easy/EasySubScan.java     | 12 ++++-
 .../exec/store/easy/text/TextFormatPlugin.java     | 19 +++++--
 .../easy/text/reader/CompliantTextBatchReader.java | 20 +++++---
 .../drill/exec/store/log/LogBatchReader.java       | 22 +++++---
 .../drill/exec/store/log/LogFormatPlugin.java      | 21 ++++----
 .../drill/exec/store/pcap/PcapBatchReader.java     |  9 +++-
 .../drill/exec/store/pcap/PcapFormatPlugin.java    | 11 ++--
 .../TestMetastoreWithEasyFormatPlugin.java         |  4 +-
 24 files changed, 265 insertions(+), 80 deletions(-)

diff --git a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
index 18cc022..1b49667 100644
--- a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
+++ b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
@@ -75,6 +75,7 @@ public class ShpBatchReader implements ManagedReader<FileSchemaNegotiator> {
   private RowSetLoader rowWriter;
   private int srid;
   private SpatialReference spatialReference;
+  private final int maxRecords;
 
   public static class ShpReaderConfig {
     protected final ShpFormatPlugin plugin;
@@ -84,8 +85,9 @@ public class ShpBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
   }
 
-  public ShpBatchReader(ShpReaderConfig readerConfig) {
+  public ShpBatchReader(ShpReaderConfig readerConfig, int maxRecords) {
     this.readerConfig = readerConfig;
+    this.maxRecords = maxRecords;
   }
 
   @Override
@@ -180,6 +182,10 @@ public class ShpBatchReader implements ManagedReader<FileSchemaNegotiator> {
   }
 
   private void processShapefileSet(RowSetLoader rowWriter, final int gid, final Geometry geom, final Object[] dbfRow) {
+    if (rowWriter.limitReached(maxRecords)) {
+      return;
+    }
+
     rowWriter.start();
 
     gidWriter.setInt(gid);
diff --git a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
index 49ee364..01dda6c 100644
--- a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
+++ b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
@@ -47,13 +47,16 @@ public class ShpFormatPlugin extends EasyFormatPlugin<ShpFormatConfig> {
   public static class ShpReaderFactory extends FileReaderFactory {
     private final ShpReaderConfig readerConfig;
 
-    public ShpReaderFactory(ShpReaderConfig config) {
+    private final int maxRecords;
+
+    public ShpReaderFactory(ShpReaderConfig config, int maxRecords) {
       readerConfig = config;
+      this.maxRecords = maxRecords;
     }
 
     @Override
     public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
-      return new ShpBatchReader(readerConfig);
+      return new ShpBatchReader(readerConfig, maxRecords);
     }
   }
 
@@ -63,13 +66,13 @@ public class ShpFormatPlugin extends EasyFormatPlugin<ShpFormatConfig> {
 
   @Override
   public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) throws ExecutionSetupException {
-    return new ShpBatchReader(formatConfig.getReaderConfig(this));
+    return new ShpBatchReader(formatConfig.getReaderConfig(this), scan.getMaxRecords());
   }
 
   @Override
   protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
     FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
-    builder.setReaderFactory(new ShpReaderFactory(new ShpReaderConfig(this)));
+    builder.setReaderFactory(new ShpReaderFactory(new ShpReaderConfig(this), scan.getMaxRecords()));
     initScanBuilder(builder, scan);
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
     return builder;
@@ -87,6 +90,7 @@ public class ShpFormatPlugin extends EasyFormatPlugin<ShpFormatConfig> {
     config.defaultName = PLUGIN_NAME;
     config.readerOperatorType = CoreOperatorType.SHP_SUB_SCAN_VALUE;
     config.useEnhancedScan = true;
+    config.supportsLimitPushdown = true;
     return config;
   }
 }
diff --git a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
index 22ac590..c60b163 100644
--- a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
+++ b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
@@ -120,6 +120,7 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
   private static final int BUFFER_SIZE = 4096;
 
   private final ExcelReaderConfig readerConfig;
+  private final int maxRecords;
   private Sheet sheet;
   private Row currentRow;
   private StreamingWorkbook streamingWorkbook;
@@ -159,8 +160,9 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
   }
 
-  public ExcelBatchReader(ExcelReaderConfig readerConfig) {
+  public ExcelBatchReader(ExcelReaderConfig readerConfig, int maxRecords) {
     this.readerConfig = readerConfig;
+    this.maxRecords = maxRecords;
     firstLine = true;
   }
 
@@ -372,6 +374,11 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
     if (readerConfig.lastColumn != 0) {
       finalColumn = readerConfig.lastColumn - 1;
     }
+
+    if (rowWriter.limitReached(maxRecords)) {
+      return false;
+    }
+
     rowWriter.start();
     for (int colWriterIndex = 0; colPosition < finalColumn; colWriterIndex++) {
       Cell cell = currentRow.getCell(colPosition);
diff --git a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
index 47e35b2..0d8d52d 100644
--- a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
+++ b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
@@ -46,14 +46,16 @@ public class ExcelFormatPlugin extends EasyFormatPlugin<ExcelFormatConfig> {
 
   private static class ExcelReaderFactory extends FileReaderFactory {
     private final ExcelBatchReader.ExcelReaderConfig readerConfig;
+    private final int maxRecords;
 
-    public ExcelReaderFactory(ExcelReaderConfig config) {
+    public ExcelReaderFactory(ExcelReaderConfig config, int maxRecords) {
       readerConfig = config;
+      this.maxRecords = maxRecords;
     }
 
     @Override
     public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-      return new ExcelBatchReader(readerConfig);
+      return new ExcelBatchReader(readerConfig, maxRecords);
     }
   }
 
@@ -75,13 +77,14 @@ public class ExcelFormatPlugin extends EasyFormatPlugin<ExcelFormatConfig> {
     config.defaultName = DEFAULT_NAME;
     config.readerOperatorType = UserBitShared.CoreOperatorType.EXCEL_SUB_SCAN_VALUE;
     config.useEnhancedScan = true;
+    config.supportsLimitPushdown = true;
     return config;
   }
 
   @Override
   public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
     EasySubScan scan, OptionManager options) throws ExecutionSetupException {
-    return new ExcelBatchReader(formatConfig.getReaderConfig(this));
+    return new ExcelBatchReader(formatConfig.getReaderConfig(this), scan.getMaxRecords());
   }
 
   @Override
@@ -90,7 +93,7 @@ public class ExcelFormatPlugin extends EasyFormatPlugin<ExcelFormatConfig> {
     ExcelReaderConfig readerConfig = new ExcelReaderConfig(this);
 
     verifyConfigOptions(readerConfig);
-    builder.setReaderFactory(new ExcelReaderFactory(readerConfig));
+    builder.setReaderFactory(new ExcelReaderFactory(readerConfig, scan.getMaxRecords()));
 
     initScanBuilder(builder, scan);
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
diff --git a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
index faecce8..387d708 100644
--- a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
+++ b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
@@ -420,4 +420,15 @@ public class TestExcelFormat extends ClusterTest {
 
     new RowSetComparison(expected).verifyAndClearAll(results);
   }
+
+  @Test
+  public void testLimitPushdown() throws Exception {
+    String sql = "SELECT id, first_name, order_count FROM cp.`excel/test_data.xlsx` LIMIT 5";
+
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=5")
+      .match();
+  }
 }
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
index 68094f8..828061f 100644
--- a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
@@ -111,6 +111,8 @@ public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
 
   private final List<HDF5DataWriter> dataWriters;
 
+  private final int maxRecords;
+
   private FileSplit split;
 
   private IHDF5Reader hdf5Reader;
@@ -158,8 +160,9 @@ public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
   }
 
-  public HDF5BatchReader(HDF5ReaderConfig readerConfig) {
+  public HDF5BatchReader(HDF5ReaderConfig readerConfig, int maxRecords) {
     this.readerConfig = readerConfig;
+    this.maxRecords = maxRecords;
     dataWriters = new ArrayList<>();
   }
 
@@ -369,6 +372,12 @@ public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
 
   @Override
   public boolean next() {
+
+    // Limit pushdown
+    if (rowWriter.limitReached(maxRecords)) {
+      return false;
+    }
+
     while (!rowWriter.isFull()) {
       if (readerConfig.defaultPath == null || readerConfig.defaultPath.isEmpty()) {
         if (!metadataIterator.hasNext()){
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
index 60d6ec5..3e9480f 100644
--- a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
@@ -66,6 +66,7 @@ public class HDF5FormatPlugin extends EasyFormatPlugin<HDF5FormatConfig> {
     config.defaultName = DEFAULT_NAME;
     config.readerOperatorType = UserBitShared.CoreOperatorType.HDF5_SUB_SCAN_VALUE;
     config.useEnhancedScan = true;
+    config.supportsLimitPushdown = true;
     return config;
   }
 
@@ -73,7 +74,7 @@ public class HDF5FormatPlugin extends EasyFormatPlugin<HDF5FormatConfig> {
   protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) throws ExecutionSetupException {
     FileScanBuilder builder = new FileScanBuilder();
 
-    builder.setReaderFactory(new HDF5ReaderFactory(new HDF5BatchReader.HDF5ReaderConfig(this, formatConfig)));
+    builder.setReaderFactory(new HDF5ReaderFactory(new HDF5BatchReader.HDF5ReaderConfig(this, formatConfig), scan.getMaxRecords()));
     initScanBuilder(builder, scan);
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
     return builder;
@@ -81,14 +82,17 @@ public class HDF5FormatPlugin extends EasyFormatPlugin<HDF5FormatConfig> {
 
   public static class HDF5ReaderFactory extends FileScanFramework.FileReaderFactory {
     private final HDF5ReaderConfig readerConfig;
+    private final int maxRecords;
 
-    HDF5ReaderFactory(HDF5ReaderConfig config) {
+
+    HDF5ReaderFactory(HDF5ReaderConfig config, int maxRecords) {
       readerConfig = config;
+      this.maxRecords = maxRecords;
     }
 
     @Override
     public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
-      return new HDF5BatchReader(readerConfig);
+      return new HDF5BatchReader(readerConfig, maxRecords);
     }
   }
 
diff --git a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java
index 8d6ba99..46cff86 100644
--- a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java
+++ b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java
@@ -47,6 +47,8 @@ public class SpssBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
   private static final String VALUE_LABEL = "_value";
 
+  private final int maxRecords;
+
   private FileSplit split;
 
   private InputStream fsStream;
@@ -71,6 +73,10 @@ public class SpssBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
   }
 
+  public SpssBatchReader(int maxRecords) {
+    this.maxRecords = maxRecords;
+  }
+
   @Override
   public boolean open(FileSchemaNegotiator negotiator) {
     split = negotiator.split();
@@ -117,6 +123,11 @@ public class SpssBatchReader implements ManagedReader<FileSchemaNegotiator> {
   }
 
   private boolean processNextRow() {
+    // Check to see if the limit has been reached
+    if (rowWriter.limitReached(maxRecords)) {
+      return false;
+    }
+
     try {
       // Stop reading when you run out of data
       if (!spssReader.readNextCase()) {
diff --git a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
index 5e780b2..53e618b 100644
--- a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
+++ b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
@@ -40,9 +40,15 @@ public class SpssFormatPlugin extends EasyFormatPlugin<SpssFormatConfig> {
 
   private static class SpssReaderFactory extends FileReaderFactory {
 
+    private final int maxRecords;
+
+    public SpssReaderFactory(int maxRecords) {
+      this.maxRecords = maxRecords;
+    }
+
     @Override
     public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-      return new SpssBatchReader();
+      return new SpssBatchReader(maxRecords);
     }
   }
 
@@ -64,19 +70,20 @@ public class SpssFormatPlugin extends EasyFormatPlugin<SpssFormatConfig> {
     config.defaultName = DEFAULT_NAME;
     config.readerOperatorType = UserBitShared.CoreOperatorType.SPSS_SUB_SCAN_VALUE;
     config.useEnhancedScan = true;
+    config.supportsLimitPushdown = true;
     return config;
   }
 
   @Override
   public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
     EasySubScan scan, OptionManager options)  {
-    return new SpssBatchReader();
+    return new SpssBatchReader(scan.getMaxRecords());
   }
 
   @Override
   protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
     FileScanBuilder builder = new FileScanBuilder();
-    builder.setReaderFactory(new SpssReaderFactory());
+    builder.setReaderFactory(new SpssReaderFactory(scan.getMaxRecords()));
 
     initScanBuilder(builder, scan);
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index 29224cb..a5330dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -17,22 +17,8 @@
  */
 package org.apache.drill.exec.physical.base;
 
-import static org.apache.drill.exec.ExecConstants.SKIP_RUNTIME_ROWGROUP_PRUNING_KEY;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.drill.common.expression.ErrorCollector;
@@ -87,8 +73,21 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.drill.exec.ExecConstants.SKIP_RUNTIME_ROWGROUP_PRUNING_KEY;
 
 /**
  * Represents table group scan with metadata usage.
@@ -120,6 +119,8 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
 
   protected boolean usedMetastore; // false by default
 
+  protected int maxRecords;
+
   protected AbstractGroupScanWithMetadata(String userName, List<SchemaPath> columns, LogicalExpression filter) {
     super(userName);
     this.columns = columns;
@@ -141,6 +142,7 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
     this.usedMetastore = that.usedMetastore;
     this.nonInterestingColumnsMetadata = that.nonInterestingColumnsMetadata;
     this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
+    this.maxRecords = that.maxRecords;
   }
 
   @JsonProperty("columns")
@@ -160,6 +162,9 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
   }
 
   @JsonIgnore
+  public int getMaxRecords() { return maxRecords; }
+
+  @JsonIgnore
   public boolean isMatchAllMetadata() {
     return matchAllMetadata;
   }
@@ -453,11 +458,17 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
   public GroupScan applyLimit(int maxRecords) {
     maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 file.
     GroupScanWithMetadataFilterer<?> prunedMetadata = getFilterer();
+
     if (getTableMetadata() != null) {
       long tableRowCount = TableStatisticsKind.ROW_COUNT.getValue(getTableMetadata());
       if (tableRowCount == Statistic.NO_COLUMN_STATS || tableRowCount <= maxRecords) {
         logger.debug("limit push down does not apply, since total number of rows [{}] is less or equal to the required [{}].",
             tableRowCount, maxRecords);
+        // Return the group scan with the limit pushed down
+        if (this.maxRecords != maxRecords) {
+          prunedMetadata.limit(maxRecords);
+          return prunedMetadata.build();
+        }
         return null;
       }
     }
@@ -468,6 +479,12 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
     // some files does not have set row count, do not do files pruning
     if (qualifiedFiles == null || qualifiedFiles.size() == getFilesMetadata().size()) {
       logger.debug("limit push down does not apply, since number of files was not reduced.");
+
+      // Return the group scan with the limit pushed down
+      if (this.maxRecords != maxRecords) {
+        prunedMetadata.limit(maxRecords);
+        return prunedMetadata.build();
+      }
       return null;
     }
 
@@ -479,6 +496,7 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
         .segments(getSegmentsMetadata())
         .partitions(getPartitionsMetadata())
         .files(filesMap)
+        .limit(maxRecords)
         .nonInterestingColumns(getNonInterestingColumnsMetadata())
         .matching(matchAllMetadata)
         .build();
@@ -715,6 +733,7 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
     protected TupleMetadata tableSchema;
     protected UdfUtilities udfUtilities;
     protected FunctionLookupContext context;
+    protected int maxRecords;
 
     // for the case when filtering is possible for partitions, but files count exceeds
     // PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD, new group scan with at least filtered partitions
@@ -757,6 +776,11 @@ public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvi
       return self();
     }
 
+    public B limit(int maxRecords) {
+      source.maxRecords = maxRecords;
+      return self();
+    }
+
     public B matching(boolean matchAllMetadata) {
       this.matchAllMetadata = matchAllMetadata;
       return self();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
index bf9ba76..462ad2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
@@ -108,6 +108,15 @@ public interface RowSetLoader extends TupleWriter {
   boolean isFull();
 
   /**
+   * Used to push a limit down to the file reader. This method checks to see whether
+   * the maxRecords parameter is not zero (for no limit) and is not greater than the
+   * current record count.
+   * @param maxRecords Maximum rows to be returned. (From the limit clause of the query)
+   * @return True if the row count exceeds the maxRecords, false if not.
+   */
+  boolean limitReached(int maxRecords);
+
+  /**
    * The number of rows in the current row set. Does not count any overflow row
    * saved for the next batch.
    *
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
index 4058fa7..d907a59 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
 
+
 /**
  * Implementation of the row set loader. Provides row-level operations, leaving the
  * result set loader to provide batch-level operations. However, all control
@@ -98,6 +99,11 @@ public class RowSetLoaderImpl extends AbstractTupleWriter implements RowSetLoade
   }
 
   @Override
+  public boolean limitReached(int maxRecords) {
+    return (maxRecords > 0 && this.rowCount() >= maxRecords);
+  }
+
+  @Override
   public boolean isFull() { return rsLoader.isFull(); }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
index 580ba7d..51a77d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
@@ -52,6 +52,11 @@ public class AvroBatchReader implements ManagedReader<FileScanFramework.FileSche
   private List<ColumnConverter> converters;
   // re-use container instance
   private GenericRecord record;
+  private int maxRecords;
+
+  public AvroBatchReader(int maxRecords) {
+    this.maxRecords = maxRecords;
+  }
 
   @Override
   public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
@@ -151,6 +156,10 @@ public class AvroBatchReader implements ManagedReader<FileScanFramework.FileSche
   }
 
   private boolean nextLine(RowSetLoader rowWriter) {
+    if (rowWriter.limitReached(maxRecords)) {
+      return false;
+    }
+
     try {
       if (!reader.hasNext() || reader.pastSync(endPosition)) {
         return false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 5d958ed..854c6be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -56,13 +56,14 @@ public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
     config.defaultName = DEFAULT_NAME;
     config.readerOperatorType = CoreOperatorType.AVRO_SUB_SCAN_VALUE;
     config.useEnhancedScan = true;
+    config.supportsLimitPushdown = true;
     return config;
   }
 
   @Override
   protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
     FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
-    builder.setReaderFactory(new AvroReaderFactory());
+    builder.setReaderFactory(new AvroReaderFactory(scan.getMaxRecords()));
     initScanBuilder(builder, scan);
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
     return builder;
@@ -70,9 +71,14 @@ public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
 
   private static class AvroReaderFactory extends FileScanFramework.FileReaderFactory {
 
+    private final int maxRecords;
+    public AvroReaderFactory(int maxRecords) {
+      this.maxRecords = maxRecords;
+    }
+
     @Override
     public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
-      return new AvroBatchReader();
+      return new AvroBatchReader(maxRecords);
     }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index cacfe26..af931f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Base class for file readers.
  * <p>
@@ -103,6 +104,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     // use this simpler form. New plugins should use these options
     // instead of overriding methods.
 
+    public boolean supportsLimitPushdown;
     public boolean supportsProjectPushdown;
     public boolean supportsFileImplicitColumns = true;
     public boolean supportsAutoPartitioning;
@@ -212,6 +214,15 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   public String getName() { return name; }
 
   /**
+   * Does this plugin support pushing the limit down to the batch reader?  If so, then
+   * the reader itself should have logic to stop reading the file as soon as the limit has been
+   * reached. It makes the most sense to do this with file formats that have consistent schemata
+   * that are identified at the first row.  CSV for example.  If the user only wants 100 rows, it
+   * does not make sense to read the entire file.
+   */
+  public boolean supportsLimitPushdown() { return easyConfig.supportsLimitPushdown; }
+
+  /**
    * Does this plugin support projection push down? That is, can the reader
    * itself handle the tasks of projecting table columns, creating null
    * columns for missing table columns, and so on?
@@ -303,7 +314,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     if (! columnExplorer.isStarQuery()) {
       scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
           columnExplorer.getTableColumns(), scan.getSelectionRoot(),
-          scan.getPartitionDepth(), scan.getSchema());
+          scan.getPartitionDepth(), scan.getSchema(), scan.getMaxRecords());
       scan.setOperatorId(scan.getOperatorId());
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 715fc25..c968f6c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.ValueExpressions;
@@ -70,6 +71,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 @JsonTypeName("fs-scan")
 public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataProvider> {
   private static final Logger logger = LoggerFactory.getLogger(EasyGroupScan.class);
@@ -84,6 +86,8 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
   private List<CompleteFileWork> chunks;
   private List<EndpointAffinity> endpointAffinities;
   private final Path selectionRoot;
+  private final int maxRecords;
+  private final boolean supportsLimitPushdown;
 
   @JsonCreator
   public EasyGroupScan(
@@ -101,7 +105,8 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
     this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, EasyFormatPlugin.class);
     this.columns = columns == null ? ALL_COLUMNS : columns;
     this.selectionRoot = selectionRoot;
-
+    this.maxRecords = getMaxRecords();
+    this.supportsLimitPushdown = formatPlugin.easyConfig().supportsLimitPushdown;
     this.metadataProvider = defaultTableMetadataProviderBuilder(new FileSystemMetadataProviderManager())
         .withSelection(selection)
         .withSchema(schema)
@@ -137,6 +142,8 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
     this.usedMetastore = metadataProviderManager.usesMetastore();
     initFromSelection(selection, formatPlugin);
     checkMetadataConsistency(selection, formatPlugin.getFsConf());
+    this.supportsLimitPushdown = formatPlugin.easyConfig().supportsLimitPushdown;
+    this.maxRecords = getMaxRecords();
   }
 
   public EasyGroupScan(
@@ -172,6 +179,8 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
     mappings = that.mappings;
     partitionDepth = that.partitionDepth;
     metadataProvider = that.metadataProvider;
+    maxRecords = getMaxRecords();
+    supportsLimitPushdown = that.formatPlugin.easyConfig().supportsLimitPushdown;
   }
 
   @JsonIgnore
@@ -288,7 +297,7 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
         String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
 
     EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin,
-        columns, selectionRoot, partitionDepth, getSchema());
+        columns, selectionRoot, partitionDepth, getSchema(), maxRecords);
     subScan.setOperatorId(getOperatorId());
     return subScan;
   }
@@ -313,8 +322,15 @@ public class EasyGroupScan extends AbstractGroupScanWithMetadata<TableMetadataPr
 
   @Override
   public String toString() {
-    String pattern = "EasyGroupScan [selectionRoot=%s, numFiles=%s, columns=%s, files=%s, schema=%s, usedMetastore=%s]";
-    return String.format(pattern, selectionRoot, getFiles().size(), columns, getFiles(), getSchema(), usedMetastore());
+    return new PlanStringBuilder(this)
+      .field("selectionRoot", selectionRoot)
+      .field("numFiles", getFiles().size())
+      .field("columns", columns)
+      .field("files", getFiles())
+      .field("schema", getSchema())
+      .field("usedMetastore", usedMetastore())
+      .field("maxRecords", maxRecords)
+      .toString();
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index 86a1292..b9a6db2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -35,6 +35,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.hadoop.fs.Path;
 
+
 @JsonTypeName("fs-sub-scan")
 public class EasySubScan extends AbstractSubScan {
 
@@ -44,6 +45,7 @@ public class EasySubScan extends AbstractSubScan {
   private final Path selectionRoot;
   private final int partitionDepth;
   private final TupleMetadata schema;
+  private final int maxRecords;
 
   @JsonCreator
   public EasySubScan(
@@ -55,7 +57,8 @@ public class EasySubScan extends AbstractSubScan {
     @JsonProperty("columns") List<SchemaPath> columns,
     @JsonProperty("selectionRoot") Path selectionRoot,
     @JsonProperty("partitionDepth") int partitionDepth,
-    @JsonProperty("schema") TupleMetadata schema
+    @JsonProperty("schema") TupleMetadata schema,
+    @JsonProperty("maxRecords") int maxRecords
     ) throws ExecutionSetupException {
     super(userName);
     this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, EasyFormatPlugin.class);
@@ -64,10 +67,11 @@ public class EasySubScan extends AbstractSubScan {
     this.selectionRoot = selectionRoot;
     this.partitionDepth = partitionDepth;
     this.schema = schema;
+    this.maxRecords = maxRecords;
   }
 
   public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin,
-      List<SchemaPath> columns, Path selectionRoot, int partitionDepth, TupleMetadata schema) {
+      List<SchemaPath> columns, Path selectionRoot, int partitionDepth, TupleMetadata schema, int maxRecords) {
     super(userName);
     this.formatPlugin = plugin;
     this.files = files;
@@ -75,6 +79,7 @@ public class EasySubScan extends AbstractSubScan {
     this.selectionRoot = selectionRoot;
     this.partitionDepth = partitionDepth;
     this.schema = schema;
+    this.maxRecords = maxRecords;
   }
 
   @JsonProperty
@@ -101,6 +106,9 @@ public class EasySubScan extends AbstractSubScan {
   @JsonProperty("schema")
   public TupleMetadata getSchema() { return schema; }
 
+  @JsonProperty("maxRecords")
+  public int getMaxRecords() { return maxRecords; }
+
   @Override
   public int getOperatorType() { return formatPlugin.getReaderOperatorType(); }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index af710e3..3dd1e89 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -207,14 +207,16 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   private static class ColumnsReaderFactory extends FileReaderFactory {
 
     private final TextParsingSettings settings;
+    private final int maxRecords;
 
-    public ColumnsReaderFactory(TextParsingSettings settings) {
+    public ColumnsReaderFactory(TextParsingSettings settings, int maxRecords) {
       this.settings = settings;
+      this.maxRecords = maxRecords;
     }
 
     @Override
     public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-       return new CompliantTextBatchReader(settings);
+       return new CompliantTextBatchReader(settings, maxRecords);
     }
   }
 
@@ -241,6 +243,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     config.readerOperatorType = CoreOperatorType.TEXT_SUB_SCAN_VALUE;
     config.writerOperatorType = CoreOperatorType.TEXT_WRITER_VALUE;
     config.useEnhancedScan = true;
+    config.supportsLimitPushdown = true;
     return config;
   }
 
@@ -270,7 +273,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
 
     TextParsingSettings settings =
         new TextParsingSettings(getConfig(), scan.getSchema());
-    builder.setReaderFactory(new ColumnsReaderFactory(settings));
+    builder.setReaderFactory(new ColumnsReaderFactory(settings, scan.getMaxRecords()));
 
     // If this format has no headers, or wants to skip them,
     // then we must use the columns column to hold the data.
@@ -338,8 +341,14 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     for (final CompleteFileWork work : scan.getWorkIterable()) {
       data += work.getTotalBytes();
     }
+
     final double estimatedRowSize = settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE);
-    final double estRowCount = data / estimatedRowSize;
-    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
+
+    if (scan.supportsLimitPushdown() && scan.getMaxRecords() > 0) {
+      return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, (long)scan.getMaxRecords(), 1, data);
+    } else {
+      final double estRowCount = data / estimatedRowSize;
+      return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
+    }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
index 79840c1..bddbac7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
@@ -17,9 +17,8 @@
  */
 package org.apache.drill.exec.store.easy.text.reader;
 
-import java.io.IOException;
-import java.io.InputStream;
-
+import com.univocity.parsers.common.TextParsingException;
+import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -40,9 +39,8 @@ import org.apache.hadoop.mapred.FileSplit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.univocity.parsers.common.TextParsingException;
-
-import io.netty.buffer.DrillBuf;
+import java.io.IOException;
+import java.io.InputStream;
 
 /**
  * Text reader, Complies with the RFC 4180 standard for text/csv files.
@@ -58,6 +56,8 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
   private final TextParsingSettings settings;
   // Chunk of the file to be read by this reader
   private FileSplit split;
+  // Limit pushed down from the query
+  private final int maxRecords;
   // text reader implementation
   private TextReader reader;
   // input buffer
@@ -68,8 +68,9 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
 
   private RowSetLoader writer;
 
-  public CompliantTextBatchReader(TextParsingSettings settings) {
+  public CompliantTextBatchReader(TextParsingSettings settings, int maxRecords) {
     this.settings = settings;
+    this.maxRecords = maxRecords;
 
     // Validate. Otherwise, these problems show up later as a data
     // read error which is very confusing.
@@ -298,6 +299,11 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
   public boolean next() {
     reader.resetForNextBatch();
 
+    // If the limit is defined and the row count is greater than the limit, stop reading the file.
+    if (maxRecords > 0 && writer.rowCount() > maxRecords) {
+      return false;
+    }
+
     try {
       boolean more = false;
       while (! writer.isFull()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
index 99efd76..b808ffe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
@@ -18,13 +18,6 @@
 
 package org.apache.drill.exec.store.log;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
@@ -41,6 +34,13 @@ import org.apache.hadoop.mapred.FileSplit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 public class LogBatchReader implements ManagedReader<FileSchemaNegotiator> {
   private static final Logger logger = LoggerFactory.getLogger(LogBatchReader.class);
   public static final String RAW_LINE_COL_NAME = "_raw";
@@ -126,6 +126,7 @@ public class LogBatchReader implements ManagedReader<FileSchemaNegotiator> {
   }
 
   private final LogReaderConfig config;
+  private final int maxRecords;
   private FileSplit split;
   private BufferedReader reader;
   private ResultSetLoader loader;
@@ -136,8 +137,9 @@ public class LogBatchReader implements ManagedReader<FileSchemaNegotiator> {
   private int lineNumber;
   private int errorCount;
 
-  public LogBatchReader(LogReaderConfig config) {
+  public LogBatchReader(LogReaderConfig config, int maxRecords) {
     this.config = config;
+    this.maxRecords = maxRecords;
   }
 
   @Override
@@ -213,6 +215,10 @@ public class LogBatchReader implements ManagedReader<FileSchemaNegotiator> {
   }
 
   private boolean nextLine(RowSetLoader rowWriter) {
+    if (rowWriter.limitReached(maxRecords)) {
+      return false;
+    }
+
     String line;
     try {
       line = reader.readLine();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
index 8a3a6af..1027fc7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
@@ -18,12 +18,6 @@
 
 package org.apache.drill.exec.store.log;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
-
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -50,6 +44,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
 public class LogFormatPlugin extends EasyFormatPlugin<LogFormatConfig> {
   private static final Logger logger = LoggerFactory.getLogger(LogFormatPlugin.class);
 
@@ -60,14 +60,16 @@ public class LogFormatPlugin extends EasyFormatPlugin<LogFormatConfig> {
 
   private static class LogReaderFactory extends FileReaderFactory {
     private final LogReaderConfig readerConfig;
+    private final int maxRecords;
 
-    public LogReaderFactory(LogReaderConfig config) {
+    public LogReaderFactory(LogReaderConfig config, int maxRecords) {
       readerConfig = config;
+      this.maxRecords = maxRecords;
     }
 
     @Override
     public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-       return new LogBatchReader(readerConfig);
+       return new LogBatchReader(readerConfig, maxRecords);
     }
   }
 
@@ -90,6 +92,7 @@ public class LogFormatPlugin extends EasyFormatPlugin<LogFormatConfig> {
     config.defaultName = PLUGIN_NAME;
     config.readerOperatorType = CoreOperatorType.REGEX_SUB_SCAN_VALUE;
     config.useEnhancedScan = true;
+    config.supportsLimitPushdown = true;
     return config;
   }
 
@@ -207,7 +210,7 @@ public class LogFormatPlugin extends EasyFormatPlugin<LogFormatConfig> {
     // each input file.
     builder.setReaderFactory(new LogReaderFactory(
         new LogReaderConfig(this, pattern, providedSchema, tableSchema,
-            readerSchema, !hasSchema, groupCount, maxErrors(providedSchema))));
+            readerSchema, !hasSchema, groupCount, maxErrors(providedSchema)), scan.getMaxRecords()));
 
     // The default type of regex columns is nullable VarChar,
     // so let's use that as the missing column type.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
index e724d18..3d9d8f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
@@ -145,6 +145,7 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
   private ScalarWriter remoteDataWriter;
 
+  private final int maxRecords;
 
   private Map<Long, TcpSession> sessionQueue;
 
@@ -164,11 +165,12 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
   }
 
-  public PcapBatchReader(PcapReaderConfig readerConfig) {
+  public PcapBatchReader(PcapReaderConfig readerConfig, int maxRecords) {
     this.readerConfig = readerConfig;
     if (readerConfig.sessionizeTCPStreams) {
       sessionQueue = new HashMap<>();
     }
+    this.maxRecords = maxRecords;
   }
 
   @Override
@@ -297,6 +299,11 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
   private boolean parseNextPacket(RowSetLoader rowWriter) {
 
+    // Push down limit
+    if (rowWriter.limitReached(maxRecords)) {
+      return false;
+    }
+
     // Decode the packet
     Packet packet = new Packet();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
index 40a70bd..b5b77fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
@@ -39,14 +39,16 @@ public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> {
   private static class PcapReaderFactory extends FileReaderFactory {
 
     private final PcapReaderConfig readerConfig;
+    private final int maxRecords;
 
-    public PcapReaderFactory(PcapReaderConfig config) {
+    public PcapReaderFactory(PcapReaderConfig config, int maxRecords) {
       readerConfig = config;
+      this.maxRecords = maxRecords;
     }
 
     @Override
     public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-      return new PcapBatchReader(readerConfig);
+      return new PcapBatchReader(readerConfig, maxRecords);
     }
   }
 
@@ -68,18 +70,19 @@ public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> {
     config.defaultName = PLUGIN_NAME;
     config.readerOperatorType = UserBitShared.CoreOperatorType.PCAP_SUB_SCAN_VALUE;
     config.useEnhancedScan = true;
+    config.supportsLimitPushdown = true;
     return config;
   }
 
   @Override
   public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) {
-    return new PcapBatchReader(new PcapReaderConfig(this));
+    return new PcapBatchReader(new PcapReaderConfig(this), scan.getMaxRecords());
   }
 
   @Override
   protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
     FileScanBuilder builder = new FileScanBuilder();
-    builder.setReaderFactory(new PcapReaderFactory(new PcapReaderConfig(this)));
+    builder.setReaderFactory(new PcapReaderFactory(new PcapReaderConfig(this), scan.getMaxRecords()));
     initScanBuilder(builder, scan);
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
     return builder;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/metastore/TestMetastoreWithEasyFormatPlugin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/metastore/TestMetastoreWithEasyFormatPlugin.java
index a6924c0..3bc0820 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/metastore/TestMetastoreWithEasyFormatPlugin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/metastore/TestMetastoreWithEasyFormatPlugin.java
@@ -1079,14 +1079,14 @@ public class TestMetastoreWithEasyFormatPlugin extends ClusterTest {
       queryBuilder()
           .sql("select * from dfs.tmp.`%s` limit 1", tableName)
           .planMatcher()
-          .include("Limit", "numFiles=1,")
+          .include("Limit", "numFiles=1", "maxRecords=1")
           .match();
 
       // each file has 10 records, so 3 files should be picked
       queryBuilder()
           .sql("select * from dfs.tmp.`%s` limit 21", tableName)
           .planMatcher()
-          .include("Limit", "numFiles=3")
+          .include("Limit", "numFiles=3", "maxRecords=21")
           .match();
     } finally {
       run("analyze table dfs.tmp.`%s` drop metadata if exists", tableName);