You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2021/04/11 01:16:44 UTC

[drill] branch master updated: DRILL-7814: Add Limit Pushdown to MongoDB Storage Plugin

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new f05e75a  DRILL-7814: Add Limit Pushdown to MongoDB Storage Plugin
f05e75a is described below

commit f05e75adfd04f90852a12cae9e502b204ece47f5
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Sat Apr 10 21:16:33 2021 -0400

    DRILL-7814: Add Limit Pushdown to MongoDB Storage Plugin
    
    * Initial Work
    
    * Initial Commit
    
    * WIP
    
    * Updated TestSuite
    
    * WIP
    
    * Updated..
    
    * Removed unused import
    
    * DRILL-7814: Fix filter pushdown
    
    Co-authored-by: Volodymyr Vysotskyi <vv...@gmail.com>
---
 .../store/druid/DruidPushDownFilterForScan.java    |  3 +-
 contrib/storage-mongo/pom.xml                      |  4 +-
 .../drill/exec/store/mongo/MongoGroupScan.java     | 62 +++++++++++++++----
 .../store/mongo/MongoPushDownFilterForScan.java    | 32 ++++------
 .../drill/exec/store/mongo/MongoRecordReader.java  | 15 ++++-
 .../drill/exec/store/mongo/MongoStoragePlugin.java |  2 +-
 .../drill/exec/store/mongo/MongoSubScan.java       | 26 ++++++--
 .../drill/exec/store/mongo/MongoTestConstants.java |  5 +-
 .../drill/exec/store/mongo/MongoTestSuite.java     |  1 +
 .../exec/store/mongo/TestMongoLimitPushDown.java   | 70 ++++++++++++++++++++++
 .../exec/planner/common/DrillScanRelBase.java      |  2 +
 .../drill/exec/planner/logical/DrillScanRel.java   |  5 ++
 .../drill/exec/planner/physical/ScanPrel.java      | 11 ++--
 13 files changed, 189 insertions(+), 49 deletions(-)

diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
old mode 100755
new mode 100644
index 6e313c6..65d95aa
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
@@ -73,8 +73,7 @@ public class DruidPushDownFilterForScan extends StoragePluginOptimizerRule {
             groupScan.getMaxRecordsToRead());
     newGroupsScan.setFilterPushedDown(true);
 
