You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2021/09/14 03:51:57 UTC

[drill] branch master updated: DRILL-7985: Support Mongo aggregate, union, project, limit, sort pushdowns (#2289)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new b249baf  DRILL-7985: Support Mongo aggregate, union, project, limit, sort pushdowns (#2289)
b249baf is described below

commit b249baf0ebd638105908d3697ca8d68f5753fad5
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Tue Sep 14 06:51:51 2021 +0300

    DRILL-7985: Support Mongo aggregate, union, project, limit, sort pushdowns (#2289)
    
    * DRILL-7985: Support Mongo aggregate, union, project, limit, sort pushdowns
    
    * Fix ser/de, make batchSize configurable and minor cleanup
    
    * Add docs for Mongo storage plugin credentials
    
    * Update mongo driver
    
    * Fix CASE with multiple branches
    
    * Fix alert
---
 .../store/druid/DruidPushDownFilterForScan.java    |   2 +-
 contrib/storage-mongo/pom.xml                      |   2 +-
 ...ngoCompareOp.java => BaseMongoSubScanSpec.java} |  33 ++-
 .../exec/store/mongo/MongoAggregateUtils.java      | 194 ++++++++++++++
 .../drill/exec/store/mongo/MongoFilterBuilder.java | 137 +++++-----
 .../drill/exec/store/mongo/MongoGroupScan.java     | 126 +++++----
 .../store/mongo/MongoPushDownFilterForScan.java    |  99 -------
 .../drill/exec/store/mongo/MongoRecordReader.java  |  35 ++-
 .../exec/store/mongo/MongoScanBatchCreator.java    |  12 +-
 .../drill/exec/store/mongo/MongoScanSpec.java      |  46 ++--
 .../drill/exec/store/mongo/MongoStoragePlugin.java |  50 +++-
 .../exec/store/mongo/MongoStoragePluginConfig.java |  53 ++--
 .../drill/exec/store/mongo/MongoSubScan.java       | 152 +++--------
 .../common/{MongoCompareOp.java => MongoOp.java}   |  30 ++-
 .../store/mongo/plan/MongoPluginImplementor.java   | 294 +++++++++++++++++++++
 .../store/mongo/plan/RexToMongoTranslator.java     | 244 +++++++++++++++++
 .../store/mongo/schema/MongoDatabaseSchema.java    |   3 +-
 .../store/mongo/schema/MongoSchemaFactory.java     |   4 +-
 .../drill/exec/store/mongo/MongoTestBase.java      |   2 +-
 .../exec/store/mongo/TestMongoChunkAssignment.java |   2 +-
 .../exec/store/mongo/TestMongoLimitPushDown.java   |  20 +-
 .../exec/store/mongo/TestMongoProjectPushDown.java |  12 +-
 .../drill/exec/store/mongo/TestMongoQueries.java   | 199 ++++++++++++++
 ...TestMongoStoragePluginUsesCredentialsStore.java |   2 +-
 docs/dev/MongoStoragePluginCredentials.md          |  58 ++++
 docs/dev/StoragePluginPushdowns.md                 |  31 +++
 .../exec/physical/base/AbstractGroupScan.java      |   6 +
 .../apache/drill/exec/physical/base/GroupScan.java |   5 +
 .../exec/planner/common/DrillScanRelBase.java      |   4 +-
 .../exec/planner/common/DrillSortRelBase.java      |  49 ++++
 .../drill/exec/planner/logical/DrillScanRel.java   |   4 +-
 .../drill/exec/planner/logical/DrillSortRel.java   |  15 +-
 .../drill/exec/planner/physical/ScanPrel.java      |   4 +-
 .../drill/exec/planner/physical/SortPrel.java      |  14 +-
 .../drill/exec/store/PluginRulesProvider.java      |  44 +--
 .../drill/exec/store/PluginRulesProviderImpl.java  | 116 ++++++++
 .../exec/store/StoragePluginRulesSupplier.java     | 180 +++++++++++++
 .../exec/store/plan/AbstractPluginImplementor.java | 134 ++++++++++
 .../drill/exec/store/plan/PluginImplementor.java   |  86 ++++++
 .../exec/store/plan/rel/PluginAggregateRel.java    |  66 +++++
 .../exec/store/plan/rel/PluginDrillTable.java      |  57 ++++
 .../drill/exec/store/plan/rel/PluginFilterRel.java |  62 +++++
 .../store/plan/rel/PluginIntermediatePrel.java     |  87 ++++++
 .../drill/exec/store/plan/rel/PluginJoinRel.java   |  56 ++++
 .../drill/exec/store/plan/rel/PluginLimitRel.java  |  63 +++++
 .../drill/exec/store/plan/rel/PluginPrel.java      | 103 ++++++++
 .../exec/store/plan/rel/PluginProjectRel.java      |  66 +++++
 .../drill/exec/store/plan/rel/PluginRel.java       |  25 +-
 .../drill/exec/store/plan/rel/PluginSortRel.java   |  69 +++++
 .../drill/exec/store/plan/rel/PluginUnionRel.java  |  68 +++++
 .../store/plan/rel/StoragePluginTableScan.java     |  79 ++++++
 .../exec/store/plan/rule/PluginAggregateRule.java  |  47 ++++
 .../exec/store/plan/rule/PluginConverterRule.java  |  76 ++++++
 .../exec/store/plan/rule/PluginFilterRule.java     |  46 ++++
 .../rule/PluginIntermediatePrelConverterRule.java  |  66 +++++
 .../drill/exec/store/plan/rule/PluginJoinRule.java |  49 ++++
 .../exec/store/plan/rule/PluginLimitRule.java      |  44 +++
 .../exec/store/plan/rule/PluginProjectRule.java    |  47 ++++
 .../drill/exec/store/plan/rule/PluginSortRule.java |  48 ++++
 .../exec/store/plan/rule/PluginUnionRule.java      |  55 ++++
 60 files changed, 3250 insertions(+), 532 deletions(-)

diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
index 65d95aa..2c5fcee 100644
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
@@ -73,7 +73,7 @@ public class DruidPushDownFilterForScan extends StoragePluginOptimizerRule {
             groupScan.getMaxRecordsToRead());
     newGroupsScan.setFilterPushedDown(true);
 
-    ScanPrel newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan);
+    ScanPrel newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan, filter.getRowType());
     if (druidFilterBuilder.isAllExpressionsConverted()) {
       /*
        * Since we could convert the entire filter condition expression into a
diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
index 66bc0cb..90a9a0e 100644
--- a/contrib/storage-mongo/pom.xml
+++ b/contrib/storage-mongo/pom.xml
@@ -45,7 +45,7 @@
   <dependency>
     <groupId>org.mongodb</groupId>
     <artifactId>mongodb-driver-sync</artifactId>
-    <version>4.2.3</version>
+    <version>4.3.1</version>
   </dependency>
 
     <!-- Test dependency -->
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
similarity index 58%
copy from contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
copy to contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
index ef89bfb..4f0d1bf 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
@@ -15,20 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.mongo.common;
+package org.apache.drill.exec.store.mongo;
 
-public enum MongoCompareOp {
-  EQUAL("$eq"), NOT_EQUAL("$ne"), GREATER_OR_EQUAL("$gte"), GREATER("$gt"), LESS_OR_EQUAL(
-      "$lte"), LESS("$lt"), IN("$in"), AND("$and"), OR("$or"), REGEX("$regex"), OPTIONS(
-      "$options"), PROJECT("$project"), COND("$cond"), IFNULL("$ifNull"), IFNOTNULL(
-      "$ifNotNull"), SUM("$sum"), GROUP_BY("$group"), EXISTS("$exists");
-  private String compareOp;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.SuperBuilder;
 
-  MongoCompareOp(String compareOp) {
-    this.compareOp = compareOp;
-  }
+import java.util.List;
+
+@Getter
+@Setter
+@SuperBuilder(setterPrefix = "set")
+public class BaseMongoSubScanSpec {
+
+  @JsonProperty
+  private final String dbName;
+
+  @JsonProperty
+  private final String collectionName;
+
+  @JsonProperty
+  private final List<String> hosts;
 
-  public String getCompareOp() {
-    return compareOp;
-  }
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
new file mode 100644
index 0000000..a72b6eb
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import com.mongodb.client.model.Accumulators;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.BsonField;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonElement;
+import org.bson.BsonInt32;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class MongoAggregateUtils {
+
+  public static List<String> mongoFieldNames(RelDataType rowType) {
+    List<String> renamed = rowType.getFieldNames().stream()
+        .map(name -> name.startsWith("$") ? "_" + name.substring(2) : name)
+        .collect(Collectors.toList());
+    return SqlValidatorUtil.uniquify(renamed, true);
+  }
+
+  public static String maybeQuote(String s) {
+    if (!needsQuote(s)) {
+      return s;
+    }
+    return quote(s);
+  }
+
+  public static String quote(String s) {
+    return "'" + s + "'";
+  }
+
+  private static boolean needsQuote(String s) {
+    for (int i = 0, n = s.length(); i < n; i++) {
+      char c = s.charAt(i);
+      if (!Character.isJavaIdentifierPart(c)
+          || c == '$') {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static List<Bson> getAggregateOperations(Aggregate aggregate, RelDataType rowType) {
+    List<String> inNames = mongoFieldNames(rowType);
+    List<String> outNames = mongoFieldNames(aggregate.getRowType());
+    Object id;
+    if (aggregate.getGroupSet().cardinality() == 1) {
+      String inName = inNames.get(aggregate.getGroupSet().nth(0));
+      id = "$" + inName;
+    } else {
+      List<BsonElement> elements =
+          StreamSupport.stream(aggregate.getGroupSet().spliterator(), false)
+              .map(inNames::get)
+              .map(inName -> new BsonElement(inName, new BsonString("$" + inName)))
+              .collect(Collectors.toList());
+      id = new BsonDocument(elements);
+    }
+    int outNameIndex = aggregate.getGroupSet().cardinality();
+    List<BsonField> accumList = new ArrayList<>();
+    for (AggregateCall aggCall : aggregate.getAggCallList()) {
+      accumList.add(bsonAggregate(inNames, outNames.get(outNameIndex++), aggCall));
+    }
+    List<Bson> operationsList = new ArrayList<>();
+    operationsList.add(Aggregates.group(id, accumList).toBsonDocument());
+    List<BsonElement> projectFields = new ArrayList<>();
+    if (aggregate.getGroupSet().cardinality() == 1) {
+      for (int index = 0; index < outNames.size(); index++) {
+        String outName = outNames.get(index);
+        projectFields.add(new BsonElement(maybeQuote(outName),
+            new BsonString("$" + (index == 0 ? "_id" : outName))));
+      }
+    } else {
+      projectFields.add(new BsonElement("_id", new BsonInt32(0)));
+      for (int group : aggregate.getGroupSet()) {
+        projectFields.add(
+            new BsonElement(maybeQuote(outNames.get(group)),
+                new BsonString("$_id." + outNames.get(group))));
+      }
+      outNameIndex = aggregate.getGroupSet().cardinality();
+      for (AggregateCall ignored : aggregate.getAggCallList()) {
+        String outName = outNames.get(outNameIndex++);
+        projectFields.add(new BsonElement(maybeQuote(outName), new BsonString("$" + outName)));
+      }
+    }
+    if (!aggregate.getGroupSet().isEmpty()) {
+      operationsList.add(Aggregates.project(new BsonDocument(projectFields)).toBsonDocument());
+    }
+
+    return operationsList;
+  }
+
+  private static BsonField bsonAggregate(List<String> inNames, String outName, AggregateCall aggCall) {
+    String aggregationName = aggCall.getAggregation().getName();
+    List<Integer> args = aggCall.getArgList();
+    if (aggregationName.equals(SqlStdOperatorTable.COUNT.getName())) {
+      Object expr;
+      if (args.size() == 0) {
+        // count(*) case
+        expr = 1;
+      } else {
+        assert args.size() == 1;
+        String inName = inNames.get(args.get(0));
+        expr = new BsonDocument(MongoOp.COND.getCompareOp(),
+            new BsonArray(Arrays.asList(
+                new Document(MongoOp.EQUAL.getCompareOp(),
+                    new BsonArray(Arrays.asList(
+                        new BsonString(quote(inName)),
+                        BsonNull.VALUE))).toBsonDocument(),
+                new BsonInt32(0),
+                new BsonInt32(1)
+            ))
+        );
+      }
+      return Accumulators.sum(maybeQuote(outName), expr);
+    } else {
+      BiFunction<String, Object, BsonField> mongoAccumulator = mongoAccumulator(aggregationName);
+      if (mongoAccumulator != null) {
+        return mongoAccumulator.apply(maybeQuote(outName), "$" + inNames.get(args.get(0)));
+      }
+    }
+    return null;
+  }
+
+  private static <T> BiFunction<String, T, BsonField> mongoAccumulator(String aggregationName) {
+    if (aggregationName.equals(SqlStdOperatorTable.SUM.getName())
+        || aggregationName.equals(SqlStdOperatorTable.SUM0.getName())) {
+      return Accumulators::sum;
+    } else if (aggregationName.equals(SqlStdOperatorTable.MIN.getName())) {
+      return Accumulators::min;
+    } else if (aggregationName.equals(SqlStdOperatorTable.MAX.getName())) {
+      return Accumulators::max;
+    } else if (aggregationName.equals(SqlStdOperatorTable.AVG.getName())) {
+      return Accumulators::avg;
+    } else if (aggregationName.equals(SqlStdOperatorTable.FIRST.getName())) {
+      return Accumulators::first;
+    } else if (aggregationName.equals(SqlStdOperatorTable.LAST.getName())) {
+      return Accumulators::last;
+    } else if (aggregationName.equals(SqlStdOperatorTable.STDDEV.getName())
+      || aggregationName.equals(SqlStdOperatorTable.STDDEV_SAMP.getName())) {
+      return Accumulators::stdDevSamp;
+    } else if (aggregationName.equals(SqlStdOperatorTable.STDDEV_POP.getName())) {
+      return Accumulators::stdDevPop;
+    }
+    return null;
+  }
+
+  public static boolean supportsAggregation(AggregateCall aggregateCall) {
+    String name = aggregateCall.getAggregation().getName();
+    return name.equals(SqlStdOperatorTable.COUNT.getName())
+      || name.equals(SqlStdOperatorTable.SUM.getName())
+      || name.equals(SqlStdOperatorTable.SUM0.getName())
+      || name.equals(SqlStdOperatorTable.MIN.getName())
+      || name.equals(SqlStdOperatorTable.MAX.getName())
+      || name.equals(SqlStdOperatorTable.AVG.getName())
+      || name.equals(SqlStdOperatorTable.FIRST.getName())
+      || name.equals(SqlStdOperatorTable.LAST.getName())
+      || name.equals(SqlStdOperatorTable.STDDEV.getName())
+      || name.equals(SqlStdOperatorTable.STDDEV_SAMP.getName())
+      || name.equals(SqlStdOperatorTable.STDDEV_POP.getName());
+  }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
index 10720e4..a463ad5 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
@@ -17,66 +17,52 @@
  */
 package org.apache.drill.exec.store.mongo;
 
-import java.io.IOException;
 import java.util.List;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.drill.common.FunctionNames;
 import org.apache.drill.common.expression.BooleanOperator;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.drill.exec.store.mongo.common.MongoCompareOp;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
 import org.bson.Document;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@Slf4j
 public class MongoFilterBuilder extends
-    AbstractExprVisitor<MongoScanSpec, Void, RuntimeException> implements
+    AbstractExprVisitor<Document, Void, RuntimeException> implements
     DrillMongoConstants {
-  private static final Logger logger = LoggerFactory
-      .getLogger(MongoFilterBuilder.class);
-  final MongoGroupScan groupScan;
-  final LogicalExpression le;
+
+  private final LogicalExpression le;
   private boolean allExpressionsConverted = true;
 
-  public MongoFilterBuilder(MongoGroupScan groupScan,
-      LogicalExpression conditionExp) {
-    this.groupScan = groupScan;
+  public MongoFilterBuilder(LogicalExpression conditionExp) {
     this.le = conditionExp;
   }
 
-  public MongoScanSpec parseTree() {
-    MongoScanSpec parsedSpec = le.accept(this, null);
-    if (parsedSpec != null) {
-      parsedSpec = mergeScanSpecs(FunctionNames.AND, this.groupScan.getScanSpec(),
-          parsedSpec);
-    }
-    return parsedSpec;
+  public Document parseTree() {
+    return le.accept(this, null);
   }
 
-  private MongoScanSpec mergeScanSpecs(String functionName,
-      MongoScanSpec leftScanSpec, MongoScanSpec rightScanSpec) {
+  private Document mergeFilters(String functionName,
+      Document left, Document right) {
     Document newFilter = new Document();
 
     switch (functionName) {
     case FunctionNames.AND:
-      if (leftScanSpec.getFilters() != null
-          && rightScanSpec.getFilters() != null) {
-        newFilter = MongoUtils.andFilterAtIndex(leftScanSpec.getFilters(),
-            rightScanSpec.getFilters());
-      } else if (leftScanSpec.getFilters() != null) {
-        newFilter = leftScanSpec.getFilters();
+      if (left != null && right != null) {
+        newFilter = MongoUtils.andFilterAtIndex(left, right);
+      } else if (left != null) {
+        newFilter = left;
       } else {
-        newFilter = rightScanSpec.getFilters();
+        newFilter = right;
       }
       break;
     case FunctionNames.OR:
-      newFilter = MongoUtils.orFilterAtIndex(leftScanSpec.getFilters(),
-          rightScanSpec.getFilters());
+      newFilter = MongoUtils.orFilterAtIndex(left, right);
     }
-    return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan
-        .getScanSpec().getCollectionName(), newFilter);
+    return newFilter;
   }
 
   public boolean isAllExpressionsConverted() {
@@ -84,41 +70,41 @@ public class MongoFilterBuilder extends
   }
 
   @Override
-  public MongoScanSpec visitUnknown(LogicalExpression e, Void value)
+  public Document visitUnknown(LogicalExpression e, Void value)
       throws RuntimeException {
     allExpressionsConverted = false;
     return null;
   }
 
   @Override
-  public MongoScanSpec visitBooleanOperator(BooleanOperator op, Void value) {
+  public Document visitBooleanOperator(BooleanOperator op, Void value) {
     List<LogicalExpression> args = op.args();
-    MongoScanSpec nodeScanSpec = null;
+    Document condition = null;
     String functionName = op.getName();
-    for (int i = 0; i < args.size(); ++i) {
+    for (LogicalExpression arg : args) {
       switch (functionName) {
-      case FunctionNames.AND:
-      case FunctionNames.OR:
-        if (nodeScanSpec == null) {
-          nodeScanSpec = args.get(i).accept(this, null);
-        } else {
-          MongoScanSpec scanSpec = args.get(i).accept(this, null);
-          if (scanSpec != null) {
-            nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec);
+        case FunctionNames.AND:
+        case FunctionNames.OR:
+          if (condition == null) {
+            condition = arg.accept(this, null);
           } else {
-            allExpressionsConverted = false;
+            Document scanSpec = arg.accept(this, null);
+            if (scanSpec != null) {
+              condition = mergeFilters(functionName, condition, scanSpec);
+            } else {
+              allExpressionsConverted = false;
+            }
           }
-        }
-        break;
+          break;
       }
     }
-    return nodeScanSpec;
+    return condition;
   }
 
   @Override
-  public MongoScanSpec visitFunctionCall(FunctionCall call, Void value)
+  public Document visitFunctionCall(FunctionCall call, Void value)
       throws RuntimeException {
-    MongoScanSpec nodeScanSpec = null;
+    Document functionCall = null;
     String functionName = call.getName();
     List<LogicalExpression> args = call.args();
 
@@ -127,7 +113,7 @@ public class MongoFilterBuilder extends
           .process(call);
       if (processor.isSuccess()) {
         try {
-          nodeScanSpec = createMongoScanSpec(processor.getFunctionName(),
+          functionCall = createFunctionCall(processor.getFunctionName(),
               processor.getPath(), processor.getValue());
         } catch (Exception e) {
           logger.error(" Failed to creare Filter ", e);
@@ -138,79 +124,76 @@ public class MongoFilterBuilder extends
       switch (functionName) {
       case FunctionNames.AND:
       case FunctionNames.OR:
-        MongoScanSpec leftScanSpec = args.get(0).accept(this, null);
-        MongoScanSpec rightScanSpec = args.get(1).accept(this, null);
-        if (leftScanSpec != null && rightScanSpec != null) {
-          nodeScanSpec = mergeScanSpecs(functionName, leftScanSpec,
-              rightScanSpec);
+        Document left = args.get(0).accept(this, null);
+        Document right = args.get(1).accept(this, null);
+        if (left != null && right != null) {
+          functionCall = mergeFilters(functionName, left, right);
         } else {
           allExpressionsConverted = false;
           if (FunctionNames.AND.equals(functionName)) {
-            nodeScanSpec = leftScanSpec == null ? rightScanSpec : leftScanSpec;
+            functionCall = left == null ? right : left;
           }
         }
         break;
       }
     }
 
-    if (nodeScanSpec == null) {
+    if (functionCall == null) {
       allExpressionsConverted = false;
     }
 
-    return nodeScanSpec;
+    return functionCall;
   }
 
-  private MongoScanSpec createMongoScanSpec(String functionName,
-      SchemaPath field, Object fieldValue) throws ClassNotFoundException,
-      IOException {
+  private Document createFunctionCall(String functionName,
+      SchemaPath field, Object fieldValue) {
     // extract the field name
     String fieldName = field.getRootSegmentPath();
-    MongoCompareOp compareOp = null;
+    MongoOp compareOp = null;
     switch (functionName) {
     case FunctionNames.EQ:
-      compareOp = MongoCompareOp.EQUAL;
+      compareOp = MongoOp.EQUAL;
       break;
     case FunctionNames.NE:
-      compareOp = MongoCompareOp.NOT_EQUAL;
+      compareOp = MongoOp.NOT_EQUAL;
       break;
     case FunctionNames.GE:
-      compareOp = MongoCompareOp.GREATER_OR_EQUAL;
+      compareOp = MongoOp.GREATER_OR_EQUAL;
       break;
     case FunctionNames.GT:
-      compareOp = MongoCompareOp.GREATER;
+      compareOp = MongoOp.GREATER;
       break;
     case FunctionNames.LE:
-      compareOp = MongoCompareOp.LESS_OR_EQUAL;
+      compareOp = MongoOp.LESS_OR_EQUAL;
       break;
     case FunctionNames.LT:
-      compareOp = MongoCompareOp.LESS;
+      compareOp = MongoOp.LESS;
       break;
     case FunctionNames.IS_NULL:
     case "isNull":
     case "is null":
-      compareOp = MongoCompareOp.IFNULL;
+      compareOp = MongoOp.IFNULL;
       break;
     case FunctionNames.IS_NOT_NULL:
     case "isNotNull":
     case "is not null":
-      compareOp = MongoCompareOp.IFNOTNULL;
+      compareOp = MongoOp.IFNOTNULL;
       break;
     }
 
     if (compareOp != null) {
       Document queryFilter = new Document();
-      if (compareOp == MongoCompareOp.IFNULL) {
+      if (compareOp == MongoOp.IFNULL) {
         queryFilter.put(fieldName,
-            new Document(MongoCompareOp.EQUAL.getCompareOp(), null));
-      } else if (compareOp == MongoCompareOp.IFNOTNULL) {
+            new Document(MongoOp.EQUAL.getCompareOp(), null));
+      } else if (compareOp == MongoOp.IFNOTNULL) {
         queryFilter.put(fieldName,
-            new Document(MongoCompareOp.NOT_EQUAL.getCompareOp(), null));
+            new Document(MongoOp.NOT_EQUAL.getCompareOp(), null));
       } else {
         queryFilter.put(fieldName, new Document(compareOp.getCompareOp(),
             fieldValue));
       }
-      return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan
-          .getScanSpec().getCollectionName(), queryFilter);
+      return queryFilter;
     }
     return null;
   }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index 8b57012..b26b410 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -38,6 +38,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.mongodb.client.MongoClient;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -51,6 +52,7 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.mongo.MongoSubScan.MongoSubScanSpec;
+import org.apache.drill.exec.store.mongo.MongoSubScan.ShardedMongoSubScanSpec;
 import org.apache.drill.exec.store.mongo.common.ChunkInfo;
 import org.bson.Document;
 import org.bson.codecs.BsonTypeClassMap;
@@ -58,8 +60,6 @@ import org.bson.codecs.DocumentCodec;
 import org.bson.conversions.Bson;
 import org.bson.types.MaxKey;
 import org.bson.types.MinKey;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
@@ -75,17 +75,16 @@ import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoDatabase;
 
+@Slf4j
 @JsonTypeName("mongo-scan")
 public class MongoGroupScan extends AbstractGroupScan implements
     DrillMongoConstants {
 
-  private static final Integer select = 1;
+  private static final int SELECT = 1;
 
-  private static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
+  private static final Comparator<List<BaseMongoSubScanSpec>> LIST_SIZE_COMPARATOR = Comparator.comparingInt(List::size);
 
-  private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = Comparator.comparingInt(List::size);
-
-  private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
+  private static final Comparator<List<BaseMongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
 
   private MongoStoragePlugin storagePlugin;
 
@@ -95,9 +94,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   private List<SchemaPath> columns;
 
-  private int maxRecords;
-
-  private Map<Integer, List<MongoSubScanSpec>> endpointFragmentMapping;
+  private Map<Integer, List<BaseMongoSubScanSpec>> endpointFragmentMapping;
 
   // Sharding with replica sets contains all the replica server addresses for
   // each chunk.
@@ -107,7 +104,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   private final Stopwatch watch = Stopwatch.createUnstarted();
 
-  private boolean filterPushedDown = false;
+  private boolean useAggregate;
 
   @JsonCreator
   public MongoGroupScan(
@@ -115,21 +112,21 @@ public class MongoGroupScan extends AbstractGroupScan implements
       @JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
       @JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
       @JsonProperty("columns") List<SchemaPath> columns,
-      @JacksonInject StoragePluginRegistry pluginRegistry,
-      @JsonProperty("maxRecords") int maxRecords) throws IOException {
+      @JsonProperty("useAggregate") boolean useAggregate,
+      @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException {
     this(userName,
         pluginRegistry.resolve(storagePluginConfig, MongoStoragePlugin.class),
-        scanSpec, columns, maxRecords);
+        scanSpec, columns, useAggregate);
   }
 
   public MongoGroupScan(String userName, MongoStoragePlugin storagePlugin,
-      MongoScanSpec scanSpec, List<SchemaPath> columns, int maxRecords) throws IOException {
+      MongoScanSpec scanSpec, List<SchemaPath> columns, boolean useAggregate) throws IOException {
     super(userName);
     this.storagePlugin = storagePlugin;
     this.storagePluginConfig = storagePlugin.getConfig();
     this.scanSpec = scanSpec;
     this.columns = columns;
-    this.maxRecords = maxRecords;
+    this.useAggregate = useAggregate;
     init();
   }
 
@@ -148,18 +145,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
     this.chunksMapping = that.chunksMapping;
     this.chunksInverseMapping = that.chunksInverseMapping;
     this.endpointFragmentMapping = that.endpointFragmentMapping;
-    this.filterPushedDown = that.filterPushedDown;
-    this.maxRecords = that.maxRecords;
-  }
-
-  @JsonIgnore
-  public boolean isFilterPushedDown() {
-    return filterPushedDown;
-  }
-
-  @JsonIgnore
-  public void setFilterPushedDown(boolean filterPushedDown) {
-    this.filterPushedDown = filterPushedDown;
+    this.useAggregate = that.useAggregate;
   }
 
   private boolean isShardedCluster(MongoClient client) {
@@ -179,7 +165,9 @@ public class MongoGroupScan extends AbstractGroupScan implements
     MongoClient client = storagePlugin.getClient();
     chunksMapping = Maps.newHashMap();
     chunksInverseMapping = Maps.newLinkedHashMap();
-    if (isShardedCluster(client)) {
+    if (useAggregate && isShardedCluster(client)) {
+      handleUnshardedCollection(getPrimaryShardInfo());
+    } else if (isShardedCluster(client)) {
       MongoDatabase db = client.getDatabase(CONFIG);
       MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS);
       Document filter = new Document();
@@ -190,9 +178,9 @@ public class MongoGroupScan extends AbstractGroupScan implements
                   + this.scanSpec.getCollectionName());
 
       Document projection = new Document();
-      projection.put(SHARD, select);
-      projection.put(MIN, select);
-      projection.put(MAX, select);
+      projection.put(SHARD, SELECT);
+      projection.put(MIN, SELECT);
+      projection.put(MAX, SELECT);
 
       FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection);
       MongoCursor<Document> iterator = chunkCursor.iterator();
@@ -200,7 +188,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
       MongoCollection<Document> shardsCollection = db.getCollection(SHARDS);
 
       projection = new Document();
-      projection.put(HOST, select);
+      projection.put(HOST, SELECT);
 
       boolean hasChunks = false;
       while (iterator.hasNext()) {
@@ -292,7 +280,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
     //Identify the primary shard of the queried database.
     MongoCollection<Document> collection = database.getCollection(DATABASES);
     Bson filter = new Document(ID, this.scanSpec.getDbName());
-    Bson projection = new Document(PRIMARY, select);
+    Bson projection = new Document(PRIMARY, SELECT);
     Document document = Objects.requireNonNull(collection.find(filter).projection(projection).first());
     String shardName = document.getString(PRIMARY);
     Preconditions.checkNotNull(shardName);
@@ -300,7 +288,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
     //Identify the host(s) on which this shard resides.
     MongoCollection<Document> shardsCol = database.getCollection(SHARDS);
     filter = new Document(ID, shardName);
-    projection = new Document(HOST, select);
+    projection = new Document(HOST, SELECT);
     Document hostInfo = Objects.requireNonNull(shardsCol.find(filter).projection(projection).first());
     String hostEntry = hostInfo.getString(HOST);
     Preconditions.checkNotNull(hostEntry);
@@ -361,7 +349,8 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   public GroupScan clone(int maxRecords) {
     MongoGroupScan clone = new MongoGroupScan(this);
-    clone.maxRecords = maxRecords;
+    clone.useAggregate = true;
+    clone.getScanSpec().getOperations().add(new Document("$limit", maxRecords));
     return clone;
   }
 
@@ -409,7 +398,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
       if (slots != null) {
         for (ChunkInfo chunkInfo : chunkEntry.getValue()) {
           Integer slotIndex = slots.poll();
-          List<MongoSubScanSpec> subScanSpecList = endpointFragmentMapping
+          List<BaseMongoSubScanSpec> subScanSpecList = endpointFragmentMapping
               .get(slotIndex);
           subScanSpecList.add(buildSubScanSpecAndGet(chunkInfo));
           slots.offer(slotIndex);
@@ -418,11 +407,11 @@ public class MongoGroupScan extends AbstractGroupScan implements
       }
     }
 
-    PriorityQueue<List<MongoSubScanSpec>> minHeap = new PriorityQueue<>(
+    PriorityQueue<List<BaseMongoSubScanSpec>> minHeap = new PriorityQueue<>(
         numSlots, LIST_SIZE_COMPARATOR);
-    PriorityQueue<List<MongoSubScanSpec>> maxHeap = new PriorityQueue<>(
+    PriorityQueue<List<BaseMongoSubScanSpec>> maxHeap = new PriorityQueue<>(
         numSlots, LIST_SIZE_COMPARATOR_REV);
-    for (List<MongoSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
+    for (List<BaseMongoSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
       if (listOfScan.size() < minPerEndpointSlot) {
         minHeap.offer(listOfScan);
       } else if (listOfScan.size() > minPerEndpointSlot) {
@@ -433,7 +422,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
     if (chunksToAssignSet.size() > 0) {
       for (Entry<String, List<ChunkInfo>> chunkEntry : chunksToAssignSet) {
         for (ChunkInfo chunkInfo : chunkEntry.getValue()) {
-          List<MongoSubScanSpec> smallestList = minHeap.poll();
+          List<BaseMongoSubScanSpec> smallestList = minHeap.poll();
           smallestList.add(buildSubScanSpecAndGet(chunkInfo));
           minHeap.offer(smallestList);
         }
@@ -441,8 +430,8 @@ public class MongoGroupScan extends AbstractGroupScan implements
     }
 
     while (minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) {
-      List<MongoSubScanSpec> smallestList = minHeap.poll();
-      List<MongoSubScanSpec> largestList = maxHeap.poll();
+      List<BaseMongoSubScanSpec> smallestList = minHeap.poll();
+      List<BaseMongoSubScanSpec> largestList = maxHeap.poll();
       smallestList.add(largestList.remove(largestList.size() - 1));
       if (largestList.size() > minPerEndpointSlot) {
         maxHeap.offer(largestList);
@@ -458,15 +447,23 @@ public class MongoGroupScan extends AbstractGroupScan implements
         endpointFragmentMapping.toString());
   }
 
-  private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
-    return new MongoSubScanSpec()
+  private BaseMongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
+    if (useAggregate) {
+      return MongoSubScanSpec.builder()
+          .setOperations(scanSpec.getOperations())
+          .setDbName(scanSpec.getDbName())
+          .setCollectionName(scanSpec.getCollectionName())
+          .setHosts(chunkInfo.getChunkLocList())
+          .build();
+    }
+    return ShardedMongoSubScanSpec.builder()
+        .setMinFilters(chunkInfo.getMinFilters())
+        .setMaxFilters(chunkInfo.getMaxFilters())
+        .setFilter(scanSpec.getFilters())
         .setDbName(scanSpec.getDbName())
         .setCollectionName(scanSpec.getCollectionName())
         .setHosts(chunkInfo.getChunkLocList())
-        .setMinFilters(chunkInfo.getMinFilters())
-        .setMaxFilters(chunkInfo.getMaxFilters())
-        .setMaxRecords(maxRecords)
-        .setFilter(scanSpec.getFilters());
+        .build();
   }
 
   @Override
@@ -487,18 +484,11 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   @Override
   public ScanStats getScanStats() {
-    long recordCount;
     try{
       MongoClient client = storagePlugin.getClient();
       MongoDatabase db = client.getDatabase(scanSpec.getDbName());
       MongoCollection<Document> collection = db.getCollection(scanSpec.getCollectionName());
-      long numDocs = collection.estimatedDocumentCount();
-
-      if (maxRecords > 0 && numDocs > 0) {
-        recordCount = Math.min(maxRecords, numDocs);
-      } else {
-        recordCount = numDocs;
-      }
+      long recordCount = collection.estimatedDocumentCount();
 
       float approxDiskCost = 0;
       if (recordCount != 0) {
@@ -563,9 +553,6 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   @Override
   public GroupScan applyLimit(int maxRecords) {
-    if (maxRecords == this.maxRecords) {
-      return null;
-    }
     return clone(maxRecords);
   }
 
@@ -585,26 +572,33 @@ public class MongoGroupScan extends AbstractGroupScan implements
     return storagePluginConfig;
   }
 
-  @JsonProperty("maxRecords")
-  public int getMaxRecords() { return maxRecords; }
-
   @JsonIgnore
   public MongoStoragePlugin getStoragePlugin() {
     return storagePlugin;
   }
 
+  @JsonProperty("useAggregate")
+  public void setUseAggregate(boolean useAggregate) {
+    this.useAggregate = useAggregate;
+  }
+
+  @JsonProperty("useAggregate")
+  public boolean isUseAggregate() {
+    return useAggregate;
+  }
+
   @Override
   public String toString() {
     return new PlanStringBuilder(this)
       .field("MongoScanSpec", scanSpec)
       .field("columns", columns)
-      .field("maxRecords", maxRecords)
+      .field("useAggregate", useAggregate)
       .toString();
   }
 
   @VisibleForTesting
   MongoGroupScan() {
-    super((String)null);
+    super((String) null);
   }
 
   @JsonIgnore
@@ -621,7 +615,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   @JsonIgnore
   @VisibleForTesting
-  void setInverseChunsMapping(Map<String, List<ChunkInfo>> chunksInverseMapping) {
+  void setInverseChunksMapping(Map<String, List<ChunkInfo>> chunksInverseMapping) {
     this.chunksInverseMapping = chunksInverseMapping;
   }
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
deleted file mode 100644
index 5e57890..0000000
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.mongo;
-
-import java.io.IOException;
-import java.util.Collections;
-
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.exec.planner.common.DrillScanRelBase;
-import org.apache.drill.exec.planner.logical.DrillOptiq;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rex.RexNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
-  private static final Logger logger = LoggerFactory
-      .getLogger(MongoPushDownFilterForScan.class);
-  public static final StoragePluginOptimizerRule INSTANCE = new MongoPushDownFilterForScan();
-
-  private MongoPushDownFilterForScan() {
-    super(
-        RelOptHelper.some(Filter.class, RelOptHelper.any(DrillScanRelBase.class)),
-        "MongoPushDownFilterForScan");
-  }
-
-  @Override
-  public void onMatch(RelOptRuleCall call) {
-    final DrillScanRelBase scan = call.rel(1);
-    final Filter filter = call.rel(0);
-    final RexNode condition = filter.getCondition();
-
-    MongoGroupScan groupScan = (MongoGroupScan) scan.getGroupScan();
-    if (groupScan.isFilterPushedDown()) {
-      return;
-    }
-
-    LogicalExpression conditionExp = DrillOptiq.toDrill(
-        new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
-    MongoFilterBuilder mongoFilterBuilder = new MongoFilterBuilder(groupScan,
-        conditionExp);
-    MongoScanSpec newScanSpec = mongoFilterBuilder.parseTree();
-    if (newScanSpec == null) {
-      return; // no filter pushdown so nothing to apply.
-    }
-
-    MongoGroupScan newGroupsScan;
-    try {
-      newGroupsScan = new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
-          newScanSpec, groupScan.getColumns(), groupScan.getMaxRecords());
-    } catch (IOException e) {
-      logger.error(e.getMessage(), e);
-      throw new DrillRuntimeException(e.getMessage(), e);
-    }
-    newGroupsScan.setFilterPushedDown(true);
-
-    RelNode newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan);
-
-    if (mongoFilterBuilder.isAllExpressionsConverted()) {
-      /*
-       * Since we could convert the entire filter condition expression into an
-       * Mongo filter, we can eliminate the filter operator altogether.
-       */
-      call.transformTo(newScanPrel);
-    } else {
-      call.transformTo(filter.copy(filter.getTraitSet(),
-          Collections.singletonList(newScanPrel)));
-    }
-
-  }
-
-  @Override
-  public boolean matches(RelOptRuleCall call) {
-    DrillScanRelBase scan = call.rel(1);
-    return scan.getGroupScan() instanceof MongoGroupScan && super.matches(call);
-  }
-}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index b06fe36..a26a98a 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.mongo;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -25,6 +26,9 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.mongodb.client.MongoIterable;
+import com.mongodb.client.model.Aggregates;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -40,6 +44,7 @@ import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.bson.BsonDocument;
 import org.bson.BsonDocumentReader;
 import org.bson.Document;
+import org.bson.conversions.Bson;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +69,7 @@ public class MongoRecordReader extends AbstractRecordReader {
   private VectorContainerWriter writer;
 
   private Document filters;
+  private List<Bson> operations;
   private final Document fields;
 
   private final FragmentContext fragmentContext;
@@ -75,9 +81,8 @@ public class MongoRecordReader extends AbstractRecordReader {
   private final boolean readNumbersAsDouble;
   private boolean unionEnabled;
   private final boolean isBsonRecordReader;
-  private final int maxRecords;
 
-  public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
+  public MongoRecordReader(BaseMongoSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
       FragmentContext context, MongoStoragePlugin plugin) {
 
     fields = new Document();
@@ -87,15 +92,19 @@ public class MongoRecordReader extends AbstractRecordReader {
     fragmentContext = context;
     this.plugin = plugin;
     filters = new Document();
-    Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
-        subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
+    if (subScanSpec instanceof MongoSubScan.MongoSubScanSpec) {
+      operations = ((MongoSubScan.MongoSubScanSpec) subScanSpec).getOperations();
+    } else {
+      MongoSubScan.ShardedMongoSubScanSpec shardedMongoSubScanSpec = (MongoSubScan.ShardedMongoSubScanSpec) subScanSpec;
+      Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
+          shardedMongoSubScanSpec.getMinFilters(), shardedMongoSubScanSpec.getMaxFilters());
 
-    buildFilters(subScanSpec.getFilter(), mergedFilters);
+      buildFilters(shardedMongoSubScanSpec.getFilter(), mergedFilters);
+    }
     enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
     enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val;
     readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
     isBsonRecordReader = fragmentContext.getOptions().getOption(ExecConstants.MONGO_BSON_RECORD_READER).bool_val;
-    maxRecords = subScanSpec.getMaxRecords();
     logger.debug("BsonRecordReader is enabled? " + isBsonRecordReader);
     init(subScanSpec);
   }
@@ -138,7 +147,7 @@ public class MongoRecordReader extends AbstractRecordReader {
     }
   }
 
-  private void init(MongoSubScan.MongoSubScanSpec subScanSpec) {
+  private void init(BaseMongoSubScanSpec subScanSpec) {
     List<String> hosts = subScanSpec.getHosts();
     List<ServerAddress> addresses = Lists.newArrayList();
     for (String host : hosts) {
@@ -176,13 +185,15 @@ public class MongoRecordReader extends AbstractRecordReader {
       logger.debug("Filters Applied : " + filters);
       logger.debug("Fields Selected :" + fields);
 
-      // Add limit to Mongo query
-      if (maxRecords > 0) {
-        logger.debug("Limit applied: {}", maxRecords);
-        cursor = collection.find(filters).projection(fields).limit(maxRecords).batchSize(100).iterator();
+      MongoIterable<BsonDocument> projection;
+      if (CollectionUtils.isNotEmpty(operations)) {
+        List<Bson> operations = new ArrayList<>(this.operations);
+        operations.add(Aggregates.project(fields));
+        projection = collection.aggregate(operations);
       } else {
-        cursor = collection.find(filters).projection(fields).batchSize(100).iterator();
+        projection = collection.find(filters).projection(fields);
       }
+      cursor = projection.batchSize(plugin.getConfig().getBatchSize()).iterator();
     }
 
     writer.allocate();
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
index f97e771..350cb28 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -42,22 +42,20 @@ public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
       List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = new LinkedList<>();
-    List<SchemaPath> columns = null;
-    for (MongoSubScan.MongoSubScanSpec scanSpec : subScan
-        .getChunkScanSpecList()) {
+    for (BaseMongoSubScanSpec scanSpec : subScan.getChunkScanSpecList()) {
       try {
-        if ((columns = subScan.getColumns()) == null) {
+        List<SchemaPath> columns = subScan.getColumns();
+        if (columns == null) {
           columns = GroupScan.ALL_COLUMNS;
         }
         readers.add(new MongoRecordReader(scanSpec, columns, context, subScan.getMongoStoragePlugin()));
       } catch (Exception e) {
-        logger.error("MongoRecordReader creation failed for subScan:  "
-            + subScan + ".");
+        logger.error("MongoRecordReader creation failed for subScan: {}.", subScan);
         logger.error(e.getMessage(), e);
         throw new ExecutionSetupException(e);
       }
     }
-    logger.info("Number of record readers initialized : " + readers.size());
+    logger.info("Number of record readers initialized : {}", readers.size());
     return new ScanBatch(subScan, context, readers);
   }
 
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
index 5c56fcc..7459bfb 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
@@ -17,47 +17,33 @@
  */
 package org.apache.drill.exec.store.mongo;
 
-import org.bson.Document;
-
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.ToString;
+import org.bson.Document;
+
+import org.bson.conversions.Bson;
+
+import java.util.ArrayList;
+import java.util.List;
 
+@AllArgsConstructor
+@Getter
+@ToString
 public class MongoScanSpec {
-  private String dbName;
-  private String collectionName;
+  private final String dbName;
+  private final String collectionName;
 
   private Document filters;
 
+  private List<Bson> operations = new ArrayList<>();
+
   @JsonCreator
   public MongoScanSpec(@JsonProperty("dbName") String dbName,
       @JsonProperty("collectionName") String collectionName) {
     this.dbName = dbName;
     this.collectionName = collectionName;
   }
-
-  public MongoScanSpec(String dbName, String collectionName,
-      Document filters) {
-    this.dbName = dbName;
-    this.collectionName = collectionName;
-    this.filters = filters;
-  }
-
-  public String getDbName() {
-    return dbName;
-  }
-
-  public String getCollectionName() {
-    return collectionName;
-  }
-
-  public Document getFilters() {
-    return filters;
-  }
-
-  @Override
-  public String toString() {
-    return "MongoScanSpec [dbName=" + dbName + ", collectionName="
-        + collectionName + ", filters=" + filters + "]";
-  }
-
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index da55907..0bf1e1f 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -25,18 +25,23 @@ import com.mongodb.client.MongoClient;
 import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
 import com.mongodb.client.MongoClients;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.mongo.plan.MongoPluginImplementor;
 import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory;
 import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
 import org.apache.drill.exec.store.security.HadoopCredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -45,13 +50,13 @@ import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
 import org.apache.drill.shaded.guava.com.google.common.cache.RemovalListener;
 import org.apache.drill.shaded.guava.com.google.common.cache.RemovalNotification;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URLEncoder;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -64,11 +69,12 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
   private final MongoSchemaFactory schemaFactory;
   private final Cache<MongoCnxnKey, MongoClient> addressClientMap;
   private final ConnectionString clientURI;
+  private final StoragePluginRulesSupplier storagePluginRulesSupplier;
 
   public MongoStoragePlugin(
       MongoStoragePluginConfig mongoConfig,
       DrillbitContext context,
-      String name) throws ExecutionSetupException {
+      String name) {
     super(context, name);
     this.mongoConfig = mongoConfig;
     String connection = addCredentialsFromCredentialsProvider(this.mongoConfig.getConnection(), name);
@@ -78,6 +84,21 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
         .removalListener(new AddressCloser())
         .build();
     this.schemaFactory = new MongoSchemaFactory(this, name);
+    this.storagePluginRulesSupplier = storagePluginRulesSupplier(name, mongoConfig);
+  }
+
+  private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name, MongoStoragePluginConfig mongoConfig) {
+    Convention convention = new Convention.Impl("MONGO." + name, PluginRel.class);
+    return StoragePluginRulesSupplier.builder()
+      .rulesProvider(new PluginRulesProviderImpl(convention, MongoPluginImplementor::new))
+      .supportsProjectPushdown(mongoConfig.getPluginOptimizations().isSupportsProjectPushdown())
+      .supportsSortPushdown(mongoConfig.getPluginOptimizations().isSupportsSortPushdown())
+      .supportsAggregatePushdown(mongoConfig.getPluginOptimizations().isSupportsAggregatePushdown())
+      .supportsFilterPushdown(mongoConfig.getPluginOptimizations().isSupportsFilterPushdown())
+      .supportsLimitPushdown(mongoConfig.getPluginOptimizations().isSupportsLimitPushdown())
+      .supportsUnionPushdown(mongoConfig.getPluginOptimizations().isSupportsUnionPushdown())
+      .convention(convention)
+      .build();
   }
 
   private String addCredentialsFromCredentialsProvider(String connection, String name) {
@@ -120,7 +141,7 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
     schemaFactory.registerSchemas(schemaConfig, parent);
   }
 
@@ -133,14 +154,27 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
     MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<MongoScanSpec>() {
     });
-    return new MongoGroupScan(userName, this, mongoScanSpec, null, -1);
+    return new MongoGroupScan(userName, this, mongoScanSpec, null, false);
   }
 
   @Override
-  public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
-    return ImmutableSet.of(MongoPushDownFilterForScan.INSTANCE);
+  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+    switch (phase) {
+      case PHYSICAL:
+      case LOGICAL:
+        return storagePluginRulesSupplier.getOptimizerRules();
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+      case JOIN_PLANNING:
+      default:
+        return Collections.emptySet();
+    }
   }
 
+  public Convention convention() {
+    return storagePluginRulesSupplier.convention();
+  }
 
   private static class AddressCloser implements
     RemovalListener<MongoCnxnKey, MongoClient> {
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
index 2ceb0d8..be410ae 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
@@ -24,11 +24,16 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.mongodb.ConnectionString;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 
 @JsonTypeName(MongoStoragePluginConfig.NAME)
+@EqualsAndHashCode(of = "connection", callSuper = false)
 public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig {
 
   public static final String NAME = "mongo";
@@ -38,29 +43,24 @@ public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig
   @JsonIgnore
   private final ConnectionString clientURI;
 
+  private final MongoPluginOptimizations pluginOptimizations;
+
+  private final int batchSize;
+
   @JsonCreator
   public MongoStoragePluginConfig(@JsonProperty("connection") String connection,
-      @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
+    @JsonProperty("pluginOptimizations") MongoPluginOptimizations pluginOptimizations,
+    @JsonProperty("batchSize") Integer batchSize,
+    @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
     super(getCredentialsProvider(credentialsProvider), credentialsProvider == null);
     this.connection = connection;
     this.clientURI = new ConnectionString(connection);
+    this.pluginOptimizations = ObjectUtils.defaultIfNull(pluginOptimizations, new MongoPluginOptimizations());
+    this.batchSize = batchSize != null ? batchSize : 100;
   }
 
-  @Override
-  public boolean equals(Object that) {
-    if (this == that) {
-      return true;
-    } else if (that == null || getClass() != that.getClass()) {
-      return false;
-    }
-    MongoStoragePluginConfig thatConfig = (MongoStoragePluginConfig) that;
-    return this.connection.equals(thatConfig.connection);
-
-  }
-
-  @Override
-  public int hashCode() {
-    return this.connection != null ? this.connection.hashCode() : 0;
+  public MongoPluginOptimizations getPluginOptimizations() {
+    return pluginOptimizations;
   }
 
   @JsonIgnore
@@ -72,7 +72,28 @@ public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig
     return connection;
   }
 
+  public int getBatchSize() {
+    return batchSize;
+  }
+
   private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) {
     return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
   }
+
+  @Getter
+  @Setter
+  public static class MongoPluginOptimizations {
+
+    private boolean supportsProjectPushdown = true;
+
+    private boolean supportsFilterPushdown = true;
+
+    private boolean supportsAggregatePushdown = true;
+
+    private boolean supportsSortPushdown = true;
+
+    private boolean supportsUnionPushdown = true;
+
+    private boolean supportsLimitPushdown = true;
+  }
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index af13eb5..975dc85 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -23,7 +23,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.drill.common.PlanStringBuilder;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -32,7 +35,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.bson.Document;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -40,6 +42,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.bson.Document;
+import org.bson.conversions.Bson;
 
 @JsonTypeName("mongo-shard-read")
 public class MongoSubScan extends AbstractBase implements SubScan {
@@ -52,14 +56,14 @@ public class MongoSubScan extends AbstractBase implements SubScan {
   private final MongoStoragePlugin mongoStoragePlugin;
   private final List<SchemaPath> columns;
 
-  private final List<MongoSubScanSpec> chunkScanSpecList;
+  private final List<BaseMongoSubScanSpec> chunkScanSpecList;
 
   @JsonCreator
   public MongoSubScan(
       @JacksonInject StoragePluginRegistry registry,
       @JsonProperty("userName") String userName,
       @JsonProperty("mongoPluginConfig") StoragePluginConfig mongoPluginConfig,
-      @JsonProperty("chunkScanSpecList") LinkedList<MongoSubScanSpec> chunkScanSpecList,
+      @JsonProperty("chunkScanSpecList") LinkedList<BaseMongoSubScanSpec> chunkScanSpecList,
       @JsonProperty("columns") List<SchemaPath> columns)
       throws ExecutionSetupException {
     super(userName);
@@ -72,7 +76,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
 
   public MongoSubScan(String userName, MongoStoragePlugin storagePlugin,
       MongoStoragePluginConfig storagePluginConfig,
-      List<MongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
+      List<BaseMongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
     super(userName);
     this.mongoStoragePlugin = storagePlugin;
     this.mongoPluginConfig = storagePluginConfig;
@@ -100,13 +104,12 @@ public class MongoSubScan extends AbstractBase implements SubScan {
     return columns;
   }
 
-  public List<MongoSubScanSpec> getChunkScanSpecList() {
+  public List<BaseMongoSubScanSpec> getChunkScanSpecList() {
     return chunkScanSpecList;
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
-      throws ExecutionSetupException {
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
     return new MongoSubScan(getUserName(), mongoStoragePlugin, mongoPluginConfig,
         chunkScanSpecList, columns);
@@ -122,112 +125,33 @@ public class MongoSubScan extends AbstractBase implements SubScan {
     return Collections.emptyIterator();
   }
 
-  public static class MongoSubScanSpec {
-
-    protected String dbName;
-    protected String collectionName;
-    protected List<String> hosts;
-    protected Map<String, Object> minFilters;
-    protected Map<String, Object> maxFilters;
-    protected int maxRecords;
-
-    protected Document filter;
-
-    @JsonCreator
-    public MongoSubScanSpec(@JsonProperty("dbName") String dbName,
-        @JsonProperty("collectionName") String collectionName,
-        @JsonProperty("hosts") List<String> hosts,
-        @JsonProperty("minFilters") Map<String, Object> minFilters,
-        @JsonProperty("maxFilters") Map<String, Object> maxFilters,
-        @JsonProperty("filters") Document filters,
-        @JsonProperty("maxRecords") int maxRecords) {
-      this.dbName = dbName;
-      this.collectionName = collectionName;
-      this.hosts = hosts;
-      this.minFilters = minFilters;
-      this.maxFilters = maxFilters;
-      this.filter = filters;
-      this.maxRecords = maxRecords;
-    }
-
-    MongoSubScanSpec() {
-
-    }
-
-    public String getDbName() {
-      return dbName;
-    }
-
-    public MongoSubScanSpec setDbName(String dbName) {
-      this.dbName = dbName;
-      return this;
-    }
-
-    public String getCollectionName() {
-      return collectionName;
-    }
-
-    public MongoSubScanSpec setCollectionName(String collectionName) {
-      this.collectionName = collectionName;
-      return this;
-    }
-
-    public List<String> getHosts() {
-      return hosts;
-    }
-
-    public MongoSubScanSpec setHosts(List<String> hosts) {
-      this.hosts = hosts;
-      return this;
-    }
-
-    public int getMaxRecords() { return maxRecords; }
-
-    public MongoSubScanSpec setMaxRecords (int maxRecords) {
-      this.maxRecords = maxRecords;
-      return this;
-    }
-
-    public Map<String, Object> getMinFilters() {
-      return minFilters;
-    }
-
-    public MongoSubScanSpec setMinFilters(Map<String, Object> minFilters) {
-      this.minFilters = minFilters;
-      return this;
-    }
-
-    public Map<String, Object> getMaxFilters() {
-      return maxFilters;
-    }
-
-    public MongoSubScanSpec setMaxFilters(Map<String, Object> maxFilters) {
-      this.maxFilters = maxFilters;
-      return this;
-    }
-
-    public Document getFilter() {
-      return filter;
-    }
-
-    public MongoSubScanSpec setFilter(Document filter) {
-      this.filter = filter;
-      return this;
-    }
-
-    @Override
-    public String toString() {
-      return new PlanStringBuilder(this)
-        .field("dbName", dbName)
-        .field("collectionName", collectionName)
-        .field("hosts", hosts)
-        .field("minFilters", minFilters)
-        .field("maxFilters", maxFilters)
-        .field("filter", filter)
-        .field("maxRecords", maxRecords)
-        .toString();
-
-    }
+  @JsonTypeName("ShardedMongoSubScanSpec")
+  @Getter
+  @ToString
+  @SuperBuilder(setterPrefix = "set")
+  @JsonDeserialize(builder = ShardedMongoSubScanSpec.ShardedMongoSubScanSpecBuilder.class)
+  public static class ShardedMongoSubScanSpec extends BaseMongoSubScanSpec {
+
+    @JsonProperty
+    private final Map<String, Object> minFilters;
+
+    @JsonProperty
+    private final Map<String, Object> maxFilters;
+
+    @JsonProperty
+    private final Document filter;
+
+  }
+
+  @JsonTypeName("MongoSubScanSpec")
+  @Getter
+  @ToString
+  @SuperBuilder(setterPrefix = "set")
+  @JsonDeserialize(builder = MongoSubScanSpec.MongoSubScanSpecBuilder.class)
+  public static class MongoSubScanSpec extends BaseMongoSubScanSpec {
+
+    @JsonProperty
+    private final List<Bson> operations;
 
   }
 
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java
similarity index 67%
copy from contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
copy to contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java
index ef89bfb..55f8cc5 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java
@@ -17,14 +17,30 @@
  */
 package org.apache.drill.exec.store.mongo.common;
 
-public enum MongoCompareOp {
-  EQUAL("$eq"), NOT_EQUAL("$ne"), GREATER_OR_EQUAL("$gte"), GREATER("$gt"), LESS_OR_EQUAL(
-      "$lte"), LESS("$lt"), IN("$in"), AND("$and"), OR("$or"), REGEX("$regex"), OPTIONS(
-      "$options"), PROJECT("$project"), COND("$cond"), IFNULL("$ifNull"), IFNOTNULL(
-      "$ifNotNull"), SUM("$sum"), GROUP_BY("$group"), EXISTS("$exists");
-  private String compareOp;
+public enum MongoOp {
+  EQUAL("$eq"),
+  NOT_EQUAL("$ne"),
+  GREATER_OR_EQUAL("$gte"),
+  GREATER("$gt"),
+  LESS_OR_EQUAL("$lte"),
+  LESS("$lt"),
+  IN("$in"),
+  AND("$and"),
+  OR("$or"),
+  NOT("$not"),
+  REGEX("$regex"),
+  OPTIONS("$options"),
+  PROJECT("$project"),
+  COND("$cond"),
+  IFNULL("$ifNull"),
+  IFNOTNULL("$ifNotNull"),
+  SUM("$sum"),
+  GROUP_BY("$group"),
+  EXISTS("$exists");
 
-  MongoCompareOp(String compareOp) {
+  private final String compareOp;
+
+  MongoOp(String compareOp) {
     this.compareOp = compareOp;
   }
 
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
new file mode 100644
index 0000000..9176fe2
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo.plan;
+
+import com.mongodb.client.model.Aggregates;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.mongo.MongoAggregateUtils;
+import org.apache.drill.exec.store.mongo.MongoFilterBuilder;
+import org.apache.drill.exec.store.mongo.MongoGroupScan;
+import org.apache.drill.exec.store.mongo.MongoScanSpec;
+import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+import org.apache.drill.exec.util.Utilities;
+import org.bson.BsonDocument;
+import org.bson.BsonElement;
+import org.bson.BsonInt32;
+import org.bson.BsonString;
+import org.bson.BsonValue;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link PluginImplementor} for Mongo.
+ * This class tries to convert operators to use {@link com.mongodb.client.MongoCollection#find}
+ * if only simple project and filter expressions are present,
+ * otherwise {@link com.mongodb.client.MongoCollection#aggregate} is used to obtain data from Mongo.
+ */
+public class MongoPluginImplementor extends AbstractPluginImplementor {
+
+  private MongoGroupScan groupScan;
+
+  private List<Bson> operations;
+
+  private Document filters;
+
+  private List<SchemaPath> columns;
+
+  private boolean runAggregate;
+
+  @Override
+  public void implement(PluginAggregateRel aggregate) throws IOException {
+    runAggregate = true;
+    visitChild(aggregate.getInput());
+
+    operations.addAll(
+      MongoAggregateUtils.getAggregateOperations(aggregate, aggregate.getInput().getRowType()));
+    List<String> outNames = MongoAggregateUtils.mongoFieldNames(aggregate.getRowType());
+    columns = outNames.stream()
+      .map(SchemaPath::getSimplePath)
+      .collect(Collectors.toList());
+  }
+
+  @Override
+  public void implement(PluginFilterRel filter) throws IOException {
+    visitChild(filter.getInput());
+
+    LogicalExpression conditionExp = DrillOptiq.toDrill(
+      new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+      filter.getInput(),
+      filter.getCondition());
+    MongoFilterBuilder mongoFilterBuilder = new MongoFilterBuilder(conditionExp);
+    if (runAggregate) {
+      Bson convertedFilterExpression = Aggregates.match(mongoFilterBuilder.parseTree()).toBsonDocument();
+      operations.add(convertedFilterExpression);
+    } else {
+      filters = mongoFilterBuilder.parseTree();
+    }
+  }
+
+  @Override
+  public void implement(PluginLimitRel limit) throws IOException {
+    runAggregate = true;
+    visitChild(limit.getInput());
+
+    if (limit.getOffset() != null) {
+      operations.add(
+        Aggregates.skip(rexLiteralIntValue((RexLiteral) limit.getOffset())).toBsonDocument());
+    }
+    if (limit.getFetch() != null) {
+      operations.add(
+        Aggregates.limit(rexLiteralIntValue((RexLiteral) limit.getFetch())).toBsonDocument());
+    }
+  }
+
+  @Override
+  public void implement(PluginProjectRel project) throws IOException {
+    runAggregate = runAggregate || project.getProjects().stream()
+      .anyMatch(expression -> !expression.isA(SqlKind.INPUT_REF));
+
+    visitChild(project.getInput());
+
+    if (runAggregate) {
+      RexToMongoTranslator translator =
+        new RexToMongoTranslator(
+          (JavaTypeFactory) project.getCluster().getTypeFactory(),
+          MongoAggregateUtils.mongoFieldNames(project.getInput().getRowType()));
+      List<BsonElement> items = new ArrayList<>();
+      for (Pair<RexNode, String> pair : project.getNamedProjects()) {
+        String name = pair.right;
+        BsonValue expr = pair.left.accept(translator);
+        items.add(expr.equals(new BsonString("$" + name))
+          ? new BsonElement(MongoAggregateUtils.maybeQuote(name), new BsonInt32(1))
+          : new BsonElement(MongoAggregateUtils.maybeQuote(name), expr));
+      }
+      BsonDocument projection = Aggregates.project(new BsonDocument(items)).toBsonDocument();
+
+      operations.add(projection);
+      List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType());
+      this.columns = outNames.stream()
+        .map(SchemaPath::getSimplePath)
+        .collect(Collectors.toList());
+    } else {
+      List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType());
+      this.columns = outNames.stream()
+        .map(SchemaPath::getSimplePath)
+        .collect(Collectors.toList());
+    }
+  }
+
+  @Override
+  public void implement(PluginSortRel sort) throws IOException {
+    runAggregate = true;
+    visitChild(sort.getInput());
+
+    if (!sort.collation.getFieldCollations().isEmpty()) {
+      BsonDocument sortKeys = new BsonDocument();
+      List<RelDataTypeField> fields = sort.getRowType().getFieldList();
+      for (RelFieldCollation fieldCollation : sort.collation.getFieldCollations()) {
+        String name = fields.get(fieldCollation.getFieldIndex()).getName();
+        sortKeys.put(name, new BsonInt32(direction(fieldCollation)));
+      }
+
+      operations.add(Aggregates.sort(sortKeys).toBsonDocument());
+    }
+    if (sort.offset != null) {
+      operations.add(
+        Aggregates.skip(rexLiteralIntValue((RexLiteral) sort.offset)).toBsonDocument());
+    }
+    if (sort.fetch != null) {
+      operations.add(
+        Aggregates.limit(rexLiteralIntValue((RexLiteral) sort.fetch)).toBsonDocument());
+    }
+  }
+
+  private int rexLiteralIntValue(RexLiteral offset) {
+    return ((BigDecimal) offset.getValue()).intValue();
+  }
+
+  @Override
+  public void implement(PluginUnionRel union) throws IOException {
+    runAggregate = true;
+
+    MongoPluginImplementor childImplementor = new MongoPluginImplementor();
+    childImplementor.runAggregate = true;
+
+    boolean firstProcessed = false;
+    for (RelNode input : union.getInputs()) {
+      if (!firstProcessed) {
+        this.visitChild(input);
+        firstProcessed = true;
+      } else {
+        childImplementor.visitChild(input);
+        operations.add(
+          Aggregates.unionWith(childImplementor.groupScan.getScanSpec().getCollectionName(), childImplementor.operations).toBsonDocument()
+        );
+      }
+    }
+  }
+
+  @Override
+  public void implement(StoragePluginTableScan scan) throws IOException {
+    groupScan = (MongoGroupScan) Utilities.getDrillTable(scan.getTable()).getGroupScan();
+    operations = new ArrayList<>(this.groupScan.getScanSpec().getOperations());
+    filters = groupScan.getScanSpec().getFilters();
+    columns = groupScan.getColumns();
+  }
+
+  @Override
+  public boolean canImplement(Aggregate aggregate) {
+    return aggregate.getGroupType() == Aggregate.Group.SIMPLE
+      && aggregate.getAggCallList().stream()
+      .noneMatch(AggregateCall::isDistinct)
+      && aggregate.getAggCallList().stream()
+      .allMatch(MongoAggregateUtils::supportsAggregation);
+  }
+
+  @Override
+  public boolean canImplement(Filter filter) {
+    LogicalExpression conditionExp = DrillOptiq.toDrill(
+      new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+      filter.getInput(),
+      filter.getCondition());
+    MongoFilterBuilder filterBuilder = new MongoFilterBuilder(conditionExp);
+    filterBuilder.parseTree();
+    return filterBuilder.isAllExpressionsConverted();
+  }
+
+  @Override
+  public boolean canImplement(DrillLimitRelBase limit) {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(Project project) {
+    return project.getProjects().stream()
+      .allMatch(RexToMongoTranslator::supportsExpression);
+  }
+
+  @Override
+  public boolean canImplement(Sort sort) {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(Union union) {
+    // allow converting for union all only, since Drill adds extra aggregation for union distinct,
+    // so we will convert both union all and aggregation later
+    return union.all;
+  }
+
+  @Override
+  public boolean canImplement(TableScan scan) {
+    return true;
+  }
+
+  @Override
+  public GroupScan getPhysicalOperator() throws IOException {
+    MongoScanSpec scanSpec = groupScan.getScanSpec();
+    MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(), scanSpec.getCollectionName(), filters, operations);
+    return new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
+      newSpec, columns, runAggregate);
+  }
+
+  public static int direction(RelFieldCollation fieldCollation) {
+    switch (fieldCollation.getDirection()) {
+      case DESCENDING:
+      case STRICTLY_DESCENDING:
+        return -1;
+      case ASCENDING:
+      case STRICTLY_ASCENDING:
+      default:
+        return 1;
+    }
+  }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java
new file mode 100644
index 0000000..cc97c82
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo.plan;
+
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.BsonValue;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Translator from {@link RexNode} to strings in MongoDB's expression language.
+ */
+class RexToMongoTranslator extends RexVisitorImpl<BsonValue> {
+
+  private final JavaTypeFactory typeFactory;
+
+  private final List<String> inFields;
+
+  private static final Map<SqlOperator, String> MONGO_OPERATORS = ImmutableMap.<SqlOperator, String>builder()
+    .put(SqlStdOperatorTable.DIVIDE, "$divide")
+    .put(SqlStdOperatorTable.MULTIPLY, "$multiply")
+    .put(SqlStdOperatorTable.ABS, "$abs")
+    .put(SqlStdOperatorTable.ACOS, "$acos")
+    .put(SqlStdOperatorTable.ASIN, "$asin")
+    .put(SqlStdOperatorTable.ATAN, "$atan")
+    .put(SqlStdOperatorTable.ATAN2, "$atan2")
+    .put(SqlStdOperatorTable.CEIL, "$ceil")
+    .put(SqlStdOperatorTable.CONCAT, "$concat")
+    .put(SqlStdOperatorTable.COS, "$cos")
+    .put(SqlStdOperatorTable.DAYOFMONTH, "$dayOfMonth")
+    .put(SqlStdOperatorTable.WEEK, "$isoWeek")
+    .put(SqlStdOperatorTable.YEAR, "$isoWeekYear")
+    .put(SqlStdOperatorTable.DAYOFWEEK, "$isoDayOfWeek")
+    .put(SqlStdOperatorTable.DAYOFYEAR, "$dayOfYear")
+    .put(SqlStdOperatorTable.RADIANS, "$degreesToRadians")
+    .put(SqlStdOperatorTable.DENSE_RANK, "$denseRank")
+    .put(SqlStdOperatorTable.EXP, "$exp")
+    .put(SqlStdOperatorTable.FLOOR, "$floor")
+    .put(SqlStdOperatorTable.HOUR, "$hour")
+    .put(SqlStdOperatorTable.LN, "$ln")
+    .put(SqlStdOperatorTable.LOG10, "$log10")
+    .put(SqlStdOperatorTable.MINUTE, "$minute")
+    .put(SqlStdOperatorTable.MOD, "$mod")
+    .put(SqlStdOperatorTable.MONTH, "$month")
+    .put(SqlStdOperatorTable.POWER, "$pow")
+    .put(SqlStdOperatorTable.DEGREES, "$radiansToDegrees")
+    .put(SqlStdOperatorTable.RAND, "$rand")
+    .put(SqlStdOperatorTable.REPLACE, "$replaceAll")
+    .put(SqlStdOperatorTable.ROUND, "$round")
+    .put(SqlStdOperatorTable.SECOND, "$second")
+    .put(SqlStdOperatorTable.SIN, "$sin")
+    .put(SqlStdOperatorTable.SQRT, "$sqrt")
+    .put(SqlStdOperatorTable.SUBSTRING, "$substr")
+    .put(SqlStdOperatorTable.PLUS, "$add")
+    .put(SqlStdOperatorTable.MINUS, "$subtract")
+    .put(SqlStdOperatorTable.TAN, "$tan")
+    .put(SqlStdOperatorTable.TRIM, "trim")
+    .put(SqlStdOperatorTable.TRUNCATE, "$trunc")
+    .put(SqlStdOperatorTable.AND, MongoOp.AND.getCompareOp())
+    .put(SqlStdOperatorTable.OR, MongoOp.OR.getCompareOp())
+    .put(SqlStdOperatorTable.NOT, MongoOp.NOT.getCompareOp())
+    .put(SqlStdOperatorTable.EQUALS, MongoOp.EQUAL.getCompareOp())
+    .put(SqlStdOperatorTable.NOT_EQUALS, MongoOp.NOT_EQUAL.getCompareOp())
+    .put(SqlStdOperatorTable.GREATER_THAN, MongoOp.GREATER.getCompareOp())
+    .put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, MongoOp.GREATER_OR_EQUAL.getCompareOp())
+    .put(SqlStdOperatorTable.LESS_THAN, MongoOp.LESS.getCompareOp())
+    .put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, MongoOp.LESS_OR_EQUAL.getCompareOp())
+    .build();
+
+
+  protected RexToMongoTranslator(JavaTypeFactory typeFactory,
+      List<String> inFields) {
+    super(true);
+    this.typeFactory = typeFactory;
+    this.inFields = inFields;
+  }
+
+  @Override
+  public BsonValue visitLiteral(RexLiteral literal) {
+    if (literal.getValue() == null) {
+      return BsonNull.VALUE;
+    }
+    return BsonDocument.parse(String.format("{$literal: %s}",
+        RexToLixTranslator.translateLiteral(literal, literal.getType(),
+            typeFactory, RexImpTable.NullAs.NOT_POSSIBLE)));
+  }
+
+  @Override
+  public BsonValue visitInputRef(RexInputRef inputRef) {
+    return new BsonString("$" + inFields.get(inputRef.getIndex()));
+  }
+
+  @Override
+  public BsonValue visitCall(RexCall call) {
+    String name = isItem(call);
+    if (name != null) {
+      return new BsonString("'$" + name + "'");
+    }
+    List<BsonValue> strings = call.operands.stream()
+        .map(operand -> operand.accept(this))
+        .collect(Collectors.toList());
+
+    if (call.getKind() == SqlKind.CAST) {
+      return strings.get(0);
+    }
+    String stdOperator = MONGO_OPERATORS.get(call.getOperator());
+    if (stdOperator != null) {
+      return new BsonDocument(stdOperator, new BsonArray(strings));
+    }
+    if (call.getOperator() == SqlStdOperatorTable.ITEM) {
+      RexNode op1 = call.operands.get(1);
+      if (op1 instanceof RexLiteral) {
+        if (op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) {
+          return new BsonDocument("$arrayElemAt", new BsonArray(
+              Arrays.asList(strings.get(0), new BsonInt32(((RexLiteral) op1).getValueAs(Integer.class)))));
+        } else if (op1.getType().getSqlTypeName() == SqlTypeName.CHAR) {
+          return new BsonString(strings.get(0).asString().getValue() + "." + ((RexLiteral) op1).getValueAs(String.class));
+        }
+      }
+    }
+    if (call.getOperator() == SqlStdOperatorTable.CASE) {
+      // case(a, b, c)  -> $cond:[a, b, c]
+      // case(a, b, c, d) -> $cond:[a, b, $cond:[c, d, null]]
+      // case(a, b, c, d, e) -> $cond:[a, b, $cond:[c, d, e]]
+      BsonDocument result = new BsonDocument();
+      BsonArray args = new BsonArray();
+      result.put("$cond", args);
+      for (int i = 0; i < strings.size(); i += 2) {
+        args.add(strings.get(i));
+        args.add(strings.get(i + 1));
+        if (i == strings.size() - 3) {
+          args.add(strings.get(i + 2));
+          break;
+        }
+        if (i == strings.size() - 2) {
+          args.add(BsonNull.VALUE);
+          break;
+        }
+        BsonArray innerArgs = new BsonArray();
+        BsonDocument innerDocument = new BsonDocument();
+        innerDocument.put("$cond", innerArgs);
+        args.add(innerDocument);
+        args = innerArgs;
+      }
+      return result;
+    }
+    throw new IllegalArgumentException("Translation of " + call + " is not supported by MongoProject");
+  }
+
+
+  /**
+   * Returns 'string' if it is a call to item['string'], null otherwise.
+   */
+  public static String isItem(RexCall call) {
+    if (call.getOperator() != SqlStdOperatorTable.ITEM) {
+      return null;
+    }
+    RexNode op0 = call.operands.get(0);
+    RexNode op1 = call.operands.get(1);
+    if (op0 instanceof RexInputRef
+        && ((RexInputRef) op0).getIndex() == 0
+        && op1 instanceof RexLiteral
+        && ((RexLiteral) op1).getValue2() instanceof String) {
+      return (String) ((RexLiteral) op1).getValue2();
+    }
+    return null;
+  }
+
+  public static boolean supportsExpression(RexNode expr) {
+    return expr.accept(new RexMongoChecker());
+  }
+
+  private static class RexMongoChecker extends RexVisitorImpl<Boolean> {
+
+    protected RexMongoChecker() {
+      super(true);
+    }
+
+    @Override
+    public Boolean visitLiteral(RexLiteral literal) {
+      return true;
+    }
+
+    @Override
+    public Boolean visitInputRef(RexInputRef inputRef) {
+      return true;
+    }
+
+    @Override
+    public Boolean visitCall(RexCall call) {
+      if (isItem(call) != null
+        || call.getKind() == SqlKind.CAST
+        || call.getOperator() == SqlStdOperatorTable.CASE
+        || MONGO_OPERATORS.get(call.getOperator()) != null) {
+        return true;
+      }
+
+      if (call.getOperator() == SqlStdOperatorTable.ITEM) {
+        RexNode op = call.operands.get(1);
+        return op instanceof RexLiteral
+          && (op.getType().getSqlTypeName() == SqlTypeName.INTEGER
+          || op.getType().getSqlTypeName() == SqlTypeName.CHAR);
+      }
+
+      return false;
+    }
+  }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java
index da52984..4249725 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java
@@ -32,8 +32,7 @@ import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory.MongoSchema;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
 public class MongoDatabaseSchema extends AbstractSchema {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
-      .getLogger(MongoDatabaseSchema.class);
+
   private final MongoSchema mongoSchema;
   private final Set<String> tableNames;
 
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index cc071a4..29f14ce 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -29,13 +29,13 @@ import java.util.concurrent.TimeUnit;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.mongo.MongoScanSpec;
 import org.apache.drill.exec.store.mongo.MongoStoragePlugin;
 import org.apache.drill.exec.store.mongo.MongoStoragePluginConfig;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -195,7 +195,7 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
 
     DrillTable getDrillTable(String dbName, String collectionName) {
       MongoScanSpec mongoScanSpec = new MongoScanSpec(schemaNameMap.get(dbName), collectionName);
-      return new DynamicDrillTable(plugin, getName(), null, mongoScanSpec);
+      return new PluginDrillTable(plugin, getName(), null, mongoScanSpec, plugin.convention());
     }
 
     @Override
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
index d9cff73..8224470 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
@@ -39,7 +39,7 @@ public class MongoTestBase extends ClusterTest implements MongoTestConstants {
 
   private static void initMongoStoragePlugin(String connectionURI) throws Exception {
     MongoStoragePluginConfig storagePluginConfig = new MongoStoragePluginConfig(connectionURI,
-        PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+        null, 100, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
     storagePluginConfig.setEnabled(true);
     pluginRegistry.put(MongoStoragePluginConfig.NAME, storagePluginConfig);
 
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
index e4c71b1..eb706d6 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
@@ -155,7 +155,7 @@ public class TestMongoChunkAssignment extends BaseTest {
 
     mongoGroupScan = new MongoGroupScan();
     mongoGroupScan.setChunksMapping(chunksMapping);
-    mongoGroupScan.setInverseChunsMapping(chunksInverseMapping);
+    mongoGroupScan.setInverseChunksMapping(chunksInverseMapping);
     MongoScanSpec scanSpec = new MongoScanSpec(dbName, collectionName);
     mongoGroupScan.setScanSpec(scanSpec);
   }
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
index 569cf5c..2344df7 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
@@ -30,31 +30,32 @@ public class TestMongoLimitPushDown extends MongoTestBase {
   public void testLimit() throws Exception {
     String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` LIMIT 4";
     queryBuilder()
-      .sql(sql)
-      .planMatcher()
-      .include("Limit", "maxRecords=4")
-      .match();
+        .sql(sql)
+        .planMatcher()
+        .exclude("Limit\\(")
+        .include("MongoGroupScan.*\"\\$limit\": 4")
+        .match();
   }
 
   @Test
   public void testLimitWithOrderBy() throws Exception {
-    // Limit should not be pushed down for this example due to the sort
     String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` ORDER BY employee_id LIMIT 4";
     queryBuilder()
       .sql(sql)
       .planMatcher()
-      .include("Limit", "maxRecords=-1")
+      .exclude("Limit")
+      .include("MongoGroupScan.*\"\\$sort\": \\{\"employee_id\": 1}", "\"\\$limit\": 4")
       .match();
   }
 
   @Test
   public void testLimitWithOffset() throws Exception {
-    // Limit should be pushed down and include the offset
     String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` LIMIT 4 OFFSET 5";
     queryBuilder()
       .sql(sql)
       .planMatcher()
-      .include("Limit", "maxRecords=9")
+      .exclude("Limit")
+      .include("\"\\$skip\": 5", "\"\\$limit\": 4")
       .match();
   }
 
@@ -64,7 +65,8 @@ public class TestMongoLimitPushDown extends MongoTestBase {
     queryBuilder()
       .sql(sql)
       .planMatcher()
-      .include("Limit", "maxRecords=4")
+      .exclude("Limit")
+      .include("\"\\$limit\": 4", "\"\\$eq\": 52\\.17")
       .match();
   }
 }
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
index 48d13ce..a691443 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
@@ -23,21 +23,21 @@ import static org.apache.drill.test.TestBuilder.mapOf;
 import org.apache.drill.categories.MongoStorageTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.exec.ExecConstants;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Ignore("DRILL-3775")
 @Category({SlowTest.class, MongoStorageTest.class})
 public class TestMongoProjectPushDown extends MongoTestBase {
 
-  /**
-   *
-   * @throws Exception
-   */
   @Test
   public void testComplexProjectPushdown() throws Exception {
 
+    queryBuilder()
+      .sql("select t.field_4.inner_3 as col_1, t.field_4 as col_2 from mongo.employee.schema_change t")
+      .planMatcher()
+      .include("MongoGroupScan.*\"\\$project\": \\{\"col_1\": \"\\$field_4.inner_3\", \"col_2\": \"\\$field_4\"\\}")
+      .match();
+
     try {
       testBuilder()
           .sqlQuery("select t.field_4.inner_3 as col_1, t.field_4 as col_2 from mongo.employee.schema_change t")
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
index 4b20ebc..2c79dd8 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
@@ -23,6 +23,8 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.junit.Assert.assertEquals;
+
 @Category({SlowTest.class, MongoStorageTest.class})
 public class TestMongoQueries extends MongoTestBase {
 
@@ -42,6 +44,15 @@ public class TestMongoQueries extends MongoTestBase {
   }
 
   @Test
+  public void testSerDe() throws Exception {
+    String plan = queryBuilder()
+      .sql(String.format(TEST_BOOLEAN_FILTER_QUERY_TEMPLATE1, EMPLOYEE_DB, EMPINFO_COLLECTION))
+      .explainJson();
+
+    assertEquals(queryBuilder().physical(plan).run().recordCount(), 11);
+  }
+
+  @Test
   public void testWithANDOperator() throws Exception {
     testBuilder()
         .sqlQuery(String.format(TEST_BOOLEAN_FILTER_QUERY_TEMPLATE3, EMPLOYEE_DB, EMPINFO_COLLECTION))
@@ -105,4 +116,192 @@ public class TestMongoQueries extends MongoTestBase {
         .expectsNumRecords(5)
         .go();
   }
+
+  @Test
+  public void testCountColumnPushDown() throws Exception {
+    String query = "select count(t.name) as c from mongo.%s.`%s` t";
+
+    queryBuilder()
+        .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+        .planMatcher()
+        .exclude("Agg\\(")
+        .include("MongoGroupScan.*group")
+        .match();
+
+    testBuilder()
+        .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+        .unOrdered()
+        .baselineColumns("c")
+        .baselineValues(5)
+        .go();
+  }
+
+  @Test
+  public void testSumColumnPushDown() throws Exception {
+    String query = "select sum(t.sales) as s from mongo.%s.`%s` t";
+
+    queryBuilder()
+        .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+        .planMatcher()
+        .exclude("Agg\\(")
+        .include("MongoGroupScan.*group")
+        .match();
+
+    testBuilder()
+        .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+        .unOrdered()
+        .baselineColumns("s")
+        .baselineValues(1194)
+        .go();
+  }
+
+  @Test
+  public void testCountGroupByPushDown() throws Exception {
+    String query = "select count(t.id) as c, t.type from mongo.%s.`%s` t group by t.type";
+
+    queryBuilder()
+        .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+        .planMatcher()
+        .exclude("Agg\\(")
+        .include("MongoGroupScan.*group")
+        .match();
+
+    testBuilder()
+        .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+        .unOrdered()
+        .baselineColumns("c", "type")
+        .baselineValues(5, "donut")
+        .go();
+  }
+
+  @Test
+  public void testSumGroupByPushDown() throws Exception {
+    String query = "select sum(t.sales) s, t.type from mongo.%s.`%s` t group by t.type";
+
+    queryBuilder()
+        .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+        .planMatcher()
+        .exclude("Agg\\(")
+        .include("MongoGroupScan.*group")
+        .match();
+
+    testBuilder()
+        .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+        .unOrdered()
+        .baselineColumns("s", "type")
+        .baselineValues(1194, "donut")
+        .go();
+  }
+
+  @Test
+  public void testCountColumnPushDownWithFilter() throws Exception {
+    String query = "select count(t.id) as c from mongo.%s.`%s` t where t.name = 'Cake'";
+
+    queryBuilder()
+        .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+        .planMatcher()
+        .exclude("Agg\\(", "Filter")
+        .include("MongoGroupScan.*group")
+        .match();
+
+    testBuilder()
+        .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+        .unOrdered()
+        .baselineColumns("c")
+        .baselineValues(1)
+        .go();
+  }
+
+  @Test
+  public void testUnionAll() throws Exception {
+    String query = "select t1.id as id, t1.name from mongo.%1$s.`%2$s` t1 where t1.name = 'Cake' union all " +
+        "select t2.id as id, t2.name from mongo.%1$s.`%2$s` t2";
+
+    queryBuilder()
+        .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+        .planMatcher()
+        .exclude("UnionAll\\(")
+        .include("MongoGroupScan.*\\$unionWith")
+        .match();
+
+    testBuilder()
+        .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+        .unOrdered()
+        .baselineColumns("id", "name")
+        .baselineValues("0001", "Cake")
+        .baselineValues("0001", "Cake")
+        .baselineValues("0002", "Raised")
+        .baselineValues("0003", "Old Fashioned")
+        .baselineValues("0004", "Filled")
+        .baselineValues("0005", "Apple Fritter")
+        .go();
+  }
+
+  @Test
+  public void testUnionDistinct() throws Exception {
+    String query = "select t1.id as id, t1.name from mongo.%1$s.`%2$s` t1 where t1.name = 'Cake' union " +
+        "select t2.id as id, t2.name from mongo.%1$s.`%2$s` t2 ";
+
+    queryBuilder()
+        .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+        .planMatcher()
+        .exclude("UnionAll\\(", "Agg\\(")
+        .include("MongoGroupScan.*\\$unionWith")
+        .match();
+
+    testBuilder()
+        .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+        .unOrdered()
+        .baselineColumns("id", "name")
+        .baselineValues("0001", "Cake")
+        .baselineValues("0002", "Raised")
+        .baselineValues("0003", "Old Fashioned")
+        .baselineValues("0004", "Filled")
+        .baselineValues("0005", "Apple Fritter")
+        .go();
+  }
+
+  @Test
+  public void testProjectPushDown() throws Exception {
+    String query = "select t.sales * t.sales as c, t.name from mongo.%s.`%s` t";
+
+    queryBuilder()
+        .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+        .planMatcher()
+        .include("MongoGroupScan.*project.*multiply")
+        .match();
+
+    testBuilder()
+        .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+        .unOrdered()
+        .baselineColumns("c", "name")
+        .baselineValues(196, "Filled")
+        .baselineValues(1225, "Cake")
+        .baselineValues(21025, "Raised")
+        .baselineValues(90000, "Old Fashioned")
+        .baselineValues(490000, "Apple Fritter")
+        .go();
+  }
+
+  @Test
+  public void testProjectPushDownWithCase() throws Exception {
+    String query = "select case when t.sales >= 700 then 2 when t.sales > 145 then 1 else 0 end as c, t.name from mongo.%s.`%s` t";
+
+    queryBuilder()
+      .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+      .planMatcher()
+      .include("MongoGroupScan.*project.*cond.*\\$gt")
+      .match();
+
+    testBuilder()
+      .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+      .unOrdered()
+      .baselineColumns("c", "name")
+      .baselineValues(0, "Filled")
+      .baselineValues(0, "Cake")
+      .baselineValues(0, "Raised")
+      .baselineValues(1, "Old Fashioned")
+      .baselineValues(2, "Apple Fritter")
+      .go();
+  }
 }
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
index d1fd63f..298744b 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
@@ -35,7 +35,7 @@ public class TestMongoStoragePluginUsesCredentialsStore extends BaseTest {
 
   private void test(String expectedUserName, String expectedPassword, String connection, String name) throws ExecutionSetupException {
     MongoStoragePlugin plugin = new MongoStoragePlugin(
-        new MongoStoragePluginConfig(connection, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER),
+        new MongoStoragePluginConfig(connection, null, 100, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER),
         null, name);
     MongoClientImpl client = (MongoClientImpl) plugin.getClient();
     MongoCredential cred = client.getSettings().getCredential();
diff --git a/docs/dev/MongoStoragePluginCredentials.md b/docs/dev/MongoStoragePluginCredentials.md
new file mode 100644
index 0000000..9819a52
--- /dev/null
+++ b/docs/dev/MongoStoragePluginCredentials.md
@@ -0,0 +1,58 @@
+# Mongo storage plugin credentials
+
+Apache Drill provides the ability to connect and submit queries to
+the MongoDB cluster with enabled authentication.
+
+## Configuring storage plugin
+
+Drill provides numerous ways for providing credentials
+that will be used for connecting to MongoDB.
+
+### Providing username and password with the connection string
+
+The simplest way for providing credentials is by specifying them within the mongo connection string:
+
+```json
+{
+  "type": "mongo",
+  "connection": "mongodb://user1:user1Pass@mongoHost:27017/",
+  "batchSize": 100,
+  "enabled": true
+}
+```
+
+where
+ - `user1` - name of the user
+ - `user1Pass` - user password
+ - `mongoHost` mongo host
+
+This way of providing username and password takes precedence over all other methods.
+
+### Providing username and password with Credentials Provider
+
+Mongo storage plugin is integrated with Credentials provider, so it is possible to specify credentials using it.
+
+Credentials provider creates connection string with provided username and password, 
+similar to the previous section, so Drill will use this connection string when connecting to MongoDB.
+
+Here is the example of using Plain Credentials Provider with Mongo storage plugin, but it is possible to use any
+other Credentials Provider implementation (including custom ones) in a similar manner:
+
+```json
+{
+  "type": "mongo",
+  "connection": "mongodb://mongoHost:27017/",
+  "batchSize": 100,
+  "credentialsProvider": {
+    "credentialsProviderType": "PlainCredentialsProvider",
+    "credentials": {
+      "username": "user1",
+      "password": "user1Pass"
+    }
+  },
+  "enabled": true
+}
+```
+
+Please refer to [Plugin credentials provider](https://github.com/apache/drill/blob/master/docs/dev/PluginCredentialsProvider.md#developer-notes)
+for more details related to Credentials Provider.
diff --git a/docs/dev/StoragePluginPushdowns.md b/docs/dev/StoragePluginPushdowns.md
new file mode 100644
index 0000000..0b97317
--- /dev/null
+++ b/docs/dev/StoragePluginPushdowns.md
@@ -0,0 +1,31 @@
+# Generic storage plugin pushdown framework
+
+Storage plugins may support specific set of operations that have corresponding Drill operators.
+The goal of this framework is to simplify the process of creating conversions from drill operators to 
+plugin-specific operations (pushdowns).
+
+## How it works
+
+A specific plugin that uses this framework, defines a list of pushdowns it supports.
+The framework will add all required rules based on this list to the planner, 
+so if at any point of the planning process, the planner will find out that it is possible to do 
+a specific optimization, corresponding part of the plan will be marked as plugin operation.
+
+## How to use it
+
+Storage plugin should define logic of converting specific operators to plugin expressions by implementing
+`org.apache.drill.exec.store.plan.PluginImplementor` interface
+(or extending `org.apache.drill.exec.store.plan.AbstractPluginImplementor`) class.
+This implementation should have both `canImplement(RelNode)` and `implement(PluginRel)` methods
+to be able to support specific pushdown. The first one checks whether it is possible to convert
+the operation to plugin-specific expression, and the second one is responsible for the conversion itself.
+
+Storage plugin should instantiate `org.apache.drill.exec.store.StoragePluginRulesSupplier` and specify for it
+list of optimizations it wants to enable (it may be done configurable here), specify previously defined 
+implementation of `org.apache.drill.exec.store.plan.PluginImplementor`, and `org.apache.calcite.plan.Convention`
+that includes plugin name.
+This `org.apache.drill.exec.store.StoragePluginRulesSupplier` should be used in 
+`AbstractStoragePlugin.getOptimizerRules()` overridden method by calling 
+`org.apache.drill.exec.store.StoragePluginRulesSupplier.getOptimizerRules()`
+
+Please use `MongoStoragePlugin` as an example, it supports almost all of supported optimizations.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index cc549b1..6779d0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.drill.exec.metastore.analyze.AnalyzeInfoProvider;
 import org.apache.drill.metastore.metadata.TableMetadata;
 import org.apache.drill.metastore.metadata.TableMetadataProvider;
@@ -92,6 +93,11 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
     return getScanStats();
   }
 
+  @Override
+  public ScanStats getScanStats(RelMetadataQuery mq) {
+    return getScanStats();
+  }
+
   @JsonIgnore
   public ScanStats getScanStats() {
     throw new UnsupportedOperationException("This should be implemented.");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 9eaa043..2c6ff4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.physical.base;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.util.Collection;
 import java.util.List;
+
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
@@ -88,6 +90,9 @@ public interface GroupScan extends Scan, HasAffinity {
   @JsonIgnore
   ScanStats getScanStats(PlannerSettings settings);
 
+  @JsonIgnore
+  ScanStats getScanStats(RelMetadataQuery mq);
+
   /**
    * Returns a clone of GroupScan instance, except that the new GroupScan will use the provided list of columns .
    */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
index fe67709..a307f93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.planner.common;
 
 import java.io.IOException;
 import java.util.List;
+
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.logical.DrillTable;
@@ -87,5 +89,5 @@ public abstract class DrillScanRelBase extends TableScan implements DrillRelNode
     return planner.getCostFactory().makeCost(dRows, dCpu, dIo);
   }
 
-  public abstract DrillScanRelBase copy(RelTraitSet traitSet, GroupScan scan);
+  public abstract DrillScanRelBase copy(RelTraitSet traitSet, GroupScan scan, RelDataType rowType);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSortRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSortRelBase.java
new file mode 100644
index 0000000..75b0c1e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSortRelBase.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * Base class for logical and physical Sort implemented in Drill.
+ */
+public abstract class DrillSortRelBase extends Sort implements DrillRelNode, OrderedRel {
+
+  protected DrillSortRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
+    super(cluster, traits, input, collation);
+  }
+
+  protected DrillSortRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) {
+    super(cluster, traits, input, collation, offset, fetch);
+  }
+
+  @Override
+  public RexNode getOffset() {
+    return offset;
+  }
+
+  @Override
+  public RexNode getFetch() {
+    return fetch;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index 26ef4ea..bcd9792 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -193,7 +193,7 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
   }
 
   @Override
-  public DrillScanRel copy(RelTraitSet traitSet, GroupScan scan) {
-    return new DrillScanRel(getCluster(), getTraitSet(), getTable(), scan, getRowType(), getColumns(), partitionFilterPushdown());
+  public DrillScanRel copy(RelTraitSet traitSet, GroupScan scan, RelDataType rowType) {
+    return new DrillScanRel(getCluster(), getTraitSet(), getTable(), scan, rowType, getColumns(), partitionFilterPushdown());
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
index 1e380cf..0fb7b8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
@@ -25,13 +25,12 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.common.logical.data.Order;
 import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.planner.common.DrillSortRelBase;
 import org.apache.drill.exec.planner.torel.ConversionContext;
-import org.apache.drill.exec.planner.common.OrderedRel;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rex.RexNode;
@@ -42,7 +41,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 /**
  * Sort implemented in Drill.
  */
-public class DrillSortRel extends Sort implements DrillRel,OrderedRel {
+public class DrillSortRel extends DrillSortRelBase implements DrillRel {
 
   /** Creates a DrillSortRel. */
   public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
@@ -100,16 +99,6 @@ public class DrillSortRel extends Sort implements DrillRel,OrderedRel {
   }
 
   @Override
-  public RexNode getOffset() {
-    return offset;
-  }
-
-  @Override
-  public RexNode getFetch() {
-    return fetch;
-  }
-
-  @Override
   public boolean canBeDropped() {
     return true;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index 50996b9..1e0bdf4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -60,8 +60,8 @@ public class ScanPrel extends DrillScanRelBase implements LeafPrel, HasDistribut
   }
 
   @Override
-  public ScanPrel copy(RelTraitSet traitSet, GroupScan scan) {
-    return new ScanPrel(getCluster(), traitSet, scan, getRowType(), getTable());
+  public ScanPrel copy(RelTraitSet traitSet, GroupScan scan, RelDataType rowType) {
+    return new ScanPrel(getCluster(), traitSet, scan, rowType, getTable());
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index 7e415b2..3e68d7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -21,13 +21,13 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.drill.exec.planner.common.DrillSortRelBase;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.calcite.rel.RelCollationImpl;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.planner.common.OrderedRel;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
@@ -41,7 +41,7 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rex.RexNode;
 
-public class SortPrel extends org.apache.calcite.rel.core.Sort implements OrderedRel,Prel {
+public class SortPrel extends DrillSortRelBase implements Prel {
   private final boolean isRemovable;
 
   /** Creates a DrillSortRel. */
@@ -154,16 +154,6 @@ public class SortPrel extends org.apache.calcite.rel.core.Sort implements Ordere
   }
 
   @Override
-  public RexNode getOffset() {
-    return offset;
-  }
-
-  @Override
-  public RexNode getFetch() {
-    return fetch;
-  }
-
-  @Override
   public boolean canBeDropped() {
     return isRemovable;
   }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
similarity index 58%
copy from contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
index ef89bfb..635b7c6 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
@@ -15,20 +15,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.mongo.common;
-
-public enum MongoCompareOp {
-  EQUAL("$eq"), NOT_EQUAL("$ne"), GREATER_OR_EQUAL("$gte"), GREATER("$gt"), LESS_OR_EQUAL(
-      "$lte"), LESS("$lt"), IN("$in"), AND("$and"), OR("$or"), REGEX("$regex"), OPTIONS(
-      "$options"), PROJECT("$project"), COND("$cond"), IFNULL("$ifNull"), IFNOTNULL(
-      "$ifNotNull"), SUM("$sum"), GROUP_BY("$group"), EXISTS("$exists");
-  private String compareOp;
-
-  MongoCompareOp(String compareOp) {
-    this.compareOp = compareOp;
-  }
-
-  public String getCompareOp() {
-    return compareOp;
-  }
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.RelOptRule;
+
+import java.util.List;
+
+/**
+ * Provides rules required for adding support of specific operator pushdown for storage plugin.
+ */
+public interface PluginRulesProvider {
+
+  List<RelOptRule> sortRules();
+
+  List<RelOptRule> limitRules();
+
+  List<RelOptRule> filterRules();
+
+  List<RelOptRule> projectRules();
+
+  List<RelOptRule> aggregateRules();
+
+  List<RelOptRule> unionRules();
+
+  List<RelOptRule> joinRules();
+
+  RelOptRule vertexRule();
+
+  RelOptRule prelConverterRule();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProviderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProviderImpl.java
new file mode 100644
index 0000000..b91b47e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProviderImpl.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rule.PluginAggregateRule;
+import org.apache.drill.exec.store.plan.rule.PluginFilterRule;
+import org.apache.drill.exec.store.plan.rule.PluginIntermediatePrelConverterRule;
+import org.apache.drill.exec.store.plan.rule.PluginJoinRule;
+import org.apache.drill.exec.store.plan.rule.PluginLimitRule;
+import org.apache.drill.exec.store.plan.rule.PluginProjectRule;
+import org.apache.drill.exec.store.plan.rule.PluginSortRule;
+import org.apache.drill.exec.store.plan.rule.PluginUnionRule;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class PluginRulesProviderImpl implements PluginRulesProvider {
+
+  private final Supplier<PluginImplementor> implementorSupplier;
+  private final PluginImplementor pluginImplementor;
+  private final Convention convention;
+
+  public PluginRulesProviderImpl(Convention convention,
+      Supplier<PluginImplementor> implementorSupplier) {
+    this.convention = convention;
+    this.implementorSupplier = implementorSupplier;
+    this.pluginImplementor = implementorSupplier.get();
+  }
+
+  @Override
+  public List<RelOptRule> sortRules() {
+    return Arrays.asList(
+        new PluginSortRule(Convention.NONE, convention, pluginImplementor),
+        new PluginSortRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+    );
+  }
+
+  @Override
+  public List<RelOptRule> limitRules() {
+    return Arrays.asList(
+        new PluginLimitRule(Convention.NONE, convention, pluginImplementor),
+        new PluginLimitRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+    );
+  }
+
+  @Override
+  public List<RelOptRule> filterRules() {
+    return Arrays.asList(
+        new PluginFilterRule(Convention.NONE, convention, pluginImplementor),
+        new PluginFilterRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+    );
+  }
+
+  @Override
+  public List<RelOptRule> projectRules() {
+    return Arrays.asList(
+        new PluginProjectRule(Convention.NONE, convention, pluginImplementor),
+        new PluginProjectRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+    );
+  }
+
+  @Override
+  public List<RelOptRule> aggregateRules() {
+    return Arrays.asList(
+        new PluginAggregateRule(Convention.NONE, convention, pluginImplementor),
+        new PluginAggregateRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+    );
+  }
+
+  @Override
+  public List<RelOptRule> unionRules() {
+    return Arrays.asList(
+        new PluginUnionRule(Convention.NONE, convention, pluginImplementor),
+        new PluginUnionRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+    );
+  }
+
+  @Override
+  public List<RelOptRule> joinRules() {
+    return Arrays.asList(
+      new PluginJoinRule(Convention.NONE, convention, pluginImplementor),
+      new PluginJoinRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+    );
+  }
+
+  @Override
+  public RelOptRule vertexRule() {
+    return new VertexDrelConverterRule(convention);
+  }
+
+  @Override
+  public RelOptRule prelConverterRule() {
+    return new PluginIntermediatePrelConverterRule(convention, implementorSupplier);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRulesSupplier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRulesSupplier.java
new file mode 100644
index 0000000..2dc14ac
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRulesSupplier.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+/**
+ * Helper class that can be used to obtain rules required for pushing down operators
+ * that specific plugin supports configured using {@link StoragePluginRulesSupplierBuilder}.
+ */
+public class StoragePluginRulesSupplier {
+
+  private final StoragePluginRulesSupplierBuilder storagePluginRulesSupplierBuilder;
+
+  private StoragePluginRulesSupplier(StoragePluginRulesSupplierBuilder storagePluginRulesSupplierBuilder) {
+    this.storagePluginRulesSupplierBuilder = storagePluginRulesSupplierBuilder;
+  }
+
+  public Set<? extends RelOptRule> getOptimizerRules() {
+    ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.builder();
+    PluginRulesProvider rulesProvider = storagePluginRulesSupplierBuilder.rulesProvider();
+    if (storagePluginRulesSupplierBuilder.supportsProjectPushdown()) {
+      builder.addAll(rulesProvider.projectRules());
+    }
+    if (storagePluginRulesSupplierBuilder.supportsFilterPushdown()) {
+      builder.addAll(rulesProvider.filterRules());
+    }
+    if (storagePluginRulesSupplierBuilder.supportsSortPushdown()) {
+      builder.addAll(rulesProvider.sortRules());
+    }
+    if (storagePluginRulesSupplierBuilder.supportsUnionPushdown()) {
+      builder.addAll(rulesProvider.unionRules());
+    }
+    if (storagePluginRulesSupplierBuilder.supportsJoinPushdown()) {
+      builder.addAll(rulesProvider.joinRules());
+    }
+    if (storagePluginRulesSupplierBuilder.supportsAggregatePushdown()) {
+      builder.addAll(rulesProvider.aggregateRules());
+    }
+    if (storagePluginRulesSupplierBuilder.supportsLimitPushdown()) {
+      builder.addAll(rulesProvider.limitRules());
+    }
+    builder.add(rulesProvider.vertexRule());
+    builder.add(rulesProvider.prelConverterRule());
+    return builder.build();
+  }
+
+  public Convention convention() {
+    return storagePluginRulesSupplierBuilder.convention();
+  }
+
+  public static StoragePluginRulesSupplierBuilder builder() {
+    return new StoragePluginRulesSupplierBuilder();
+  }
+
+  public static class StoragePluginRulesSupplierBuilder {
+
+    private boolean supportsProjectPushdown;
+
+    private boolean supportsFilterPushdown;
+
+    private boolean supportsAggregatePushdown;
+
+    private boolean supportsSortPushdown;
+
+    private boolean supportsUnionPushdown;
+
+    private boolean supportsJoinPushdown;
+
+    private boolean supportsLimitPushdown;
+
+    private PluginRulesProvider rulesProvider;
+
+    private Convention convention;
+
+    public boolean supportsProjectPushdown() {
+      return supportsProjectPushdown;
+    }
+
+    public StoragePluginRulesSupplierBuilder supportsProjectPushdown(boolean supportsProjectPushdown) {
+      this.supportsProjectPushdown = supportsProjectPushdown;
+      return this;
+    }
+
+    public boolean supportsFilterPushdown() {
+      return supportsFilterPushdown;
+    }
+
+    public StoragePluginRulesSupplierBuilder supportsFilterPushdown(boolean supportsFilterPushdown) {
+      this.supportsFilterPushdown = supportsFilterPushdown;
+      return this;
+    }
+
+    public boolean supportsAggregatePushdown() {
+      return supportsAggregatePushdown;
+    }
+
+    public StoragePluginRulesSupplierBuilder supportsAggregatePushdown(boolean supportsAggregatePushdown) {
+      this.supportsAggregatePushdown = supportsAggregatePushdown;
+      return this;
+    }
+
+    public boolean supportsSortPushdown() {
+      return supportsSortPushdown;
+    }
+
+    public StoragePluginRulesSupplierBuilder supportsSortPushdown(boolean supportsSortPushdown) {
+      this.supportsSortPushdown = supportsSortPushdown;
+      return this;
+    }
+
+    public boolean supportsUnionPushdown() {
+      return supportsUnionPushdown;
+    }
+
+    public StoragePluginRulesSupplierBuilder supportsUnionPushdown(boolean supportsUnionPushdown) {
+      this.supportsUnionPushdown = supportsUnionPushdown;
+      return this;
+    }
+
+    public boolean supportsJoinPushdown() {
+      return supportsJoinPushdown;
+    }
+
+    public StoragePluginRulesSupplierBuilder supportsJoinPushdown(boolean supportsJoinPushdown) {
+      this.supportsJoinPushdown = supportsJoinPushdown;
+      return this;
+    }
+
+    public boolean supportsLimitPushdown() {
+      return supportsLimitPushdown;
+    }
+
+    public StoragePluginRulesSupplierBuilder supportsLimitPushdown(boolean supportsLimitPushdown) {
+      this.supportsLimitPushdown = supportsLimitPushdown;
+      return this;
+    }
+
+    public PluginRulesProvider rulesProvider() {
+      return rulesProvider;
+    }
+
+    public StoragePluginRulesSupplierBuilder rulesProvider(PluginRulesProvider rulesProvider) {
+      this.rulesProvider = rulesProvider;
+      return this;
+    }
+
+    public Convention convention() {
+      return convention;
+    }
+
+    public StoragePluginRulesSupplierBuilder convention(Convention convention) {
+      this.convention = convention;
+      return this;
+    }
+
+    public StoragePluginRulesSupplier build() {
+      return new StoragePluginRulesSupplier(this);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
new file mode 100644
index 0000000..7c3639b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan;
+
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Abstract base implementation of {@link PluginImplementor} that can be used by
+ * plugin implementors which can support only a subset of all provided operations.
+ */
+public abstract class AbstractPluginImplementor implements PluginImplementor {
+  private static final Logger logger = LoggerFactory.getLogger(AbstractPluginImplementor.class);
+
+  @Override
+  public void implement(PluginAggregateRel aggregate) throws IOException {
+    throw getUnsupported("aggregate");
+  }
+
+  @Override
+  public void implement(PluginFilterRel filter) throws IOException {
+    throw getUnsupported("filter");
+  }
+
+  @Override
+  public void implement(PluginLimitRel limit) throws IOException {
+    throw getUnsupported("limit");
+  }
+
+  @Override
+  public void implement(PluginProjectRel project) throws IOException {
+    throw getUnsupported("project");
+  }
+
+  @Override
+  public void implement(PluginSortRel sort) throws IOException {
+    throw getUnsupported("sort");
+  }
+
+  @Override
+  public void implement(PluginUnionRel union) throws IOException {
+    throw getUnsupported("union");
+  }
+
+  @Override
+  public void implement(PluginJoinRel join) throws IOException {
+    throw getUnsupported("join");
+  }
+
+  @Override
+  public void implement(StoragePluginTableScan scan) throws IOException {
+    throw getUnsupported("scan");
+  }
+
+  @Override
+  public boolean canImplement(Aggregate aggregate) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(Filter filter) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(DrillLimitRelBase limit) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(Project project) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(Sort sort) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(Union union) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(TableScan scan) {
+    return false;
+  }
+
+  @Override
+  public boolean canImplement(Join scan) {
+    return false;
+  }
+
+  private UserException getUnsupported(String rel) {
+    return UserException.unsupportedError()
+        .message("Plugin implementor doesn't support push down for %s", rel)
+        .build(logger);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
new file mode 100644
index 0000000..db46467
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+
+import java.io.IOException;
+
+/**
+ * Callback for the implementation process that checks whether a specific operator
+ * can be converted and converts a tree of {@link PluginRel} nodes into expressions
+ * that can be consumed by the storage plugin.
+ */
+public interface PluginImplementor {
+
+  void implement(PluginAggregateRel aggregate) throws IOException;
+
+  void implement(PluginFilterRel filter) throws IOException;
+
+  void implement(PluginLimitRel limit) throws IOException;
+
+  void implement(PluginProjectRel project) throws IOException;
+
+  void implement(PluginSortRel sort) throws IOException;
+
+  void implement(PluginUnionRel union) throws IOException;
+
+  void implement(PluginJoinRel join) throws IOException;
+
+  void implement(StoragePluginTableScan scan) throws IOException;
+
+  boolean canImplement(Aggregate aggregate);
+
+  boolean canImplement(Filter filter);
+
+  boolean canImplement(DrillLimitRelBase limit);
+
+  boolean canImplement(Project project);
+
+  boolean canImplement(Sort sort);
+
+  boolean canImplement(Union union);
+
+  boolean canImplement(Join scan);
+
+  boolean canImplement(TableScan scan);
+
+  GroupScan getPhysicalOperator() throws IOException;
+
+  default void visitChild(RelNode input) throws IOException {
+    ((PluginRel) input).implement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
new file mode 100644
index 0000000..02885e9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Aggregate implementation for Drill plugins.
+ */
+public class PluginAggregateRel extends DrillAggregateRelBase implements PluginRel {
+
+  public PluginAggregateRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
+      ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+    super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+    assert getConvention() == input.getConvention();
+  }
+
+  @Override
+  public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet,
+      List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+    return new PluginAggregateRel(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginDrillTable.java
new file mode 100644
index 0000000..7528d99
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginDrillTable.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.util.Utilities;
+
+import java.io.IOException;
+
+/**
+ * Table implementation based on {@link DynamicDrillTable} to be used by Drill plugins.
+ */
+public class PluginDrillTable extends DynamicDrillTable implements TranslatableTable {
+  private final Convention convention;
+
+  public PluginDrillTable(StoragePlugin plugin, String storageEngineName,
+      String userName, Object selection, Convention convention) {
+    super(plugin, storageEngineName, userName, selection);
+    this.convention = convention;
+  }
+
+  @Override
+  public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable table) {
+    DrillTable drillTable = Utilities.getDrillTable(table);
+    try {
+      return new StoragePluginTableScan(context.getCluster(),
+          context.getCluster().traitSetOf(convention),
+          drillTable.getGroupScan(),
+          table,
+          table.getRowType());
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java
new file mode 100644
index 0000000..cb63571
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillFilterRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+
+/**
+ * Filter implementation for Drill plugins.
+ */
+public class PluginFilterRel extends DrillFilterRelBase implements PluginRel {
+
+  public PluginFilterRel(Convention convention, RelOptCluster cluster,
+      RelTraitSet traitSet, RelNode child, RexNode condition) {
+    super(convention, cluster, traitSet, child, condition);
+    assert getConvention() == child.getConvention();
+  }
+
+  @Override public PluginFilterRel copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+    return new PluginFilterRel(getConvention(), getCluster(), traitSet, input, condition);
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginIntermediatePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginIntermediatePrel.java
new file mode 100644
index 0000000..8c9f9bb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginIntermediatePrel.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.SinglePrel;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.planner.sql.handlers.PrelFinalizable;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Prel used to represent a Plugin Conversion within an expression tree.
+ * This Prel will replaced with a full {@link PluginPrel} before execution can happen.
+ */
+public class PluginIntermediatePrel extends SinglePrel implements PrelFinalizable {
+
+  private final Supplier<PluginImplementor> implementorFactory;
+
+  public PluginIntermediatePrel(RelOptCluster cluster, RelTraitSet traits,
+      RelNode child, Supplier<PluginImplementor> implementorFactory) {
+    super(cluster, traits, child);
+    this.implementorFactory = implementorFactory;
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new PluginIntermediatePrel(getCluster(), traitSet, getInput(), implementorFactory);
+  }
+
+  @Override
+  protected Object clone() throws CloneNotSupportedException {
+    return copy(getTraitSet(), getInputs());
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public Prel finalizeRel() {
+    return new PluginPrel(getCluster(), this);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) {
+    throw new UnsupportedOperationException("This needs to be finalized before using a PrelVisitor.");
+  }
+
+  @Override
+  public boolean needsFinalColumnReordering() {
+    return false;
+  }
+
+  public PluginImplementor getPluginImplementor() {
+    return implementorFactory.get();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginJoinRel.java
new file mode 100644
index 0000000..4bf767f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginJoinRel.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillJoinRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+
+/**
+ * Join implementation for Drill plugins.
+ */
+public class PluginJoinRel extends DrillJoinRelBase implements PluginRel {
+
+  public PluginJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+      RexNode condition, JoinRelType joinType) {
+    super(cluster, traits, left, right, condition, joinType);
+  }
+
+  @Override
+  public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right,
+      JoinRelType joinType, boolean semiJoinDone) {
+    return new PluginJoinRel(getCluster(), traitSet, left, right, conditionExpr, joinType);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java
new file mode 100644
index 0000000..6902ce2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Limit implementation for Drill plugins.
+ */
+public class PluginLimitRel extends DrillLimitRelBase implements PluginRel {
+
+  public PluginLimitRel(RelOptCluster cluster, RelTraitSet traitSet,
+      RelNode child, RexNode offset, RexNode fetch) {
+    super(cluster, traitSet, child, offset, fetch);
+    assert getConvention() == child.getConvention();
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.05);
+  }
+
+  @Override
+  public PluginLimitRel copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new PluginLimitRel(getCluster(), traitSet, inputs.get(0), offset, fetch);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
new file mode 100644
index 0000000..cc96426
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.SubsetRemover;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * Represents a plugin-specific plan once children nodes have been pushed down into group scan.
+ */
+public class PluginPrel extends AbstractRelNode implements Prel {
+  private final GroupScan groupScan;
+  private final RelDataType rowType;
+
+  public PluginPrel(RelOptCluster cluster, PluginIntermediatePrel intermediatePrel) {
+    super(cluster, intermediatePrel.getTraitSet());
+    this.rowType = intermediatePrel.getRowType();
+    PluginRel input = (PluginRel) intermediatePrel.getInput().accept(SubsetRemover.INSTANCE);
+    try {
+      PluginImplementor implementor = intermediatePrel.getPluginImplementor();
+      input.implement(implementor);
+      this.groupScan = implementor.getPhysicalOperator();
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) {
+    return creator.addMetadata(this, groupScan);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
+    return BatchSchema.SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode getEncoding() {
+    return BatchSchema.SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public boolean needsFinalColumnReordering() {
+    return false;
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return Collections.emptyIterator();
+  }
+
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw).item("groupScan", groupScan);
+  }
+
+  @Override
+  public double estimateRowCount(RelMetadataQuery mq) {
+    return groupScan.getScanStats(mq).getRecordCount();
+  }
+
+  @Override
+  protected RelDataType deriveRowType() {
+    return rowType;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java
new file mode 100644
index 0000000..b646ef7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Project implementation for Drill plugins.
+ */
+public class PluginProjectRel extends DrillProjectRelBase implements PluginRel {
+
+  public PluginProjectRel(Convention convention, RelOptCluster cluster, RelTraitSet traitSet,
+      RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+    super(convention, cluster, traitSet, input, projects, rowType);
+    assert getConvention() == input.getConvention();
+  }
+
+  @Override public Project copy(RelTraitSet traitSet, RelNode input,
+      List<RexNode> projects, RelDataType rowType) {
+    return new PluginProjectRel(getConvention(), getCluster(), traitSet, input, projects, rowType);
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
similarity index 58%
rename from contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
index ef89bfb..55b54f7 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
@@ -15,20 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.mongo.common;
+package org.apache.drill.exec.store.plan.rel;
 
-public enum MongoCompareOp {
-  EQUAL("$eq"), NOT_EQUAL("$ne"), GREATER_OR_EQUAL("$gte"), GREATER("$gt"), LESS_OR_EQUAL(
-      "$lte"), LESS("$lt"), IN("$in"), AND("$and"), OR("$or"), REGEX("$regex"), OPTIONS(
-      "$options"), PROJECT("$project"), COND("$cond"), IFNULL("$ifNull"), IFNOTNULL(
-      "$ifNotNull"), SUM("$sum"), GROUP_BY("$group"), EXISTS("$exists");
-  private String compareOp;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.store.plan.PluginImplementor;
 
-  MongoCompareOp(String compareOp) {
-    this.compareOp = compareOp;
-  }
+import java.io.IOException;
 
-  public String getCompareOp() {
-    return compareOp;
-  }
+/**
+ * Relational expression that uses specific plugin calling convention.
+ */
+public interface PluginRel extends RelNode {
+
+  void implement(PluginImplementor implementor) throws IOException;
+
+  boolean canImplement(PluginImplementor implementor);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java
new file mode 100644
index 0000000..3e2968a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillSortRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.io.IOException;
+
+/**
+ * Sort implementation for Drill plugins.
+ */
+public class PluginSortRel extends DrillSortRelBase implements PluginRel {
+  public PluginSortRel(RelOptCluster cluster, RelTraitSet traitSet,
+      RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
+    super(cluster, traitSet, child, collation, offset, fetch);
+    assert getConvention() == child.getConvention();
+  }
+
+  @Override
+  public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+  }
+
+  @Override
+  public PluginSortRel copy(RelTraitSet traitSet, RelNode input,
+      RelCollation newCollation, RexNode offset, RexNode fetch) {
+    return new PluginSortRel(getCluster(), traitSet, input, collation, offset, fetch);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+
+  @Override
+  public boolean canBeDropped() {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
new file mode 100644
index 0000000..3d61ea5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.planner.common.DrillUnionRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Union implementation for Drill plugins.
+ */
+public class PluginUnionRel extends DrillUnionRelBase implements PluginRel {
+
+  public PluginUnionRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+      boolean all, boolean checkCompatibility) throws InvalidRelException {
+    super(cluster, traits, inputs, all, checkCompatibility);
+  }
+
+  @Override
+  public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+    try {
+      return new PluginUnionRel(getCluster(), traitSet, inputs, all, true);
+    } catch (InvalidRelException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(.1);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
new file mode 100644
index 0000000..68e4b1f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+
+/**
+ * Storage plugin table scan rel implementation.
+ */
+public class StoragePluginTableScan extends DrillScanRelBase implements PluginRel {
+
+  private final RelDataType rowType;
+
+  public StoragePluginTableScan(RelOptCluster cluster, RelTraitSet traits, GroupScan grpScan,
+      RelOptTable table, RelDataType rowType) {
+    super(cluster, traits, grpScan, table);
+    this.rowType = rowType;
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+
+  @Override
+  public DrillScanRelBase copy(RelTraitSet traitSet, GroupScan scan, RelDataType rowType) {
+    return new StoragePluginTableScan(getCluster(), traitSet, scan, getTable(), rowType);
+  }
+
+  @Override
+  public double estimateRowCount(RelMetadataQuery mq) {
+    return getGroupScan().getScanStats(mq).getRecordCount();
+  }
+
+  @Override
+  public RelDataType deriveRowType() {
+    return this.rowType;
+  }
+
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw).item("groupScan", getGroupScan().getDigest());
+  }
+
+  @Override
+  protected String computeDigest() {
+    return super.computeDigest();
+  }
+
+  @Override
+  public boolean canImplement(PluginImplementor implementor) {
+    return implementor.canImplement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java
new file mode 100644
index 0000000..2ebf66e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+
+/**
+ * The rule that converts provided aggregate operator to plugin-specific implementation.
+ */
+public class PluginAggregateRule extends PluginConverterRule {
+
+  public PluginAggregateRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+    super(Aggregate.class, in, out, "PluginAggregateRule", pluginImplementor);
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Aggregate agg = (Aggregate) rel;
+    return new PluginAggregateRel(
+        rel.getCluster(),
+        agg.getTraitSet().replace(getOutConvention()),
+        convert(agg.getInput(), agg.getTraitSet().replace(getOutConvention()).simplify()),
+        agg.getGroupSet(),
+        agg.getGroupSets(),
+        agg.getAggCallList());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java
new file mode 100644
index 0000000..377eba3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.util.function.Predicate;
+
+/**
+ * Abstract base class for a rule that converts provided operator to plugin-specific implementation.
+ */
+public abstract class PluginConverterRule extends ConverterRule {
+
+  private final PluginImplementor pluginImplementor;
+
+  protected PluginConverterRule(Class<? extends RelNode> clazz,
+      RelTrait in, Convention out, String description, PluginImplementor pluginImplementor) {
+    super(clazz, (Predicate<RelNode>) input -> true, in, out, DrillRelFactories.LOGICAL_BUILDER, description);
+    this.pluginImplementor = pluginImplementor;
+  }
+
+  public PluginImplementor getPluginImplementor() {
+    return pluginImplementor;
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    RelNode rel = call.rel(0);
+    boolean canImplement = false;
+    // cannot use visitor pattern here, since RelShuttle supports only logical rel implementations
+    if (rel instanceof Aggregate) {
+      canImplement = pluginImplementor.canImplement(((Aggregate) rel));
+    } else if (rel instanceof Filter) {
+      canImplement = pluginImplementor.canImplement(((Filter) rel));
+    } else if (rel instanceof DrillLimitRelBase) {
+      canImplement = pluginImplementor.canImplement(((DrillLimitRelBase) rel));
+    } else if (rel instanceof Project) {
+      canImplement = pluginImplementor.canImplement(((Project) rel));
+    } else if (rel instanceof Sort) {
+      canImplement = pluginImplementor.canImplement(((Sort) rel));
+    } else if (rel instanceof Union) {
+      canImplement = pluginImplementor.canImplement(((Union) rel));
+    } else if (rel instanceof Join) {
+      canImplement = pluginImplementor.canImplement(((Join) rel));
+    }
+    return canImplement && super.matches(call);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
new file mode 100644
index 0000000..e5e02e7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+
+/**
+ * The rule that converts provided filter operator to plugin-specific implementation.
+ */
+public class PluginFilterRule extends PluginConverterRule {
+
+  public PluginFilterRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+    super(Filter.class, in, out, "PluginFilterRule", pluginImplementor);
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Filter filter = (Filter) rel;
+    return new PluginFilterRel(
+        getOutConvention(),
+        rel.getCluster(),
+        filter.getTraitSet().replace(getOutConvention()),
+        convert(filter.getInput(), filter.getTraitSet().replace(getOutConvention())),
+        filter.getCondition());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
new file mode 100644
index 0000000..279241e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginIntermediatePrel;
+
+import java.util.function.Supplier;
+
+public class PluginIntermediatePrelConverterRule extends RelOptRule {
+  private final Supplier<PluginImplementor> implementorFactory;
+  private final RelTrait inTrait;
+  private final RelTrait outTrait;
+
+  public PluginIntermediatePrelConverterRule(Convention convention,
+      Supplier<PluginImplementor> implementorFactory) {
+    super(
+        RelOptHelper.some(VertexDrel.class, DrillRel.DRILL_LOGICAL,
+            RelOptHelper.any(RelNode.class, convention)),
+        DrillRelFactories.LOGICAL_BUILDER, "EnumerableIntermediatePrelConverterRule" + convention);
+    this.implementorFactory = implementorFactory;
+    this.inTrait = DrillRel.DRILL_LOGICAL;
+    this.outTrait = Prel.DRILL_PHYSICAL;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    VertexDrel in = call.rel(0);
+    RelNode intermediatePrel = new PluginIntermediatePrel(
+        in.getCluster(),
+        in.getTraitSet().replace(outTrait),
+        in.getInput(0),
+        implementorFactory);
+    call.transformTo(intermediatePrel);
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    return super.matches(call) && call.rel(0).getTraitSet().contains(inTrait);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
new file mode 100644
index 0000000..a71060b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
+
+/**
+ * The rule that converts provided join operator to plugin-specific implementation.
+ */
+public class PluginJoinRule extends PluginConverterRule {
+
+  public PluginJoinRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+    super(Join.class, in, out, "PluginProjectRule", pluginImplementor);
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Join join = (Join) rel;
+    RelTraitSet traits = join.getTraitSet().replace(getOutConvention());
+    return new PluginJoinRel(
+        join.getCluster(),
+        traits,
+        convert(join.getLeft(), traits),
+        convert(join.getRight(), traits),
+        join.getCondition(),
+        join.getJoinType());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java
new file mode 100644
index 0000000..96a9fcc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+
+public class PluginLimitRule extends PluginConverterRule {
+
+  public PluginLimitRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+    super(DrillLimitRelBase.class, in, out, "PluginLimitRule", pluginImplementor);
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    DrillLimitRelBase sort = (DrillLimitRelBase) rel;
+    RelNode input = convert(sort.getInput(), sort.getInput().getTraitSet().replace(getOutConvention()).simplify());
+    return new PluginLimitRel(
+        rel.getCluster(),
+        sort.getTraitSet().replace(getOutConvention()),
+        input,
+        sort.getOffset(),
+        sort.getFetch());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java
new file mode 100644
index 0000000..6968cc6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+
+/**
+ * The rule that converts provided project operator to plugin-specific implementation.
+ */
+public class PluginProjectRule extends PluginConverterRule {
+
+  public PluginProjectRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+    super(Project.class, in, out, "PluginProjectRule", pluginImplementor);
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Project project = (Project) rel;
+    return new PluginProjectRel(
+        getOutConvention(),
+        project.getCluster(),
+        project.getTraitSet().replace(getOutConvention()),
+        convert(project.getInput(), project.getTraitSet().replace(getOutConvention())),
+        project.getProjects(),
+        project.getRowType());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java
new file mode 100644
index 0000000..c9d9823
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+
+/**
+ * The rule that converts provided sort operator to plugin-specific implementation.
+ */
+public class PluginSortRule extends PluginConverterRule {
+
+  public PluginSortRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+    super(Sort.class, in, out, "PluginSortRule", pluginImplementor);
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Sort sort = (Sort) rel;
+    RelNode input = convert(sort.getInput(), sort.getInput().getTraitSet().replace(getOutConvention()).simplify());
+    return new PluginSortRel(
+        rel.getCluster(),
+        sort.getTraitSet().replace(getOutConvention()).replace(sort.getCollation()),
+        input,
+        sort.getCollation(),
+        sort.offset,
+        sort.fetch);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java
new file mode 100644
index 0000000..fa1722a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The rule that converts provided union operator to plugin-specific implementation.
+ */
+public class PluginUnionRule extends PluginConverterRule {
+  private static final Logger logger = LoggerFactory.getLogger(PluginUnionRule.class);
+
+  public PluginUnionRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+    super(Union.class, in, out, "PluginUnionRule", pluginImplementor);
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Union union = (Union) rel;
+    try {
+      return new PluginUnionRel(
+          rel.getCluster(),
+          union.getTraitSet().replace(getOutConvention()),
+          convertList(union.getInputs(), getOutConvention()),
+          union.all,
+          true);
+    } catch (InvalidRelException e) {
+      logger.warn(e.getMessage());
+      return null;
+    }
+  }
+}