You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2021/08/05 17:23:20 UTC

[drill] 07/13: Incomplete changes for project

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch mongo
in repository https://gitbox.apache.org/repos/asf/drill.git

commit a8bc68b61d212cb39476c0927fb90f641cb24a4c
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Mon Jul 19 19:11:59 2021 +0300

    Incomplete changes for project
---
 .../exec/store/mongo/MongoAggregateUtils.java      |   8 +-
 .../drill/exec/store/mongo/MongoFilterBuilder.java |  56 +++---
 .../drill/exec/store/mongo/MongoGroupScan.java     |   1 +
 .../drill/exec/store/mongo/MongoRecordReader.java  |   8 +-
 .../drill/exec/store/mongo/MongoScanSpec.java      |  21 ++-
 .../drill/exec/store/mongo/MongoSubScan.java       |  14 +-
 .../common/{MongoCompareOp.java => MongoOp.java}   |  30 +++-
 .../store/mongo/plan/MongoPluginImplementor.java   |  55 +++---
 .../drill/exec/store/mongo/plan/MongoRules.java    | 194 ++++-----------------
 .../drill/exec/store/mongo/TestMongoQueries.java   |  22 +++
 10 files changed, 184 insertions(+), 225 deletions(-)

diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
index e196258..817644d 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
@@ -8,7 +8,7 @@ import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.drill.exec.store.mongo.common.MongoCompareOp;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
 import org.bson.BsonArray;
 import org.bson.BsonDocument;
 import org.bson.BsonElement;
