You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2021/08/05 17:23:20 UTC
[drill] 07/13: Incomplete changes for project
This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch mongo
in repository https://gitbox.apache.org/repos/asf/drill.git
commit a8bc68b61d212cb39476c0927fb90f641cb24a4c
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Mon Jul 19 19:11:59 2021 +0300
Incomplete changes for project
---
.../exec/store/mongo/MongoAggregateUtils.java | 8 +-
.../drill/exec/store/mongo/MongoFilterBuilder.java | 56 +++---
.../drill/exec/store/mongo/MongoGroupScan.java | 1 +
.../drill/exec/store/mongo/MongoRecordReader.java | 8 +-
.../drill/exec/store/mongo/MongoScanSpec.java | 21 ++-
.../drill/exec/store/mongo/MongoSubScan.java | 14 +-
.../common/{MongoCompareOp.java => MongoOp.java} | 30 +++-
.../store/mongo/plan/MongoPluginImplementor.java | 55 +++---
.../drill/exec/store/mongo/plan/MongoRules.java | 194 ++++-----------------
.../drill/exec/store/mongo/TestMongoQueries.java | 22 +++
10 files changed, 184 insertions(+), 225 deletions(-)
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
index e196258..817644d 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
@@ -8,7 +8,7 @@ import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.drill.exec.store.mongo.common.MongoCompareOp;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonElement;
@@ -42,7 +42,7 @@ public class MongoAggregateUtils {
}
static String quote(String s) {
- return "'" + s + "'"; // TODO: handle embedded quotes
+ return "'" + s + "'";
}
private static boolean needsQuote(String s) {
@@ -116,9 +116,9 @@ public class MongoAggregateUtils {
} else {
assert args.size() == 1;
String inName = inNames.get(args.get(0));
- expr = new BsonDocument(MongoCompareOp.COND.getCompareOp(),
+ expr = new BsonDocument(MongoOp.COND.getCompareOp(),
new BsonArray(Arrays.asList(
- new Document(MongoCompareOp.EQUAL.getCompareOp(),
+ new Document(MongoOp.EQUAL.getCompareOp(),
new BsonArray(Arrays.asList(
new BsonString(quote(inName)),
BsonNull.VALUE))).toBsonDocument(),
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
index 8eecf00..5bea34e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.mongo;
-import java.io.IOException;
import java.util.List;
import org.apache.drill.common.FunctionNames;
@@ -26,7 +25,7 @@ import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.drill.exec.store.mongo.common.MongoCompareOp;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,21 +93,21 @@ public class MongoFilterBuilder extends
List<LogicalExpression> args = op.args();
Document nodeScanSpec = null;
String functionName = op.getName();
- for (int i = 0; i < args.size(); ++i) {
+ for (LogicalExpression arg : args) {
switch (functionName) {
- case FunctionNames.AND:
- case FunctionNames.OR:
- if (nodeScanSpec == null) {
- nodeScanSpec = args.get(i).accept(this, null);
- } else {
- Document scanSpec = args.get(i).accept(this, null);
- if (scanSpec != null) {
- nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec);
+ case FunctionNames.AND:
+ case FunctionNames.OR:
+ if (nodeScanSpec == null) {
+ nodeScanSpec = arg.accept(this, null);
} else {
- allExpressionsConverted = false;
+ Document scanSpec = arg.accept(this, null);
+ if (scanSpec != null) {
+ nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec);
+ } else {
+ allExpressionsConverted = false;
+ }
}
- }
- break;
+ break;
}
}
return nodeScanSpec;
@@ -160,50 +159,49 @@ public class MongoFilterBuilder extends
}
private Document createMongoScanSpec(String functionName,
- SchemaPath field, Object fieldValue) throws ClassNotFoundException,
- IOException {
+ SchemaPath field, Object fieldValue) {
// extract the field name
String fieldName = field.getRootSegmentPath();
- MongoCompareOp compareOp = null;
+ MongoOp compareOp = null;
switch (functionName) {
case FunctionNames.EQ:
- compareOp = MongoCompareOp.EQUAL;
+ compareOp = MongoOp.EQUAL;
break;
case FunctionNames.NE:
- compareOp = MongoCompareOp.NOT_EQUAL;
+ compareOp = MongoOp.NOT_EQUAL;
break;
case FunctionNames.GE:
- compareOp = MongoCompareOp.GREATER_OR_EQUAL;
+ compareOp = MongoOp.GREATER_OR_EQUAL;
break;
case FunctionNames.GT:
- compareOp = MongoCompareOp.GREATER;
+ compareOp = MongoOp.GREATER;
break;
case FunctionNames.LE:
- compareOp = MongoCompareOp.LESS_OR_EQUAL;
+ compareOp = MongoOp.LESS_OR_EQUAL;
break;
case FunctionNames.LT:
- compareOp = MongoCompareOp.LESS;
+ compareOp = MongoOp.LESS;
break;
case FunctionNames.IS_NULL:
case "isNull":
case "is null":
- compareOp = MongoCompareOp.IFNULL;
+ compareOp = MongoOp.IFNULL;
break;
case FunctionNames.IS_NOT_NULL:
case "isNotNull":
case "is not null":
- compareOp = MongoCompareOp.IFNOTNULL;
+ compareOp = MongoOp.IFNOTNULL;
break;
}
if (compareOp != null) {
Document queryFilter = new Document();
- if (compareOp == MongoCompareOp.IFNULL) {
+ if (compareOp == MongoOp.IFNULL) {
queryFilter.put(fieldName,
- new Document(MongoCompareOp.EQUAL.getCompareOp(), null));
- } else if (compareOp == MongoCompareOp.IFNOTNULL) {
+ new Document(MongoOp.EQUAL.getCompareOp(), null));
+ } else if (compareOp == MongoOp.IFNOTNULL) {
queryFilter.put(fieldName,
- new Document(MongoCompareOp.NOT_EQUAL.getCompareOp(), null));
+ new Document(MongoOp.NOT_EQUAL.getCompareOp(), null));
} else {
queryFilter.put(fieldName, new Document(compareOp.getCompareOp(),
fieldValue));
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index bc3817e..12fd384 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -460,6 +460,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
.setMinFilters(chunkInfo.getMinFilters())
.setMaxFilters(chunkInfo.getMaxFilters())
.setFilter(scanSpec.getFilters())
+ .setFields(scanSpec.getFields())
.setDbName(scanSpec.getDbName())
.setCollectionName(scanSpec.getCollectionName())
.setHosts(chunkInfo.getChunkLocList());
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 5cb007f..820eb8a 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -70,7 +70,7 @@ public class MongoRecordReader extends AbstractRecordReader {
private Document filters;
private List<Bson> operations;
- private final Document fields;
+ private Document fields;
private final FragmentContext fragmentContext;
@@ -85,9 +85,9 @@ public class MongoRecordReader extends AbstractRecordReader {
public MongoRecordReader(BaseMongoSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
FragmentContext context, MongoStoragePlugin plugin) {
- fields = new Document();
+// fields = new Document();
// exclude _id field, if not mentioned by user.
- fields.put(DrillMongoConstants.ID, 0);
+// fields.put(DrillMongoConstants.ID, 0);
setColumns(projectedColumns);
fragmentContext = context;
this.plugin = plugin;
@@ -100,6 +100,8 @@ public class MongoRecordReader extends AbstractRecordReader {
shardedMongoSubScanSpec.getMinFilters(), shardedMongoSubScanSpec.getMaxFilters());
buildFilters(shardedMongoSubScanSpec.getFilter(), mergedFilters);
+
+ fields = shardedMongoSubScanSpec.getFields();
}
enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val;
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
index 17b548b..da10a54 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
@@ -33,20 +33,26 @@ public class MongoScanSpec {
private Document filters;
+ private Document fields;
+
private List<Bson> operations = new ArrayList<>();
- @JsonCreator
- public MongoScanSpec(@JsonProperty("dbName") String dbName,
- @JsonProperty("collectionName") String collectionName) {
+ public MongoScanSpec(String dbName,
+ String collectionName) {
this.dbName = dbName;
this.collectionName = collectionName;
}
- public MongoScanSpec(String dbName, String collectionName,
- Document filters, List<Bson> operations) {
+ @JsonCreator
+ public MongoScanSpec(@JsonProperty("dbName") String dbName,
+ @JsonProperty("collectionName") String collectionName,
+ @JsonProperty("filters") Document filters,
+ @JsonProperty("fields") Document fields,
+ @JsonProperty("operations") List<Bson> operations) {
this.dbName = dbName;
this.collectionName = collectionName;
this.filters = filters;
+ this.fields = fields;
this.operations = operations;
}
@@ -62,6 +68,10 @@ public class MongoScanSpec {
return filters;
}
+ public Document getFields() {
+ return fields;
+ }
+
public List<Bson> getOperations() {
return operations;
}
@@ -72,6 +82,7 @@ public class MongoScanSpec {
.field("dbName", dbName)
.field("collectionName", collectionName)
.field("filters", filters)
+ .field("fields", fields)
.field("operations", operations)
.toString();
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index 692a939..ef31f25 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -128,6 +128,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
protected Map<String, Object> minFilters;
protected Map<String, Object> maxFilters;
protected Document filter;
+ protected Document fields;
@JsonCreator
public ShardedMongoSubScanSpec(@JsonProperty("dbName") String dbName,
@@ -135,11 +136,13 @@ public class MongoSubScan extends AbstractBase implements SubScan {
@JsonProperty("hosts") List<String> hosts,
@JsonProperty("minFilters") Map<String, Object> minFilters,
@JsonProperty("maxFilters") Map<String, Object> maxFilters,
- @JsonProperty("filters") Document filters) {
+ @JsonProperty("filters") Document filters,
+ @JsonProperty("fields") Document fields) {
super(dbName, collectionName, hosts);
this.minFilters = minFilters;
this.maxFilters = maxFilters;
this.filter = filters;
+ this.fields = fields;
}
ShardedMongoSubScanSpec() {
@@ -172,6 +175,15 @@ public class MongoSubScan extends AbstractBase implements SubScan {
return this;
}
+ public Document getFields() {
+ return fields;
+ }
+
+ public ShardedMongoSubScanSpec setFields(Document fields) {
+ this.fields = fields;
+ return this;
+ }
+
@Override
public String toString() {
return new PlanStringBuilder(this)
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java
similarity index 67%
rename from contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
rename to contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java
index ef89bfb..55f8cc5 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java
@@ -17,14 +17,30 @@
*/
package org.apache.drill.exec.store.mongo.common;
-public enum MongoCompareOp {
- EQUAL("$eq"), NOT_EQUAL("$ne"), GREATER_OR_EQUAL("$gte"), GREATER("$gt"), LESS_OR_EQUAL(
- "$lte"), LESS("$lt"), IN("$in"), AND("$and"), OR("$or"), REGEX("$regex"), OPTIONS(
- "$options"), PROJECT("$project"), COND("$cond"), IFNULL("$ifNull"), IFNOTNULL(
- "$ifNotNull"), SUM("$sum"), GROUP_BY("$group"), EXISTS("$exists");
- private String compareOp;
+public enum MongoOp {
+ EQUAL("$eq"),
+ NOT_EQUAL("$ne"),
+ GREATER_OR_EQUAL("$gte"),
+ GREATER("$gt"),
+ LESS_OR_EQUAL("$lte"),
+ LESS("$lt"),
+ IN("$in"),
+ AND("$and"),
+ OR("$or"),
+ NOT("$not"),
+ REGEX("$regex"),
+ OPTIONS("$options"),
+ PROJECT("$project"),
+ COND("$cond"),
+ IFNULL("$ifNull"),
+ IFNOTNULL("$ifNotNull"),
+ SUM("$sum"),
+ GROUP_BY("$group"),
+ EXISTS("$exists");
- MongoCompareOp(String compareOp) {
+ private final String compareOp;
+
+ MongoOp(String compareOp) {
this.compareOp = compareOp;
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
index fad7c25..c359c77 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
@@ -1,10 +1,13 @@
package org.apache.drill.exec.store.mongo.plan;
import com.mongodb.client.model.Aggregates;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
@@ -26,6 +29,10 @@ import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
import org.apache.drill.exec.util.Utilities;
import org.bson.BsonDocument;
+import org.bson.BsonElement;
+import org.bson.BsonInt32;
+import org.bson.BsonString;
+import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
@@ -40,6 +47,7 @@ public class MongoPluginImplementor implements PluginImplementor {
private List<Bson> operations;
private Document filters;
private List<SchemaPath> columns;
+ private Document fields;
private boolean runAggregate;
@@ -90,24 +98,30 @@ public class MongoPluginImplementor implements PluginImplementor {
public void implement(PluginProjectRel project) throws IOException {
visitChild(project.getInput());
-// final MongoRules.RexToMongoTranslator translator =
-// new MongoRules.RexToMongoTranslator(
-// (JavaTypeFactory) project.getCluster().getTypeFactory(),
-// MongoRules.mongoFieldNames(project.getInput().getRowType()));
-// final List<String> items = new ArrayList<>();
-// for (Pair<RexNode, String> pair : project.getNamedProjects()) {
-// final String name = pair.right;
-// final String expr = pair.left.accept(translator);
-// items.add(expr.equals("'$" + name + "'")
-// ? MongoRules.maybeQuote(name) + ": 1"
-// : MongoRules.maybeQuote(name) + ": " + expr);
-// }
-// final String findString = Util.toString(items, "{", ", ", "}");
-// final String aggregateString = "{$project: " + findString + "}";
-// final Pair<String, String> op = Pair.of(findString, aggregateString);
+ MongoRules.RexToMongoTranslator translator =
+ new MongoRules.RexToMongoTranslator(
+ (JavaTypeFactory) project.getCluster().getTypeFactory(),
+ MongoRules.mongoFieldNames(project.getInput().getRowType()));
+ List<BsonElement> items = new ArrayList<>();
+ for (Pair<RexNode, String> pair : project.getNamedProjects()) {
+ String name = pair.right;
+ BsonValue expr = pair.left.accept(translator);
+ items.add(expr.equals(new BsonString("$" + name))
+ ? new BsonElement(MongoRules.maybeQuote(name), new BsonInt32(1))
+ : new BsonElement(MongoRules.maybeQuote(name), expr));
+ }
+ BsonDocument projection = Aggregates.project(new BsonDocument(items)).toBsonDocument();
+ if (runAggregate) {
+ operations.add(projection);
+ } else {
+ List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType());
+ this.columns = outNames.stream()
+ .map(SchemaPath::getSimplePath)
+ .collect(Collectors.toList());
+ }
// implementor.add(op.left, op.right);
- List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType());
+// List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType());
// Document fields = new Document();
// fields.put(DrillMongoConstants.ID, 0);
// List<String> inNames = MongoAggregateUtils.mongoFieldNames(project.getInput().getRowType());
@@ -118,9 +132,9 @@ public class MongoPluginImplementor implements PluginImplementor {
//
// operations.add(Aggregates.project(fields).toBsonDocument());
- this.columns = outNames.stream()
- .map(SchemaPath::getSimplePath)
- .collect(Collectors.toList());
+// this.columns = outNames.stream()
+// .map(SchemaPath::getSimplePath)
+// .collect(Collectors.toList());
}
@Override
@@ -187,6 +201,7 @@ public class MongoPluginImplementor implements PluginImplementor {
groupScan = (MongoGroupScan) Utilities.getDrillTable(scan.getTable()).getGroupScan();
operations = new ArrayList<>(this.groupScan.getScanSpec().getOperations());
filters = groupScan.getScanSpec().getFilters();
+ fields = groupScan.getScanSpec().getFields();
columns = groupScan.getColumns();
}
@@ -198,7 +213,7 @@ public class MongoPluginImplementor implements PluginImplementor {
@Override
public GroupScan getPhysicalOperator() throws IOException {
MongoScanSpec scanSpec = groupScan.getScanSpec();
- MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(), scanSpec.getCollectionName(), filters, operations);
+ MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(), scanSpec.getCollectionName(), filters, fields, operations);
return new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
newSpec, columns, runAggregate);
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java
index fddfa7c..7f0daf3 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.store.mongo.plan;
import org.apache.calcite.adapter.enumerable.RexImpTable;
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
@@ -32,75 +30,23 @@ import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.calcite.util.Bug;
-import org.apache.calcite.util.Util;
-import org.apache.drill.exec.planner.logical.DrillRel;
-import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
-import org.apache.drill.exec.store.plan.rule.PluginIntermediatePrelConverterRule;
-import org.apache.drill.exec.store.plan.rule.PluginAggregateRule;
-import org.apache.drill.exec.store.plan.rule.PluginFilterRule;
-import org.apache.drill.exec.store.plan.rule.PluginLimitRule;
-import org.apache.drill.exec.store.plan.rule.PluginProjectRule;
-import org.apache.drill.exec.store.plan.rule.PluginSortRule;
-import org.apache.drill.exec.store.plan.rule.PluginUnionRule;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.BsonValue;
import java.util.AbstractList;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class MongoRules {
- public List<RelOptRule> sortRules(Convention out) {
- return Arrays.asList(
- new PluginSortRule(Convention.NONE, out),
- new PluginSortRule(DrillRel.DRILL_LOGICAL, out)
- );
- }
-
- public List<RelOptRule> limitRules(Convention out) {
- return Arrays.asList(
- new PluginLimitRule(Convention.NONE, out),
- new PluginLimitRule(DrillRel.DRILL_LOGICAL, out)
- );
- }
-
- public List<RelOptRule> filterRules(Convention out) {
- return Arrays.asList(
- new PluginFilterRule(Convention.NONE, out),
- new PluginFilterRule(DrillRel.DRILL_LOGICAL, out)
- );
- }
-
- public List<RelOptRule> projectRules(Convention out) {
- return Arrays.asList(
- new PluginProjectRule(Convention.NONE, out),
- new PluginProjectRule(DrillRel.DRILL_LOGICAL, out)
- );
- }
-
- public List<RelOptRule> aggregateRules(Convention out) {
- return Arrays.asList(
- new PluginAggregateRule(Convention.NONE, out),
- new PluginAggregateRule(DrillRel.DRILL_LOGICAL, out)
- );
- }
-
- public List<RelOptRule> unionRules(Convention out) {
- return Arrays.asList(
- new PluginUnionRule(Convention.NONE, out),
- new PluginUnionRule(DrillRel.DRILL_LOGICAL, out)
- );
- }
-
- public RelOptRule vertexRule(Convention out) {
- return new VertexDrelConverterRule(out);
- }
-
- public static final RelOptRule PREL_CONVERTER_INSTANCE = new PluginIntermediatePrelConverterRule(MongoPluginImplementor::new);
-
/** Returns 'string' if it is a call to item['string'], null otherwise. */
public static String isItem(RexCall call) {
if (call.getOperator() != SqlStdOperatorTable.ITEM) {
@@ -156,7 +102,7 @@ public class MongoRules {
/** Translator from {@link RexNode} to strings in MongoDB's expression
* language. */
- static class RexToMongoTranslator extends RexVisitorImpl<String> {
+ static class RexToMongoTranslator extends RexVisitorImpl<BsonValue> {
private final JavaTypeFactory typeFactory;
private final List<String> inFields;
@@ -171,16 +117,16 @@ public class MongoRules {
MONGO_OPERATORS.put(SqlStdOperatorTable.PLUS, "$add");
MONGO_OPERATORS.put(SqlStdOperatorTable.MINUS, "$subtract");
// Boolean
- MONGO_OPERATORS.put(SqlStdOperatorTable.AND, "$and");
- MONGO_OPERATORS.put(SqlStdOperatorTable.OR, "$or");
- MONGO_OPERATORS.put(SqlStdOperatorTable.NOT, "$not");
+ MONGO_OPERATORS.put(SqlStdOperatorTable.AND, MongoOp.AND.getCompareOp());
+ MONGO_OPERATORS.put(SqlStdOperatorTable.OR, MongoOp.OR.getCompareOp());
+ MONGO_OPERATORS.put(SqlStdOperatorTable.NOT, MongoOp.NOT.getCompareOp());
// Comparison
- MONGO_OPERATORS.put(SqlStdOperatorTable.EQUALS, "$eq");
- MONGO_OPERATORS.put(SqlStdOperatorTable.NOT_EQUALS, "$ne");
- MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN, "$gt");
- MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, "$gte");
- MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN, "$lt");
- MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, "$lte");
+ MONGO_OPERATORS.put(SqlStdOperatorTable.EQUALS, MongoOp.EQUAL.getCompareOp());
+ MONGO_OPERATORS.put(SqlStdOperatorTable.NOT_EQUALS, MongoOp.NOT_EQUAL.getCompareOp());
+ MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN, MongoOp.GREATER.getCompareOp());
+ MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, MongoOp.GREATER_OR_EQUAL.getCompareOp());
+ MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN, MongoOp.LESS.getCompareOp());
+ MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, MongoOp.LESS_OR_EQUAL.getCompareOp());
}
protected RexToMongoTranslator(JavaTypeFactory typeFactory,
@@ -190,43 +136,45 @@ public class MongoRules {
this.inFields = inFields;
}
- @Override public String visitLiteral(RexLiteral literal) {
+ @Override
+ public BsonValue visitLiteral(RexLiteral literal) {
if (literal.getValue() == null) {
- return "null";
+ return BsonNull.VALUE;
}
- return "{$literal: "
- + RexToLixTranslator.translateLiteral(literal, literal.getType(),
- typeFactory, RexImpTable.NullAs.NOT_POSSIBLE)
- + "}";
+ return new BsonDocument("$literal", new BsonString(
+ RexToLixTranslator.translateLiteral(literal, literal.getType(),
+ typeFactory, RexImpTable.NullAs.NOT_POSSIBLE).toString()));
}
- @Override public String visitInputRef(RexInputRef inputRef) {
- return maybeQuote(
+ @Override
+ public BsonValue visitInputRef(RexInputRef inputRef) {
+ return new BsonString(
"$" + inFields.get(inputRef.getIndex()));
}
- @Override public String visitCall(RexCall call) {
+ @Override
+ public BsonValue visitCall(RexCall call) {
String name = isItem(call);
if (name != null) {
- return "'$" + name + "'";
+ return new BsonString("'$" + name + "'");
}
- final List<String> strings = new ArrayList<>();//visitList(call.operands);
+ List<BsonValue> strings = call.operands.stream()
+ .map(operand -> operand.accept(this))
+ .collect(Collectors.toList());
+
if (call.getKind() == SqlKind.CAST) {
return strings.get(0);
}
String stdOperator = MONGO_OPERATORS.get(call.getOperator());
if (stdOperator != null) {
- return "{" + stdOperator + ": [" + Util.commaList(strings) + "]}";
+ return new BsonDocument(stdOperator, new BsonArray(strings));
}
if (call.getOperator() == SqlStdOperatorTable.ITEM) {
final RexNode op1 = call.operands.get(1);
if (op1 instanceof RexLiteral
&& op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) {
- if (!Bug.CALCITE_194_FIXED) {
- return "'" + stripQuotes(strings.get(0)) + "["
- + ((RexLiteral) op1).getValue2() + "]'";
- }
- return strings.get(0) + "[" + strings.get(1) + "]";
+ return new BsonDocument("$arrayElemAt", new BsonArray(
+ Arrays.asList(strings.get(0), new BsonInt32(((RexLiteral) op1).getValueAs(Integer.class)))));
}
}
if (call.getOperator() == SqlStdOperatorTable.CASE) {
@@ -253,7 +201,7 @@ public class MongoRules {
}
}
sb.append(finish);
- return sb.toString();
+ return BsonDocument.parse(sb.toString());
}
throw new IllegalArgumentException("Translation of " + call
+ " is not supported by MongoProject");
@@ -435,72 +383,6 @@ public class MongoRules {
*/
-
-// /**
-// * Rule to convert an {@link org.apache.calcite.rel.logical.Union} to a
-// * {@link MongoUnionRel}.
-// */
-// public static class MongoUnionRule
-// extends MongoConverterRule {
-// private MongoUnionRule(MongoConvention out) {
-// super(
-// Union.class,
-// Convention.NONE,
-// out,
-// "MongoUnionRule");
-// }
-//
-// public RelNode convert(RelNode rel) {
-// final Union union = (Union) rel;
-// final RelTraitSet traitSet =
-// union.getTraitSet().replace(out);
-// return new MongoUnionRel(
-// rel.getCluster(),
-// traitSet,
-// convertList(union.getInputs(), traitSet),
-// union.all);
-// }
-// }
-//
-// public static class MongoUnionRel
-// extends Union
-// implements MongoRel {
-// public MongoUnionRel(
-// RelOptCluster cluster,
-// RelTraitSet traitSet,
-// List<RelNode> inputs,
-// boolean all) {
-// super(cluster, traitSet, inputs, all);
-// }
-//
-// public MongoUnionRel copy(
-// RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
-// return new MongoUnionRel(getCluster(), traitSet, inputs, all);
-// }
-//
-// @Override public RelOptCost computeSelfCost(RelOptPlanner planner) {
-// return super.computeSelfCost(planner).multiplyBy(.1);
-// }
-//
-// public SqlString implement(MongoImplementor implementor) {
-// return setOpSql(this, implementor, "UNION");
-// }
-// }
-//
-// private static SqlString setOpSql(
-// SetOp setOpRel, MongoImplementor implementor, String op) {
-// final SqlBuilder buf = new SqlBuilder(implementor.dialect);
-// for (Ord<RelNode> input : Ord.zip(setOpRel.getInputs())) {
-// if (input.i > 0) {
-// implementor.newline(buf)
-// .append(op + (setOpRel.all ? " ALL " : ""));
-// implementor.newline(buf);
-// }
-// buf.append(implementor.visitChild(input.i, input.e));
-// }
-// return buf.toSqlString();
-// }
-
/*
/**
* Rule to convert an {@link org.apache.calcite.rel.logical.LogicalIntersect}
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
index 090bad0..e8f698b 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
@@ -249,4 +249,26 @@ public class TestMongoQueries extends MongoTestBase {
.baselineValues("0005", "Apple Fritter")
.go();
}
+
+ @Test
+ public void testProjectPushDown() throws Exception {
+ String query = "select t.id * t.id as c from mongo.%s.`%s` t";
+
+ queryBuilder()
+ .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+ .planMatcher()
+ .include("MongoGroupScan.*project.*multiply")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(1)
+ .baselineValues(4)
+ .baselineValues(9)
+ .baselineValues(16)
+ .baselineValues(25)
+ .go();
+ }
}