-    final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(),
-      newGroupsScan, scan.getRowType());
+    ScanPrel newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan);
     if (druidFilterBuilder.isAllExpressionsConverted()) {
       /*
        * Since we could convert the entire filter condition expression into a
diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
index c1f2613..9746615 100644
--- a/contrib/storage-mongo/pom.xml
+++ b/contrib/storage-mongo/pom.xml
@@ -45,10 +45,10 @@
   <dependency>
     <groupId>org.mongodb</groupId>
     <artifactId>mongo-java-driver</artifactId>
-    <version>3.12.7</version>
+    <version>3.12.8</version>
   </dependency>
 
-    <!-- Test dependencie -->
+    <!-- Test dependency -->
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
       <artifactId>drill-java-exec</artifactId>
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index aedbc82..97fd312 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -37,6 +37,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -79,7 +80,7 @@ import com.mongodb.client.MongoDatabase;
 public class MongoGroupScan extends AbstractGroupScan implements
     DrillMongoConstants {
 
-  private static final Integer select = Integer.valueOf(1);
+  private static final Integer select = 1;
 
   private static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
 
@@ -102,6 +103,8 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   private List<SchemaPath> columns;
 
+  private int maxRecords;
+
   private Map<Integer, List<MongoSubScanSpec>> endpointFragmentMapping;
 
   // Sharding with replica sets contains all the replica server addresses for
@@ -120,24 +123,27 @@ public class MongoGroupScan extends AbstractGroupScan implements
       @JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
       @JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
       @JsonProperty("columns") List<SchemaPath> columns,
-      @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException,
+      @JacksonInject StoragePluginRegistry pluginRegistry,
+      @JsonProperty("maxRecords") int maxRecords) throws IOException,
       ExecutionSetupException {
     this(userName,
         pluginRegistry.resolve(storagePluginConfig, MongoStoragePlugin.class),
-        scanSpec, columns);
+        scanSpec, columns, maxRecords);
   }
 
   public MongoGroupScan(String userName, MongoStoragePlugin storagePlugin,
-      MongoScanSpec scanSpec, List<SchemaPath> columns) throws IOException {
+      MongoScanSpec scanSpec, List<SchemaPath> columns, int maxRecords) throws IOException {
     super(userName);
     this.storagePlugin = storagePlugin;
     this.storagePluginConfig = storagePlugin.getConfig();
     this.scanSpec = scanSpec;
     this.columns = columns;
     this.storagePluginConfig.getConnection();
+    this.maxRecords = maxRecords;
     init();
   }
 
+
   /**
    * Private constructor, used for cloning.
    * @param that
@@ -153,6 +159,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
     this.chunksInverseMapping = that.chunksInverseMapping;
     this.endpointFragmentMapping = that.endpointFragmentMapping;
     this.filterPushedDown = that.filterPushedDown;
+    this.maxRecords = that.maxRecords;
   }
 
   @JsonIgnore
@@ -371,6 +378,12 @@ public class MongoGroupScan extends AbstractGroupScan implements
     return clone;
   }
 
+  public GroupScan clone(int maxRecords) {
+    MongoGroupScan clone = new MongoGroupScan(this);
+    clone.maxRecords = maxRecords;
+    return clone;
+  }
+
   @Override
   public boolean canPushdownProjects(List<SchemaPath> columns) {
     return true;
@@ -476,6 +489,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
         .setHosts(chunkInfo.getChunkLocList())
         .setMinFilters(chunkInfo.getMinFilters())
         .setMaxFilters(chunkInfo.getMaxFilters())
+        .setMaxRecords(maxRecords)
         .setFilter(scanSpec.getFilters());
     return subScanSpec;
   }
@@ -499,21 +513,29 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   @Override
   public ScanStats getScanStats() {
+    long recordCount;
     try{
       MongoClient client = storagePlugin.getClient();
       MongoDatabase db = client.getDatabase(scanSpec.getDbName());
       MongoCollection<Document> collection = db.getCollection(scanSpec.getCollectionName());
-      long numDocs = collection.count();
+      long numDocs = collection.countDocuments();
+
+      if (maxRecords > 0 && numDocs > 0) {
+        recordCount = Math.min(maxRecords, numDocs);
+      } else {
+        recordCount = numDocs;
+      }
+
       float approxDiskCost = 0;
-      if (numDocs != 0) {
+      if (recordCount != 0) {
         //toJson should use client's codec, otherwise toJson could fail on
         // some types not known to DocumentCodec, e.g. DBRef.
         final DocumentCodec codec =
             new DocumentCodec(client.getMongoClientOptions().getCodecRegistry(), new BsonTypeClassMap());
         String json = collection.find().first().toJson(codec);
-        approxDiskCost = json.getBytes().length * numDocs;
+        approxDiskCost = json.getBytes().length * recordCount;
       }
-      return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, numDocs, 1, approxDiskCost);
+      return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, recordCount, 1, approxDiskCost);
     } catch (Exception e) {
       throw new DrillRuntimeException(e.getMessage(), e);
     }
@@ -563,6 +585,19 @@ public class MongoGroupScan extends AbstractGroupScan implements
   }
 
   @Override
+  public boolean supportsLimitPushdown() {
+    return true;
+  }
+
+  @Override
+  public GroupScan applyLimit(int maxRecords) {
+    if (maxRecords == this.maxRecords) {
+      return null;
+    }
+    return clone(maxRecords);
+  }
+
+  @Override
   @JsonProperty
   public List<SchemaPath> getColumns() {
     return columns;
@@ -578,6 +613,9 @@ public class MongoGroupScan extends AbstractGroupScan implements
     return storagePluginConfig;
   }
 
+  @JsonProperty("maxRecords")
+  public int getMaxRecords() { return maxRecords; }
+
   @JsonIgnore
   public MongoStoragePlugin getStoragePlugin() {
     return storagePlugin;
@@ -585,8 +623,11 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   @Override
   public String toString() {
-    return "MongoGroupScan [MongoScanSpec=" + scanSpec + ", columns=" + columns
-        + "]";
+    return new PlanStringBuilder(this)
+      .field("MongoScanSpec", scanSpec)
+      .field("columns", columns)
+      .field("maxRecords", maxRecords)
+      .toString();
   }
 
   @VisibleForTesting
@@ -611,5 +652,4 @@ public class MongoGroupScan extends AbstractGroupScan implements
   void setInverseChunsMapping(Map<String, List<ChunkInfo>> chunksInverseMapping) {
     this.chunksInverseMapping = chunksInverseMapping;
   }
-
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
index be157b4..5e57890 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
@@ -18,24 +18,23 @@
 package org.apache.drill.exec.store.mongo;
 
 import java.io.IOException;
+import java.util.Collections;
 
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
 import org.apache.drill.exec.planner.logical.DrillOptiq;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.planner.physical.FilterPrel;
 import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rex.RexNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-
 public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
   private static final Logger logger = LoggerFactory
       .getLogger(MongoPushDownFilterForScan.class);
@@ -43,14 +42,14 @@ public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
 
   private MongoPushDownFilterForScan() {
     super(
-        RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
+        RelOptHelper.some(Filter.class, RelOptHelper.any(DrillScanRelBase.class)),
         "MongoPushDownFilterForScan");
   }
 
   @Override
   public void onMatch(RelOptRuleCall call) {
-    final ScanPrel scan = (ScanPrel) call.rel(1);
-    final FilterPrel filter = (FilterPrel) call.rel(0);
+    final DrillScanRelBase scan = call.rel(1);
+    final Filter filter = call.rel(0);
     final RexNode condition = filter.getCondition();
 
     MongoGroupScan groupScan = (MongoGroupScan) scan.getGroupScan();
@@ -67,18 +66,17 @@ public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
       return; // no filter pushdown so nothing to apply.
     }
 
-    MongoGroupScan newGroupsScan = null;
+    MongoGroupScan newGroupsScan;
     try {
       newGroupsScan = new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
-          newScanSpec, groupScan.getColumns());
+          newScanSpec, groupScan.getColumns(), groupScan.getMaxRecords());
     } catch (IOException e) {
       logger.error(e.getMessage(), e);
       throw new DrillRuntimeException(e.getMessage(), e);
     }
     newGroupsScan.setFilterPushedDown(true);
 
-    final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(),
-        newGroupsScan, scan.getRowType(), scan.getTable());
+    RelNode newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan);
 
     if (mongoFilterBuilder.isAllExpressionsConverted()) {
       /*
@@ -88,18 +86,14 @@ public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
       call.transformTo(newScanPrel);
     } else {
       call.transformTo(filter.copy(filter.getTraitSet(),
-          ImmutableList.of((RelNode) newScanPrel)));
+          Collections.singletonList(newScanPrel)));
     }
 
   }
 
   @Override
   public boolean matches(RelOptRuleCall call) {
-    final ScanPrel scan = (ScanPrel) call.rel(1);
-    if (scan.getGroupScan() instanceof MongoGroupScan) {
-      return super.matches(call);
-    }
-    return false;
+    DrillScanRelBase scan = call.rel(1);
+    return scan.getGroupScan() instanceof MongoGroupScan && super.matches(call);
   }
-
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index f5d1f2e..9d8b5bb 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -75,6 +75,7 @@ public class MongoRecordReader extends AbstractRecordReader {
   private final boolean readNumbersAsDouble;
   private boolean unionEnabled;
   private final boolean isBsonRecordReader;
+  private final int maxRecords;
 
   public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
       FragmentContext context, MongoStoragePlugin plugin) {
@@ -94,6 +95,7 @@ public class MongoRecordReader extends AbstractRecordReader {
     enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val;
     readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
     isBsonRecordReader = fragmentContext.getOptions().getOption(ExecConstants.MONGO_BSON_RECORD_READER).bool_val;
+    maxRecords = subScanSpec.getMaxRecords();
     logger.debug("BsonRecordReader is enabled? " + isBsonRecordReader);
     init(subScanSpec);
   }
@@ -171,9 +173,16 @@ public class MongoRecordReader extends AbstractRecordReader {
   @Override
   public int next() {
     if (cursor == null) {
-      logger.info("Filters Applied : " + filters);
-      logger.info("Fields Selected :" + fields);
-      cursor = collection.find(filters).projection(fields).batchSize(100).iterator();
+      logger.debug("Filters Applied : " + filters);
+      logger.debug("Fields Selected :" + fields);
+
+      // Add limit to Mongo query
+      if (maxRecords > 0) {
+        logger.debug("Limit applied: {}", maxRecords);
+        cursor = collection.find(filters).projection(fields).limit(maxRecords).batchSize(100).iterator();
+      } else {
+        cursor = collection.find(filters).projection(fields).batchSize(100).iterator();
+      }
     }
 
     writer.allocate();
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index e64632d..362ac51 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -129,7 +129,7 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
     MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<MongoScanSpec>() {
     });
-    return new MongoGroupScan(userName, this, mongoScanSpec, null);
+    return new MongoGroupScan(userName, this, mongoScanSpec, null, -1);
   }
 
   @Override
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index 551886f..af13eb5 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -128,6 +129,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
     protected List<String> hosts;
     protected Map<String, Object> minFilters;
     protected Map<String, Object> maxFilters;
+    protected int maxRecords;
 
     protected Document filter;
 
@@ -137,13 +139,15 @@ public class MongoSubScan extends AbstractBase implements SubScan {
         @JsonProperty("hosts") List<String> hosts,
         @JsonProperty("minFilters") Map<String, Object> minFilters,
         @JsonProperty("maxFilters") Map<String, Object> maxFilters,
-        @JsonProperty("filters") Document filters) {
+        @JsonProperty("filters") Document filters,
+        @JsonProperty("maxRecords") int maxRecords) {
       this.dbName = dbName;
       this.collectionName = collectionName;
       this.hosts = hosts;
       this.minFilters = minFilters;
       this.maxFilters = maxFilters;
       this.filter = filters;
+      this.maxRecords = maxRecords;
     }
 
     MongoSubScanSpec() {
@@ -177,6 +181,13 @@ public class MongoSubScan extends AbstractBase implements SubScan {
       return this;
     }
 
+    public int getMaxRecords() { return maxRecords; }
+
+    public MongoSubScanSpec setMaxRecords (int maxRecords) {
+      this.maxRecords = maxRecords;
+      return this;
+    }
+
     public Map<String, Object> getMinFilters() {
       return minFilters;
     }
@@ -206,9 +217,16 @@ public class MongoSubScan extends AbstractBase implements SubScan {
 
     @Override
     public String toString() {
-      return "MongoSubScanSpec [dbName=" + dbName + ", collectionName="
-          + collectionName + ", hosts=" + hosts + ", minFilters=" + minFilters
-          + ", maxFilters=" + maxFilters + ", filter=" + filter + "]";
+      return new PlanStringBuilder(this)
+        .field("dbName", dbName)
+        .field("collectionName", collectionName)
+        .field("hosts", hosts)
+        .field("minFilters", minFilters)
+        .field("maxFilters", maxFilters)
+        .field("filter", filter)
+        .field("maxRecords", maxRecords)
+        .toString();
+
     }
 
   }
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestConstants.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestConstants.java
index d8190a6..1e9d243 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestConstants.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestConstants.java
@@ -60,7 +60,10 @@ public interface MongoTestConstants {
 
   // test queries
   String TEST_QUERY_1 = "SELECT * FROM mongo.employee.`empinfo` limit 5";
-  String TEST_QUERY_LIMIT = "SELECT first_name, last_name FROM mongo.employee.`empinfo` limit 2;";
+  String TEST_QUERY_LIMIT = "SELECT first_name, last_name FROM mongo.%s.`%s` limit 2";
+  String TEST_LIMIT_QUERY = "select `employee_id` from mongo.%s.`%s` limit %d";
+
+
 
   // test query template1
   String TEST_QUERY_PROJECT_PUSH_DOWN_TEMPLATE_1 = "SELECT `employee_id` FROM mongo.%s.`%s`";
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
index b779c9a..14c4f56 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
@@ -70,6 +70,7 @@ import java.util.concurrent.atomic.AtomicInteger;
   TestMongoFilterPushDown.class,
   TestMongoProjectPushDown.class,
   TestMongoQueries.class,
+  TestMongoLimitPushDown.class,
   TestMongoChunkAssignment.class,
   TestMongoStoragePluginUsesCredentialsStore.class,
   TestMongoDrillIssue.class
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
new file mode 100644
index 0000000..569cf5c
--- /dev/null
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.mongo;
+
+import org.apache.drill.categories.MongoStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SlowTest.class, MongoStorageTest.class})
+public class TestMongoLimitPushDown extends MongoTestBase {
+
+  @Test
+  public void testLimit() throws Exception {
+    String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` LIMIT 4";
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=4")
+      .match();
+  }
+
+  @Test
+  public void testLimitWithOrderBy() throws Exception {
+    // Limit should not be pushed down for this example due to the sort
+    String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` ORDER BY employee_id LIMIT 4";
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=-1")
+      .match();
+  }
+
+  @Test
+  public void testLimitWithOffset() throws Exception {
+    // Limit should be pushed down and include the offset
+    String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` LIMIT 4 OFFSET 5";
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=9")
+      .match();
+  }
+
+  @Test
+  public void testLimitWithFilter() throws Exception {
+    String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` WHERE rating = 52.17 LIMIT 4";
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=4")
+      .match();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
index 96e5bfc..fe67709 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
@@ -86,4 +86,6 @@ public abstract class DrillScanRelBase extends TableScan implements DrillRelNode
     double dIo = 0;
     return planner.getCostFactory().makeCost(dRows, dCpu, dIo);
   }
+
+  public abstract DrillScanRelBase copy(RelTraitSet traitSet, GroupScan scan);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index e11dad2..26ef4ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -191,4 +191,9 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
 
     return projectedColumns;
   }
+
+  @Override
+  public DrillScanRel copy(RelTraitSet traitSet, GroupScan scan) {
+    return new DrillScanRel(getCluster(), getTraitSet(), getTable(), scan, getRowType(), getColumns(), partitionFilterPushdown());
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index 82fdc4b..50996b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -60,6 +60,11 @@ public class ScanPrel extends DrillScanRelBase implements LeafPrel, HasDistribut
   }
 
   @Override
+  public ScanPrel copy(RelTraitSet traitSet, GroupScan scan) {
+    return new ScanPrel(getCluster(), traitSet, scan, getRowType(), getTable());
+  }
+
+  @Override
   protected Object clone() throws CloneNotSupportedException {
     return new ScanPrel(this.getCluster(), this.getTraitSet(), getCopy(this.getGroupScan()),
         this.rowType, this.getTable());
@@ -80,12 +85,6 @@ public class ScanPrel extends DrillScanRelBase implements LeafPrel, HasDistribut
     return creator.addMetadata(this, this.getGroupScan());
   }
 
-  public static ScanPrel create(RelNode old, RelTraitSet traitSets,
-      GroupScan scan, RelDataType rowType) {
-    return new ScanPrel(old.getCluster(), traitSets,
-        getCopy(scan), rowType, old.getTable());
-  }
-
   @Override
   public RelWriter explainTerms(RelWriter pw) {
     return super.explainTerms(pw).item("groupscan", this.getGroupScan().getDigest());