@@ -42,7 +42,7 @@ public class MongoAggregateUtils {
   }
 
   static String quote(String s) {
-    return "'" + s + "'"; // TODO: handle embedded quotes
+    return "'" + s + "'";
   }
 
   private static boolean needsQuote(String s) {
@@ -116,9 +116,9 @@ public class MongoAggregateUtils {
       } else {
         assert args.size() == 1;
         String inName = inNames.get(args.get(0));
-        expr = new BsonDocument(MongoCompareOp.COND.getCompareOp(),
+        expr = new BsonDocument(MongoOp.COND.getCompareOp(),
             new BsonArray(Arrays.asList(
-                new Document(MongoCompareOp.EQUAL.getCompareOp(),
+                new Document(MongoOp.EQUAL.getCompareOp(),
                     new BsonArray(Arrays.asList(
                         new BsonString(quote(inName)),
                         BsonNull.VALUE))).toBsonDocument(),
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
index 8eecf00..5bea34e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.mongo;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.common.FunctionNames;
@@ -26,7 +25,7 @@ import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.drill.exec.store.mongo.common.MongoCompareOp;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
 import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,21 +93,21 @@ public class MongoFilterBuilder extends
     List<LogicalExpression> args = op.args();
     Document nodeScanSpec = null;
     String functionName = op.getName();
-    for (int i = 0; i < args.size(); ++i) {
+    for (LogicalExpression arg : args) {
       switch (functionName) {
-      case FunctionNames.AND:
-      case FunctionNames.OR:
-        if (nodeScanSpec == null) {
-          nodeScanSpec = args.get(i).accept(this, null);
-        } else {
-          Document scanSpec = args.get(i).accept(this, null);
-          if (scanSpec != null) {
-            nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec);
+        case FunctionNames.AND:
+        case FunctionNames.OR:
+          if (nodeScanSpec == null) {
+            nodeScanSpec = arg.accept(this, null);
           } else {
-            allExpressionsConverted = false;
+            Document scanSpec = arg.accept(this, null);
+            if (scanSpec != null) {
+              nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec);
+            } else {
+              allExpressionsConverted = false;
+            }
           }
-        }
-        break;
+          break;
       }
     }
     return nodeScanSpec;
@@ -160,50 +159,49 @@ public class MongoFilterBuilder extends
   }
 
   private Document createMongoScanSpec(String functionName,
-      SchemaPath field, Object fieldValue) throws ClassNotFoundException,
-      IOException {
+      SchemaPath field, Object fieldValue) {
     // extract the field name
     String fieldName = field.getRootSegmentPath();
-    MongoCompareOp compareOp = null;
+    MongoOp compareOp = null;
     switch (functionName) {
     case FunctionNames.EQ:
-      compareOp = MongoCompareOp.EQUAL;
+      compareOp = MongoOp.EQUAL;
       break;
     case FunctionNames.NE:
-      compareOp = MongoCompareOp.NOT_EQUAL;
+      compareOp = MongoOp.NOT_EQUAL;
       break;
     case FunctionNames.GE:
-      compareOp = MongoCompareOp.GREATER_OR_EQUAL;
+      compareOp = MongoOp.GREATER_OR_EQUAL;
       break;
     case FunctionNames.GT:
-      compareOp = MongoCompareOp.GREATER;
+      compareOp = MongoOp.GREATER;
       break;
     case FunctionNames.LE:
-      compareOp = MongoCompareOp.LESS_OR_EQUAL;
+      compareOp = MongoOp.LESS_OR_EQUAL;
       break;
     case FunctionNames.LT:
-      compareOp = MongoCompareOp.LESS;
+      compareOp = MongoOp.LESS;
       break;
     case FunctionNames.IS_NULL:
     case "isNull":
     case "is null":
-      compareOp = MongoCompareOp.IFNULL;
+      compareOp = MongoOp.IFNULL;
       break;
     case FunctionNames.IS_NOT_NULL:
     case "isNotNull":
     case "is not null":
-      compareOp = MongoCompareOp.IFNOTNULL;
+      compareOp = MongoOp.IFNOTNULL;
       break;
     }
 
     if (compareOp != null) {
       Document queryFilter = new Document();
-      if (compareOp == MongoCompareOp.IFNULL) {
+      if (compareOp == MongoOp.IFNULL) {
         queryFilter.put(fieldName,
-            new Document(MongoCompareOp.EQUAL.getCompareOp(), null));
-      } else if (compareOp == MongoCompareOp.IFNOTNULL) {
+            new Document(MongoOp.EQUAL.getCompareOp(), null));
+      } else if (compareOp == MongoOp.IFNOTNULL) {
         queryFilter.put(fieldName,
-            new Document(MongoCompareOp.NOT_EQUAL.getCompareOp(), null));
+            new Document(MongoOp.NOT_EQUAL.getCompareOp(), null));
       } else {
         queryFilter.put(fieldName, new Document(compareOp.getCompareOp(),
             fieldValue));
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index bc3817e..12fd384 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -460,6 +460,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
         .setMinFilters(chunkInfo.getMinFilters())
         .setMaxFilters(chunkInfo.getMaxFilters())
         .setFilter(scanSpec.getFilters())
+        .setFields(scanSpec.getFields())
         .setDbName(scanSpec.getDbName())
         .setCollectionName(scanSpec.getCollectionName())
         .setHosts(chunkInfo.getChunkLocList());
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 5cb007f..820eb8a 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -70,7 +70,7 @@ public class MongoRecordReader extends AbstractRecordReader {
 
   private Document filters;
   private List<Bson> operations;
-  private final Document fields;
+  private Document fields;
 
   private final FragmentContext fragmentContext;
 
@@ -85,9 +85,9 @@ public class MongoRecordReader extends AbstractRecordReader {
   public MongoRecordReader(BaseMongoSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
       FragmentContext context, MongoStoragePlugin plugin) {
 
-    fields = new Document();
+//    fields = new Document();
     // exclude _id field, if not mentioned by user.
-    fields.put(DrillMongoConstants.ID, 0);
+//    fields.put(DrillMongoConstants.ID, 0);
     setColumns(projectedColumns);
     fragmentContext = context;
     this.plugin = plugin;
@@ -100,6 +100,8 @@ public class MongoRecordReader extends AbstractRecordReader {
           shardedMongoSubScanSpec.getMinFilters(), shardedMongoSubScanSpec.getMaxFilters());
 
       buildFilters(shardedMongoSubScanSpec.getFilter(), mergedFilters);
+
+      fields = shardedMongoSubScanSpec.getFields();
     }
     enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
     enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val;
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
index 17b548b..da10a54 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
@@ -33,20 +33,26 @@ public class MongoScanSpec {
 
   private Document filters;
 
+  private Document fields;
+
   private List<Bson> operations = new ArrayList<>();
 
-  @JsonCreator
-  public MongoScanSpec(@JsonProperty("dbName") String dbName,
-      @JsonProperty("collectionName") String collectionName) {
+  public MongoScanSpec(String dbName,
+      String collectionName) {
     this.dbName = dbName;
     this.collectionName = collectionName;
   }
 
-  public MongoScanSpec(String dbName, String collectionName,
-      Document filters, List<Bson> operations) {
+  @JsonCreator
+  public MongoScanSpec(@JsonProperty("dbName") String dbName,
+      @JsonProperty("collectionName") String collectionName,
+      @JsonProperty("filters") Document filters,
+      @JsonProperty("fields") Document fields,
+      @JsonProperty("operations") List<Bson> operations) {
     this.dbName = dbName;
     this.collectionName = collectionName;
     this.filters = filters;
+    this.fields = fields;
     this.operations = operations;
   }
 
@@ -62,6 +68,10 @@ public class MongoScanSpec {
     return filters;
   }
 
+  public Document getFields() {
+    return fields;
+  }
+
   public List<Bson> getOperations() {
     return operations;
   }
@@ -72,6 +82,7 @@ public class MongoScanSpec {
         .field("dbName", dbName)
         .field("collectionName", collectionName)
         .field("filters", filters)
+        .field("fields", fields)
         .field("operations", operations)
         .toString();
   }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index 692a939..ef31f25 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -128,6 +128,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
     protected Map<String, Object> minFilters;
     protected Map<String, Object> maxFilters;
     protected Document filter;
+    protected Document fields;
 
     @JsonCreator
     public ShardedMongoSubScanSpec(@JsonProperty("dbName") String dbName,
@@ -135,11 +136,13 @@ public class MongoSubScan extends AbstractBase implements SubScan {
         @JsonProperty("hosts") List<String> hosts,
         @JsonProperty("minFilters") Map<String, Object> minFilters,
         @JsonProperty("maxFilters") Map<String, Object> maxFilters,
-        @JsonProperty("filters") Document filters) {
+        @JsonProperty("filters") Document filters,
+        @JsonProperty("fields") Document fields) {
       super(dbName, collectionName, hosts);
       this.minFilters = minFilters;
       this.maxFilters = maxFilters;
       this.filter = filters;
+      this.fields = fields;
     }
 
     ShardedMongoSubScanSpec() {
@@ -172,6 +175,15 @@ public class MongoSubScan extends AbstractBase implements SubScan {
       return this;
     }
 
+    public Document getFields() {
+      return fields;
+    }
+
+    public ShardedMongoSubScanSpec setFields(Document fields) {
+      this.fields = fields;
+      return this;
+    }
+
     @Override
     public String toString() {
       return new PlanStringBuilder(this)
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java
similarity index 67%
rename from contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
rename to contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java
index ef89bfb..55f8cc5 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java
@@ -17,14 +17,30 @@
  */
 package org.apache.drill.exec.store.mongo.common;
 
-public enum MongoCompareOp {
-  EQUAL("$eq"), NOT_EQUAL("$ne"), GREATER_OR_EQUAL("$gte"), GREATER("$gt"), LESS_OR_EQUAL(
-      "$lte"), LESS("$lt"), IN("$in"), AND("$and"), OR("$or"), REGEX("$regex"), OPTIONS(
-      "$options"), PROJECT("$project"), COND("$cond"), IFNULL("$ifNull"), IFNOTNULL(
-      "$ifNotNull"), SUM("$sum"), GROUP_BY("$group"), EXISTS("$exists");
-  private String compareOp;
+public enum MongoOp {
+  EQUAL("$eq"),
+  NOT_EQUAL("$ne"),
+  GREATER_OR_EQUAL("$gte"),
+  GREATER("$gt"),
+  LESS_OR_EQUAL("$lte"),
+  LESS("$lt"),
+  IN("$in"),
+  AND("$and"),
+  OR("$or"),
+  NOT("$not"),
+  REGEX("$regex"),
+  OPTIONS("$options"),
+  PROJECT("$project"),
+  COND("$cond"),
+  IFNULL("$ifNull"),
+  IFNOTNULL("$ifNotNull"),
+  SUM("$sum"),
+  GROUP_BY("$group"),
+  EXISTS("$exists");
 
-  MongoCompareOp(String compareOp) {
+  private final String compareOp;
+
+  MongoOp(String compareOp) {
     this.compareOp = compareOp;
   }
 
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
index fad7c25..c359c77 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
@@ -1,10 +1,13 @@
 package org.apache.drill.exec.store.mongo.plan;
 
 import com.mongodb.client.model.Aggregates;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
@@ -26,6 +29,10 @@ import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
 import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
 import org.apache.drill.exec.util.Utilities;
 import org.bson.BsonDocument;
+import org.bson.BsonElement;
+import org.bson.BsonInt32;
+import org.bson.BsonString;
+import org.bson.BsonValue;
 import org.bson.Document;
 import org.bson.conversions.Bson;
 
@@ -40,6 +47,7 @@ public class MongoPluginImplementor implements PluginImplementor {
   private List<Bson> operations;
   private Document filters;
   private List<SchemaPath> columns;
+  private Document fields;
 
   private boolean runAggregate;
 
@@ -90,24 +98,30 @@ public class MongoPluginImplementor implements PluginImplementor {
   public void implement(PluginProjectRel project) throws IOException {
     visitChild(project.getInput());
 
-//    final MongoRules.RexToMongoTranslator translator =
-//        new MongoRules.RexToMongoTranslator(
-//            (JavaTypeFactory) project.getCluster().getTypeFactory(),
-//            MongoRules.mongoFieldNames(project.getInput().getRowType()));
-//    final List<String> items = new ArrayList<>();
-//    for (Pair<RexNode, String> pair : project.getNamedProjects()) {
-//      final String name = pair.right;
-//      final String expr = pair.left.accept(translator);
-//      items.add(expr.equals("'$" + name + "'")
-//          ? MongoRules.maybeQuote(name) + ": 1"
-//          : MongoRules.maybeQuote(name) + ": " + expr);
-//    }
-//    final String findString = Util.toString(items, "{", ", ", "}");
-//    final String aggregateString = "{$project: " + findString + "}";
-//    final Pair<String, String> op = Pair.of(findString, aggregateString);
+    MongoRules.RexToMongoTranslator translator =
+        new MongoRules.RexToMongoTranslator(
+            (JavaTypeFactory) project.getCluster().getTypeFactory(),
+            MongoRules.mongoFieldNames(project.getInput().getRowType()));
+    List<BsonElement> items = new ArrayList<>();
+    for (Pair<RexNode, String> pair : project.getNamedProjects()) {
+      String name = pair.right;
+      BsonValue expr = pair.left.accept(translator);
+      items.add(expr.equals(new BsonString("$" + name))
+          ? new BsonElement(MongoRules.maybeQuote(name), new BsonInt32(1))
+          : new BsonElement(MongoRules.maybeQuote(name), expr));
+    }
+    BsonDocument projection = Aggregates.project(new BsonDocument(items)).toBsonDocument();
+    if (runAggregate) {
+      operations.add(projection);
+    } else {
+      List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType());
+      this.columns = outNames.stream()
+        .map(SchemaPath::getSimplePath)
+        .collect(Collectors.toList());
+    }
 //    implementor.add(op.left, op.right);
 
-    List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType());
+//    List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType());
 //    Document fields = new Document();
 //    fields.put(DrillMongoConstants.ID, 0);
 //    List<String> inNames = MongoAggregateUtils.mongoFieldNames(project.getInput().getRowType());
@@ -118,9 +132,9 @@ public class MongoPluginImplementor implements PluginImplementor {
 //
 //    operations.add(Aggregates.project(fields).toBsonDocument());
 
-    this.columns = outNames.stream()
-        .map(SchemaPath::getSimplePath)
-        .collect(Collectors.toList());
+//    this.columns = outNames.stream()
+//        .map(SchemaPath::getSimplePath)
+//        .collect(Collectors.toList());
   }
 
   @Override
@@ -187,6 +201,7 @@ public class MongoPluginImplementor implements PluginImplementor {
     groupScan = (MongoGroupScan) Utilities.getDrillTable(scan.getTable()).getGroupScan();
     operations = new ArrayList<>(this.groupScan.getScanSpec().getOperations());
     filters = groupScan.getScanSpec().getFilters();
+    fields = groupScan.getScanSpec().getFields();
     columns = groupScan.getColumns();
   }
 
@@ -198,7 +213,7 @@ public class MongoPluginImplementor implements PluginImplementor {
   @Override
   public GroupScan getPhysicalOperator() throws IOException {
     MongoScanSpec scanSpec = groupScan.getScanSpec();
-    MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(), scanSpec.getCollectionName(), filters, operations);
+    MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(), scanSpec.getCollectionName(), filters, fields, operations);
     return new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
         newSpec, columns, runAggregate);
   }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java
index fddfa7c..7f0daf3 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.store.mongo.plan;
 import org.apache.calcite.adapter.enumerable.RexImpTable;
 import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
@@ -32,75 +30,23 @@ import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.calcite.util.Bug;
-import org.apache.calcite.util.Util;
-import org.apache.drill.exec.planner.logical.DrillRel;
-import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
-import org.apache.drill.exec.store.plan.rule.PluginIntermediatePrelConverterRule;
-import org.apache.drill.exec.store.plan.rule.PluginAggregateRule;
-import org.apache.drill.exec.store.plan.rule.PluginFilterRule;
-import org.apache.drill.exec.store.plan.rule.PluginLimitRule;
-import org.apache.drill.exec.store.plan.rule.PluginProjectRule;
-import org.apache.drill.exec.store.plan.rule.PluginSortRule;
-import org.apache.drill.exec.store.plan.rule.PluginUnionRule;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.BsonValue;
 
 import java.util.AbstractList;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class MongoRules {
 
-  public List<RelOptRule> sortRules(Convention out) {
-    return Arrays.asList(
-        new PluginSortRule(Convention.NONE, out),
-        new PluginSortRule(DrillRel.DRILL_LOGICAL, out)
-    );
-  }
-
-  public List<RelOptRule> limitRules(Convention out) {
-    return Arrays.asList(
-        new PluginLimitRule(Convention.NONE, out),
-        new PluginLimitRule(DrillRel.DRILL_LOGICAL, out)
-    );
-  }
-
-  public List<RelOptRule> filterRules(Convention out) {
-    return Arrays.asList(
-        new PluginFilterRule(Convention.NONE, out),
-        new PluginFilterRule(DrillRel.DRILL_LOGICAL, out)
-    );
-  }
-
-  public List<RelOptRule> projectRules(Convention out) {
-    return Arrays.asList(
-        new PluginProjectRule(Convention.NONE, out),
-        new PluginProjectRule(DrillRel.DRILL_LOGICAL, out)
-    );
-  }
-
-  public List<RelOptRule> aggregateRules(Convention out) {
-    return Arrays.asList(
-        new PluginAggregateRule(Convention.NONE, out),
-        new PluginAggregateRule(DrillRel.DRILL_LOGICAL, out)
-    );
-  }
-
-  public List<RelOptRule> unionRules(Convention out) {
-    return Arrays.asList(
-        new PluginUnionRule(Convention.NONE, out),
-        new PluginUnionRule(DrillRel.DRILL_LOGICAL, out)
-    );
-  }
-
-  public RelOptRule vertexRule(Convention out) {
-    return new VertexDrelConverterRule(out);
-  }
-
-  public static final RelOptRule PREL_CONVERTER_INSTANCE = new PluginIntermediatePrelConverterRule(MongoPluginImplementor::new);
-
   /** Returns 'string' if it is a call to item['string'], null otherwise. */
   public static String isItem(RexCall call) {
     if (call.getOperator() != SqlStdOperatorTable.ITEM) {
@@ -156,7 +102,7 @@ public class MongoRules {
 
   /** Translator from {@link RexNode} to strings in MongoDB's expression
    * language. */
-  static class RexToMongoTranslator extends RexVisitorImpl<String> {
+  static class RexToMongoTranslator extends RexVisitorImpl<BsonValue> {
     private final JavaTypeFactory typeFactory;
     private final List<String> inFields;
 
@@ -171,16 +117,16 @@ public class MongoRules {
       MONGO_OPERATORS.put(SqlStdOperatorTable.PLUS, "$add");
       MONGO_OPERATORS.put(SqlStdOperatorTable.MINUS, "$subtract");
       // Boolean
-      MONGO_OPERATORS.put(SqlStdOperatorTable.AND, "$and");
-      MONGO_OPERATORS.put(SqlStdOperatorTable.OR, "$or");
-      MONGO_OPERATORS.put(SqlStdOperatorTable.NOT, "$not");
+      MONGO_OPERATORS.put(SqlStdOperatorTable.AND, MongoOp.AND.getCompareOp());
+      MONGO_OPERATORS.put(SqlStdOperatorTable.OR, MongoOp.OR.getCompareOp());
+      MONGO_OPERATORS.put(SqlStdOperatorTable.NOT, MongoOp.NOT.getCompareOp());
       // Comparison
-      MONGO_OPERATORS.put(SqlStdOperatorTable.EQUALS, "$eq");
-      MONGO_OPERATORS.put(SqlStdOperatorTable.NOT_EQUALS, "$ne");
-      MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN, "$gt");
-      MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, "$gte");
-      MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN, "$lt");
-      MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, "$lte");
+      MONGO_OPERATORS.put(SqlStdOperatorTable.EQUALS, MongoOp.EQUAL.getCompareOp());
+      MONGO_OPERATORS.put(SqlStdOperatorTable.NOT_EQUALS, MongoOp.NOT_EQUAL.getCompareOp());
+      MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN, MongoOp.GREATER.getCompareOp());
+      MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, MongoOp.GREATER_OR_EQUAL.getCompareOp());
+      MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN, MongoOp.LESS.getCompareOp());
+      MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, MongoOp.LESS_OR_EQUAL.getCompareOp());
     }
 
     protected RexToMongoTranslator(JavaTypeFactory typeFactory,
@@ -190,43 +136,45 @@ public class MongoRules {
       this.inFields = inFields;
     }
 
-    @Override public String visitLiteral(RexLiteral literal) {
+    @Override
+    public BsonValue visitLiteral(RexLiteral literal) {
       if (literal.getValue() == null) {
-        return "null";
+        return BsonNull.VALUE;
       }
-      return "{$literal: "
-          + RexToLixTranslator.translateLiteral(literal, literal.getType(),
-              typeFactory, RexImpTable.NullAs.NOT_POSSIBLE)
-          + "}";
+      return new BsonDocument("$literal", new BsonString(
+          RexToLixTranslator.translateLiteral(literal, literal.getType(),
+              typeFactory, RexImpTable.NullAs.NOT_POSSIBLE).toString()));
     }
 
-    @Override public String visitInputRef(RexInputRef inputRef) {
-      return maybeQuote(
+    @Override
+    public BsonValue visitInputRef(RexInputRef inputRef) {
+      return new BsonString(
           "$" + inFields.get(inputRef.getIndex()));
     }
 
-    @Override public String visitCall(RexCall call) {
+    @Override
+    public BsonValue visitCall(RexCall call) {
       String name = isItem(call);
       if (name != null) {
-        return "'$" + name + "'";
+        return new BsonString("'$" + name + "'");
       }
-      final List<String> strings = new ArrayList<>();//visitList(call.operands);
+      List<BsonValue> strings = call.operands.stream()
+          .map(operand -> operand.accept(this))
+          .collect(Collectors.toList());
+
       if (call.getKind() == SqlKind.CAST) {
         return strings.get(0);
       }
       String stdOperator = MONGO_OPERATORS.get(call.getOperator());
       if (stdOperator != null) {
-        return "{" + stdOperator + ": [" + Util.commaList(strings) + "]}";
+        return new BsonDocument(stdOperator, new BsonArray(strings));
       }
       if (call.getOperator() == SqlStdOperatorTable.ITEM) {
         final RexNode op1 = call.operands.get(1);
         if (op1 instanceof RexLiteral
             && op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) {
-          if (!Bug.CALCITE_194_FIXED) {
-            return "'" + stripQuotes(strings.get(0)) + "["
-                + ((RexLiteral) op1).getValue2() + "]'";
-          }
-          return strings.get(0) + "[" + strings.get(1) + "]";
+          return new BsonDocument("$arrayElemAt", new BsonArray(
+              Arrays.asList(strings.get(0), new BsonInt32(((RexLiteral) op1).getValueAs(Integer.class)))));
         }
       }
       if (call.getOperator() == SqlStdOperatorTable.CASE) {
@@ -253,7 +201,7 @@ public class MongoRules {
           }
         }
         sb.append(finish);
-        return sb.toString();
+        return BsonDocument.parse(sb.toString());
       }
       throw new IllegalArgumentException("Translation of " + call
           + " is not supported by MongoProject");
@@ -435,72 +383,6 @@ public class MongoRules {
 
 */
 
-
-//  /**
-//   * Rule to convert an {@link org.apache.calcite.rel.logical.Union} to a
-//   * {@link MongoUnionRel}.
-//   */
-//  public static class MongoUnionRule
-//      extends MongoConverterRule {
-//    private MongoUnionRule(MongoConvention out) {
-//      super(
-//          Union.class,
-//          Convention.NONE,
-//          out,
-//          "MongoUnionRule");
-//    }
-//
-//    public RelNode convert(RelNode rel) {
-//      final Union union = (Union) rel;
-//      final RelTraitSet traitSet =
-//          union.getTraitSet().replace(out);
-//      return new MongoUnionRel(
-//          rel.getCluster(),
-//          traitSet,
-//          convertList(union.getInputs(), traitSet),
-//          union.all);
-//    }
-//  }
-//
-//  public static class MongoUnionRel
-//      extends Union
-//      implements MongoRel {
-//    public MongoUnionRel(
-//        RelOptCluster cluster,
-//        RelTraitSet traitSet,
-//        List<RelNode> inputs,
-//        boolean all) {
-//      super(cluster, traitSet, inputs, all);
-//    }
-//
-//    public MongoUnionRel copy(
-//        RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
-//      return new MongoUnionRel(getCluster(), traitSet, inputs, all);
-//    }
-//
-//    @Override public RelOptCost computeSelfCost(RelOptPlanner planner) {
-//      return super.computeSelfCost(planner).multiplyBy(.1);
-//    }
-//
-//    public SqlString implement(MongoImplementor implementor) {
-//      return setOpSql(this, implementor, "UNION");
-//    }
-//  }
-//
-//  private static SqlString setOpSql(
-//      SetOp setOpRel, MongoImplementor implementor, String op) {
-//    final SqlBuilder buf = new SqlBuilder(implementor.dialect);
-//    for (Ord<RelNode> input : Ord.zip(setOpRel.getInputs())) {
-//      if (input.i > 0) {
-//        implementor.newline(buf)
-//            .append(op + (setOpRel.all ? " ALL " : ""));
-//        implementor.newline(buf);
-//      }
-//      buf.append(implementor.visitChild(input.i, input.e));
-//    }
-//    return buf.toSqlString();
-//  }
-
   /*
   /**
    * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalIntersect}
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
index 090bad0..e8f698b 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
@@ -249,4 +249,26 @@ public class TestMongoQueries extends MongoTestBase {
         .baselineValues("0005", "Apple Fritter")
         .go();
   }
+
+  @Test
+  public void testProjectPushDown() throws Exception {
+    String query = "select t.id * t.id as c from mongo.%s.`%s` t";
+
+    queryBuilder()
+        .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+        .planMatcher()
+        .include("MongoGroupScan.*project.*multiply")
+        .match();
+
+    testBuilder()
+        .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+        .unOrdered()
+        .baselineColumns("c")
+        .baselineValues(1)
+        .baselineValues(4)
+        .baselineValues(9)
+        .baselineValues(16)
+        .baselineValues(25)
+        .go();
+  }
 }