You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2021/08/05 17:23:15 UTC

[drill] 02/13: DRILL-7971: Intermediate commit

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch mongo
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 32d04143847783ce2061be2368b625a170c41543
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Tue Jul 13 20:34:03 2021 +0300

    DRILL-7971: Intermediate commit
---
 .../exec/store/mongo/BaseMongoSubScanSpec.java     |  64 ++
 .../exec/store/mongo/MongoAggregateUtils.java      | 235 ++++++++
 .../drill/exec/store/mongo/MongoFilterBuilder.java |  54 +-
 .../drill/exec/store/mongo/MongoGroupScan.java     | 120 ++--
 .../store/mongo/MongoPushDownAggregateForScan.java | 301 ----------
 .../store/mongo/MongoPushDownFilterForScan.java    |   9 +-
 .../drill/exec/store/mongo/MongoRecordReader.java  |  44 +-
 .../exec/store/mongo/MongoScanBatchCreator.java    |  12 +-
 .../drill/exec/store/mongo/MongoScanSpec.java      |  30 +-
 .../drill/exec/store/mongo/MongoStoragePlugin.java |  56 +-
 .../drill/exec/store/mongo/MongoSubScan.java       | 121 ++--
 .../store/mongo/plan/MongoPluginImplementor.java   | 206 +++++++
 .../store/mongo/plan/MongoPluginRulesProvider.java |  76 +++
 .../drill/exec/store/mongo/plan/MongoRules.java    | 649 +++++++++++++++++++++
 .../drill/exec/store/mongo/plan/MongoTable.java    | 197 +++++++
 .../store/mongo/schema/MongoDatabaseSchema.java    |   3 +-
 .../store/mongo/schema/MongoSchemaFactory.java     |   4 +-
 .../exec/store/mongo/TestMongoChunkAssignment.java |   2 +-
 .../exec/store/mongo/TestMongoLimitPushDown.java   |  20 +-
 .../drill/exec/store/mongo/TestMongoQueries.java   |  53 +-
 .../exec/physical/base/AbstractGroupScan.java      |   6 +
 .../apache/drill/exec/physical/base/GroupScan.java |   5 +
 .../exec/planner/common/DrillSortRelBase.java      |  29 +
 .../drill/exec/planner/logical/DrillSortRel.java   |  15 +-
 .../drill/exec/planner/physical/SortPrel.java      |  14 +-
 .../drill/exec/store/PlannableStoragePlugin.java   | 189 ++++++
 .../drill/exec/store/PluginRulesProvider.java      |  16 +
 .../drill/exec/store/plan/PluginImplementor.java   |  39 ++
 .../exec/store/plan/rel/PluginAggregateRel.java    |  87 +++
 .../exec/store/plan/rel/PluginDrillTable.java      |  37 ++
 .../drill/exec/store/plan/rel/PluginFilterRel.java |  59 ++
 .../store/plan/rel/PluginIntermediatePrelRel.java  |  83 +++
 .../drill/exec/store/plan/rel/PluginLimitRel.java  |  54 ++
 .../drill/exec/store/plan/rel/PluginPrel.java      |  83 +++
 .../exec/store/plan/rel/PluginProjectRel.java      |  59 ++
 .../drill/exec/store/plan/rel/PluginRel.java       |  11 +
 .../drill/exec/store/plan/rel/PluginSortRel.java   |  61 ++
 .../drill/exec/store/plan/rel/PluginUnionRel.java  |  42 ++
 .../store/plan/rel/StoragePluginTableScan.java     |  74 +++
 .../exec/store/plan/rule/PluginAggregateRule.java  |  38 ++
 .../exec/store/plan/rule/PluginConverterRule.java  |  16 +
 .../exec/store/plan/rule/PluginFilterRule.java     |  30 +
 .../rule/PluginIntermediatePrelConverterRule.java  |  50 ++
 .../exec/store/plan/rule/PluginLimitRule.java      |  25 +
 .../exec/store/plan/rule/PluginProjectRule.java    |  27 +
 .../drill/exec/store/plan/rule/PluginSortRule.java |  28 +
 .../exec/store/plan/rule/PluginUnionRule.java      |  42 ++
 47 files changed, 2886 insertions(+), 589 deletions(-)

diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
new file mode 100644
index 0000000..fd05500
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
@@ -0,0 +1,64 @@
+package org.apache.drill.exec.store.mongo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.PlanStringBuilder;
+
+import java.util.List;
+
+public class BaseMongoSubScanSpec {
+
+  protected String dbName;
+  protected String collectionName;
+  protected List<String> hosts;
+
+  @JsonCreator
+  public BaseMongoSubScanSpec(@JsonProperty("dbName") String dbName,
+      @JsonProperty("collectionName") String collectionName,
+      @JsonProperty("hosts") List<String> hosts) {
+    this.dbName = dbName;
+    this.collectionName = collectionName;
+    this.hosts = hosts;
+  }
+
+  BaseMongoSubScanSpec() {
+
+  }
+
+  public String getDbName() {
+    return dbName;
+  }
+
+  public BaseMongoSubScanSpec setDbName(String dbName) {
+    this.dbName = dbName;
+    return this;
+  }
+
+  public String getCollectionName() {
+    return collectionName;
+  }
+
+  public BaseMongoSubScanSpec setCollectionName(String collectionName) {
+    this.collectionName = collectionName;
+    return this;
+  }
+
+  public List<String> getHosts() {
+    return hosts;
+  }
+
+  public BaseMongoSubScanSpec setHosts(List<String> hosts) {
+    this.hosts = hosts;
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+        .field("dbName", dbName)
+        .field("collectionName", collectionName)
+        .field("hosts", hosts)
+        .toString();
+
+  }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
new file mode 100644
index 0000000..a38c494
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
@@ -0,0 +1,235 @@
+package org.apache.drill.exec.store.mongo;
+
+import com.mongodb.client.model.Accumulators;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.BsonField;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Util;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiFunction;
+
+public class MongoAggregateUtils {
+
+  public static List<String> mongoFieldNames(RelDataType rowType) {
+    return SqlValidatorUtil.uniquify(
+        new AbstractList<String>() {
+          @Override public String get(int index) {
+            final String name = rowType.getFieldList().get(index).getName();
+            return name.startsWith("$") ? "_" + name.substring(2) : name;
+          }
+
+          @Override public int size() {
+            return rowType.getFieldCount();
+          }
+        },
+        SqlValidatorUtil.EXPR_SUGGESTER, true);
+  }
+
+  static String maybeQuote(String s) {
+    if (!needsQuote(s)) {
+      return s;
+    }
+    return quote(s);
+  }
+
+  static String quote(String s) {
+    return "'" + s + "'"; // TODO: handle embedded quotes
+  }
+
+  private static boolean needsQuote(String s) {
+    for (int i = 0, n = s.length(); i < n; i++) {
+      char c = s.charAt(i);
+      if (!Character.isJavaIdentifierPart(c)
+          || c == '$') {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static List<Bson> getAggregateOperations(Aggregate aggregate, RelDataType rowType, MongoGroupScan groupScan) {
+//    List<String> list = new ArrayList<>();
+    List<BsonField> docList = new ArrayList<>();
+    List<String> inNames = mongoFieldNames(rowType);
+    List<String> outNames = mongoFieldNames(aggregate.getRowType());
+    int i = 0;
+    Object id;
+    if (aggregate.getGroupSet().cardinality() == 1) {
+      String inName = inNames.get(aggregate.getGroupSet().nth(0));
+//      list.add("_id: " + maybeQuote("$" + inName));
+//      docList.add(new BsonField("_id",
+//          new BsonDocument(maybeQuote("$" + inName), new BsonString(maybeQuote("$" + inName)))));
+      id = "$" + inName;
+      ++i;
+    } else {
+      List<String> keys = new ArrayList<>();
+      for (int group : aggregate.getGroupSet()) {
+        String inName = inNames.get(group);
+        keys.add(inName + ": " + quote("$" + inName));
+        ++i;
+      }
+//      list.add("_id: " + Util.toString(keys, "{", ", ", "}"));
+//      docList.add(new BsonField("_id", BsonDocument.parse(Util.toString(keys, "{", ", ", "}"))));
+      id = BsonDocument.parse(Util.toString(keys, "{", ", ", "}"));
+    }
+    for (AggregateCall aggCall : aggregate.getAggCallList()) {
+//      list.add(
+//          maybeQuote(outNames.get(i)) + ": "
+//              + toMongo(aggCall.getAggregation(), inNames, aggCall.getArgList()));
+      docList.add(bsonAggregate(inNames, outNames.get(i), aggCall));
+      i++;
+    }
+//    List<Pair<String, String>> aggsList = new ArrayList<>();
+    List<Bson> docAggList = new ArrayList<>();
+//    aggsList.add(Pair.of(null, "{$group: " + Util.toString(list, "{", ", ", "}") + "}"));
+    docAggList.add(Aggregates.group(id, docList).toBsonDocument());
+    List<String> fixups;
+    if (aggregate.getGroupSet().cardinality() == 1) {
+      fixups = new AbstractList<String>() {
+        @Override public String get(int index) {
+          String outName = outNames.get(index);
+          return maybeQuote(outName) + ": "
+              + maybeQuote("$" + (index == 0 ? "_id" : outName));
+        }
+
+        @Override public int size() {
+          return outNames.size();
+        }
+      };
+    } else {
+      fixups = new ArrayList<>();
+      fixups.add("_id: 0");
+      i = 0;
+      for (int group : aggregate.getGroupSet()) {
+        fixups.add(
+            maybeQuote(outNames.get(group))
+                + ": "
+                + maybeQuote("$_id." + outNames.get(group)));
+        ++i;
+      }
+      for (AggregateCall ignored : aggregate.getAggCallList()) {
+        String outName = outNames.get(i++);
+        fixups.add(
+            maybeQuote(outName) + ": " + maybeQuote(
+                "$" + outName));
+      }
+    }
+    if (!aggregate.getGroupSet().isEmpty()) {
+//      aggsList.add(Pair.of(null, "{$project: " + Util.toString(fixups, "{", ", ", "}") + "}"));
+      docAggList.add(new Document("$project", BsonDocument.parse(Util.toString(fixups, "{", ", ", "}") + "}")));
+    }
+
+    List<Bson> allOperations = new ArrayList<>(groupScan.getScanSpec().getOperations());
+//    Pair.right(aggsList).stream()
+//        .map(BsonDocument::parse)
+//        .forEach(allOperations::add);
+    allOperations.addAll(docAggList);
+    return allOperations;
+  }
+
+  private static BsonField bsonAggregate(List<String> inNames, String outName, AggregateCall aggCall) {
+    String aggregationName = aggCall.getAggregation().getName();
+    List<Integer> args = aggCall.getArgList();
+    if (aggregationName.equals(SqlStdOperatorTable.COUNT.getName())) {
+      if (args.size() == 0) {
+        return Accumulators.sum(maybeQuote(outName), 1);
+      } else {
+        assert args.size() == 1;
+        String inName = inNames.get(args.get(0));
+        return Accumulators.sum(maybeQuote(outName),
+            new BsonDocument("$cond",
+                new BsonArray(Arrays.asList(
+                    new Document("$eq",
+                        new BsonArray(Arrays.asList(
+                            new BsonString(quote(inName)), BsonNull.VALUE))).toBsonDocument(),
+                    new BsonInt32(0),
+                    new BsonInt32(1)
+                ))
+            )
+        );
+      }
+    } else {
+      BiFunction<String, Object, BsonField> mongoAccumulator = mongoAccumulator(aggregationName);
+      if (mongoAccumulator != null) {
+        return mongoAccumulator.apply(maybeQuote(outName), maybeQuote("$" + inNames.get(0)));
+      }
+    }
+    return null;
+  }
+
+  private static <T> BiFunction<String, T, BsonField> mongoAccumulator(String aggregationName) {
+    if (aggregationName.equals(SqlStdOperatorTable.SUM.getName())
+        || aggregationName.equals(SqlStdOperatorTable.SUM0.getName())) {
+      return Accumulators::sum;
+    } else if (aggregationName.equals(SqlStdOperatorTable.MIN.getName())) {
+      return Accumulators::min;
+    } else if (aggregationName.equals(SqlStdOperatorTable.MAX.getName())) {
+      return Accumulators::max;
+    } else if (aggregationName.equals(SqlStdOperatorTable.AVG.getName())) {
+      return Accumulators::avg;
+    } else if (aggregationName.equals(SqlStdOperatorTable.FIRST.getName())) {
+      return Accumulators::first;
+    } else if (aggregationName.equals(SqlStdOperatorTable.LAST.getName())) {
+      return Accumulators::last;
+    } else if (aggregationName.equals(SqlStdOperatorTable.STDDEV.getName())
+      || aggregationName.equals(SqlStdOperatorTable.STDDEV_SAMP.getName())) {
+      return Accumulators::stdDevSamp;
+    } else if (aggregationName.equals(SqlStdOperatorTable.STDDEV_POP.getName())) {
+      return Accumulators::stdDevPop;
+    }
+    return null;
+  }
+
+//  private static String mongoAggName(String aggregationName) {
+//    if (aggregationName.equals(SqlStdOperatorTable.SUM.getName())
+//        || aggregationName.equals(SqlStdOperatorTable.SUM0.getName())) {
+//      return "$sum";
+//    } else if (aggregationName.equals(SqlStdOperatorTable.MIN.getName())) {
+//      return "$min";
+//    } else if (aggregationName.equals(SqlStdOperatorTable.MAX.getName())) {
+//      return "$max";
+//    } else if (aggregationName.equals(SqlStdOperatorTable.AVG.getName())) {
+//      return "$avg";
+//    }
+//    return null;
+//  }
+
+//  private static String toMongo(SqlAggFunction aggregation, List<String> inNames,
+//      List<Integer> args) {
+//    if (aggregation.getName().equals(SqlStdOperatorTable.COUNT.getName())) {
+//      if (args.size() == 0) {
+//        return "{$sum: 1}";
+//      } else {
+//        assert args.size() == 1;
+//        final String inName = inNames.get(args.get(0));
+//        return "{$sum: {$cond: [ {$eq: ["
+//            + quote(inName)
+//            + ", null]}, 0, 1]}}";
+//      }
+//    } else {
+//      String mongoAggName = mongoAggName(aggregation.getName());
+//      assert args.size() == 1;
+//      if (mongoAggName != null) {
+//        String inName = inNames.get(args.get(0));
+//        return "{" + mongoAggName + ": " + maybeQuote("$" + inName) + "}";
+//      }
+//      throw new AssertionError("unknown aggregate " + aggregation);
+//    }
+//  }
+
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
index 10720e4..8eecf00 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
@@ -32,7 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MongoFilterBuilder extends
-    AbstractExprVisitor<MongoScanSpec, Void, RuntimeException> implements
+    AbstractExprVisitor<Document, Void, RuntimeException> implements
     DrillMongoConstants {
   private static final Logger logger = LoggerFactory
       .getLogger(MongoFilterBuilder.class);
@@ -46,37 +46,36 @@ public class MongoFilterBuilder extends
     this.le = conditionExp;
   }
 
-  public MongoScanSpec parseTree() {
-    MongoScanSpec parsedSpec = le.accept(this, null);
+  public Document parseTree() {
+    Document parsedSpec = le.accept(this, null);
     if (parsedSpec != null) {
-      parsedSpec = mergeScanSpecs(FunctionNames.AND, this.groupScan.getScanSpec(),
+      parsedSpec = mergeScanSpecs(FunctionNames.AND, null,
           parsedSpec);
     }
     return parsedSpec;
   }
 
-  private MongoScanSpec mergeScanSpecs(String functionName,
-      MongoScanSpec leftScanSpec, MongoScanSpec rightScanSpec) {
+  private Document mergeScanSpecs(String functionName,
+      Document leftScanSpec, Document rightScanSpec) {
     Document newFilter = new Document();
 
     switch (functionName) {
     case FunctionNames.AND:
-      if (leftScanSpec.getFilters() != null
-          && rightScanSpec.getFilters() != null) {
-        newFilter = MongoUtils.andFilterAtIndex(leftScanSpec.getFilters(),
-            rightScanSpec.getFilters());
-      } else if (leftScanSpec.getFilters() != null) {
-        newFilter = leftScanSpec.getFilters();
+      if (leftScanSpec != null
+          && rightScanSpec != null) {
+        newFilter = MongoUtils.andFilterAtIndex(leftScanSpec,
+            rightScanSpec);
+      } else if (leftScanSpec != null) {
+        newFilter = leftScanSpec;
       } else {
-        newFilter = rightScanSpec.getFilters();
+        newFilter = rightScanSpec;
       }
       break;
     case FunctionNames.OR:
-      newFilter = MongoUtils.orFilterAtIndex(leftScanSpec.getFilters(),
-          rightScanSpec.getFilters());
+      newFilter = MongoUtils.orFilterAtIndex(leftScanSpec,
+          rightScanSpec);
     }
-    return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan
-        .getScanSpec().getCollectionName(), newFilter);
+    return newFilter;
   }
 
   public boolean isAllExpressionsConverted() {
@@ -84,16 +83,16 @@ public class MongoFilterBuilder extends
   }
 
   @Override
-  public MongoScanSpec visitUnknown(LogicalExpression e, Void value)
+  public Document visitUnknown(LogicalExpression e, Void value)
       throws RuntimeException {
     allExpressionsConverted = false;
     return null;
   }
 
   @Override
-  public MongoScanSpec visitBooleanOperator(BooleanOperator op, Void value) {
+  public Document visitBooleanOperator(BooleanOperator op, Void value) {
     List<LogicalExpression> args = op.args();
-    MongoScanSpec nodeScanSpec = null;
+    Document nodeScanSpec = null;
     String functionName = op.getName();
     for (int i = 0; i < args.size(); ++i) {
       switch (functionName) {
@@ -102,7 +101,7 @@ public class MongoFilterBuilder extends
         if (nodeScanSpec == null) {
           nodeScanSpec = args.get(i).accept(this, null);
         } else {
-          MongoScanSpec scanSpec = args.get(i).accept(this, null);
+          Document scanSpec = args.get(i).accept(this, null);
           if (scanSpec != null) {
             nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec);
           } else {
@@ -116,9 +115,9 @@ public class MongoFilterBuilder extends
   }
 
   @Override
-  public MongoScanSpec visitFunctionCall(FunctionCall call, Void value)
+  public Document visitFunctionCall(FunctionCall call, Void value)
       throws RuntimeException {
-    MongoScanSpec nodeScanSpec = null;
+    Document nodeScanSpec = null;
     String functionName = call.getName();
     List<LogicalExpression> args = call.args();
 
@@ -138,8 +137,8 @@ public class MongoFilterBuilder extends
       switch (functionName) {
       case FunctionNames.AND:
       case FunctionNames.OR:
-        MongoScanSpec leftScanSpec = args.get(0).accept(this, null);
-        MongoScanSpec rightScanSpec = args.get(1).accept(this, null);
+        Document leftScanSpec = args.get(0).accept(this, null);
+        Document rightScanSpec = args.get(1).accept(this, null);
         if (leftScanSpec != null && rightScanSpec != null) {
           nodeScanSpec = mergeScanSpecs(functionName, leftScanSpec,
               rightScanSpec);
@@ -160,7 +159,7 @@ public class MongoFilterBuilder extends
     return nodeScanSpec;
   }
 
-  private MongoScanSpec createMongoScanSpec(String functionName,
+  private Document createMongoScanSpec(String functionName,
       SchemaPath field, Object fieldValue) throws ClassNotFoundException,
       IOException {
     // extract the field name
@@ -209,8 +208,7 @@ public class MongoFilterBuilder extends
         queryFilter.put(fieldName, new Document(compareOp.getCompareOp(),
             fieldValue));
       }
-      return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan
-          .getScanSpec().getCollectionName(), queryFilter);
+      return queryFilter;
     }
     return null;
   }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index 6662e8c..bc3817e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -50,7 +50,7 @@ import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.mongo.MongoSubScan.MongoSubScanSpec;
+import org.apache.drill.exec.store.mongo.MongoSubScan.ShardedMongoSubScanSpec;
 import org.apache.drill.exec.store.mongo.common.ChunkInfo;
 import org.bson.Document;
 import org.bson.codecs.BsonTypeClassMap;
@@ -79,13 +79,13 @@ import com.mongodb.client.MongoDatabase;
 public class MongoGroupScan extends AbstractGroupScan implements
     DrillMongoConstants {
 
-  private static final Integer select = 1;
+  private static final int SELECT = 1;
 
   private static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
 
-  private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = Comparator.comparingInt(List::size);
+  private static final Comparator<List<BaseMongoSubScanSpec>> LIST_SIZE_COMPARATOR = Comparator.comparingInt(List::size);
 
-  private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
+  private static final Comparator<List<BaseMongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
 
   private MongoStoragePlugin storagePlugin;
 
@@ -95,9 +95,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   private List<SchemaPath> columns;
 
-  private int maxRecords;
-
-  private Map<Integer, List<MongoSubScanSpec>> endpointFragmentMapping;
+  private Map<Integer, List<BaseMongoSubScanSpec>> endpointFragmentMapping;
 
   // Sharding with replica sets contains all the replica server addresses for
   // each chunk.
@@ -107,7 +105,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   private final Stopwatch watch = Stopwatch.createUnstarted();
 
-  private boolean filterPushedDown = false;
+  private boolean useAggregate;
 
   @JsonCreator
   public MongoGroupScan(
@@ -115,21 +113,21 @@ public class MongoGroupScan extends AbstractGroupScan implements
       @JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
       @JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
       @JsonProperty("columns") List<SchemaPath> columns,
-      @JacksonInject StoragePluginRegistry pluginRegistry,
-      @JsonProperty("maxRecords") int maxRecords) throws IOException {
+      @JsonProperty("useAggregate") boolean useAggregate,
+      @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException {
     this(userName,
         pluginRegistry.resolve(storagePluginConfig, MongoStoragePlugin.class),
-        scanSpec, columns, maxRecords);
+        scanSpec, columns, useAggregate);
   }
 
   public MongoGroupScan(String userName, MongoStoragePlugin storagePlugin,
-      MongoScanSpec scanSpec, List<SchemaPath> columns, int maxRecords) throws IOException {
+      MongoScanSpec scanSpec, List<SchemaPath> columns, boolean useAggregate) throws IOException {
     super(userName);
     this.storagePlugin = storagePlugin;
     this.storagePluginConfig = storagePlugin.getConfig();
     this.scanSpec = scanSpec;
     this.columns = columns;
-    this.maxRecords = maxRecords;
+    this.useAggregate = useAggregate;
     init();
   }
 
@@ -148,18 +146,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
     this.chunksMapping = that.chunksMapping;
     this.chunksInverseMapping = that.chunksInverseMapping;
     this.endpointFragmentMapping = that.endpointFragmentMapping;
-    this.filterPushedDown = that.filterPushedDown;
-    this.maxRecords = that.maxRecords;
-  }
-
-  @JsonIgnore
-  public boolean isFilterPushedDown() {
-    return filterPushedDown;
-  }
-
-  @JsonIgnore
-  public void setFilterPushedDown(boolean filterPushedDown) {
-    this.filterPushedDown = filterPushedDown;
+    this.useAggregate = that.useAggregate;
   }
 
   private boolean isShardedCluster(MongoClient client) {
@@ -179,7 +166,9 @@ public class MongoGroupScan extends AbstractGroupScan implements
     MongoClient client = storagePlugin.getClient();
     chunksMapping = Maps.newHashMap();
     chunksInverseMapping = Maps.newLinkedHashMap();
-    if (isShardedCluster(client)) {
+    if (useAggregate && isShardedCluster(client)) {
+      handleUnshardedCollection(getPrimaryShardInfo());
+    } else if (isShardedCluster(client)) {
       MongoDatabase db = client.getDatabase(CONFIG);
       MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS);
       Document filter = new Document();
@@ -190,9 +179,9 @@ public class MongoGroupScan extends AbstractGroupScan implements
                   + this.scanSpec.getCollectionName());
 
       Document projection = new Document();
-      projection.put(SHARD, select);
-      projection.put(MIN, select);
-      projection.put(MAX, select);
+      projection.put(SHARD, SELECT);
+      projection.put(MIN, SELECT);
+      projection.put(MAX, SELECT);
 
       FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection);
       MongoCursor<Document> iterator = chunkCursor.iterator();
@@ -200,7 +189,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
       MongoCollection<Document> shardsCollection = db.getCollection(SHARDS);
 
       projection = new Document();
-      projection.put(HOST, select);
+      projection.put(HOST, SELECT);
 
       boolean hasChunks = false;
       while (iterator.hasNext()) {
@@ -292,7 +281,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
     //Identify the primary shard of the queried database.
     MongoCollection<Document> collection = database.getCollection(DATABASES);
     Bson filter = new Document(ID, this.scanSpec.getDbName());
-    Bson projection = new Document(PRIMARY, select);
+    Bson projection = new Document(PRIMARY, SELECT);
     Document document = Objects.requireNonNull(collection.find(filter).projection(projection).first());
     String shardName = document.getString(PRIMARY);
     Preconditions.checkNotNull(shardName);
@@ -300,7 +289,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
     //Identify the host(s) on which this shard resides.
     MongoCollection<Document> shardsCol = database.getCollection(SHARDS);
     filter = new Document(ID, shardName);
-    projection = new Document(HOST, select);
+    projection = new Document(HOST, SELECT);
     Document hostInfo = Objects.requireNonNull(shardsCol.find(filter).projection(projection).first());
     String hostEntry = hostInfo.getString(HOST);
     Preconditions.checkNotNull(hostEntry);
@@ -361,7 +350,8 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   public GroupScan clone(int maxRecords) {
     MongoGroupScan clone = new MongoGroupScan(this);
-    clone.maxRecords = maxRecords;
+    clone.useAggregate = true;
+    clone.getScanSpec().getOperations().add(new Document("$limit", maxRecords));
     return clone;
   }
 
@@ -409,7 +399,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
       if (slots != null) {
         for (ChunkInfo chunkInfo : chunkEntry.getValue()) {
           Integer slotIndex = slots.poll();
-          List<MongoSubScanSpec> subScanSpecList = endpointFragmentMapping
+          List<BaseMongoSubScanSpec> subScanSpecList = endpointFragmentMapping
               .get(slotIndex);
           subScanSpecList.add(buildSubScanSpecAndGet(chunkInfo));
           slots.offer(slotIndex);
@@ -418,11 +408,11 @@ public class MongoGroupScan extends AbstractGroupScan implements
       }
     }
 
-    PriorityQueue<List<MongoSubScanSpec>> minHeap = new PriorityQueue<>(
+    PriorityQueue<List<BaseMongoSubScanSpec>> minHeap = new PriorityQueue<>(
         numSlots, LIST_SIZE_COMPARATOR);
-    PriorityQueue<List<MongoSubScanSpec>> maxHeap = new PriorityQueue<>(
+    PriorityQueue<List<BaseMongoSubScanSpec>> maxHeap = new PriorityQueue<>(
         numSlots, LIST_SIZE_COMPARATOR_REV);
-    for (List<MongoSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
+    for (List<BaseMongoSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
       if (listOfScan.size() < minPerEndpointSlot) {
         minHeap.offer(listOfScan);
       } else if (listOfScan.size() > minPerEndpointSlot) {
@@ -433,7 +423,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
     if (chunksToAssignSet.size() > 0) {
       for (Entry<String, List<ChunkInfo>> chunkEntry : chunksToAssignSet) {
         for (ChunkInfo chunkInfo : chunkEntry.getValue()) {
-          List<MongoSubScanSpec> smallestList = minHeap.poll();
+          List<BaseMongoSubScanSpec> smallestList = minHeap.poll();
           smallestList.add(buildSubScanSpecAndGet(chunkInfo));
           minHeap.offer(smallestList);
         }
@@ -441,8 +431,8 @@ public class MongoGroupScan extends AbstractGroupScan implements
     }
 
     while (minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) {
-      List<MongoSubScanSpec> smallestList = minHeap.poll();
-      List<MongoSubScanSpec> largestList = maxHeap.poll();
+      List<BaseMongoSubScanSpec> smallestList = minHeap.poll();
+      List<BaseMongoSubScanSpec> largestList = maxHeap.poll();
       smallestList.add(largestList.remove(largestList.size() - 1));
       if (largestList.size() > minPerEndpointSlot) {
         maxHeap.offer(largestList);
@@ -458,16 +448,21 @@ public class MongoGroupScan extends AbstractGroupScan implements
         endpointFragmentMapping.toString());
   }
 
-  private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
-    return new MongoSubScanSpec()
-        .setDbName(scanSpec.getDbName())
-        .setCollectionName(scanSpec.getCollectionName())
-        .setHosts(chunkInfo.getChunkLocList())
+  private BaseMongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
+    if (useAggregate) {
+      return new MongoSubScan.MongoSubScanSpec()
+          .setOperations(scanSpec.getOperations())
+          .setDbName(scanSpec.getDbName())
+          .setCollectionName(scanSpec.getCollectionName())
+          .setHosts(chunkInfo.getChunkLocList());
+    }
+    return new ShardedMongoSubScanSpec()
         .setMinFilters(chunkInfo.getMinFilters())
         .setMaxFilters(chunkInfo.getMaxFilters())
-        .setMaxRecords(maxRecords)
         .setFilter(scanSpec.getFilters())
-        .setAggregates(scanSpec.getAggregates());
+        .setDbName(scanSpec.getDbName())
+        .setCollectionName(scanSpec.getCollectionName())
+        .setHosts(chunkInfo.getChunkLocList());
   }
 
   @Override
@@ -488,18 +483,11 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   @Override
   public ScanStats getScanStats() {
-    long recordCount;
     try{
       MongoClient client = storagePlugin.getClient();
       MongoDatabase db = client.getDatabase(scanSpec.getDbName());
       MongoCollection<Document> collection = db.getCollection(scanSpec.getCollectionName());
-      long numDocs = collection.estimatedDocumentCount();
-
-      if (maxRecords > 0 && numDocs > 0) {
-        recordCount = Math.min(maxRecords, numDocs);
-      } else {
-        recordCount = numDocs;
-      }
+      long recordCount = collection.estimatedDocumentCount();
 
       float approxDiskCost = 0;
       if (recordCount != 0) {
@@ -564,9 +552,6 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   @Override
   public GroupScan applyLimit(int maxRecords) {
-    if (maxRecords == this.maxRecords) {
-      return null;
-    }
     return clone(maxRecords);
   }
 
@@ -586,26 +571,33 @@ public class MongoGroupScan extends AbstractGroupScan implements
     return storagePluginConfig;
   }
 
-  @JsonProperty("maxRecords")
-  public int getMaxRecords() { return maxRecords; }
-
   @JsonIgnore
   public MongoStoragePlugin getStoragePlugin() {
     return storagePlugin;
   }
 
+  @JsonProperty("useAggregate")
+  public void setUseAggregate(boolean useAggregate) {
+    this.useAggregate = useAggregate;
+  }
+
+  @JsonProperty("useAggregate")
+  public boolean isUseAggregate() {
+    return useAggregate;
+  }
+
   @Override
   public String toString() {
     return new PlanStringBuilder(this)
       .field("MongoScanSpec", scanSpec)
       .field("columns", columns)
-      .field("maxRecords", maxRecords)
+      .field("useAggregate", useAggregate)
       .toString();
   }
 
   @VisibleForTesting
   MongoGroupScan() {
-    super((String)null);
+    super((String) null);
   }
 
   @JsonIgnore
@@ -622,7 +614,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   @JsonIgnore
   @VisibleForTesting
-  void setInverseChunsMapping(Map<String, List<ChunkInfo>> chunksInverseMapping) {
+  void setInverseChunksMapping(Map<String, List<ChunkInfo>> chunksInverseMapping) {
     this.chunksInverseMapping = chunksInverseMapping;
   }
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownAggregateForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownAggregateForScan.java
deleted file mode 100644
index f7e1a00..0000000
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownAggregateForScan.java
+++ /dev/null
@@ -1,301 +0,0 @@
-package org.apache.drill.exec.store.mongo;
-
-import org.apache.calcite.avatica.util.DateTimeUtils;
-import org.apache.calcite.linq4j.function.Function1;
-import org.apache.calcite.linq4j.tree.Primitive;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.fun.SqlSumAggFunction;
-import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
-import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.Util;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.planner.common.DrillScanRelBase;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.bson.BsonDocument;
-import org.bson.BsonString;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-import java.io.IOException;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class MongoPushDownAggregateForScan extends StoragePluginOptimizerRule {
-  public static final StoragePluginOptimizerRule INSTANCE = new MongoPushDownAggregateForScan(RelOptHelper.some(Aggregate.class, RelOptHelper.any(DrillScanRelBase.class)), "MongoPushDownAggregateForScan");
-  public static final StoragePluginOptimizerRule PROJ_INSTANCE = new MongoPushDownAggregateForScan(RelOptHelper.some(Aggregate.class, RelOptHelper.some(Project.class, RelOptHelper.any(DrillScanRelBase.class))), "MongoPushDownAggregateForScan_project");
-
-  public MongoPushDownAggregateForScan(RelOptRuleOperand operand, String desc) {
-    super(operand, desc);
-  }
-
-  static List<String> mongoFieldNames(final RelDataType rowType) {
-    return SqlValidatorUtil.uniquify(
-        new AbstractList<String>() {
-          @Override public String get(int index) {
-            final String name = rowType.getFieldList().get(index).getName();
-            return name.startsWith("$") ? "_" + name.substring(2) : name;
-          }
-
-          @Override public int size() {
-            return rowType.getFieldCount();
-          }
-        },
-        SqlValidatorUtil.EXPR_SUGGESTER, true);
-  }
-
-  static String maybeQuote(String s) {
-    if (!needsQuote(s)) {
-      return s;
-    }
-    return quote(s);
-  }
-
-  static String quote(String s) {
-    return "'" + s + "'"; // TODO: handle embedded quotes
-  }
-
-  private static boolean needsQuote(String s) {
-    for (int i = 0, n = s.length(); i < n; i++) {
-      char c = s.charAt(i);
-      if (!Character.isJavaIdentifierPart(c)
-          || c == '$') {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public void onMatch(RelOptRuleCall call) {
-    Aggregate aggregate = call.rel(0);
-    DrillScanRelBase scan = call.rel(1);
-
-    MongoGroupScan groupScan = (MongoGroupScan) scan.getGroupScan();
-
-
-//    implementor.visitChild(0, getInput());
-    List<String> list = new ArrayList<>();
-//    List<BsonDocument> docList = new ArrayList<>();
-    final List<String> inNames =
-        mongoFieldNames(scan.getRowType());
-    final List<String> outNames = mongoFieldNames(aggregate.getRowType());
-    int i = 0;
-    if (aggregate.getGroupSet().cardinality() == 1) {
-      final String inName = inNames.get(aggregate.getGroupSet().nth(0));
-      list.add("_id: " + maybeQuote("$" + inName));
-//      docList.add(new BsonDocument("_id", new BsonString(maybeQuote("$" + inName))));
-      ++i;
-    } else {
-      List<String> keys = new ArrayList<>();
-      for (int group : aggregate.getGroupSet()) {
-        final String inName = inNames.get(group);
-        keys.add(inName + ": " + quote("$" + inName));
-        ++i;
-      }
-      list.add("_id: " + Util.toString(keys, "{", ", ", "}"));
-//      docList.add(new BsonDocument("_id", new BsonString(Util.toString(keys, "{", ", ", "}"))));
-    }
-    for (AggregateCall aggCall : aggregate.getAggCallList()) {
-      list.add(
-          maybeQuote(outNames.get(i++)) + ": "
-              + toMongo(aggCall.getAggregation(), inNames, aggCall.getArgList()));
-    }
-    List<Pair<String, String>> aggsList = new ArrayList<>();
-    aggsList.add(Pair.of(null, "{$group: " + Util.toString(list, "{", ", ", "}") + "}"));
-    final List<String> fixups;
-    if (aggregate.getGroupSet().cardinality() == 1) {
-      fixups = new AbstractList<String>() {
-        @Override public String get(int index) {
-          final String outName = outNames.get(index);
-          return maybeQuote(outName) + ": "
-              + maybeQuote("$" + (index == 0 ? "_id" : outName));
-        }
-
-        @Override public int size() {
-          return outNames.size();
-        }
-      };
-    } else {
-      fixups = new ArrayList<>();
-      fixups.add("_id: 0");
-      i = 0;
-      for (int group : aggregate.getGroupSet()) {
-        fixups.add(
-            maybeQuote(outNames.get(group))
-                + ": "
-                + maybeQuote("$_id." + outNames.get(group)));
-        ++i;
-      }
-      for (AggregateCall ignored : aggregate.getAggCallList()) {
-        final String outName = outNames.get(i++);
-        fixups.add(
-            maybeQuote(outName) + ": " + maybeQuote(
-                "$" + outName));
-      }
-    }
-    if (!aggregate.getGroupSet().isEmpty()) {
-      aggsList.add(Pair.of(null, "{$project: " + Util.toString(fixups, "{", ", ", "}") + "}"));
-    }
-
-    MongoScanSpec mongoScanSpec = aggregate(groupScan.getScanSpec(), Pair.right(aggsList));
-    try {
-      List<SchemaPath> columns = outNames.stream()
-          .map(SchemaPath::getSimplePath)
-          .collect(Collectors.toList());
-      MongoGroupScan mongoScanSpec123 = new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
-          mongoScanSpec, columns, groupScan.getMaxRecords());
-      call.transformTo(scan.copy(aggregate.getTraitSet(), mongoScanSpec123, aggregate.getRowType()));
-    } catch (IOException e) {
-      throw new DrillRuntimeException(e.getMessage(), e);
-    }
-  }
-
-  private static String toMongo(SqlAggFunction aggregation, List<String> inNames,
-      List<Integer> args) {
-    if (aggregation.getName().equals(SqlStdOperatorTable.COUNT.getName())) {
-      if (args.size() == 0) {
-//        Aggregates.count()
-        return "{$sum: 1}";
-      } else {
-        assert args.size() == 1;
-//        Arrays.asList(
-//            Aggregates.match(Filters.eq("languages.name", "English")),
-//            Aggregates.count())
-        final String inName = inNames.get(args.get(0));
-        return "{$sum: {$cond: [ {$eq: ["
-            + quote(inName)
-            + ", null]}, 0, 1]}}";
-      }
-    } else if (aggregation instanceof SqlSumAggFunction
-        || aggregation instanceof SqlSumEmptyIsZeroAggFunction) {
-      assert args.size() == 1;
-      final String inName = inNames.get(args.get(0));
-      return "{$sum: " + maybeQuote("$" + inName) + "}";
-    } else if (aggregation.getName().equals(SqlStdOperatorTable.MIN.getName())) {
-      assert args.size() == 1;
-      final String inName = inNames.get(args.get(0));
-      return "{$min: " + maybeQuote("$" + inName) + "}";
-    } else if (aggregation.getName().equals(SqlStdOperatorTable.MAX.getName())) {
-      assert args.size() == 1;
-      final String inName = inNames.get(args.get(0));
-      return "{$max: " + maybeQuote("$" + inName) + "}";
-    } else if (aggregation.getName().equals(SqlStdOperatorTable.AVG.getName())) {
-      assert args.size() == 1;
-      final String inName = inNames.get(args.get(0));
-      return "{$avg: " + maybeQuote("$" + inName) + "}";
-    } else {
-      throw new AssertionError("unknown aggregate " + aggregation);
-    }
-  }
-
-  private MongoScanSpec aggregate(MongoScanSpec scanSpec,
-      final List<String> operations) {
-    final List<Bson> list = new ArrayList<>();
-    for (String operation : operations) {
-      list.add(BsonDocument.parse(operation));
-    }
-    return new MongoScanSpec(scanSpec.getDbName(), scanSpec.getCollectionName(),
-        scanSpec.getFilters(), list);
-//    final Function1<Document, Object> getter =
-//        getter(fields);
-//    return new AbstractEnumerable<Object>() {
-//      @Override public Enumerator<Object> enumerator() {
-//        final Iterator<Document> resultIterator;
-//        try {
-//          resultIterator = mongoDb.getCollection(scanSpec.getCollectionName())
-//              .aggregate(list).iterator();
-//        } catch (Exception e) {
-//          throw new RuntimeException("While running MongoDB query "
-//              + Util.toString(operations, "[", ",\n", "]"), e);
-//        }
-//        return new MongoEnumerator(resultIterator, getter);
-//      }
-//    };
-  }
-
-  static Function1<Document, Map> mapGetter() {
-    return a0 -> (Map) a0;
-  }
-
-  /** Returns a function that projects a single field. */
-  static Function1<Document, Object> singletonGetter(final String fieldName,
-      final Class fieldClass) {
-    return a0 -> convert(a0.get(fieldName), fieldClass);
-  }
-
-  /** Returns a function that projects fields.
-   *
-   * @param fields List of fields to project; or null to return map
-   */
-  static Function1<Document, Object[]> listGetter(
-      final List<Map.Entry<String, Class>> fields) {
-    return a0 -> {
-      Object[] objects = new Object[fields.size()];
-      for (int i = 0; i < fields.size(); i++) {
-        final Map.Entry<String, Class> field = fields.get(i);
-        final String name = field.getKey();
-        objects[i] = convert(a0.get(name), field.getValue());
-      }
-      return objects;
-    };
-  }
-
-  static Function1<Document, Object> getter(
-      List<Map.Entry<String, Class>> fields) {
-    //noinspection unchecked
-    return fields == null
-        ? (Function1) mapGetter()
-        : fields.size() == 1
-        ? singletonGetter(fields.get(0).getKey(), fields.get(0).getValue())
-        : (Function1) listGetter(fields);
-  }
-
-  @SuppressWarnings("JavaUtilDate")
-  private static Object convert(Object o, Class clazz) {
-    if (o == null) {
-      return null;
-    }
-    Primitive primitive = Primitive.of(clazz);
-    if (primitive != null) {
-      clazz = primitive.boxClass;
-    } else {
-      primitive = Primitive.ofBox(clazz);
-    }
-    if (clazz.isInstance(o)) {
-      return o;
-    }
-    if (o instanceof Date && primitive != null) {
-      o = ((Date) o).getTime() / DateTimeUtils.MILLIS_PER_DAY;
-    }
-    if (o instanceof Number && primitive != null) {
-      return primitive.number((Number) o);
-    }
-    return o;
-  }
-
-  //$addToSet
-  //$avg
-  //$first
-  //$last
-  //$max
-  //$min
-  //$mergeObjects
-  //$push
-  //$stdDevPop
-  //$stdDevSamp
-  //$sum
-}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
index b1c06e7..8dcd9d0 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rex.RexNode;
+import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,15 +54,12 @@ public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
     final RexNode condition = filter.getCondition();
 
     MongoGroupScan groupScan = (MongoGroupScan) scan.getGroupScan();
-    if (groupScan.isFilterPushedDown()) {
-      return;
-    }
 
     LogicalExpression conditionExp = DrillOptiq.toDrill(
         new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
     MongoFilterBuilder mongoFilterBuilder = new MongoFilterBuilder(groupScan,
         conditionExp);
-    MongoScanSpec newScanSpec = mongoFilterBuilder.parseTree();
+    Document newScanSpec = mongoFilterBuilder.parseTree();
     if (newScanSpec == null) {
       return; // no filter pushdown so nothing to apply.
     }
@@ -69,12 +67,11 @@ public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
     MongoGroupScan newGroupsScan;
     try {
       newGroupsScan = new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
-          newScanSpec, groupScan.getColumns(), groupScan.getMaxRecords());
+          null, groupScan.getColumns(), groupScan.isUseAggregate());
     } catch (IOException e) {
       logger.error(e.getMessage(), e);
       throw new DrillRuntimeException(e.getMessage(), e);
     }
-    newGroupsScan.setFilterPushedDown(true);
 
     RelNode newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan, filter.getRowType());
 
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 7c4f3f2..5cb007f 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -26,7 +26,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoIterable;
 import com.mongodb.client.model.Aggregates;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -69,7 +69,7 @@ public class MongoRecordReader extends AbstractRecordReader {
   private VectorContainerWriter writer;
 
   private Document filters;
-  private List<Bson> aggregates;
+  private List<Bson> operations;
   private final Document fields;
 
   private final FragmentContext fragmentContext;
@@ -81,9 +81,8 @@ public class MongoRecordReader extends AbstractRecordReader {
   private final boolean readNumbersAsDouble;
   private boolean unionEnabled;
   private final boolean isBsonRecordReader;
-  private final int maxRecords;
 
-  public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
+  public MongoRecordReader(BaseMongoSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
       FragmentContext context, MongoStoragePlugin plugin) {
 
     fields = new Document();
@@ -93,16 +92,19 @@ public class MongoRecordReader extends AbstractRecordReader {
     fragmentContext = context;
     this.plugin = plugin;
     filters = new Document();
-    aggregates = subScanSpec.aggregates;
-    Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
-        subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
+    if (subScanSpec instanceof MongoSubScan.MongoSubScanSpec) {
+      operations = ((MongoSubScan.MongoSubScanSpec) subScanSpec).getOperations();
+    } else {
+      MongoSubScan.ShardedMongoSubScanSpec shardedMongoSubScanSpec = (MongoSubScan.ShardedMongoSubScanSpec) subScanSpec;
+      Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
+          shardedMongoSubScanSpec.getMinFilters(), shardedMongoSubScanSpec.getMaxFilters());
 
-    buildFilters(subScanSpec.getFilter(), mergedFilters);
+      buildFilters(shardedMongoSubScanSpec.getFilter(), mergedFilters);
+    }
     enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
     enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val;
     readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
     isBsonRecordReader = fragmentContext.getOptions().getOption(ExecConstants.MONGO_BSON_RECORD_READER).bool_val;
-    maxRecords = subScanSpec.getMaxRecords();
     logger.debug("BsonRecordReader is enabled? " + isBsonRecordReader);
     init(subScanSpec);
   }
@@ -145,7 +147,7 @@ public class MongoRecordReader extends AbstractRecordReader {
     }
   }
 
-  private void init(MongoSubScan.MongoSubScanSpec subScanSpec) {
+  private void init(BaseMongoSubScanSpec subScanSpec) {
     List<String> hosts = subScanSpec.getHosts();
     List<ServerAddress> addresses = Lists.newArrayList();
     for (String host : hosts) {
@@ -183,25 +185,15 @@ public class MongoRecordReader extends AbstractRecordReader {
       logger.debug("Filters Applied : " + filters);
       logger.debug("Fields Selected :" + fields);
 
-      if (CollectionUtils.isNotEmpty(aggregates)) {
-        List<Bson> operations = new ArrayList<>();
-        operations.add(Aggregates.match(filters));
-        operations.addAll(aggregates);
+      MongoIterable<BsonDocument> projection;
+      if (CollectionUtils.isNotEmpty(operations)) {
+        List<Bson> operations = new ArrayList<>(this.operations);
         operations.add(Aggregates.project(fields));
-        if (maxRecords > 0) {
-          operations.add(Aggregates.limit(maxRecords));
-        }
-        cursor = collection.aggregate(operations).batchSize(100).iterator();
+        projection = collection.aggregate(operations);
       } else {
-        // Add limit to Mongo query
-        FindIterable<BsonDocument> projection = collection.find(filters).projection(fields);
-        if (maxRecords > 0) {
-          logger.debug("Limit applied: {}", maxRecords);
-          projection = projection.limit(maxRecords);
-        }
-
-        cursor = projection.batchSize(100).iterator();
+        projection = collection.find(filters).projection(fields);
       }
+      cursor = projection.batchSize(100).iterator();
     }
 
     writer.allocate();
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
index f97e771..350cb28 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -42,22 +42,20 @@ public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
       List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = new LinkedList<>();
-    List<SchemaPath> columns = null;
-    for (MongoSubScan.MongoSubScanSpec scanSpec : subScan
-        .getChunkScanSpecList()) {
+    for (BaseMongoSubScanSpec scanSpec : subScan.getChunkScanSpecList()) {
       try {
-        if ((columns = subScan.getColumns()) == null) {
+        List<SchemaPath> columns = subScan.getColumns();
+        if (columns == null) {
           columns = GroupScan.ALL_COLUMNS;
         }
         readers.add(new MongoRecordReader(scanSpec, columns, context, subScan.getMongoStoragePlugin()));
       } catch (Exception e) {
-        logger.error("MongoRecordReader creation failed for subScan:  "
-            + subScan + ".");
+        logger.error("MongoRecordReader creation failed for subScan: {}.", subScan);
         logger.error(e.getMessage(), e);
         throw new ExecutionSetupException(e);
       }
     }
-    logger.info("Number of record readers initialized : " + readers.size());
+    logger.info("Number of record readers initialized : {}", readers.size());
     return new ScanBatch(subScan, context, readers);
   }
 
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
index 7ec1210..17b548b 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
@@ -17,14 +17,15 @@
  */
 package org.apache.drill.exec.store.mongo;
 
+import org.apache.drill.common.PlanStringBuilder;
 import org.bson.Document;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.bson.conversions.Bson;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.StringJoiner;
 
 public class MongoScanSpec {
   private final String dbName;
@@ -32,7 +33,7 @@ public class MongoScanSpec {
 
   private Document filters;
 
-  private List<Bson> aggregates;
+  private List<Bson> operations = new ArrayList<>();
 
   @JsonCreator
   public MongoScanSpec(@JsonProperty("dbName") String dbName,
@@ -42,18 +43,11 @@ public class MongoScanSpec {
   }
 
   public MongoScanSpec(String dbName, String collectionName,
-      Document filters) {
+      Document filters, List<Bson> operations) {
     this.dbName = dbName;
     this.collectionName = collectionName;
     this.filters = filters;
-  }
-
-  public MongoScanSpec(String dbName, String collectionName,
-      Document filters, List<Bson> aggregates) {
-    this.dbName = dbName;
-    this.collectionName = collectionName;
-    this.filters = filters;
-    this.aggregates = aggregates;
+    this.operations = operations;
   }
 
   public String getDbName() {
@@ -68,17 +62,17 @@ public class MongoScanSpec {
     return filters;
   }
 
-  public List<Bson> getAggregates() {
-    return aggregates;
+  public List<Bson> getOperations() {
+    return operations;
   }
 
   @Override
   public String toString() {
-    return new StringJoiner(", ", MongoScanSpec.class.getSimpleName() + "[", "]")
-        .add("dbName='" + dbName + "'")
-        .add("collectionName='" + collectionName + "'")
-        .add("filters=" + filters)
-        .add("aggregates=" + aggregates)
+    return new PlanStringBuilder(this)
+        .field("dbName", dbName)
+        .field("collectionName", collectionName)
+        .field("filters", filters)
+        .field("operations", operations)
         .toString();
   }
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index f6c3ac2..6262711 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -21,44 +21,42 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.mongodb.ConnectionString;
 import com.mongodb.MongoClientSettings;
-import com.mongodb.client.MongoClient;
 import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
-import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.Convention;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.PlannableStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.mongo.plan.MongoPluginRulesProvider;
 import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory;
-import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
 import org.apache.drill.exec.store.security.HadoopCredentialsProvider;
-import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.shaded.guava.com.google.common.cache.Cache;
 import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
 import org.apache.drill.shaded.guava.com.google.common.cache.RemovalListener;
 import org.apache.drill.shaded.guava.com.google.common.cache.RemovalNotification;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URLEncoder;
-import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-public class MongoStoragePlugin extends AbstractStoragePlugin {
+public class MongoStoragePlugin extends PlannableStoragePlugin implements StoragePlugin {
   private static final Logger logger = LoggerFactory.getLogger(MongoStoragePlugin.class);
 
   private final MongoStoragePluginConfig mongoConfig;
@@ -70,7 +68,7 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
       MongoStoragePluginConfig mongoConfig,
       DrillbitContext context,
       String name) {
-    super(context, name);
+    super(mongoStoragePluginBuilder(name).context(context));
     this.mongoConfig = mongoConfig;
     String connection = addCredentialsFromCredentialsProvider(this.mongoConfig.getConnection(), name);
     this.clientURI = new ConnectionString(connection);
@@ -81,6 +79,20 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
     this.schemaFactory = new MongoSchemaFactory(this, name);
   }
 
+  private static MongoStoragePluginConfigs mongoStoragePluginBuilder(String name) {
+    Convention convention = new Convention.Impl("MONGO." + name, PluginRel.class);
+    return new MongoStoragePluginConfigs()
+        .rulesProvider(new MongoPluginRulesProvider(convention))
+        .supportsProjectPushdown(true)
+        .supportsSortPushdown(true)
+        .supportsAggregatePushdown(true)
+        .supportsFilterPushdown(true)
+        .supportsLimitPushdown(true)
+        .supportsUnionPushdown(true)
+        .convention(convention)
+        .name(name);
+  }
+
   private String addCredentialsFromCredentialsProvider(String connection, String name) {
     ConnectionString parsed = new ConnectionString(connection);
     if (parsed.getCredential() == null) {
@@ -134,26 +146,16 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
     MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<MongoScanSpec>() {
     });
-    return new MongoGroupScan(userName, this, mongoScanSpec, null, -1);
+    return new MongoGroupScan(userName, this, mongoScanSpec, null, false);
   }
 
-  @Override
-  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
-    switch (phase) {
-      case PHYSICAL:
-      case LOGICAL:
-        return ImmutableSet.of(MongoPushDownFilterForScan.INSTANCE,
-            MongoPushDownAggregateForScan.INSTANCE);
-      case LOGICAL_PRUNE_AND_JOIN:
-      case LOGICAL_PRUNE:
-      case PARTITION_PRUNING:
-      case JOIN_PLANNING:
-      default:
-        return Collections.emptySet();
+  private static class MongoStoragePluginConfigs extends PlannableStoragePluginConfigs<MongoStoragePluginConfigs> {
+    @Override
+    public MongoStoragePluginConfigs self() {
+      return this;
     }
   }
 
-
   private static class AddressCloser implements
     RemovalListener<MongoCnxnKey, MongoClient> {
     @Override
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index a32336d..692a939 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -32,7 +32,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.bson.Document;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -40,6 +39,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.bson.Document;
 import org.bson.conversions.Bson;
 
 @JsonTypeName("mongo-shard-read")
@@ -53,14 +53,14 @@ public class MongoSubScan extends AbstractBase implements SubScan {
   private final MongoStoragePlugin mongoStoragePlugin;
   private final List<SchemaPath> columns;
 
-  private final List<MongoSubScanSpec> chunkScanSpecList;
+  private final List<BaseMongoSubScanSpec> chunkScanSpecList;
 
   @JsonCreator
   public MongoSubScan(
       @JacksonInject StoragePluginRegistry registry,
       @JsonProperty("userName") String userName,
       @JsonProperty("mongoPluginConfig") StoragePluginConfig mongoPluginConfig,
-      @JsonProperty("chunkScanSpecList") LinkedList<MongoSubScanSpec> chunkScanSpecList,
+      @JsonProperty("chunkScanSpecList") LinkedList<BaseMongoSubScanSpec> chunkScanSpecList,
       @JsonProperty("columns") List<SchemaPath> columns)
       throws ExecutionSetupException {
     super(userName);
@@ -73,7 +73,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
 
   public MongoSubScan(String userName, MongoStoragePlugin storagePlugin,
       MongoStoragePluginConfig storagePluginConfig,
-      List<MongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
+      List<BaseMongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
     super(userName);
     this.mongoStoragePlugin = storagePlugin;
     this.mongoPluginConfig = storagePluginConfig;
@@ -101,13 +101,12 @@ public class MongoSubScan extends AbstractBase implements SubScan {
     return columns;
   }
 
-  public List<MongoSubScanSpec> getChunkScanSpecList() {
+  public List<BaseMongoSubScanSpec> getChunkScanSpecList() {
     return chunkScanSpecList;
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
-      throws ExecutionSetupException {
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
     return new MongoSubScan(getUserName(), mongoStoragePlugin, mongoPluginConfig,
         chunkScanSpecList, columns);
@@ -123,80 +122,34 @@ public class MongoSubScan extends AbstractBase implements SubScan {
     return Collections.emptyIterator();
   }
 
-  public static class MongoSubScanSpec {
+  @JsonTypeName("ShardedMongoSubScanSpec")
+  public static class ShardedMongoSubScanSpec extends BaseMongoSubScanSpec {
 
-    protected String dbName;
-    protected String collectionName;
-    protected List<String> hosts;
     protected Map<String, Object> minFilters;
     protected Map<String, Object> maxFilters;
-    protected int maxRecords;
-
     protected Document filter;
-    protected List<Bson> aggregates;
 
     @JsonCreator
-    public MongoSubScanSpec(@JsonProperty("dbName") String dbName,
+    public ShardedMongoSubScanSpec(@JsonProperty("dbName") String dbName,
         @JsonProperty("collectionName") String collectionName,
         @JsonProperty("hosts") List<String> hosts,
         @JsonProperty("minFilters") Map<String, Object> minFilters,
         @JsonProperty("maxFilters") Map<String, Object> maxFilters,
-        @JsonProperty("filters") Document filters,
-        @JsonProperty("aggregates") List<Bson> aggregates,
-        @JsonProperty("maxRecords") int maxRecords) {
-      this.dbName = dbName;
-      this.collectionName = collectionName;
-      this.hosts = hosts;
+        @JsonProperty("filters") Document filters) {
+      super(dbName, collectionName, hosts);
       this.minFilters = minFilters;
       this.maxFilters = maxFilters;
       this.filter = filters;
-      this.aggregates = aggregates;
-      this.maxRecords = maxRecords;
-    }
-
-    MongoSubScanSpec() {
-
-    }
-
-    public String getDbName() {
-      return dbName;
-    }
-
-    public MongoSubScanSpec setDbName(String dbName) {
-      this.dbName = dbName;
-      return this;
-    }
-
-    public String getCollectionName() {
-      return collectionName;
     }
 
-    public MongoSubScanSpec setCollectionName(String collectionName) {
-      this.collectionName = collectionName;
-      return this;
-    }
-
-    public List<String> getHosts() {
-      return hosts;
-    }
-
-    public MongoSubScanSpec setHosts(List<String> hosts) {
-      this.hosts = hosts;
-      return this;
-    }
-
-    public int getMaxRecords() { return maxRecords; }
-
-    public MongoSubScanSpec setMaxRecords (int maxRecords) {
-      this.maxRecords = maxRecords;
-      return this;
+    ShardedMongoSubScanSpec() {
     }
 
     public Map<String, Object> getMinFilters() {
       return minFilters;
     }
 
-    public MongoSubScanSpec setMinFilters(Map<String, Object> minFilters) {
+    public ShardedMongoSubScanSpec setMinFilters(Map<String, Object> minFilters) {
       this.minFilters = minFilters;
       return this;
     }
@@ -205,7 +158,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
       return maxFilters;
     }
 
-    public MongoSubScanSpec setMaxFilters(Map<String, Object> maxFilters) {
+    public ShardedMongoSubScanSpec setMaxFilters(Map<String, Object> maxFilters) {
       this.maxFilters = maxFilters;
       return this;
     }
@@ -214,16 +167,11 @@ public class MongoSubScan extends AbstractBase implements SubScan {
       return filter;
     }
 
-    public MongoSubScanSpec setFilter(Document filter) {
+    public ShardedMongoSubScanSpec setFilter(Document filter) {
       this.filter = filter;
       return this;
     }
 
-    public MongoSubScanSpec setAggregates(List<Bson> aggregates) {
-      this.aggregates = aggregates;
-      return this;
-    }
-
     @Override
     public String toString() {
       return new PlanStringBuilder(this)
@@ -233,10 +181,45 @@ public class MongoSubScan extends AbstractBase implements SubScan {
         .field("minFilters", minFilters)
         .field("maxFilters", maxFilters)
         .field("filter", filter)
-        .field("aggregates", aggregates)
-        .field("maxRecords", maxRecords)
         .toString();
+    }
+
+  }
+
+  @JsonTypeName("MongoSubScanSpec")
+  public static class MongoSubScanSpec extends BaseMongoSubScanSpec {
 
+    protected List<Bson> operations;
+
+    @JsonCreator
+    public MongoSubScanSpec(@JsonProperty("dbName") String dbName,
+        @JsonProperty("collectionName") String collectionName,
+        @JsonProperty("hosts") List<String> hosts,
+        @JsonProperty("operations") List<Bson> operations) {
+      super(dbName, collectionName, hosts);
+      this.operations = operations;
+    }
+
+    MongoSubScanSpec() {
+    }
+
+    public List<Bson> getOperations() {
+      return operations;
+    }
+
+    public MongoSubScanSpec setOperations(List<Bson> operations) {
+      this.operations = operations;
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return new PlanStringBuilder(this)
+          .field("dbName", dbName)
+          .field("collectionName", collectionName)
+          .field("hosts", hosts)
+          .field("operations", operations)
+          .toString();
     }
 
   }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
new file mode 100644
index 0000000..2247683
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
@@ -0,0 +1,206 @@
+package org.apache.drill.exec.store.mongo.plan;
+
+import com.mongodb.client.model.Aggregates;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.util.Util;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.mongo.MongoFilterBuilder;
+import org.apache.drill.exec.store.mongo.MongoGroupScan;
+import org.apache.drill.exec.store.mongo.MongoAggregateUtils;
+import org.apache.drill.exec.store.mongo.MongoScanSpec;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+import org.apache.drill.exec.util.Utilities;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MongoPluginImplementor implements PluginImplementor {
+  private MongoGroupScan groupScan;
+  private List<Bson> operations;
+  private Document filters;
+  private List<SchemaPath> columns;
+
+  private boolean runAggregate;
+
+  @Override
+  public void implement(PluginAggregateRel aggregate) throws IOException {
+    runAggregate = true;
+    visitChild(aggregate.getInput());
+
+    operations.addAll(
+        MongoAggregateUtils.getAggregateOperations(aggregate, aggregate.getRowType(), groupScan));
+    List<String> outNames = MongoAggregateUtils.mongoFieldNames(aggregate.getRowType());
+    columns = outNames.stream()
+          .map(SchemaPath::getSimplePath)
+          .collect(Collectors.toList());
+  }
+
+  @Override
+  public void implement(PluginFilterRel filter) throws IOException {
+    visitChild(filter.getInput());
+
+    LogicalExpression conditionExp = DrillOptiq.toDrill(
+        new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())), filter.getInput(), filter.getCondition());
+    MongoFilterBuilder mongoFilterBuilder = new MongoFilterBuilder(groupScan, conditionExp);
+    if (runAggregate) {
+      Bson convertedFilterExpression = Aggregates.match(mongoFilterBuilder.parseTree()).toBsonDocument();
+      operations.add(convertedFilterExpression);
+    } else {
+      filters = mongoFilterBuilder.parseTree();
+    }
+  }
+
+  @Override
+  public void implement(PluginLimitRel limit) throws IOException {
+    runAggregate = true;
+    visitChild(limit.getInput());
+
+    if (limit.getOffset() != null) {
+      operations.add(
+          Aggregates.skip(((BigDecimal) ((RexLiteral) limit.getOffset()).getValue()).intValue()).toBsonDocument());
+    }
+    if (limit.getFetch() != null) {
+      operations.add(
+          Aggregates.limit(((BigDecimal) ((RexLiteral) limit.getFetch()).getValue()).intValue()).toBsonDocument());
+    }
+  }
+
+  @Override
+  public void implement(PluginProjectRel project) throws IOException {
+    visitChild(project.getInput());
+
+//    final MongoRules.RexToMongoTranslator translator =
+//        new MongoRules.RexToMongoTranslator(
+//            (JavaTypeFactory) project.getCluster().getTypeFactory(),
+//            MongoRules.mongoFieldNames(project.getInput().getRowType()));
+//    final List<String> items = new ArrayList<>();
+//    for (Pair<RexNode, String> pair : project.getNamedProjects()) {
+//      final String name = pair.right;
+//      final String expr = pair.left.accept(translator);
+//      items.add(expr.equals("'$" + name + "'")
+//          ? MongoRules.maybeQuote(name) + ": 1"
+//          : MongoRules.maybeQuote(name) + ": " + expr);
+//    }
+//    final String findString = Util.toString(items, "{", ", ", "}");
+//    final String aggregateString = "{$project: " + findString + "}";
+//    final Pair<String, String> op = Pair.of(findString, aggregateString);
+//    implementor.add(op.left, op.right);
+    final List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType());
+    this.columns = outNames.stream()
+        .map(SchemaPath::getSimplePath)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void implement(PluginSortRel sort) throws IOException {
+    runAggregate = true;
+    visitChild(sort.getInput());
+
+    if (!sort.collation.getFieldCollations().isEmpty()) {
+      final List<String> keys = new ArrayList<>();
+      final List<RelDataTypeField> fields = sort.getRowType().getFieldList();
+      for (RelFieldCollation fieldCollation : sort.collation.getFieldCollations()) {
+        final String name =
+            fields.get(fieldCollation.getFieldIndex()).getName();
+        keys.add(name + ": " + direction(fieldCollation));
+        if (false) {
+          // TODO: NULLS FIRST and NULLS LAST
+          switch (fieldCollation.nullDirection) {
+            case FIRST:
+              break;
+            case LAST:
+              break;
+            default:
+              break;
+          }
+        }
+      }
+
+      operations.add(
+          Aggregates.sort(BsonDocument.parse(Util.toString(keys, "{", ", ", "}"))).toBsonDocument());
+    }
+    if (sort.offset != null) {
+      operations.add(
+          Aggregates.skip(((BigDecimal) ((RexLiteral) sort.offset).getValue()).intValue()).toBsonDocument());
+    }
+    if (sort.fetch != null) {
+      operations.add(
+          Aggregates.limit(((BigDecimal) ((RexLiteral) sort.fetch).getValue()).intValue()).toBsonDocument());
+    }
+  }
+
+  @Override
+  public void implement(PluginUnionRel union) throws IOException {
+    runAggregate = true;
+
+    MongoPluginImplementor childImplementor = copy();
+    childImplementor.runAggregate = true;
+
+    boolean firstProcessed = false;
+    for (RelNode input : union.getInputs()) {
+      if (!firstProcessed) {
+        this.visitChild(input);
+        firstProcessed = true;
+      } else {
+        childImplementor.visitChild(input);
+        operations.add(
+            Aggregates.unionWith(childImplementor.groupScan.getScanSpec().getCollectionName(), childImplementor.operations).toBsonDocument()
+        );
+      }
+    }
+  }
+
+  @Override
+  public void implement(StoragePluginTableScan scan) throws IOException {
+    groupScan = (MongoGroupScan) Utilities.getDrillTable(scan.getTable()).getGroupScan();
+    operations = new ArrayList<>(this.groupScan.getScanSpec().getOperations());
+    filters = groupScan.getScanSpec().getFilters();
+    columns = groupScan.getColumns();
+  }
+
+  @Override
+  public MongoPluginImplementor copy() {
+    return new MongoPluginImplementor();
+  }
+
+  @Override
+  public GroupScan getPhysicalOperator() throws IOException {
+    MongoScanSpec scanSpec = groupScan.getScanSpec();
+    MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(), scanSpec.getCollectionName(), filters, operations);
+    return new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
+        newSpec, columns, runAggregate);
+  }
+
+  public static int direction(RelFieldCollation fieldCollation) {
+    switch (fieldCollation.getDirection()) {
+      case DESCENDING:
+      case STRICTLY_DESCENDING:
+        return -1;
+      case ASCENDING:
+      case STRICTLY_ASCENDING:
+      default:
+        return 1;
+    }
+  }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginRulesProvider.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginRulesProvider.java
new file mode 100644
index 0000000..06319b6
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginRulesProvider.java
@@ -0,0 +1,76 @@
+package org.apache.drill.exec.store.mongo.plan;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.store.PluginRulesProvider;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
+import org.apache.drill.exec.store.plan.rule.PluginAggregateRule;
+import org.apache.drill.exec.store.plan.rule.PluginFilterRule;
+import org.apache.drill.exec.store.plan.rule.PluginIntermediatePrelConverterRule;
+import org.apache.drill.exec.store.plan.rule.PluginLimitRule;
+import org.apache.drill.exec.store.plan.rule.PluginProjectRule;
+import org.apache.drill.exec.store.plan.rule.PluginSortRule;
+import org.apache.drill.exec.store.plan.rule.PluginUnionRule;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class MongoPluginRulesProvider implements PluginRulesProvider {
+  private final Convention convention;
+
+  public MongoPluginRulesProvider(Convention convention) {
+    this.convention = convention;
+  }
+
+  public List<RelOptRule> sortRules() {
+    return Arrays.asList(
+        new PluginSortRule(Convention.NONE, convention),
+        new PluginSortRule(DrillRel.DRILL_LOGICAL, convention)
+    );
+  }
+
+  public List<RelOptRule> limitRules() {
+    return Arrays.asList(
+        new PluginLimitRule(Convention.NONE, convention),
+        new PluginLimitRule(DrillRel.DRILL_LOGICAL, convention)
+    );
+  }
+
+  public List<RelOptRule> filterRules() {
+    return Arrays.asList(
+        new PluginFilterRule(Convention.NONE, convention),
+        new PluginFilterRule(DrillRel.DRILL_LOGICAL, convention)
+    );
+  }
+
+  public List<RelOptRule> projectRules() {
+    return Arrays.asList(
+        new PluginProjectRule(Convention.NONE, convention),
+        new PluginProjectRule(DrillRel.DRILL_LOGICAL, convention)
+    );
+  }
+
+  public List<RelOptRule> aggregateRules() {
+    return Arrays.asList(
+        new PluginAggregateRule(Convention.NONE, convention),
+        new PluginAggregateRule(DrillRel.DRILL_LOGICAL, convention)
+    );
+  }
+
+  public List<RelOptRule> unionRules() {
+    return Arrays.asList(
+        new PluginUnionRule(Convention.NONE, convention),
+        new PluginUnionRule(DrillRel.DRILL_LOGICAL, convention)
+    );
+  }
+
+  public RelOptRule vertexRule() {
+    return new VertexDrelConverterRule(convention);
+  }
+
+  @Override
+  public RelOptRule prelConverterRule() {
+    return new PluginIntermediatePrelConverterRule(MongoPluginImplementor::new);
+  }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java
new file mode 100644
index 0000000..fddfa7c
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo.plan;
+
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Bug;
+import org.apache.calcite.util.Util;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
+import org.apache.drill.exec.store.plan.rule.PluginIntermediatePrelConverterRule;
+import org.apache.drill.exec.store.plan.rule.PluginAggregateRule;
+import org.apache.drill.exec.store.plan.rule.PluginFilterRule;
+import org.apache.drill.exec.store.plan.rule.PluginLimitRule;
+import org.apache.drill.exec.store.plan.rule.PluginProjectRule;
+import org.apache.drill.exec.store.plan.rule.PluginSortRule;
+import org.apache.drill.exec.store.plan.rule.PluginUnionRule;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MongoRules {
+
+  public List<RelOptRule> sortRules(Convention out) {
+    return Arrays.asList(
+        new PluginSortRule(Convention.NONE, out),
+        new PluginSortRule(DrillRel.DRILL_LOGICAL, out)
+    );
+  }
+
+  public List<RelOptRule> limitRules(Convention out) {
+    return Arrays.asList(
+        new PluginLimitRule(Convention.NONE, out),
+        new PluginLimitRule(DrillRel.DRILL_LOGICAL, out)
+    );
+  }
+
+  public List<RelOptRule> filterRules(Convention out) {
+    return Arrays.asList(
+        new PluginFilterRule(Convention.NONE, out),
+        new PluginFilterRule(DrillRel.DRILL_LOGICAL, out)
+    );
+  }
+
+  public List<RelOptRule> projectRules(Convention out) {
+    return Arrays.asList(
+        new PluginProjectRule(Convention.NONE, out),
+        new PluginProjectRule(DrillRel.DRILL_LOGICAL, out)
+    );
+  }
+
+  public List<RelOptRule> aggregateRules(Convention out) {
+    return Arrays.asList(
+        new PluginAggregateRule(Convention.NONE, out),
+        new PluginAggregateRule(DrillRel.DRILL_LOGICAL, out)
+    );
+  }
+
+  public List<RelOptRule> unionRules(Convention out) {
+    return Arrays.asList(
+        new PluginUnionRule(Convention.NONE, out),
+        new PluginUnionRule(DrillRel.DRILL_LOGICAL, out)
+    );
+  }
+
+  public RelOptRule vertexRule(Convention out) {
+    return new VertexDrelConverterRule(out);
+  }
+
+  public static final RelOptRule PREL_CONVERTER_INSTANCE = new PluginIntermediatePrelConverterRule(MongoPluginImplementor::new);
+
+  /** Returns 'string' if it is a call to item['string'], null otherwise. */
+  public static String isItem(RexCall call) {
+    if (call.getOperator() != SqlStdOperatorTable.ITEM) {
+      return null;
+    }
+    final RexNode op0 = call.operands.get(0);
+    final RexNode op1 = call.operands.get(1);
+    if (op0 instanceof RexInputRef
+        && ((RexInputRef) op0).getIndex() == 0
+        && op1 instanceof RexLiteral
+        && ((RexLiteral) op1).getValue2() instanceof String) {
+      return (String) ((RexLiteral) op1).getValue2();
+    }
+    return null;
+  }
+
+  static List<String> mongoFieldNames(final RelDataType rowType) {
+    return SqlValidatorUtil.uniquify(
+        new AbstractList<String>() {
+          @Override public String get(int index) {
+            final String name = rowType.getFieldList().get(index).getName();
+            return name.startsWith("$") ? "_" + name.substring(2) : name;
+          }
+
+          @Override public int size() {
+            return rowType.getFieldCount();
+          }
+        },
+        SqlValidatorUtil.EXPR_SUGGESTER, true);
+  }
+
+  static String maybeQuote(String s) {
+    if (!needsQuote(s)) {
+      return s;
+    }
+    return quote(s);
+  }
+
+  static String quote(String s) {
+    return "'" + s + "'"; // TODO: handle embedded quotes
+  }
+
+  private static boolean needsQuote(String s) {
+    for (int i = 0, n = s.length(); i < n; i++) {
+      char c = s.charAt(i);
+      if (!Character.isJavaIdentifierPart(c)
+          || c == '$') {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** Translator from {@link RexNode} to strings in MongoDB's expression
+   * language. */
+  static class RexToMongoTranslator extends RexVisitorImpl<String> {
+    private final JavaTypeFactory typeFactory;
+    private final List<String> inFields;
+
+    private static final Map<SqlOperator, String> MONGO_OPERATORS =
+        new HashMap<>();
+
+    static {
+      // Arithmetic
+      MONGO_OPERATORS.put(SqlStdOperatorTable.DIVIDE, "$divide");
+      MONGO_OPERATORS.put(SqlStdOperatorTable.MULTIPLY, "$multiply");
+      MONGO_OPERATORS.put(SqlStdOperatorTable.MOD, "$mod");
+      MONGO_OPERATORS.put(SqlStdOperatorTable.PLUS, "$add");
+      MONGO_OPERATORS.put(SqlStdOperatorTable.MINUS, "$subtract");
+      // Boolean
+      MONGO_OPERATORS.put(SqlStdOperatorTable.AND, "$and");
+      MONGO_OPERATORS.put(SqlStdOperatorTable.OR, "$or");
+      MONGO_OPERATORS.put(SqlStdOperatorTable.NOT, "$not");
+      // Comparison
+      MONGO_OPERATORS.put(SqlStdOperatorTable.EQUALS, "$eq");
+      MONGO_OPERATORS.put(SqlStdOperatorTable.NOT_EQUALS, "$ne");
+      MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN, "$gt");
+      MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, "$gte");
+      MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN, "$lt");
+      MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, "$lte");
+    }
+
+    protected RexToMongoTranslator(JavaTypeFactory typeFactory,
+        List<String> inFields) {
+      super(true);
+      this.typeFactory = typeFactory;
+      this.inFields = inFields;
+    }
+
+    @Override public String visitLiteral(RexLiteral literal) {
+      if (literal.getValue() == null) {
+        return "null";
+      }
+      return "{$literal: "
+          + RexToLixTranslator.translateLiteral(literal, literal.getType(),
+              typeFactory, RexImpTable.NullAs.NOT_POSSIBLE)
+          + "}";
+    }
+
+    @Override public String visitInputRef(RexInputRef inputRef) {
+      return maybeQuote(
+          "$" + inFields.get(inputRef.getIndex()));
+    }
+
+    @Override public String visitCall(RexCall call) {
+      String name = isItem(call);
+      if (name != null) {
+        return "'$" + name + "'";
+      }
+      final List<String> strings = new ArrayList<>();//visitList(call.operands);
+      if (call.getKind() == SqlKind.CAST) {
+        return strings.get(0);
+      }
+      String stdOperator = MONGO_OPERATORS.get(call.getOperator());
+      if (stdOperator != null) {
+        return "{" + stdOperator + ": [" + Util.commaList(strings) + "]}";
+      }
+      if (call.getOperator() == SqlStdOperatorTable.ITEM) {
+        final RexNode op1 = call.operands.get(1);
+        if (op1 instanceof RexLiteral
+            && op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) {
+          if (!Bug.CALCITE_194_FIXED) {
+            return "'" + stripQuotes(strings.get(0)) + "["
+                + ((RexLiteral) op1).getValue2() + "]'";
+          }
+          return strings.get(0) + "[" + strings.get(1) + "]";
+        }
+      }
+      if (call.getOperator() == SqlStdOperatorTable.CASE) {
+        StringBuilder sb = new StringBuilder();
+        StringBuilder finish = new StringBuilder();
+        // case(a, b, c)  -> $cond:[a, b, c]
+        // case(a, b, c, d) -> $cond:[a, b, $cond:[c, d, null]]
+        // case(a, b, c, d, e) -> $cond:[a, b, $cond:[c, d, e]]
+        for (int i = 0; i < strings.size(); i += 2) {
+          sb.append("{$cond:[");
+          finish.append("]}");
+
+          sb.append(strings.get(i));
+          sb.append(',');
+          sb.append(strings.get(i + 1));
+          sb.append(',');
+          if (i == strings.size() - 3) {
+            sb.append(strings.get(i + 2));
+            break;
+          }
+          if (i == strings.size() - 2) {
+            sb.append("null");
+            break;
+          }
+        }
+        sb.append(finish);
+        return sb.toString();
+      }
+      throw new IllegalArgumentException("Translation of " + call
+          + " is not supported by MongoProject");
+    }
+
+    private static String stripQuotes(String s) {
+      return s.startsWith("'") && s.endsWith("'")
+          ? s.substring(1, s.length() - 1)
+          : s;
+    }
+  }
+
+  /*
+
+  /**
+   * Rule to convert a {@link LogicalCalc} to an
+   * {@link MongoCalcRel}.
+   o/
+  public static class MongoCalcRule
+      extends MongoConverterRule {
+    private MongoCalcRule(MongoConvention out) {
+      super(
+          LogicalCalc.class,
+          Convention.NONE,
+          out,
+          "MongoCalcRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      final LogicalCalc calc = (LogicalCalc) rel;
+
+      // If there's a multiset, let FarragoMultisetSplitter work on it
+      // first.
+      if (RexMultisetUtil.containsMultiset(calc.getProgram())) {
+        return null;
+      }
+
+      return new MongoCalcRel(
+          rel.getCluster(),
+          rel.getTraitSet().replace(out),
+          convert(
+              calc.getChild(),
+              calc.getTraitSet().replace(out)),
+          calc.getProgram(),
+          Project.Flags.Boxed);
+    }
+  }
+
+  public static class MongoCalcRel extends SingleRel implements MongoRel {
+    private final RexProgram program;
+
+    /**
+     * Values defined in {@link org.apache.calcite.rel.core.Project.Flags}.
+     o/
+    protected int flags;
+
+    public MongoCalcRel(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode child,
+        RexProgram program,
+        int flags) {
+      super(cluster, traitSet, child);
+      assert getConvention() instanceof MongoConvention;
+      this.flags = flags;
+      this.program = program;
+      this.rowType = program.getOutputRowType();
+    }
+
+    public RelOptPlanWriter explainTerms(RelOptPlanWriter pw) {
+      return program.explainCalc(super.explainTerms(pw));
+    }
+
+    public double getRows() {
+      return LogicalFilter.estimateFilteredRows(
+          getChild(), program);
+    }
+
+    public RelOptCost computeSelfCost(RelOptPlanner planner) {
+      double dRows = RelMetadataQuery.getRowCount(this);
+      double dCpu =
+          RelMetadataQuery.getRowCount(getChild())
+              * program.getExprCount();
+      double dIo = 0;
+      return planner.makeCost(dRows, dCpu, dIo);
+    }
+
+    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+      return new MongoCalcRel(
+          getCluster(),
+          traitSet,
+          sole(inputs),
+          program.copy(),
+          getFlags());
+    }
+
+    public int getFlags() {
+      return flags;
+    }
+
+    public RexProgram getProgram() {
+      return program;
+    }
+
+    public SqlString implement(MongoImplementor implementor) {
+      final SqlBuilder buf = new SqlBuilder(implementor.dialect);
+      buf.append("SELECT ");
+      if (isStar(program)) {
+        buf.append("*");
+      } else {
+        for (Ord<RexLocalRef> ref : Ord.zip(program.getProjectList())) {
+          buf.append(ref.i == 0 ? "" : ", ");
+          expr(buf, program, ref.e);
+          alias(buf, null, getRowType().getFieldNames().get(ref.i));
+        }
+      }
+      implementor.newline(buf)
+          .append("FROM ");
+      implementor.subQuery(buf, 0, getChild(), "t");
+      if (program.getCondition() != null) {
+        implementor.newline(buf);
+        buf.append("WHERE ");
+        expr(buf, program, program.getCondition());
+      }
+      return buf.toSqlString();
+    }
+
+    private static boolean isStar(RexProgram program) {
+      int i = 0;
+      for (RexLocalRef ref : program.getProjectList()) {
+        if (ref.getIndex() != i++) {
+          return false;
+        }
+      }
+      return i == program.getInputRowType().getFieldCount();
+    }
+
+    private static void expr(
+        SqlBuilder buf, RexProgram program, RexNode rex) {
+      if (rex instanceof RexLocalRef) {
+        final int index = ((RexLocalRef) rex).getIndex();
+        expr(buf, program, program.getExprList().get(index));
+      } else if (rex instanceof RexInputRef) {
+        buf.identifier(
+            program.getInputRowType().getFieldNames().get(
+                ((RexInputRef) rex).getIndex()));
+      } else if (rex instanceof RexLiteral) {
+        toSql(buf, (RexLiteral) rex);
+      } else if (rex instanceof RexCall) {
+        final RexCall call = (RexCall) rex;
+        switch (call.getOperator().getSyntax()) {
+        case Binary:
+          expr(buf, program, call.getOperands().get(0));
+          buf.append(' ')
+              .append(call.getOperator().toString())
+              .append(' ');
+          expr(buf, program, call.getOperands().get(1));
+          break;
+        default:
+          throw new AssertionError(call.getOperator());
+        }
+      } else {
+        throw new AssertionError(rex);
+      }
+    }
+  }
+
+  private static SqlBuilder toSql(SqlBuilder buf, RexLiteral rex) {
+    switch (rex.getTypeName()) {
+    case CHAR:
+    case VARCHAR:
+      return buf.append(
+          new NlsString(rex.getValue2().toString(), null, null)
+              .asSql(false, false));
+    default:
+      return buf.append(rex.getValue2().toString());
+    }
+  }
+
+*/
+
+
+//  /**
+//   * Rule to convert an {@link org.apache.calcite.rel.logical.Union} to a
+//   * {@link MongoUnionRel}.
+//   */
+//  public static class MongoUnionRule
+//      extends MongoConverterRule {
+//    private MongoUnionRule(MongoConvention out) {
+//      super(
+//          Union.class,
+//          Convention.NONE,
+//          out,
+//          "MongoUnionRule");
+//    }
+//
+//    public RelNode convert(RelNode rel) {
+//      final Union union = (Union) rel;
+//      final RelTraitSet traitSet =
+//          union.getTraitSet().replace(out);
+//      return new MongoUnionRel(
+//          rel.getCluster(),
+//          traitSet,
+//          convertList(union.getInputs(), traitSet),
+//          union.all);
+//    }
+//  }
+//
+//  public static class MongoUnionRel
+//      extends Union
+//      implements MongoRel {
+//    public MongoUnionRel(
+//        RelOptCluster cluster,
+//        RelTraitSet traitSet,
+//        List<RelNode> inputs,
+//        boolean all) {
+//      super(cluster, traitSet, inputs, all);
+//    }
+//
+//    public MongoUnionRel copy(
+//        RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+//      return new MongoUnionRel(getCluster(), traitSet, inputs, all);
+//    }
+//
+//    @Override public RelOptCost computeSelfCost(RelOptPlanner planner) {
+//      return super.computeSelfCost(planner).multiplyBy(.1);
+//    }
+//
+//    public SqlString implement(MongoImplementor implementor) {
+//      return setOpSql(this, implementor, "UNION");
+//    }
+//  }
+//
+//  private static SqlString setOpSql(
+//      SetOp setOpRel, MongoImplementor implementor, String op) {
+//    final SqlBuilder buf = new SqlBuilder(implementor.dialect);
+//    for (Ord<RelNode> input : Ord.zip(setOpRel.getInputs())) {
+//      if (input.i > 0) {
+//        implementor.newline(buf)
+//            .append(op + (setOpRel.all ? " ALL " : ""));
+//        implementor.newline(buf);
+//      }
+//      buf.append(implementor.visitChild(input.i, input.e));
+//    }
+//    return buf.toSqlString();
+//  }
+
+  /*
+  /**
+   * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalIntersect}
+   * to an {@link MongoIntersectRel}.
+   o/
+  public static class MongoIntersectRule
+      extends MongoConverterRule {
+    private MongoIntersectRule(MongoConvention out) {
+      super(
+          LogicalIntersect.class,
+          Convention.NONE,
+          out,
+          "MongoIntersectRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      final LogicalIntersect intersect = (LogicalIntersect) rel;
+      if (intersect.all) {
+        return null; // INTERSECT ALL not implemented
+      }
+      final RelTraitSet traitSet =
+          intersect.getTraitSet().replace(out);
+      return new MongoIntersectRel(
+          rel.getCluster(),
+          traitSet,
+          convertList(intersect.getInputs(), traitSet),
+          intersect.all);
+    }
+  }
+
+  public static class MongoIntersectRel
+      extends Intersect
+      implements MongoRel {
+    public MongoIntersectRel(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        List<RelNode> inputs,
+        boolean all) {
+      super(cluster, traitSet, inputs, all);
+      assert !all;
+    }
+
+    public MongoIntersectRel copy(
+        RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+      return new MongoIntersectRel(getCluster(), traitSet, inputs, all);
+    }
+
+    public SqlString implement(MongoImplementor implementor) {
+      return setOpSql(this, implementor, " intersect ");
+    }
+  }
+
+  /**
+   * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalMinus}
+   * to an {@link MongoMinusRel}.
+   o/
+  public static class MongoMinusRule
+      extends MongoConverterRule {
+    private MongoMinusRule(MongoConvention out) {
+      super(
+          LogicalMinus.class,
+          Convention.NONE,
+          out,
+          "MongoMinusRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      final LogicalMinus minus = (LogicalMinus) rel;
+      if (minus.all) {
+        return null; // EXCEPT ALL not implemented
+      }
+      final RelTraitSet traitSet =
+          rel.getTraitSet().replace(out);
+      return new MongoMinusRel(
+          rel.getCluster(),
+          traitSet,
+          convertList(minus.getInputs(), traitSet),
+          minus.all);
+    }
+  }
+
+  public static class MongoMinusRel
+      extends Minus
+      implements MongoRel {
+    public MongoMinusRel(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        List<RelNode> inputs,
+        boolean all) {
+      super(cluster, traitSet, inputs, all);
+      assert !all;
+    }
+
+    public MongoMinusRel copy(
+        RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+      return new MongoMinusRel(getCluster(), traitSet, inputs, all);
+    }
+
+    public SqlString implement(MongoImplementor implementor) {
+      return setOpSql(this, implementor, " minus ");
+    }
+  }
+
+  public static class MongoValuesRule extends MongoConverterRule {
+    private MongoValuesRule(MongoConvention out) {
+      super(
+          LogicalValues.class,
+          Convention.NONE,
+          out,
+          "MongoValuesRule");
+    }
+
+    @Override public RelNode convert(RelNode rel) {
+      LogicalValues valuesRel = (LogicalValues) rel;
+      return new MongoValuesRel(
+          valuesRel.getCluster(),
+          valuesRel.getRowType(),
+          valuesRel.getTuples(),
+          valuesRel.getTraitSet().plus(out));
+    }
+  }
+
+  public static class MongoValuesRel
+      extends Values
+      implements MongoRel {
+    MongoValuesRel(
+        RelOptCluster cluster,
+        RelDataType rowType,
+        List<List<RexLiteral>> tuples,
+        RelTraitSet traitSet) {
+      super(cluster, rowType, tuples, traitSet);
+    }
+
+    @Override public RelNode copy(
+        RelTraitSet traitSet, List<RelNode> inputs) {
+      assert inputs.isEmpty();
+      return new MongoValuesRel(
+          getCluster(), rowType, tuples, traitSet);
+    }
+
+    public SqlString implement(MongoImplementor implementor) {
+      throw new AssertionError(); // TODO:
+    }
+  }
+*/
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoTable.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoTable.java
new file mode 100644
index 0000000..68e01e9
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoTable.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo.plan;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.apache.calcite.adapter.java.AbstractQueryableTable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTableQueryable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Table based on a MongoDB collection.
+ */
+public class MongoTable extends AbstractQueryableTable
+    implements TranslatableTable {
+  private final String collectionName;
+
+  /** Creates a MongoTable. */
+  MongoTable(String collectionName) {
+    super(Object[].class);
+    this.collectionName = collectionName;
+  }
+
+  @Override public String toString() {
+    return "MongoTable {" + collectionName + "}";
+  }
+
+  @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    final RelDataType mapType =
+        typeFactory.createMapType(
+            typeFactory.createSqlType(SqlTypeName.VARCHAR),
+            typeFactory.createTypeWithNullability(
+                typeFactory.createSqlType(SqlTypeName.ANY), true));
+    return typeFactory.builder().add("_MAP", mapType).build();
+  }
+
+  @Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
+      SchemaPlus schema, String tableName) {
+    return new MongoQueryable<>(queryProvider, schema, this, tableName);
+  }
+
+  @Override public RelNode toRel(
+      RelOptTable.ToRelContext context,
+      RelOptTable relOptTable) {
+    final RelOptCluster cluster = context.getCluster();
+    // cluster, traits, grpScan, table
+//    return new MongoTableScan(cluster, cluster.traitSetOf(MongoRel.CONVENTION),
+//        relOptTable, relOptTable.getRowType(), null);
+    return null;
+  }
+
+  /** Executes a "find" operation on the underlying collection.
+   *
+   * <p>For example,
+   * <code>zipsTable.find("{state: 'OR'}", "{city: 1, zipcode: 1}")</code></p>
+   *
+   * @param mongoDb MongoDB connection
+   * @param filterJson Filter JSON string, or null
+   * @param projectJson Project JSON string, or null
+   * @param fields List of fields to project; or null to return map
+   * @return Enumerator of results
+   */
+  private Enumerable<Object> find(MongoDatabase mongoDb, String filterJson,
+      String projectJson, List<Map.Entry<String, Class>> fields) {
+    final MongoCollection collection =
+        mongoDb.getCollection(collectionName);
+    final Bson filter =
+        filterJson == null ? null : BsonDocument.parse(filterJson);
+    final Bson project =
+        projectJson == null ? null : BsonDocument.parse(projectJson);
+//    final Function1<Document, Object> getter = MongoEnumerator.getter(fields);
+//    return new AbstractEnumerable<Object>() {
+//      @Override public Enumerator<Object> enumerator() {
+//        @SuppressWarnings("unchecked") final FindIterable<Document> cursor =
+//            collection.find(filter).projection(project);
+//        return new MongoEnumerator(cursor.iterator(), getter);
+//      }
+//    };
+    return null;
+  }
+
+  /** Executes an "aggregate" operation on the underlying collection.
+   *
+   * <p>For example:
+   * <code>zipsTable.aggregate(
+   * "{$filter: {state: 'OR'}",
+   * "{$group: {_id: '$city', c: {$sum: 1}, p: {$sum: '$pop'}}}")
+   * </code></p>
+   *
+   * @param mongoDb MongoDB connection
+   * @param fields List of fields to project; or null to return map
+   * @param operations One or more JSON strings
+   * @return Enumerator of results
+   */
+  private Enumerable<Object> aggregate(final MongoDatabase mongoDb,
+      List<Map.Entry<String, Class>> fields,
+      List<String> operations) {
+    List<Bson> list = new ArrayList<>();
+    for (String operation : operations) {
+      list.add(BsonDocument.parse(operation));
+    }
+//    Function1<Document, Object> getter =
+//        MongoEnumerator.getter(fields);
+//    return new AbstractEnumerable<Object>() {
+//      @Override public Enumerator<Object> enumerator() {
+//        final Iterator<Document> resultIterator;
+//        try {
+//          resultIterator = mongoDb.getCollection(collectionName)
+//              .aggregate(list).iterator();
+//        } catch (Exception e) {
+//          throw new RuntimeException("While running MongoDB query "
+//              + Util.toString(operations, "[", ",\n", "]"), e);
+//        }
+//        return new MongoEnumerator(resultIterator, getter);
+//      }
+//    };
+    return null;
+  }
+
+  /** Implementation of {@link Queryable} based on
+   * a {@link MongoTable}.
+   *
+   * @param <T> element type */
+  public static class MongoQueryable<T> extends AbstractTableQueryable<T> {
+    MongoQueryable(QueryProvider queryProvider, SchemaPlus schema,
+        MongoTable table, String tableName) {
+      super(queryProvider, schema, table, tableName);
+    }
+
+    @Override public Enumerator<T> enumerator() {
+      //noinspection unchecked
+      final Enumerable<T> enumerable =
+          (Enumerable<T>) getTable().find(getMongoDb(), null, null, null);
+      return enumerable.enumerator();
+    }
+
+    private MongoDatabase getMongoDb() {
+//      return schema.unwrap(MongoSchemaFactory.MongoSchema.class).mongoDb;
+      return null;
+    }
+
+    private MongoTable getTable() {
+      return (MongoTable) table;
+    }
+
+    @SuppressWarnings("UnusedDeclaration")
+    public Enumerable<Object> aggregate(List<Map.Entry<String, Class>> fields,
+        List<String> operations) {
+      return getTable().aggregate(getMongoDb(), fields, operations);
+    }
+
+    /** Called via code-generation.
+     *
+     * @param filterJson Filter document
+     * @param projectJson Projection document
+     * @param fields List of expected fields (and their types)
+     * @return result of mongo query
+     *
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public Enumerable<Object> find(String filterJson,
+        String projectJson, List<Map.Entry<String, Class>> fields) {
+      return getTable().find(getMongoDb(), filterJson, projectJson, fields);
+    }
+  }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java
index da52984..4249725 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java
@@ -32,8 +32,7 @@ import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory.MongoSchema;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
 public class MongoDatabaseSchema extends AbstractSchema {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
-      .getLogger(MongoDatabaseSchema.class);
+
   private final MongoSchema mongoSchema;
   private final Set<String> tableNames;
 
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index cc071a4..29f14ce 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -29,13 +29,13 @@ import java.util.concurrent.TimeUnit;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.mongo.MongoScanSpec;
 import org.apache.drill.exec.store.mongo.MongoStoragePlugin;
 import org.apache.drill.exec.store.mongo.MongoStoragePluginConfig;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -195,7 +195,7 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
 
     DrillTable getDrillTable(String dbName, String collectionName) {
       MongoScanSpec mongoScanSpec = new MongoScanSpec(schemaNameMap.get(dbName), collectionName);
-      return new DynamicDrillTable(plugin, getName(), null, mongoScanSpec);
+      return new PluginDrillTable(plugin, getName(), null, mongoScanSpec, plugin.convention());
     }
 
     @Override
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
index e4c71b1..eb706d6 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
@@ -155,7 +155,7 @@ public class TestMongoChunkAssignment extends BaseTest {
 
     mongoGroupScan = new MongoGroupScan();
     mongoGroupScan.setChunksMapping(chunksMapping);
-    mongoGroupScan.setInverseChunsMapping(chunksInverseMapping);
+    mongoGroupScan.setInverseChunksMapping(chunksInverseMapping);
     MongoScanSpec scanSpec = new MongoScanSpec(dbName, collectionName);
     mongoGroupScan.setScanSpec(scanSpec);
   }
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
index 569cf5c..bb660a0 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
@@ -30,31 +30,32 @@ public class TestMongoLimitPushDown extends MongoTestBase {
   public void testLimit() throws Exception {
     String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` LIMIT 4";
     queryBuilder()
-      .sql(sql)
-      .planMatcher()
-      .include("Limit", "maxRecords=4")
-      .match();
+        .sql(sql)
+        .planMatcher()
+        .exclude("Limit\\(")
+        .include("MongoGroupScan.*limit=4")
+        .match();
   }
 
   @Test
   public void testLimitWithOrderBy() throws Exception {
-    // Limit should not be pushed down for this example due to the sort
     String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` ORDER BY employee_id LIMIT 4";
     queryBuilder()
       .sql(sql)
       .planMatcher()
-      .include("Limit", "maxRecords=-1")
+      .exclude("Limit")
+      .include("sort=\\{employee_id", "limit=4")
       .match();
   }
 
   @Test
   public void testLimitWithOffset() throws Exception {
-    // Limit should be pushed down and include the offset
     String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` LIMIT 4 OFFSET 5";
     queryBuilder()
       .sql(sql)
       .planMatcher()
-      .include("Limit", "maxRecords=9")
+      .exclude("Limit")
+      .include("skip=5", "limit=4")
       .match();
   }
 
@@ -64,7 +65,8 @@ public class TestMongoLimitPushDown extends MongoTestBase {
     queryBuilder()
       .sql(sql)
       .planMatcher()
-      .include("Limit", "maxRecords=4")
+      .exclude("Limit")
+      .include("limit=4", "eq=52.17")
       .match();
   }
 }
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
index d5316f9..18f1ad5 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
@@ -113,7 +113,7 @@ public class TestMongoQueries extends MongoTestBase {
     queryBuilder().sql(query, DONUTS_DB, DONUTS_COLLECTION)
         .planMatcher()
         .exclude("Agg\\(")
-        .include("Scan\\(.*aggregates")
+        .include("MongoGroupScan.*group")
         .match();
 
     testBuilder()
@@ -131,7 +131,7 @@ public class TestMongoQueries extends MongoTestBase {
     queryBuilder().sql(query, DONUTS_DB, DONUTS_COLLECTION)
         .planMatcher()
         .exclude("Agg\\(")
-        .include("Scan\\(.*aggregates")
+        .include("MongoGroupScan.*group")
         .match();
 
     testBuilder()
@@ -149,7 +149,7 @@ public class TestMongoQueries extends MongoTestBase {
     queryBuilder().sql(query, DONUTS_DB, DONUTS_COLLECTION)
         .planMatcher()
         .exclude("Agg\\(", "Filter")
-        .include("Scan\\(.*aggregates")
+        .include("MongoGroupScan.*group")
         .match();
 
     testBuilder()
@@ -158,7 +158,52 @@ public class TestMongoQueries extends MongoTestBase {
         .baselineColumns("c")
         .baselineValues(1)
         .go();
+  }
+
+  @Test
+  public void testUnionAll() throws Exception {
+    String query = "select t1.id as id, t1.name from mongo.%1$s.`%2$s` t1 where t1.name = 'Cake' union all " +
+        "select t2.id as id, t2.name from mongo.%1$s.`%2$s` t2";
+
+    queryBuilder().sql(query, DONUTS_DB, DONUTS_COLLECTION)
+        .planMatcher()
+        .exclude("UnionAll\\(")
+        .include("MongoGroupScan.*\\$unionWith")
+        .match();
 
-//    queryBuilder().sql("select * from mongo.%s.`%s` t", DONUTS_DB, DONUTS_COLLECTION).printCsv();
+    testBuilder()
+        .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+        .unOrdered()
+        .baselineColumns("id", "name")
+        .baselineValues("0001", "Cake")
+        .baselineValues("0001", "Cake")
+        .baselineValues("0002", "Raised")
+        .baselineValues("0003", "Old Fashioned")
+        .baselineValues("0004", "Filled")
+        .baselineValues("0005", "Apple Fritter")
+        .go();
+  }
+
+  @Test
+  public void testUnionDistinct() throws Exception {
+    String query = "select t1.id as id, t1.name from mongo.%1$s.`%2$s` t1 where t1.name = 'Cake' union " +
+        "select t2.id as id, t2.name from mongo.%1$s.`%2$s` t2 ";
+
+    queryBuilder().sql(query, DONUTS_DB, DONUTS_COLLECTION)
+        .planMatcher()
+        .exclude("UnionAll\\(", "Agg\\(")
+        .include("MongoGroupScan.*\\$unionWith")
+        .match();
+
+    testBuilder()
+        .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+        .unOrdered()
+        .baselineColumns("id", "name")
+        .baselineValues("0001", "Cake")
+        .baselineValues("0002", "Raised")
+        .baselineValues("0003", "Old Fashioned")
+        .baselineValues("0004", "Filled")
+        .baselineValues("0005", "Apple Fritter")
+        .go();
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index cc549b1..6779d0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.drill.exec.metastore.analyze.AnalyzeInfoProvider;
 import org.apache.drill.metastore.metadata.TableMetadata;
 import org.apache.drill.metastore.metadata.TableMetadataProvider;
@@ -92,6 +93,11 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
     return getScanStats();
   }
 
+  @Override
+  public ScanStats getScanStats(RelMetadataQuery mq) {
+    return getScanStats();
+  }
+
   @JsonIgnore
   public ScanStats getScanStats() {
     throw new UnsupportedOperationException("This should be implemented.");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 9eaa043..2c6ff4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.physical.base;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.util.Collection;
 import java.util.List;
+
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
@@ -88,6 +90,9 @@ public interface GroupScan extends Scan, HasAffinity {
   @JsonIgnore
   ScanStats getScanStats(PlannerSettings settings);
 
+  @JsonIgnore
+  ScanStats getScanStats(RelMetadataQuery mq);
+
   /**
    * Returns a clone of GroupScan instance, except that the new GroupScan will use the provided list of columns .
    */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSortRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSortRelBase.java
new file mode 100644
index 0000000..c13effc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSortRelBase.java
@@ -0,0 +1,29 @@
+package org.apache.drill.exec.planner.common;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexNode;
+
+public abstract class DrillSortRelBase extends Sort implements DrillRelNode, OrderedRel {
+
+  protected DrillSortRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
+    super(cluster, traits, input, collation);
+  }
+
+  protected DrillSortRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) {
+    super(cluster, traits, input, collation, offset, fetch);
+  }
+
+  @Override
+  public RexNode getOffset() {
+    return offset;
+  }
+
+  @Override
+  public RexNode getFetch() {
+    return fetch;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
index 1e380cf..0fb7b8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
@@ -25,13 +25,12 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.common.logical.data.Order;
 import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.planner.common.DrillSortRelBase;
 import org.apache.drill.exec.planner.torel.ConversionContext;
-import org.apache.drill.exec.planner.common.OrderedRel;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rex.RexNode;
@@ -42,7 +41,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 /**
  * Sort implemented in Drill.
  */
-public class DrillSortRel extends Sort implements DrillRel,OrderedRel {
+public class DrillSortRel extends DrillSortRelBase implements DrillRel {
 
   /** Creates a DrillSortRel. */
   public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
@@ -100,16 +99,6 @@ public class DrillSortRel extends Sort implements DrillRel,OrderedRel {
   }
 
   @Override
-  public RexNode getOffset() {
-    return offset;
-  }
-
-  @Override
-  public RexNode getFetch() {
-    return fetch;
-  }
-
-  @Override
   public boolean canBeDropped() {
     return true;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index 7e415b2..3e68d7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -21,13 +21,13 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.drill.exec.planner.common.DrillSortRelBase;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.calcite.rel.RelCollationImpl;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.planner.common.OrderedRel;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
@@ -41,7 +41,7 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rex.RexNode;
 
-public class SortPrel extends org.apache.calcite.rel.core.Sort implements OrderedRel,Prel {
+public class SortPrel extends DrillSortRelBase implements Prel {
   private final boolean isRemovable;
 
   /** Creates a DrillSortRel. */
@@ -154,16 +154,6 @@ public class SortPrel extends org.apache.calcite.rel.core.Sort implements Ordere
   }
 
   @Override
-  public RexNode getOffset() {
-    return offset;
-  }
-
-  @Override
-  public RexNode getFetch() {
-    return fetch;
-  }
-
-  @Override
   public boolean canBeDropped() {
     return isRemovable;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java
new file mode 100644
index 0000000..c9d4b8c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Abstract class for StorePlugin implementations.
+ * See StoragePlugin for description of the interface intent and its methods.
+ */
+public abstract class PlannableStoragePlugin extends AbstractStoragePlugin {
+
+  private final PlannableStoragePluginConfigs<?> plannableStoragePluginConfigs;
+
+  protected PlannableStoragePlugin(PlannableStoragePluginConfigs<? extends PlannableStoragePluginConfigs<?>> plannableStoragePluginConfigs) {
+    super(plannableStoragePluginConfigs.context(), plannableStoragePluginConfigs.name());
+    this.plannableStoragePluginConfigs = plannableStoragePluginConfigs;
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+    switch (phase) {
+      case PHYSICAL:
+      case LOGICAL:
+        ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.builder();
+        PluginRulesProvider rulesProvider = plannableStoragePluginConfigs.rulesProvider;
+        if (plannableStoragePluginConfigs.supportsProjectPushdown) {
+          builder.addAll(rulesProvider.projectRules());
+        }
+        if (plannableStoragePluginConfigs.supportsFilterPushdown) {
+          builder.addAll(rulesProvider.filterRules());
+        }
+        if (plannableStoragePluginConfigs.supportsSortPushdown) {
+          builder.addAll(rulesProvider.sortRules());
+        }
+        if (plannableStoragePluginConfigs.supportsUnionPushdown) {
+          builder.addAll(rulesProvider.unionRules());
+        }
+        if (plannableStoragePluginConfigs.supportsAggregatePushdown) {
+          builder.addAll(rulesProvider.aggregateRules());
+        }
+        if (plannableStoragePluginConfigs.supportsLimitPushdown) {
+          builder.addAll(rulesProvider.limitRules());
+        }
+        builder.add(rulesProvider.vertexRule());
+        builder.add(rulesProvider.prelConverterRule());
+        return builder.build();
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+      case JOIN_PLANNING:
+      default:
+        return Collections.emptySet();
+    }
+  }
+
+  public Convention convention() {
+    return plannableStoragePluginConfigs.convention();
+  }
+
+  public static abstract class PlannableStoragePluginConfigs<T extends PlannableStoragePluginConfigs<?>> {
+    private DrillbitContext context;
+    private String name;
+    private boolean supportsProjectPushdown;
+    private boolean supportsFilterPushdown;
+    private boolean supportsAggregatePushdown;
+    private boolean supportsSortPushdown;
+    private boolean supportsUnionPushdown;
+    private boolean supportsLimitPushdown;
+    private PluginRulesProvider rulesProvider;
+    private Convention convention;
+
+    public abstract T self();
+
+    public DrillbitContext context() {
+      return context;
+    }
+
+    public T context(DrillbitContext inContext) {
+      this.context = inContext;
+      return self();
+    }
+
+    public String name() {
+      return name;
+    }
+
+    public T name(String inName) {
+      this.name = inName;
+      return self();
+    }
+
+    public boolean supportsProjectPushdown() {
+      return supportsProjectPushdown;
+    }
+
+    public T supportsProjectPushdown(boolean supportsProjectPushdown) {
+      this.supportsProjectPushdown = supportsProjectPushdown;
+      return self();
+    }
+
+    public boolean supportsFilterPushdown() {
+      return supportsFilterPushdown;
+    }
+
+    public T supportsFilterPushdown(boolean supportsFilterPushdown) {
+      this.supportsFilterPushdown = supportsFilterPushdown;
+      return self();
+    }
+
+    public boolean supportsAggregatePushdown() {
+      return supportsAggregatePushdown;
+    }
+
+    public T supportsAggregatePushdown(boolean supportsAggregatePushdown) {
+      this.supportsAggregatePushdown = supportsAggregatePushdown;
+      return self();
+    }
+
+    public boolean supportsSortPushdown() {
+      return supportsSortPushdown;
+    }
+
+    public T supportsSortPushdown(boolean supportsSortPushdown) {
+      this.supportsSortPushdown = supportsSortPushdown;
+      return self();
+    }
+
+    public boolean supportsUnionPushdown() {
+      return supportsUnionPushdown;
+    }
+
+    public T supportsUnionPushdown(boolean supportsUnionPushdown) {
+      this.supportsUnionPushdown = supportsUnionPushdown;
+      return self();
+    }
+
+    public boolean supportsLimitPushdown() {
+      return supportsLimitPushdown;
+    }
+
+    public PlannableStoragePluginConfigs<T> supportsLimitPushdown(boolean supportsLimitPushdown) {
+      this.supportsLimitPushdown = supportsLimitPushdown;
+      return this;
+    }
+
+    public PluginRulesProvider rulesProvider() {
+      return rulesProvider;
+    }
+
+    public T rulesProvider(PluginRulesProvider rulesProvider) {
+      this.rulesProvider = rulesProvider;
+      return self();
+    }
+
+    public Convention convention() {
+      return convention;
+    }
+
+    public PlannableStoragePluginConfigs<T> convention(Convention convention) {
+      this.convention = convention;
+      return this;
+    }
+  }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
new file mode 100644
index 0000000..d35bf83
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
@@ -0,0 +1,16 @@
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.RelOptRule;
+
+import java.util.List;
+
+public interface PluginRulesProvider {
+  List<RelOptRule> sortRules();
+  List<RelOptRule> limitRules();
+  List<RelOptRule> filterRules();
+  List<RelOptRule> projectRules();
+  List<RelOptRule> aggregateRules();
+  List<RelOptRule> unionRules();
+  RelOptRule vertexRule();
+  RelOptRule prelConverterRule();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
new file mode 100644
index 0000000..31f388d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
@@ -0,0 +1,39 @@
+package org.apache.drill.exec.store.plan;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+
+import java.io.IOException;
+
+public interface PluginImplementor {
+
+  void implement(PluginAggregateRel aggregate) throws IOException;
+
+  void implement(PluginFilterRel filter) throws IOException;
+
+  void implement(PluginLimitRel limit) throws IOException;
+
+  void implement(PluginProjectRel project) throws IOException;
+
+  void implement(PluginSortRel sort) throws IOException;
+
+  void implement(PluginUnionRel union) throws IOException;
+
+  void implement(StoragePluginTableScan scan) throws IOException;
+
+  PluginImplementor copy();
+
+  GroupScan getPhysicalOperator() throws IOException;
+
+  default void visitChild(RelNode input) throws IOException {
+    ((PluginRel) input).implement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
new file mode 100644
index 0000000..8505ac7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PluginAggregateRel extends DrillAggregateRelBase implements PluginRel {
+
+  public PluginAggregateRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      ImmutableBitSet groupSet,
+      List<ImmutableBitSet> groupSets,
+      List<AggregateCall> aggCalls)
+      throws InvalidRelException {
+    super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+    assert getConvention() == input.getConvention();
+
+    for (AggregateCall aggCall : aggCalls) {
+      if (aggCall.isDistinct()) {
+        throw new InvalidRelException(
+            "distinct aggregation not supported");
+      }
+    }
+    switch (getGroupType()) {
+    case SIMPLE:
+      break;
+    default:
+      throw new InvalidRelException("unsupported group type: "
+          + getGroupType());
+    }
+  }
+
+  @Override
+  public Aggregate copy(RelTraitSet traitSet, RelNode input,
+      ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets,
+      List<AggregateCall> aggCalls) {
+    try {
+      return new PluginAggregateRel(getCluster(), traitSet, input,
+          groupSet, groupSets, aggCalls);
+    } catch (InvalidRelException e) {
+      // Semantic error not possible. Must be a bug. Convert to
+      // internal error.
+      throw new AssertionError(e);
+    }
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner,
+      RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginDrillTable.java
new file mode 100644
index 0000000..d0a0854
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginDrillTable.java
@@ -0,0 +1,37 @@
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.util.Utilities;
+
+import java.io.IOException;
+
+public class PluginDrillTable extends DynamicDrillTable implements TranslatableTable {
+  private final Convention convention;
+
+  public PluginDrillTable(StoragePlugin plugin, String storageEngineName,
+      String userName, Object selection, Convention convention) {
+    super(plugin, storageEngineName, userName, selection);
+    this.convention = convention;
+  }
+
+  @Override
+  public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable table) {
+    DrillTable drillTable = Utilities.getDrillTable(table);
+    try {
+      return new StoragePluginTableScan(context.getCluster(),
+          context.getCluster().traitSetOf(convention),
+          drillTable.getGroupScan(),
+          table,
+          table.getRowType());
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java
new file mode 100644
index 0000000..661ce21
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillFilterRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+
+public class PluginFilterRel extends DrillFilterRelBase implements PluginRel {
+
+  public PluginFilterRel(
+      Convention convention,
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode child,
+      RexNode condition) {
+    super(convention, cluster, traitSet, child, condition);
+    assert getConvention() == child.getConvention();
+  }
+
+  @Override public PluginFilterRel copy(RelTraitSet traitSet, RelNode input,
+      RexNode condition) {
+    return new PluginFilterRel(getConvention(), getCluster(), traitSet, input, condition);
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner,
+      RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginIntermediatePrelRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginIntermediatePrelRel.java
new file mode 100644
index 0000000..88e8c83
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginIntermediatePrelRel.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.SinglePrel;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.planner.sql.handlers.PrelFinalizable;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+public class PluginIntermediatePrelRel extends SinglePrel implements PrelFinalizable {
+
+  private final Supplier<PluginImplementor> implementorFactory;
+
+  public PluginIntermediatePrelRel(RelOptCluster cluster, RelTraitSet traits,
+      RelNode child, Supplier<PluginImplementor> implementorFactory) {
+    super(cluster, traits, child);
+    this.implementorFactory = implementorFactory;
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new PluginIntermediatePrelRel(getCluster(), traitSet, getInput(), implementorFactory);
+  }
+
+  @Override
+  protected Object clone() throws CloneNotSupportedException {
+    return copy(getTraitSet(), getInputs());
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public Prel finalizeRel() {
+    return new PluginPrel(getCluster(), this);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) {
+    throw new UnsupportedOperationException("This needs to be finalized before using a PrelVisitor.");
+  }
+
+  @Override
+  public boolean needsFinalColumnReordering() {
+    return false;
+  }
+
+  public PluginImplementor getPluginImplementor() {
+    return implementorFactory.get();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java
new file mode 100644
index 0000000..b74aac4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PluginLimitRel extends DrillLimitRelBase implements PluginRel {
+  public PluginLimitRel(RelOptCluster cluster, RelTraitSet traitSet,
+      RelNode child, RexNode offset, RexNode fetch) {
+    super(cluster, traitSet, child, offset, fetch);
+    assert getConvention() == child.getConvention();
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner,
+      RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.05);
+  }
+
+  @Override
+  public PluginLimitRel copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new PluginLimitRel(getCluster(), traitSet, inputs.get(0), offset, fetch);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
new file mode 100644
index 0000000..743679d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
@@ -0,0 +1,83 @@
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.SubsetRemover;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+public class PluginPrel extends AbstractRelNode implements Prel {
+  private final GroupScan groupScan;
+  private final RelDataType rowType;
+
+  public PluginPrel(RelOptCluster cluster, PluginIntermediatePrelRel intermediatePrel) {
+    super(cluster, intermediatePrel.getTraitSet());
+    this.rowType = intermediatePrel.getRowType();
+    PluginRel input = (PluginRel) intermediatePrel.getInput().accept(SubsetRemover.INSTANCE);
+    try {
+      PluginImplementor implementor = intermediatePrel.getPluginImplementor();
+      input.implement(implementor);
+      this.groupScan = implementor.getPhysicalOperator();
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) {
+    return groupScan;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
+    return BatchSchema.SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode getEncoding() {
+    return BatchSchema.SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public boolean needsFinalColumnReordering() {
+    return false;
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return Collections.emptyIterator();
+  }
+
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw).item("groupScan", groupScan);
+  }
+
+  @Override
+  public double estimateRowCount(RelMetadataQuery mq) {
+    return groupScan.getScanStats(mq).getRecordCount();
+  }
+
+  @Override
+  protected RelDataType deriveRowType() {
+    return rowType;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java
new file mode 100644
index 0000000..49a2cac
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PluginProjectRel extends DrillProjectRelBase implements PluginRel {
+
+  public PluginProjectRel(Convention convention, RelOptCluster cluster, RelTraitSet traitSet,
+      RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+    super(convention, cluster, traitSet, input, projects, rowType);
+    assert getConvention() == input.getConvention();
+  }
+
+  @Override public Project copy(RelTraitSet traitSet, RelNode input,
+      List<RexNode> projects, RelDataType rowType) {
+    return new PluginProjectRel(getConvention(), getCluster(), traitSet, input, projects,
+        rowType);
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner,
+      RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
new file mode 100644
index 0000000..7364e70
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
@@ -0,0 +1,11 @@
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+
+public interface PluginRel extends RelNode {
+  void implement(PluginImplementor implementor) throws IOException;
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java
new file mode 100644
index 0000000..255bb2d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillSortRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.io.IOException;
+
+public class PluginSortRel extends DrillSortRelBase implements PluginRel {
+  public PluginSortRel(RelOptCluster cluster, RelTraitSet traitSet,
+      RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
+    super(cluster, traitSet, child, collation, offset, fetch);
+    assert getConvention() == child.getConvention();
+  }
+
+  @Override
+  public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner,
+      RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+  }
+
+  @Override
+  public PluginSortRel copy(RelTraitSet traitSet, RelNode input,
+      RelCollation newCollation, RexNode offset, RexNode fetch) {
+    return new PluginSortRel(getCluster(), traitSet, input, collation, offset, fetch);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+
+  @Override
+  public boolean canBeDropped() {
+    return false;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
new file mode 100644
index 0000000..73227bd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
@@ -0,0 +1,42 @@
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.planner.common.DrillUnionRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PluginUnionRel extends DrillUnionRelBase implements PluginRel {
+
+  public PluginUnionRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all, boolean checkCompatibility) throws InvalidRelException {
+    super(cluster, traits, inputs, all, checkCompatibility);
+  }
+
+  @Override
+  public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+    try {
+      return new PluginUnionRel(getCluster(), traitSet, inputs, all, true);
+    } catch (InvalidRelException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(.1);
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
new file mode 100644
index 0000000..a062b2e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+
+public class StoragePluginTableScan extends DrillScanRelBase implements PluginRel {
+
+  private final RelDataType rowType;
+
+  public StoragePluginTableScan(
+      RelOptCluster cluster,
+      RelTraitSet traits,
+      GroupScan grpScan,
+      RelOptTable table,
+      RelDataType rowType) {
+    super(cluster, traits, grpScan, table);
+    this.rowType = rowType;
+  }
+
+  @Override
+  public void implement(PluginImplementor implementor) throws IOException {
+    implementor.implement(this);
+  }
+
+  @Override
+  public DrillScanRelBase copy(RelTraitSet traitSet, GroupScan scan, RelDataType rowType) {
+    return new StoragePluginTableScan(getCluster(), traitSet, scan, getTable(), rowType);
+  }
+
+  @Override
+  public double estimateRowCount(RelMetadataQuery mq) {
+    return getGroupScan().getScanStats(mq).getRecordCount();
+  }
+
+  @Override
+  public RelDataType deriveRowType() {
+    return this.rowType;
+  }
+
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw).item("groupScan", getGroupScan().getDigest());
+  }
+
+  @Override
+  protected String computeDigest() {
+    return super.computeDigest();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java
new file mode 100644
index 0000000..616493d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java
@@ -0,0 +1,38 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PluginAggregateRule extends PluginConverterRule {
+  private static final Logger logger = LoggerFactory.getLogger(PluginAggregateRule.class);
+
+  public PluginAggregateRule(RelTrait in, Convention out) {
+    super(Aggregate.class, in, out, "PluginAggregateRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Aggregate agg = (Aggregate) rel;
+    RelTraitSet traitSet =
+        agg.getTraitSet().replace(getOutConvention());
+    try {
+      return new PluginAggregateRel(
+          rel.getCluster(),
+          traitSet,
+          convert(agg.getInput(), traitSet.simplify()),
+          agg.getGroupSet(),
+          agg.getGroupSets(),
+          agg.getAggCallList());
+    } catch (InvalidRelException e) {
+      logger.warn(e.toString());
+      return null;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java
new file mode 100644
index 0000000..5801d83
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java
@@ -0,0 +1,16 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
+
+import java.util.function.Predicate;
+
+public abstract class PluginConverterRule extends ConverterRule {
+
+  protected PluginConverterRule(Class<? extends RelNode> clazz, RelTrait in, Convention out, String description) {
+    super(clazz, (Predicate<RelNode>) input -> true, in, out, DrillRelFactories.LOGICAL_BUILDER, description);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
new file mode 100644
index 0000000..8d350f4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
@@ -0,0 +1,30 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+
+/**
+ * Rule to convert a {@link Filter} to a {@link PluginFilterRel}.
+ */
+public class PluginFilterRule extends PluginConverterRule {
+
+  public PluginFilterRule(RelTrait in, Convention out) {
+    super(Filter.class, in, out, "PluginFilterRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Filter filter = (Filter) rel;
+    RelTraitSet traitSet = filter.getTraitSet().replace(getOutConvention());
+    return new PluginFilterRel(
+        getOutConvention(),
+        rel.getCluster(),
+        traitSet,
+        convert(filter.getInput(), getOutConvention()),
+        filter.getCondition());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
new file mode 100644
index 0000000..5b08891
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginIntermediatePrelRel;
+
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+public class PluginIntermediatePrelConverterRule extends ConverterRule {
+
+  private final Supplier<PluginImplementor> implementorFactory;
+
+  public PluginIntermediatePrelConverterRule(Supplier<PluginImplementor> implementorFactory) {
+    super(VertexDrel.class, (Predicate<RelNode>) input -> true, DrillRel.DRILL_LOGICAL,
+        Prel.DRILL_PHYSICAL, DrillRelFactories.LOGICAL_BUILDER, "Plugin_prel_Converter");
+    this.implementorFactory = implementorFactory;
+  }
+
+  @Override
+  public RelNode convert(RelNode in) {
+    return new PluginIntermediatePrelRel(
+        in.getCluster(),
+        in.getTraitSet().replace(getOutTrait()),
+        in.getInput(0),
+        implementorFactory);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java
new file mode 100644
index 0000000..fea2276
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java
@@ -0,0 +1,25 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+
+public class PluginLimitRule extends PluginConverterRule {
+
+  public PluginLimitRule(RelTrait in, Convention out) {
+    super(DrillLimitRelBase.class, in, out, "PluginLimitRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    DrillLimitRelBase sort = (DrillLimitRelBase) rel;
+    RelTraitSet traitSet =
+        sort.getTraitSet().replace(getOutConvention());
+    RelNode input = convert(sort.getInput(), sort.getInput().getTraitSet().replace(getOutConvention()).simplify());
+    return new PluginLimitRel(rel.getCluster(), traitSet, input,
+        sort.getOffset(), sort.getFetch());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java
new file mode 100644
index 0000000..d02bd1e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java
@@ -0,0 +1,27 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+
+/**
+ * Rule to convert a {@link Project} to a {@link PluginProjectRel}.
+ */
+public class PluginProjectRule extends PluginConverterRule {
+
+  public PluginProjectRule(RelTrait in, Convention out) {
+    super(Project.class, in, out, "PluginProjectRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Project project = (Project) rel;
+    RelTraitSet traitSet = project.getTraitSet().replace(getOutConvention());
+    return new PluginProjectRel(getOutConvention(), project.getCluster(), traitSet,
+        convert(project.getInput(), getOutConvention()), project.getProjects(),
+        project.getRowType());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java
new file mode 100644
index 0000000..0378b0e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java
@@ -0,0 +1,28 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+
+/**
+ * Rule to convert a {@link Sort} to a {@link PluginSortRel}.
+ */
+public class PluginSortRule extends PluginConverterRule {
+
+  public PluginSortRule(RelTrait in, Convention out) {
+    super(Sort.class, in, out, "PluginSortRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Sort sort = (Sort) rel;
+    RelTraitSet traitSet = sort.getTraitSet().replace(getOutConvention())
+            .replace(sort.getCollation());
+    RelNode input = convert(sort.getInput(), sort.getInput().getTraitSet().replace(getOutConvention()).simplify());
+    return new PluginSortRel(rel.getCluster(), traitSet, input,
+        sort.getCollation(), sort.offset, sort.fetch);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java
new file mode 100644
index 0000000..c85f65e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java
@@ -0,0 +1,42 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+
+public class PluginUnionRule extends PluginConverterRule {
+
+  public PluginUnionRule(RelTrait in, Convention out) {
+    super(Union.class, in, out, "PluginUnionRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Union union = (Union) rel;
+    RelTraitSet traitSet =
+        union.getTraitSet().replace(getOutConvention());
+    try {
+      return new PluginUnionRel(
+          rel.getCluster(),
+          traitSet,
+          convertList(union.getInputs(), getOutConvention()),
+          union.all,
+          true);
+    } catch (InvalidRelException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    // allow converting for union all only, since Drill adds extra aggregation for union distinct,
+    // so we will convert both union all and aggregation later
+    return call.<Union>rel(0).all;
+  }
+}