You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2021/08/05 17:23:15 UTC
[drill] 02/13: DRILL-7971: Intermediate commit
This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch mongo
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 32d04143847783ce2061be2368b625a170c41543
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Tue Jul 13 20:34:03 2021 +0300
DRILL-7971: Intermediate commit
---
.../exec/store/mongo/BaseMongoSubScanSpec.java | 64 ++
.../exec/store/mongo/MongoAggregateUtils.java | 235 ++++++++
.../drill/exec/store/mongo/MongoFilterBuilder.java | 54 +-
.../drill/exec/store/mongo/MongoGroupScan.java | 120 ++--
.../store/mongo/MongoPushDownAggregateForScan.java | 301 ----------
.../store/mongo/MongoPushDownFilterForScan.java | 9 +-
.../drill/exec/store/mongo/MongoRecordReader.java | 44 +-
.../exec/store/mongo/MongoScanBatchCreator.java | 12 +-
.../drill/exec/store/mongo/MongoScanSpec.java | 30 +-
.../drill/exec/store/mongo/MongoStoragePlugin.java | 56 +-
.../drill/exec/store/mongo/MongoSubScan.java | 121 ++--
.../store/mongo/plan/MongoPluginImplementor.java | 206 +++++++
.../store/mongo/plan/MongoPluginRulesProvider.java | 76 +++
.../drill/exec/store/mongo/plan/MongoRules.java | 649 +++++++++++++++++++++
.../drill/exec/store/mongo/plan/MongoTable.java | 197 +++++++
.../store/mongo/schema/MongoDatabaseSchema.java | 3 +-
.../store/mongo/schema/MongoSchemaFactory.java | 4 +-
.../exec/store/mongo/TestMongoChunkAssignment.java | 2 +-
.../exec/store/mongo/TestMongoLimitPushDown.java | 20 +-
.../drill/exec/store/mongo/TestMongoQueries.java | 53 +-
.../exec/physical/base/AbstractGroupScan.java | 6 +
.../apache/drill/exec/physical/base/GroupScan.java | 5 +
.../exec/planner/common/DrillSortRelBase.java | 29 +
.../drill/exec/planner/logical/DrillSortRel.java | 15 +-
.../drill/exec/planner/physical/SortPrel.java | 14 +-
.../drill/exec/store/PlannableStoragePlugin.java | 189 ++++++
.../drill/exec/store/PluginRulesProvider.java | 16 +
.../drill/exec/store/plan/PluginImplementor.java | 39 ++
.../exec/store/plan/rel/PluginAggregateRel.java | 87 +++
.../exec/store/plan/rel/PluginDrillTable.java | 37 ++
.../drill/exec/store/plan/rel/PluginFilterRel.java | 59 ++
.../store/plan/rel/PluginIntermediatePrelRel.java | 83 +++
.../drill/exec/store/plan/rel/PluginLimitRel.java | 54 ++
.../drill/exec/store/plan/rel/PluginPrel.java | 83 +++
.../exec/store/plan/rel/PluginProjectRel.java | 59 ++
.../drill/exec/store/plan/rel/PluginRel.java | 11 +
.../drill/exec/store/plan/rel/PluginSortRel.java | 61 ++
.../drill/exec/store/plan/rel/PluginUnionRel.java | 42 ++
.../store/plan/rel/StoragePluginTableScan.java | 74 +++
.../exec/store/plan/rule/PluginAggregateRule.java | 38 ++
.../exec/store/plan/rule/PluginConverterRule.java | 16 +
.../exec/store/plan/rule/PluginFilterRule.java | 30 +
.../rule/PluginIntermediatePrelConverterRule.java | 50 ++
.../exec/store/plan/rule/PluginLimitRule.java | 25 +
.../exec/store/plan/rule/PluginProjectRule.java | 27 +
.../drill/exec/store/plan/rule/PluginSortRule.java | 28 +
.../exec/store/plan/rule/PluginUnionRule.java | 42 ++
47 files changed, 2886 insertions(+), 589 deletions(-)
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
new file mode 100644
index 0000000..fd05500
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
@@ -0,0 +1,64 @@
+package org.apache.drill.exec.store.mongo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.PlanStringBuilder;
+
+import java.util.List;
+
+public class BaseMongoSubScanSpec {
+
+ protected String dbName;
+ protected String collectionName;
+ protected List<String> hosts;
+
+ @JsonCreator
+ public BaseMongoSubScanSpec(@JsonProperty("dbName") String dbName,
+ @JsonProperty("collectionName") String collectionName,
+ @JsonProperty("hosts") List<String> hosts) {
+ this.dbName = dbName;
+ this.collectionName = collectionName;
+ this.hosts = hosts;
+ }
+
+ BaseMongoSubScanSpec() {
+
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public BaseMongoSubScanSpec setDbName(String dbName) {
+ this.dbName = dbName;
+ return this;
+ }
+
+ public String getCollectionName() {
+ return collectionName;
+ }
+
+ public BaseMongoSubScanSpec setCollectionName(String collectionName) {
+ this.collectionName = collectionName;
+ return this;
+ }
+
+ public List<String> getHosts() {
+ return hosts;
+ }
+
+ public BaseMongoSubScanSpec setHosts(List<String> hosts) {
+ this.hosts = hosts;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("dbName", dbName)
+ .field("collectionName", collectionName)
+ .field("hosts", hosts)
+ .toString();
+
+ }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
new file mode 100644
index 0000000..a38c494
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
@@ -0,0 +1,235 @@
+package org.apache.drill.exec.store.mongo;
+
+import com.mongodb.client.model.Accumulators;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.BsonField;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Util;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiFunction;
+
+public class MongoAggregateUtils {
+
+ public static List<String> mongoFieldNames(RelDataType rowType) {
+ return SqlValidatorUtil.uniquify(
+ new AbstractList<String>() {
+ @Override public String get(int index) {
+ final String name = rowType.getFieldList().get(index).getName();
+ return name.startsWith("$") ? "_" + name.substring(2) : name;
+ }
+
+ @Override public int size() {
+ return rowType.getFieldCount();
+ }
+ },
+ SqlValidatorUtil.EXPR_SUGGESTER, true);
+ }
+
+ static String maybeQuote(String s) {
+ if (!needsQuote(s)) {
+ return s;
+ }
+ return quote(s);
+ }
+
+ static String quote(String s) {
+ return "'" + s + "'"; // TODO: handle embedded quotes
+ }
+
+ private static boolean needsQuote(String s) {
+ for (int i = 0, n = s.length(); i < n; i++) {
+ char c = s.charAt(i);
+ if (!Character.isJavaIdentifierPart(c)
+ || c == '$') {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static List<Bson> getAggregateOperations(Aggregate aggregate, RelDataType rowType, MongoGroupScan groupScan) {
+// List<String> list = new ArrayList<>();
+ List<BsonField> docList = new ArrayList<>();
+ List<String> inNames = mongoFieldNames(rowType);
+ List<String> outNames = mongoFieldNames(aggregate.getRowType());
+ int i = 0;
+ Object id;
+ if (aggregate.getGroupSet().cardinality() == 1) {
+ String inName = inNames.get(aggregate.getGroupSet().nth(0));
+// list.add("_id: " + maybeQuote("$" + inName));
+// docList.add(new BsonField("_id",
+// new BsonDocument(maybeQuote("$" + inName), new BsonString(maybeQuote("$" + inName)))));
+ id = "$" + inName;
+ ++i;
+ } else {
+ List<String> keys = new ArrayList<>();
+ for (int group : aggregate.getGroupSet()) {
+ String inName = inNames.get(group);
+ keys.add(inName + ": " + quote("$" + inName));
+ ++i;
+ }
+// list.add("_id: " + Util.toString(keys, "{", ", ", "}"));
+// docList.add(new BsonField("_id", BsonDocument.parse(Util.toString(keys, "{", ", ", "}"))));
+ id = BsonDocument.parse(Util.toString(keys, "{", ", ", "}"));
+ }
+ for (AggregateCall aggCall : aggregate.getAggCallList()) {
+// list.add(
+// maybeQuote(outNames.get(i)) + ": "
+// + toMongo(aggCall.getAggregation(), inNames, aggCall.getArgList()));
+ docList.add(bsonAggregate(inNames, outNames.get(i), aggCall));
+ i++;
+ }
+// List<Pair<String, String>> aggsList = new ArrayList<>();
+ List<Bson> docAggList = new ArrayList<>();
+// aggsList.add(Pair.of(null, "{$group: " + Util.toString(list, "{", ", ", "}") + "}"));
+ docAggList.add(Aggregates.group(id, docList).toBsonDocument());
+ List<String> fixups;
+ if (aggregate.getGroupSet().cardinality() == 1) {
+ fixups = new AbstractList<String>() {
+ @Override public String get(int index) {
+ String outName = outNames.get(index);
+ return maybeQuote(outName) + ": "
+ + maybeQuote("$" + (index == 0 ? "_id" : outName));
+ }
+
+ @Override public int size() {
+ return outNames.size();
+ }
+ };
+ } else {
+ fixups = new ArrayList<>();
+ fixups.add("_id: 0");
+ i = 0;
+ for (int group : aggregate.getGroupSet()) {
+ fixups.add(
+ maybeQuote(outNames.get(group))
+ + ": "
+ + maybeQuote("$_id." + outNames.get(group)));
+ ++i;
+ }
+ for (AggregateCall ignored : aggregate.getAggCallList()) {
+ String outName = outNames.get(i++);
+ fixups.add(
+ maybeQuote(outName) + ": " + maybeQuote(
+ "$" + outName));
+ }
+ }
+ if (!aggregate.getGroupSet().isEmpty()) {
+// aggsList.add(Pair.of(null, "{$project: " + Util.toString(fixups, "{", ", ", "}") + "}"));
+ docAggList.add(new Document("$project", BsonDocument.parse(Util.toString(fixups, "{", ", ", "}") + "}")));
+ }
+
+ List<Bson> allOperations = new ArrayList<>(groupScan.getScanSpec().getOperations());
+// Pair.right(aggsList).stream()
+// .map(BsonDocument::parse)
+// .forEach(allOperations::add);
+ allOperations.addAll(docAggList);
+ return allOperations;
+ }
+
+ private static BsonField bsonAggregate(List<String> inNames, String outName, AggregateCall aggCall) {
+ String aggregationName = aggCall.getAggregation().getName();
+ List<Integer> args = aggCall.getArgList();
+ if (aggregationName.equals(SqlStdOperatorTable.COUNT.getName())) {
+ if (args.size() == 0) {
+ return Accumulators.sum(maybeQuote(outName), 1);
+ } else {
+ assert args.size() == 1;
+ String inName = inNames.get(args.get(0));
+ return Accumulators.sum(maybeQuote(outName),
+ new BsonDocument("$cond",
+ new BsonArray(Arrays.asList(
+ new Document("$eq",
+ new BsonArray(Arrays.asList(
+ new BsonString(quote(inName)), BsonNull.VALUE))).toBsonDocument(),
+ new BsonInt32(0),
+ new BsonInt32(1)
+ ))
+ )
+ );
+ }
+ } else {
+ BiFunction<String, Object, BsonField> mongoAccumulator = mongoAccumulator(aggregationName);
+ if (mongoAccumulator != null) {
+ return mongoAccumulator.apply(maybeQuote(outName), maybeQuote("$" + inNames.get(0)));
+ }
+ }
+ return null;
+ }
+
+ private static <T> BiFunction<String, T, BsonField> mongoAccumulator(String aggregationName) {
+ if (aggregationName.equals(SqlStdOperatorTable.SUM.getName())
+ || aggregationName.equals(SqlStdOperatorTable.SUM0.getName())) {
+ return Accumulators::sum;
+ } else if (aggregationName.equals(SqlStdOperatorTable.MIN.getName())) {
+ return Accumulators::min;
+ } else if (aggregationName.equals(SqlStdOperatorTable.MAX.getName())) {
+ return Accumulators::max;
+ } else if (aggregationName.equals(SqlStdOperatorTable.AVG.getName())) {
+ return Accumulators::avg;
+ } else if (aggregationName.equals(SqlStdOperatorTable.FIRST.getName())) {
+ return Accumulators::first;
+ } else if (aggregationName.equals(SqlStdOperatorTable.LAST.getName())) {
+ return Accumulators::last;
+ } else if (aggregationName.equals(SqlStdOperatorTable.STDDEV.getName())
+ || aggregationName.equals(SqlStdOperatorTable.STDDEV_SAMP.getName())) {
+ return Accumulators::stdDevSamp;
+ } else if (aggregationName.equals(SqlStdOperatorTable.STDDEV_POP.getName())) {
+ return Accumulators::stdDevPop;
+ }
+ return null;
+ }
+
+// private static String mongoAggName(String aggregationName) {
+// if (aggregationName.equals(SqlStdOperatorTable.SUM.getName())
+// || aggregationName.equals(SqlStdOperatorTable.SUM0.getName())) {
+// return "$sum";
+// } else if (aggregationName.equals(SqlStdOperatorTable.MIN.getName())) {
+// return "$min";
+// } else if (aggregationName.equals(SqlStdOperatorTable.MAX.getName())) {
+// return "$max";
+// } else if (aggregationName.equals(SqlStdOperatorTable.AVG.getName())) {
+// return "$avg";
+// }
+// return null;
+// }
+
+// private static String toMongo(SqlAggFunction aggregation, List<String> inNames,
+// List<Integer> args) {
+// if (aggregation.getName().equals(SqlStdOperatorTable.COUNT.getName())) {
+// if (args.size() == 0) {
+// return "{$sum: 1}";
+// } else {
+// assert args.size() == 1;
+// final String inName = inNames.get(args.get(0));
+// return "{$sum: {$cond: [ {$eq: ["
+// + quote(inName)
+// + ", null]}, 0, 1]}}";
+// }
+// } else {
+// String mongoAggName = mongoAggName(aggregation.getName());
+// assert args.size() == 1;
+// if (mongoAggName != null) {
+// String inName = inNames.get(args.get(0));
+// return "{" + mongoAggName + ": " + maybeQuote("$" + inName) + "}";
+// }
+// throw new AssertionError("unknown aggregate " + aggregation);
+// }
+// }
+
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
index 10720e4..8eecf00 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
@@ -32,7 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MongoFilterBuilder extends
- AbstractExprVisitor<MongoScanSpec, Void, RuntimeException> implements
+ AbstractExprVisitor<Document, Void, RuntimeException> implements
DrillMongoConstants {
private static final Logger logger = LoggerFactory
.getLogger(MongoFilterBuilder.class);
@@ -46,37 +46,36 @@ public class MongoFilterBuilder extends
this.le = conditionExp;
}
- public MongoScanSpec parseTree() {
- MongoScanSpec parsedSpec = le.accept(this, null);
+ public Document parseTree() {
+ Document parsedSpec = le.accept(this, null);
if (parsedSpec != null) {
- parsedSpec = mergeScanSpecs(FunctionNames.AND, this.groupScan.getScanSpec(),
+ parsedSpec = mergeScanSpecs(FunctionNames.AND, null,
parsedSpec);
}
return parsedSpec;
}
- private MongoScanSpec mergeScanSpecs(String functionName,
- MongoScanSpec leftScanSpec, MongoScanSpec rightScanSpec) {
+ private Document mergeScanSpecs(String functionName,
+ Document leftScanSpec, Document rightScanSpec) {
Document newFilter = new Document();
switch (functionName) {
case FunctionNames.AND:
- if (leftScanSpec.getFilters() != null
- && rightScanSpec.getFilters() != null) {
- newFilter = MongoUtils.andFilterAtIndex(leftScanSpec.getFilters(),
- rightScanSpec.getFilters());
- } else if (leftScanSpec.getFilters() != null) {
- newFilter = leftScanSpec.getFilters();
+ if (leftScanSpec != null
+ && rightScanSpec != null) {
+ newFilter = MongoUtils.andFilterAtIndex(leftScanSpec,
+ rightScanSpec);
+ } else if (leftScanSpec != null) {
+ newFilter = leftScanSpec;
} else {
- newFilter = rightScanSpec.getFilters();
+ newFilter = rightScanSpec;
}
break;
case FunctionNames.OR:
- newFilter = MongoUtils.orFilterAtIndex(leftScanSpec.getFilters(),
- rightScanSpec.getFilters());
+ newFilter = MongoUtils.orFilterAtIndex(leftScanSpec,
+ rightScanSpec);
}
- return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan
- .getScanSpec().getCollectionName(), newFilter);
+ return newFilter;
}
public boolean isAllExpressionsConverted() {
@@ -84,16 +83,16 @@ public class MongoFilterBuilder extends
}
@Override
- public MongoScanSpec visitUnknown(LogicalExpression e, Void value)
+ public Document visitUnknown(LogicalExpression e, Void value)
throws RuntimeException {
allExpressionsConverted = false;
return null;
}
@Override
- public MongoScanSpec visitBooleanOperator(BooleanOperator op, Void value) {
+ public Document visitBooleanOperator(BooleanOperator op, Void value) {
List<LogicalExpression> args = op.args();
- MongoScanSpec nodeScanSpec = null;
+ Document nodeScanSpec = null;
String functionName = op.getName();
for (int i = 0; i < args.size(); ++i) {
switch (functionName) {
@@ -102,7 +101,7 @@ public class MongoFilterBuilder extends
if (nodeScanSpec == null) {
nodeScanSpec = args.get(i).accept(this, null);
} else {
- MongoScanSpec scanSpec = args.get(i).accept(this, null);
+ Document scanSpec = args.get(i).accept(this, null);
if (scanSpec != null) {
nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec);
} else {
@@ -116,9 +115,9 @@ public class MongoFilterBuilder extends
}
@Override
- public MongoScanSpec visitFunctionCall(FunctionCall call, Void value)
+ public Document visitFunctionCall(FunctionCall call, Void value)
throws RuntimeException {
- MongoScanSpec nodeScanSpec = null;
+ Document nodeScanSpec = null;
String functionName = call.getName();
List<LogicalExpression> args = call.args();
@@ -138,8 +137,8 @@ public class MongoFilterBuilder extends
switch (functionName) {
case FunctionNames.AND:
case FunctionNames.OR:
- MongoScanSpec leftScanSpec = args.get(0).accept(this, null);
- MongoScanSpec rightScanSpec = args.get(1).accept(this, null);
+ Document leftScanSpec = args.get(0).accept(this, null);
+ Document rightScanSpec = args.get(1).accept(this, null);
if (leftScanSpec != null && rightScanSpec != null) {
nodeScanSpec = mergeScanSpecs(functionName, leftScanSpec,
rightScanSpec);
@@ -160,7 +159,7 @@ public class MongoFilterBuilder extends
return nodeScanSpec;
}
- private MongoScanSpec createMongoScanSpec(String functionName,
+ private Document createMongoScanSpec(String functionName,
SchemaPath field, Object fieldValue) throws ClassNotFoundException,
IOException {
// extract the field name
@@ -209,8 +208,7 @@ public class MongoFilterBuilder extends
queryFilter.put(fieldName, new Document(compareOp.getCompareOp(),
fieldValue));
}
- return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan
- .getScanSpec().getCollectionName(), queryFilter);
+ return queryFilter;
}
return null;
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index 6662e8c..bc3817e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -50,7 +50,7 @@ import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.mongo.MongoSubScan.MongoSubScanSpec;
+import org.apache.drill.exec.store.mongo.MongoSubScan.ShardedMongoSubScanSpec;
import org.apache.drill.exec.store.mongo.common.ChunkInfo;
import org.bson.Document;
import org.bson.codecs.BsonTypeClassMap;
@@ -79,13 +79,13 @@ import com.mongodb.client.MongoDatabase;
public class MongoGroupScan extends AbstractGroupScan implements
DrillMongoConstants {
- private static final Integer select = 1;
+ private static final int SELECT = 1;
private static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
- private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = Comparator.comparingInt(List::size);
+ private static final Comparator<List<BaseMongoSubScanSpec>> LIST_SIZE_COMPARATOR = Comparator.comparingInt(List::size);
- private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
+ private static final Comparator<List<BaseMongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
private MongoStoragePlugin storagePlugin;
@@ -95,9 +95,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
private List<SchemaPath> columns;
- private int maxRecords;
-
- private Map<Integer, List<MongoSubScanSpec>> endpointFragmentMapping;
+ private Map<Integer, List<BaseMongoSubScanSpec>> endpointFragmentMapping;
// Sharding with replica sets contains all the replica server addresses for
// each chunk.
@@ -107,7 +105,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
private final Stopwatch watch = Stopwatch.createUnstarted();
- private boolean filterPushedDown = false;
+ private boolean useAggregate;
@JsonCreator
public MongoGroupScan(
@@ -115,21 +113,21 @@ public class MongoGroupScan extends AbstractGroupScan implements
@JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
@JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
- @JacksonInject StoragePluginRegistry pluginRegistry,
- @JsonProperty("maxRecords") int maxRecords) throws IOException {
+ @JsonProperty("useAggregate") boolean useAggregate,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException {
this(userName,
pluginRegistry.resolve(storagePluginConfig, MongoStoragePlugin.class),
- scanSpec, columns, maxRecords);
+ scanSpec, columns, useAggregate);
}
public MongoGroupScan(String userName, MongoStoragePlugin storagePlugin,
- MongoScanSpec scanSpec, List<SchemaPath> columns, int maxRecords) throws IOException {
+ MongoScanSpec scanSpec, List<SchemaPath> columns, boolean useAggregate) throws IOException {
super(userName);
this.storagePlugin = storagePlugin;
this.storagePluginConfig = storagePlugin.getConfig();
this.scanSpec = scanSpec;
this.columns = columns;
- this.maxRecords = maxRecords;
+ this.useAggregate = useAggregate;
init();
}
@@ -148,18 +146,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
this.chunksMapping = that.chunksMapping;
this.chunksInverseMapping = that.chunksInverseMapping;
this.endpointFragmentMapping = that.endpointFragmentMapping;
- this.filterPushedDown = that.filterPushedDown;
- this.maxRecords = that.maxRecords;
- }
-
- @JsonIgnore
- public boolean isFilterPushedDown() {
- return filterPushedDown;
- }
-
- @JsonIgnore
- public void setFilterPushedDown(boolean filterPushedDown) {
- this.filterPushedDown = filterPushedDown;
+ this.useAggregate = that.useAggregate;
}
private boolean isShardedCluster(MongoClient client) {
@@ -179,7 +166,9 @@ public class MongoGroupScan extends AbstractGroupScan implements
MongoClient client = storagePlugin.getClient();
chunksMapping = Maps.newHashMap();
chunksInverseMapping = Maps.newLinkedHashMap();
- if (isShardedCluster(client)) {
+ if (useAggregate && isShardedCluster(client)) {
+ handleUnshardedCollection(getPrimaryShardInfo());
+ } else if (isShardedCluster(client)) {
MongoDatabase db = client.getDatabase(CONFIG);
MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS);
Document filter = new Document();
@@ -190,9 +179,9 @@ public class MongoGroupScan extends AbstractGroupScan implements
+ this.scanSpec.getCollectionName());
Document projection = new Document();
- projection.put(SHARD, select);
- projection.put(MIN, select);
- projection.put(MAX, select);
+ projection.put(SHARD, SELECT);
+ projection.put(MIN, SELECT);
+ projection.put(MAX, SELECT);
FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection);
MongoCursor<Document> iterator = chunkCursor.iterator();
@@ -200,7 +189,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
MongoCollection<Document> shardsCollection = db.getCollection(SHARDS);
projection = new Document();
- projection.put(HOST, select);
+ projection.put(HOST, SELECT);
boolean hasChunks = false;
while (iterator.hasNext()) {
@@ -292,7 +281,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
//Identify the primary shard of the queried database.
MongoCollection<Document> collection = database.getCollection(DATABASES);
Bson filter = new Document(ID, this.scanSpec.getDbName());
- Bson projection = new Document(PRIMARY, select);
+ Bson projection = new Document(PRIMARY, SELECT);
Document document = Objects.requireNonNull(collection.find(filter).projection(projection).first());
String shardName = document.getString(PRIMARY);
Preconditions.checkNotNull(shardName);
@@ -300,7 +289,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
//Identify the host(s) on which this shard resides.
MongoCollection<Document> shardsCol = database.getCollection(SHARDS);
filter = new Document(ID, shardName);
- projection = new Document(HOST, select);
+ projection = new Document(HOST, SELECT);
Document hostInfo = Objects.requireNonNull(shardsCol.find(filter).projection(projection).first());
String hostEntry = hostInfo.getString(HOST);
Preconditions.checkNotNull(hostEntry);
@@ -361,7 +350,8 @@ public class MongoGroupScan extends AbstractGroupScan implements
public GroupScan clone(int maxRecords) {
MongoGroupScan clone = new MongoGroupScan(this);
- clone.maxRecords = maxRecords;
+ clone.useAggregate = true;
+ clone.getScanSpec().getOperations().add(new Document("$limit", maxRecords));
return clone;
}
@@ -409,7 +399,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
if (slots != null) {
for (ChunkInfo chunkInfo : chunkEntry.getValue()) {
Integer slotIndex = slots.poll();
- List<MongoSubScanSpec> subScanSpecList = endpointFragmentMapping
+ List<BaseMongoSubScanSpec> subScanSpecList = endpointFragmentMapping
.get(slotIndex);
subScanSpecList.add(buildSubScanSpecAndGet(chunkInfo));
slots.offer(slotIndex);
@@ -418,11 +408,11 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
}
- PriorityQueue<List<MongoSubScanSpec>> minHeap = new PriorityQueue<>(
+ PriorityQueue<List<BaseMongoSubScanSpec>> minHeap = new PriorityQueue<>(
numSlots, LIST_SIZE_COMPARATOR);
- PriorityQueue<List<MongoSubScanSpec>> maxHeap = new PriorityQueue<>(
+ PriorityQueue<List<BaseMongoSubScanSpec>> maxHeap = new PriorityQueue<>(
numSlots, LIST_SIZE_COMPARATOR_REV);
- for (List<MongoSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
+ for (List<BaseMongoSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
if (listOfScan.size() < minPerEndpointSlot) {
minHeap.offer(listOfScan);
} else if (listOfScan.size() > minPerEndpointSlot) {
@@ -433,7 +423,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
if (chunksToAssignSet.size() > 0) {
for (Entry<String, List<ChunkInfo>> chunkEntry : chunksToAssignSet) {
for (ChunkInfo chunkInfo : chunkEntry.getValue()) {
- List<MongoSubScanSpec> smallestList = minHeap.poll();
+ List<BaseMongoSubScanSpec> smallestList = minHeap.poll();
smallestList.add(buildSubScanSpecAndGet(chunkInfo));
minHeap.offer(smallestList);
}
@@ -441,8 +431,8 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
while (minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) {
- List<MongoSubScanSpec> smallestList = minHeap.poll();
- List<MongoSubScanSpec> largestList = maxHeap.poll();
+ List<BaseMongoSubScanSpec> smallestList = minHeap.poll();
+ List<BaseMongoSubScanSpec> largestList = maxHeap.poll();
smallestList.add(largestList.remove(largestList.size() - 1));
if (largestList.size() > minPerEndpointSlot) {
maxHeap.offer(largestList);
@@ -458,16 +448,21 @@ public class MongoGroupScan extends AbstractGroupScan implements
endpointFragmentMapping.toString());
}
- private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
- return new MongoSubScanSpec()
- .setDbName(scanSpec.getDbName())
- .setCollectionName(scanSpec.getCollectionName())
- .setHosts(chunkInfo.getChunkLocList())
+ private BaseMongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
+ if (useAggregate) {
+ return new MongoSubScan.MongoSubScanSpec()
+ .setOperations(scanSpec.getOperations())
+ .setDbName(scanSpec.getDbName())
+ .setCollectionName(scanSpec.getCollectionName())
+ .setHosts(chunkInfo.getChunkLocList());
+ }
+ return new ShardedMongoSubScanSpec()
.setMinFilters(chunkInfo.getMinFilters())
.setMaxFilters(chunkInfo.getMaxFilters())
- .setMaxRecords(maxRecords)
.setFilter(scanSpec.getFilters())
- .setAggregates(scanSpec.getAggregates());
+ .setDbName(scanSpec.getDbName())
+ .setCollectionName(scanSpec.getCollectionName())
+ .setHosts(chunkInfo.getChunkLocList());
}
@Override
@@ -488,18 +483,11 @@ public class MongoGroupScan extends AbstractGroupScan implements
@Override
public ScanStats getScanStats() {
- long recordCount;
try{
MongoClient client = storagePlugin.getClient();
MongoDatabase db = client.getDatabase(scanSpec.getDbName());
MongoCollection<Document> collection = db.getCollection(scanSpec.getCollectionName());
- long numDocs = collection.estimatedDocumentCount();
-
- if (maxRecords > 0 && numDocs > 0) {
- recordCount = Math.min(maxRecords, numDocs);
- } else {
- recordCount = numDocs;
- }
+ long recordCount = collection.estimatedDocumentCount();
float approxDiskCost = 0;
if (recordCount != 0) {
@@ -564,9 +552,6 @@ public class MongoGroupScan extends AbstractGroupScan implements
@Override
public GroupScan applyLimit(int maxRecords) {
- if (maxRecords == this.maxRecords) {
- return null;
- }
return clone(maxRecords);
}
@@ -586,26 +571,33 @@ public class MongoGroupScan extends AbstractGroupScan implements
return storagePluginConfig;
}
- @JsonProperty("maxRecords")
- public int getMaxRecords() { return maxRecords; }
-
@JsonIgnore
public MongoStoragePlugin getStoragePlugin() {
return storagePlugin;
}
+ @JsonProperty("useAggregate")
+ public void setUseAggregate(boolean useAggregate) {
+ this.useAggregate = useAggregate;
+ }
+
+ @JsonProperty("useAggregate")
+ public boolean isUseAggregate() {
+ return useAggregate;
+ }
+
@Override
public String toString() {
return new PlanStringBuilder(this)
.field("MongoScanSpec", scanSpec)
.field("columns", columns)
- .field("maxRecords", maxRecords)
+ .field("useAggregate", useAggregate)
.toString();
}
@VisibleForTesting
MongoGroupScan() {
- super((String)null);
+ super((String) null);
}
@JsonIgnore
@@ -622,7 +614,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
@JsonIgnore
@VisibleForTesting
- void setInverseChunsMapping(Map<String, List<ChunkInfo>> chunksInverseMapping) {
+ void setInverseChunksMapping(Map<String, List<ChunkInfo>> chunksInverseMapping) {
this.chunksInverseMapping = chunksInverseMapping;
}
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownAggregateForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownAggregateForScan.java
deleted file mode 100644
index f7e1a00..0000000
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownAggregateForScan.java
+++ /dev/null
@@ -1,301 +0,0 @@
-package org.apache.drill.exec.store.mongo;
-
-import org.apache.calcite.avatica.util.DateTimeUtils;
-import org.apache.calcite.linq4j.function.Function1;
-import org.apache.calcite.linq4j.tree.Primitive;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.fun.SqlSumAggFunction;
-import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
-import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.Util;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.planner.common.DrillScanRelBase;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.bson.BsonDocument;
-import org.bson.BsonString;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-import java.io.IOException;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class MongoPushDownAggregateForScan extends StoragePluginOptimizerRule {
- public static final StoragePluginOptimizerRule INSTANCE = new MongoPushDownAggregateForScan(RelOptHelper.some(Aggregate.class, RelOptHelper.any(DrillScanRelBase.class)), "MongoPushDownAggregateForScan");
- public static final StoragePluginOptimizerRule PROJ_INSTANCE = new MongoPushDownAggregateForScan(RelOptHelper.some(Aggregate.class, RelOptHelper.some(Project.class, RelOptHelper.any(DrillScanRelBase.class))), "MongoPushDownAggregateForScan_project");
-
- public MongoPushDownAggregateForScan(RelOptRuleOperand operand, String desc) {
- super(operand, desc);
- }
-
- static List<String> mongoFieldNames(final RelDataType rowType) {
- return SqlValidatorUtil.uniquify(
- new AbstractList<String>() {
- @Override public String get(int index) {
- final String name = rowType.getFieldList().get(index).getName();
- return name.startsWith("$") ? "_" + name.substring(2) : name;
- }
-
- @Override public int size() {
- return rowType.getFieldCount();
- }
- },
- SqlValidatorUtil.EXPR_SUGGESTER, true);
- }
-
- static String maybeQuote(String s) {
- if (!needsQuote(s)) {
- return s;
- }
- return quote(s);
- }
-
- static String quote(String s) {
- return "'" + s + "'"; // TODO: handle embedded quotes
- }
-
- private static boolean needsQuote(String s) {
- for (int i = 0, n = s.length(); i < n; i++) {
- char c = s.charAt(i);
- if (!Character.isJavaIdentifierPart(c)
- || c == '$') {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- Aggregate aggregate = call.rel(0);
- DrillScanRelBase scan = call.rel(1);
-
- MongoGroupScan groupScan = (MongoGroupScan) scan.getGroupScan();
-
-
-// implementor.visitChild(0, getInput());
- List<String> list = new ArrayList<>();
-// List<BsonDocument> docList = new ArrayList<>();
- final List<String> inNames =
- mongoFieldNames(scan.getRowType());
- final List<String> outNames = mongoFieldNames(aggregate.getRowType());
- int i = 0;
- if (aggregate.getGroupSet().cardinality() == 1) {
- final String inName = inNames.get(aggregate.getGroupSet().nth(0));
- list.add("_id: " + maybeQuote("$" + inName));
-// docList.add(new BsonDocument("_id", new BsonString(maybeQuote("$" + inName))));
- ++i;
- } else {
- List<String> keys = new ArrayList<>();
- for (int group : aggregate.getGroupSet()) {
- final String inName = inNames.get(group);
- keys.add(inName + ": " + quote("$" + inName));
- ++i;
- }
- list.add("_id: " + Util.toString(keys, "{", ", ", "}"));
-// docList.add(new BsonDocument("_id", new BsonString(Util.toString(keys, "{", ", ", "}"))));
- }
- for (AggregateCall aggCall : aggregate.getAggCallList()) {
- list.add(
- maybeQuote(outNames.get(i++)) + ": "
- + toMongo(aggCall.getAggregation(), inNames, aggCall.getArgList()));
- }
- List<Pair<String, String>> aggsList = new ArrayList<>();
- aggsList.add(Pair.of(null, "{$group: " + Util.toString(list, "{", ", ", "}") + "}"));
- final List<String> fixups;
- if (aggregate.getGroupSet().cardinality() == 1) {
- fixups = new AbstractList<String>() {
- @Override public String get(int index) {
- final String outName = outNames.get(index);
- return maybeQuote(outName) + ": "
- + maybeQuote("$" + (index == 0 ? "_id" : outName));
- }
-
- @Override public int size() {
- return outNames.size();
- }
- };
- } else {
- fixups = new ArrayList<>();
- fixups.add("_id: 0");
- i = 0;
- for (int group : aggregate.getGroupSet()) {
- fixups.add(
- maybeQuote(outNames.get(group))
- + ": "
- + maybeQuote("$_id." + outNames.get(group)));
- ++i;
- }
- for (AggregateCall ignored : aggregate.getAggCallList()) {
- final String outName = outNames.get(i++);
- fixups.add(
- maybeQuote(outName) + ": " + maybeQuote(
- "$" + outName));
- }
- }
- if (!aggregate.getGroupSet().isEmpty()) {
- aggsList.add(Pair.of(null, "{$project: " + Util.toString(fixups, "{", ", ", "}") + "}"));
- }
-
- MongoScanSpec mongoScanSpec = aggregate(groupScan.getScanSpec(), Pair.right(aggsList));
- try {
- List<SchemaPath> columns = outNames.stream()
- .map(SchemaPath::getSimplePath)
- .collect(Collectors.toList());
- MongoGroupScan mongoScanSpec123 = new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
- mongoScanSpec, columns, groupScan.getMaxRecords());
- call.transformTo(scan.copy(aggregate.getTraitSet(), mongoScanSpec123, aggregate.getRowType()));
- } catch (IOException e) {
- throw new DrillRuntimeException(e.getMessage(), e);
- }
- }
-
- private static String toMongo(SqlAggFunction aggregation, List<String> inNames,
- List<Integer> args) {
- if (aggregation.getName().equals(SqlStdOperatorTable.COUNT.getName())) {
- if (args.size() == 0) {
-// Aggregates.count()
- return "{$sum: 1}";
- } else {
- assert args.size() == 1;
-// Arrays.asList(
-// Aggregates.match(Filters.eq("languages.name", "English")),
-// Aggregates.count())
- final String inName = inNames.get(args.get(0));
- return "{$sum: {$cond: [ {$eq: ["
- + quote(inName)
- + ", null]}, 0, 1]}}";
- }
- } else if (aggregation instanceof SqlSumAggFunction
- || aggregation instanceof SqlSumEmptyIsZeroAggFunction) {
- assert args.size() == 1;
- final String inName = inNames.get(args.get(0));
- return "{$sum: " + maybeQuote("$" + inName) + "}";
- } else if (aggregation.getName().equals(SqlStdOperatorTable.MIN.getName())) {
- assert args.size() == 1;
- final String inName = inNames.get(args.get(0));
- return "{$min: " + maybeQuote("$" + inName) + "}";
- } else if (aggregation.getName().equals(SqlStdOperatorTable.MAX.getName())) {
- assert args.size() == 1;
- final String inName = inNames.get(args.get(0));
- return "{$max: " + maybeQuote("$" + inName) + "}";
- } else if (aggregation.getName().equals(SqlStdOperatorTable.AVG.getName())) {
- assert args.size() == 1;
- final String inName = inNames.get(args.get(0));
- return "{$avg: " + maybeQuote("$" + inName) + "}";
- } else {
- throw new AssertionError("unknown aggregate " + aggregation);
- }
- }
-
- private MongoScanSpec aggregate(MongoScanSpec scanSpec,
- final List<String> operations) {
- final List<Bson> list = new ArrayList<>();
- for (String operation : operations) {
- list.add(BsonDocument.parse(operation));
- }
- return new MongoScanSpec(scanSpec.getDbName(), scanSpec.getCollectionName(),
- scanSpec.getFilters(), list);
-// final Function1<Document, Object> getter =
-// getter(fields);
-// return new AbstractEnumerable<Object>() {
-// @Override public Enumerator<Object> enumerator() {
-// final Iterator<Document> resultIterator;
-// try {
-// resultIterator = mongoDb.getCollection(scanSpec.getCollectionName())
-// .aggregate(list).iterator();
-// } catch (Exception e) {
-// throw new RuntimeException("While running MongoDB query "
-// + Util.toString(operations, "[", ",\n", "]"), e);
-// }
-// return new MongoEnumerator(resultIterator, getter);
-// }
-// };
- }
-
- static Function1<Document, Map> mapGetter() {
- return a0 -> (Map) a0;
- }
-
- /** Returns a function that projects a single field. */
- static Function1<Document, Object> singletonGetter(final String fieldName,
- final Class fieldClass) {
- return a0 -> convert(a0.get(fieldName), fieldClass);
- }
-
- /** Returns a function that projects fields.
- *
- * @param fields List of fields to project; or null to return map
- */
- static Function1<Document, Object[]> listGetter(
- final List<Map.Entry<String, Class>> fields) {
- return a0 -> {
- Object[] objects = new Object[fields.size()];
- for (int i = 0; i < fields.size(); i++) {
- final Map.Entry<String, Class> field = fields.get(i);
- final String name = field.getKey();
- objects[i] = convert(a0.get(name), field.getValue());
- }
- return objects;
- };
- }
-
- static Function1<Document, Object> getter(
- List<Map.Entry<String, Class>> fields) {
- //noinspection unchecked
- return fields == null
- ? (Function1) mapGetter()
- : fields.size() == 1
- ? singletonGetter(fields.get(0).getKey(), fields.get(0).getValue())
- : (Function1) listGetter(fields);
- }
-
- @SuppressWarnings("JavaUtilDate")
- private static Object convert(Object o, Class clazz) {
- if (o == null) {
- return null;
- }
- Primitive primitive = Primitive.of(clazz);
- if (primitive != null) {
- clazz = primitive.boxClass;
- } else {
- primitive = Primitive.ofBox(clazz);
- }
- if (clazz.isInstance(o)) {
- return o;
- }
- if (o instanceof Date && primitive != null) {
- o = ((Date) o).getTime() / DateTimeUtils.MILLIS_PER_DAY;
- }
- if (o instanceof Number && primitive != null) {
- return primitive.number((Number) o);
- }
- return o;
- }
-
- //$addToSet
- //$avg
- //$first
- //$last
- //$max
- //$min
- //$mergeObjects
- //$push
- //$stdDevPop
- //$stdDevSamp
- //$sum
-}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
index b1c06e7..8dcd9d0 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rex.RexNode;
+import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,15 +54,12 @@ public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
final RexNode condition = filter.getCondition();
MongoGroupScan groupScan = (MongoGroupScan) scan.getGroupScan();
- if (groupScan.isFilterPushedDown()) {
- return;
- }
LogicalExpression conditionExp = DrillOptiq.toDrill(
new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
MongoFilterBuilder mongoFilterBuilder = new MongoFilterBuilder(groupScan,
conditionExp);
- MongoScanSpec newScanSpec = mongoFilterBuilder.parseTree();
+ Document newScanSpec = mongoFilterBuilder.parseTree();
if (newScanSpec == null) {
return; // no filter pushdown so nothing to apply.
}
@@ -69,12 +67,11 @@ public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
MongoGroupScan newGroupsScan;
try {
newGroupsScan = new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
- newScanSpec, groupScan.getColumns(), groupScan.getMaxRecords());
+ null, groupScan.getColumns(), groupScan.isUseAggregate());
} catch (IOException e) {
logger.error(e.getMessage(), e);
throw new DrillRuntimeException(e.getMessage(), e);
}
- newGroupsScan.setFilterPushedDown(true);
RelNode newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan, filter.getRowType());
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 7c4f3f2..5cb007f 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -26,7 +26,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoIterable;
import com.mongodb.client.model.Aggregates;
import org.apache.commons.collections.CollectionUtils;
import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -69,7 +69,7 @@ public class MongoRecordReader extends AbstractRecordReader {
private VectorContainerWriter writer;
private Document filters;
- private List<Bson> aggregates;
+ private List<Bson> operations;
private final Document fields;
private final FragmentContext fragmentContext;
@@ -81,9 +81,8 @@ public class MongoRecordReader extends AbstractRecordReader {
private final boolean readNumbersAsDouble;
private boolean unionEnabled;
private final boolean isBsonRecordReader;
- private final int maxRecords;
- public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
+ public MongoRecordReader(BaseMongoSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
FragmentContext context, MongoStoragePlugin plugin) {
fields = new Document();
@@ -93,16 +92,19 @@ public class MongoRecordReader extends AbstractRecordReader {
fragmentContext = context;
this.plugin = plugin;
filters = new Document();
- aggregates = subScanSpec.aggregates;
- Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
- subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
+ if (subScanSpec instanceof MongoSubScan.MongoSubScanSpec) {
+ operations = ((MongoSubScan.MongoSubScanSpec) subScanSpec).getOperations();
+ } else {
+ MongoSubScan.ShardedMongoSubScanSpec shardedMongoSubScanSpec = (MongoSubScan.ShardedMongoSubScanSpec) subScanSpec;
+ Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
+ shardedMongoSubScanSpec.getMinFilters(), shardedMongoSubScanSpec.getMaxFilters());
- buildFilters(subScanSpec.getFilter(), mergedFilters);
+ buildFilters(shardedMongoSubScanSpec.getFilter(), mergedFilters);
+ }
enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val;
readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
isBsonRecordReader = fragmentContext.getOptions().getOption(ExecConstants.MONGO_BSON_RECORD_READER).bool_val;
- maxRecords = subScanSpec.getMaxRecords();
logger.debug("BsonRecordReader is enabled? " + isBsonRecordReader);
init(subScanSpec);
}
@@ -145,7 +147,7 @@ public class MongoRecordReader extends AbstractRecordReader {
}
}
- private void init(MongoSubScan.MongoSubScanSpec subScanSpec) {
+ private void init(BaseMongoSubScanSpec subScanSpec) {
List<String> hosts = subScanSpec.getHosts();
List<ServerAddress> addresses = Lists.newArrayList();
for (String host : hosts) {
@@ -183,25 +185,15 @@ public class MongoRecordReader extends AbstractRecordReader {
logger.debug("Filters Applied : " + filters);
logger.debug("Fields Selected :" + fields);
- if (CollectionUtils.isNotEmpty(aggregates)) {
- List<Bson> operations = new ArrayList<>();
- operations.add(Aggregates.match(filters));
- operations.addAll(aggregates);
+ MongoIterable<BsonDocument> projection;
+ if (CollectionUtils.isNotEmpty(operations)) {
+ List<Bson> operations = new ArrayList<>(this.operations);
operations.add(Aggregates.project(fields));
- if (maxRecords > 0) {
- operations.add(Aggregates.limit(maxRecords));
- }
- cursor = collection.aggregate(operations).batchSize(100).iterator();
+ projection = collection.aggregate(operations);
} else {
- // Add limit to Mongo query
- FindIterable<BsonDocument> projection = collection.find(filters).projection(fields);
- if (maxRecords > 0) {
- logger.debug("Limit applied: {}", maxRecords);
- projection = projection.limit(maxRecords);
- }
-
- cursor = projection.batchSize(100).iterator();
+ projection = collection.find(filters).projection(fields);
}
+ cursor = projection.batchSize(100).iterator();
}
writer.allocate();
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
index f97e771..350cb28 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -42,22 +42,20 @@ public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<RecordReader> readers = new LinkedList<>();
- List<SchemaPath> columns = null;
- for (MongoSubScan.MongoSubScanSpec scanSpec : subScan
- .getChunkScanSpecList()) {
+ for (BaseMongoSubScanSpec scanSpec : subScan.getChunkScanSpecList()) {
try {
- if ((columns = subScan.getColumns()) == null) {
+ List<SchemaPath> columns = subScan.getColumns();
+ if (columns == null) {
columns = GroupScan.ALL_COLUMNS;
}
readers.add(new MongoRecordReader(scanSpec, columns, context, subScan.getMongoStoragePlugin()));
} catch (Exception e) {
- logger.error("MongoRecordReader creation failed for subScan: "
- + subScan + ".");
+ logger.error("MongoRecordReader creation failed for subScan: {}.", subScan);
logger.error(e.getMessage(), e);
throw new ExecutionSetupException(e);
}
}
- logger.info("Number of record readers initialized : " + readers.size());
+ logger.info("Number of record readers initialized : {}", readers.size());
return new ScanBatch(subScan, context, readers);
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
index 7ec1210..17b548b 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
@@ -17,14 +17,15 @@
*/
package org.apache.drill.exec.store.mongo;
+import org.apache.drill.common.PlanStringBuilder;
import org.bson.Document;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.bson.conversions.Bson;
+import java.util.ArrayList;
import java.util.List;
-import java.util.StringJoiner;
public class MongoScanSpec {
private final String dbName;
@@ -32,7 +33,7 @@ public class MongoScanSpec {
private Document filters;
- private List<Bson> aggregates;
+ private List<Bson> operations = new ArrayList<>();
@JsonCreator
public MongoScanSpec(@JsonProperty("dbName") String dbName,
@@ -42,18 +43,11 @@ public class MongoScanSpec {
}
public MongoScanSpec(String dbName, String collectionName,
- Document filters) {
+ Document filters, List<Bson> operations) {
this.dbName = dbName;
this.collectionName = collectionName;
this.filters = filters;
- }
-
- public MongoScanSpec(String dbName, String collectionName,
- Document filters, List<Bson> aggregates) {
- this.dbName = dbName;
- this.collectionName = collectionName;
- this.filters = filters;
- this.aggregates = aggregates;
+ this.operations = operations;
}
public String getDbName() {
@@ -68,17 +62,17 @@ public class MongoScanSpec {
return filters;
}
- public List<Bson> getAggregates() {
- return aggregates;
+ public List<Bson> getOperations() {
+ return operations;
}
@Override
public String toString() {
- return new StringJoiner(", ", MongoScanSpec.class.getSimpleName() + "[", "]")
- .add("dbName='" + dbName + "'")
- .add("collectionName='" + collectionName + "'")
- .add("filters=" + filters)
- .add("aggregates=" + aggregates)
+ return new PlanStringBuilder(this)
+ .field("dbName", dbName)
+ .field("collectionName", collectionName)
+ .field("filters", filters)
+ .field("operations", operations)
.toString();
}
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index f6c3ac2..6262711 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -21,44 +21,42 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
-import com.mongodb.client.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
-import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.Convention;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.PlannableStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.mongo.plan.MongoPluginRulesProvider;
import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory;
-import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
import org.apache.drill.exec.store.security.HadoopCredentialsProvider;
-import org.apache.drill.common.logical.security.PlainCredentialsProvider;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
import org.apache.drill.shaded.guava.com.google.common.cache.Cache;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
import org.apache.drill.shaded.guava.com.google.common.cache.RemovalListener;
import org.apache.drill.shaded.guava.com.google.common.cache.RemovalNotification;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URLEncoder;
-import java.util.Collections;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-public class MongoStoragePlugin extends AbstractStoragePlugin {
+public class MongoStoragePlugin extends PlannableStoragePlugin implements StoragePlugin {
private static final Logger logger = LoggerFactory.getLogger(MongoStoragePlugin.class);
private final MongoStoragePluginConfig mongoConfig;
@@ -70,7 +68,7 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
MongoStoragePluginConfig mongoConfig,
DrillbitContext context,
String name) {
- super(context, name);
+ super(mongoStoragePluginBuilder(name).context(context));
this.mongoConfig = mongoConfig;
String connection = addCredentialsFromCredentialsProvider(this.mongoConfig.getConnection(), name);
this.clientURI = new ConnectionString(connection);
@@ -81,6 +79,20 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
this.schemaFactory = new MongoSchemaFactory(this, name);
}
+ private static MongoStoragePluginConfigs mongoStoragePluginBuilder(String name) {
+ Convention convention = new Convention.Impl("MONGO." + name, PluginRel.class);
+ return new MongoStoragePluginConfigs()
+ .rulesProvider(new MongoPluginRulesProvider(convention))
+ .supportsProjectPushdown(true)
+ .supportsSortPushdown(true)
+ .supportsAggregatePushdown(true)
+ .supportsFilterPushdown(true)
+ .supportsLimitPushdown(true)
+ .supportsUnionPushdown(true)
+ .convention(convention)
+ .name(name);
+ }
+
private String addCredentialsFromCredentialsProvider(String connection, String name) {
ConnectionString parsed = new ConnectionString(connection);
if (parsed.getCredential() == null) {
@@ -134,26 +146,16 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<MongoScanSpec>() {
});
- return new MongoGroupScan(userName, this, mongoScanSpec, null, -1);
+ return new MongoGroupScan(userName, this, mongoScanSpec, null, false);
}
- @Override
- public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
- switch (phase) {
- case PHYSICAL:
- case LOGICAL:
- return ImmutableSet.of(MongoPushDownFilterForScan.INSTANCE,
- MongoPushDownAggregateForScan.INSTANCE);
- case LOGICAL_PRUNE_AND_JOIN:
- case LOGICAL_PRUNE:
- case PARTITION_PRUNING:
- case JOIN_PLANNING:
- default:
- return Collections.emptySet();
+ private static class MongoStoragePluginConfigs extends PlannableStoragePluginConfigs<MongoStoragePluginConfigs> {
+ @Override
+ public MongoStoragePluginConfigs self() {
+ return this;
}
}
-
private static class AddressCloser implements
RemovalListener<MongoCnxnKey, MongoClient> {
@Override
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index a32336d..692a939 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -32,7 +32,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.bson.Document;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -40,6 +39,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.bson.Document;
import org.bson.conversions.Bson;
@JsonTypeName("mongo-shard-read")
@@ -53,14 +53,14 @@ public class MongoSubScan extends AbstractBase implements SubScan {
private final MongoStoragePlugin mongoStoragePlugin;
private final List<SchemaPath> columns;
- private final List<MongoSubScanSpec> chunkScanSpecList;
+ private final List<BaseMongoSubScanSpec> chunkScanSpecList;
@JsonCreator
public MongoSubScan(
@JacksonInject StoragePluginRegistry registry,
@JsonProperty("userName") String userName,
@JsonProperty("mongoPluginConfig") StoragePluginConfig mongoPluginConfig,
- @JsonProperty("chunkScanSpecList") LinkedList<MongoSubScanSpec> chunkScanSpecList,
+ @JsonProperty("chunkScanSpecList") LinkedList<BaseMongoSubScanSpec> chunkScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns)
throws ExecutionSetupException {
super(userName);
@@ -73,7 +73,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
public MongoSubScan(String userName, MongoStoragePlugin storagePlugin,
MongoStoragePluginConfig storagePluginConfig,
- List<MongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
+ List<BaseMongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
super(userName);
this.mongoStoragePlugin = storagePlugin;
this.mongoPluginConfig = storagePluginConfig;
@@ -101,13 +101,12 @@ public class MongoSubScan extends AbstractBase implements SubScan {
return columns;
}
- public List<MongoSubScanSpec> getChunkScanSpecList() {
+ public List<BaseMongoSubScanSpec> getChunkScanSpecList() {
return chunkScanSpecList;
}
@Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
- throws ExecutionSetupException {
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new MongoSubScan(getUserName(), mongoStoragePlugin, mongoPluginConfig,
chunkScanSpecList, columns);
@@ -123,80 +122,34 @@ public class MongoSubScan extends AbstractBase implements SubScan {
return Collections.emptyIterator();
}
- public static class MongoSubScanSpec {
+ @JsonTypeName("ShardedMongoSubScanSpec")
+ public static class ShardedMongoSubScanSpec extends BaseMongoSubScanSpec {
- protected String dbName;
- protected String collectionName;
- protected List<String> hosts;
protected Map<String, Object> minFilters;
protected Map<String, Object> maxFilters;
- protected int maxRecords;
-
protected Document filter;
- protected List<Bson> aggregates;
@JsonCreator
- public MongoSubScanSpec(@JsonProperty("dbName") String dbName,
+ public ShardedMongoSubScanSpec(@JsonProperty("dbName") String dbName,
@JsonProperty("collectionName") String collectionName,
@JsonProperty("hosts") List<String> hosts,
@JsonProperty("minFilters") Map<String, Object> minFilters,
@JsonProperty("maxFilters") Map<String, Object> maxFilters,
- @JsonProperty("filters") Document filters,
- @JsonProperty("aggregates") List<Bson> aggregates,
- @JsonProperty("maxRecords") int maxRecords) {
- this.dbName = dbName;
- this.collectionName = collectionName;
- this.hosts = hosts;
+ @JsonProperty("filters") Document filters) {
+ super(dbName, collectionName, hosts);
this.minFilters = minFilters;
this.maxFilters = maxFilters;
this.filter = filters;
- this.aggregates = aggregates;
- this.maxRecords = maxRecords;
- }
-
- MongoSubScanSpec() {
-
- }
-
- public String getDbName() {
- return dbName;
- }
-
- public MongoSubScanSpec setDbName(String dbName) {
- this.dbName = dbName;
- return this;
- }
-
- public String getCollectionName() {
- return collectionName;
}
- public MongoSubScanSpec setCollectionName(String collectionName) {
- this.collectionName = collectionName;
- return this;
- }
-
- public List<String> getHosts() {
- return hosts;
- }
-
- public MongoSubScanSpec setHosts(List<String> hosts) {
- this.hosts = hosts;
- return this;
- }
-
- public int getMaxRecords() { return maxRecords; }
-
- public MongoSubScanSpec setMaxRecords (int maxRecords) {
- this.maxRecords = maxRecords;
- return this;
+ ShardedMongoSubScanSpec() {
}
public Map<String, Object> getMinFilters() {
return minFilters;
}
- public MongoSubScanSpec setMinFilters(Map<String, Object> minFilters) {
+ public ShardedMongoSubScanSpec setMinFilters(Map<String, Object> minFilters) {
this.minFilters = minFilters;
return this;
}
@@ -205,7 +158,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
return maxFilters;
}
- public MongoSubScanSpec setMaxFilters(Map<String, Object> maxFilters) {
+ public ShardedMongoSubScanSpec setMaxFilters(Map<String, Object> maxFilters) {
this.maxFilters = maxFilters;
return this;
}
@@ -214,16 +167,11 @@ public class MongoSubScan extends AbstractBase implements SubScan {
return filter;
}
- public MongoSubScanSpec setFilter(Document filter) {
+ public ShardedMongoSubScanSpec setFilter(Document filter) {
this.filter = filter;
return this;
}
- public MongoSubScanSpec setAggregates(List<Bson> aggregates) {
- this.aggregates = aggregates;
- return this;
- }
-
@Override
public String toString() {
return new PlanStringBuilder(this)
@@ -233,10 +181,45 @@ public class MongoSubScan extends AbstractBase implements SubScan {
.field("minFilters", minFilters)
.field("maxFilters", maxFilters)
.field("filter", filter)
- .field("aggregates", aggregates)
- .field("maxRecords", maxRecords)
.toString();
+ }
+
+ }
+
+ @JsonTypeName("MongoSubScanSpec")
+ public static class MongoSubScanSpec extends BaseMongoSubScanSpec {
+ protected List<Bson> operations;
+
+ @JsonCreator
+ public MongoSubScanSpec(@JsonProperty("dbName") String dbName,
+ @JsonProperty("collectionName") String collectionName,
+ @JsonProperty("hosts") List<String> hosts,
+ @JsonProperty("operations") List<Bson> operations) {
+ super(dbName, collectionName, hosts);
+ this.operations = operations;
+ }
+
+ MongoSubScanSpec() {
+ }
+
+ public List<Bson> getOperations() {
+ return operations;
+ }
+
+ public MongoSubScanSpec setOperations(List<Bson> operations) {
+ this.operations = operations;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("dbName", dbName)
+ .field("collectionName", collectionName)
+ .field("hosts", hosts)
+ .field("operations", operations)
+ .toString();
}
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
new file mode 100644
index 0000000..2247683
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
@@ -0,0 +1,206 @@
+package org.apache.drill.exec.store.mongo.plan;
+
+import com.mongodb.client.model.Aggregates;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.util.Util;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.mongo.MongoFilterBuilder;
+import org.apache.drill.exec.store.mongo.MongoGroupScan;
+import org.apache.drill.exec.store.mongo.MongoAggregateUtils;
+import org.apache.drill.exec.store.mongo.MongoScanSpec;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+import org.apache.drill.exec.util.Utilities;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MongoPluginImplementor implements PluginImplementor {
+ private MongoGroupScan groupScan;
+ private List<Bson> operations;
+ private Document filters;
+ private List<SchemaPath> columns;
+
+ private boolean runAggregate;
+
+ @Override
+ public void implement(PluginAggregateRel aggregate) throws IOException {
+ runAggregate = true;
+ visitChild(aggregate.getInput());
+
+ operations.addAll(
+ MongoAggregateUtils.getAggregateOperations(aggregate, aggregate.getRowType(), groupScan));
+ List<String> outNames = MongoAggregateUtils.mongoFieldNames(aggregate.getRowType());
+ columns = outNames.stream()
+ .map(SchemaPath::getSimplePath)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void implement(PluginFilterRel filter) throws IOException {
+ visitChild(filter.getInput());
+
+ LogicalExpression conditionExp = DrillOptiq.toDrill(
+ new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())), filter.getInput(), filter.getCondition());
+ MongoFilterBuilder mongoFilterBuilder = new MongoFilterBuilder(groupScan, conditionExp);
+ if (runAggregate) {
+ Bson convertedFilterExpression = Aggregates.match(mongoFilterBuilder.parseTree()).toBsonDocument();
+ operations.add(convertedFilterExpression);
+ } else {
+ filters = mongoFilterBuilder.parseTree();
+ }
+ }
+
+ @Override
+ public void implement(PluginLimitRel limit) throws IOException {
+ runAggregate = true;
+ visitChild(limit.getInput());
+
+ if (limit.getOffset() != null) {
+ operations.add(
+ Aggregates.skip(((BigDecimal) ((RexLiteral) limit.getOffset()).getValue()).intValue()).toBsonDocument());
+ }
+ if (limit.getFetch() != null) {
+ operations.add(
+ Aggregates.limit(((BigDecimal) ((RexLiteral) limit.getFetch()).getValue()).intValue()).toBsonDocument());
+ }
+ }
+
+ @Override
+ public void implement(PluginProjectRel project) throws IOException {
+ visitChild(project.getInput());
+
+// final MongoRules.RexToMongoTranslator translator =
+// new MongoRules.RexToMongoTranslator(
+// (JavaTypeFactory) project.getCluster().getTypeFactory(),
+// MongoRules.mongoFieldNames(project.getInput().getRowType()));
+// final List<String> items = new ArrayList<>();
+// for (Pair<RexNode, String> pair : project.getNamedProjects()) {
+// final String name = pair.right;
+// final String expr = pair.left.accept(translator);
+// items.add(expr.equals("'$" + name + "'")
+// ? MongoRules.maybeQuote(name) + ": 1"
+// : MongoRules.maybeQuote(name) + ": " + expr);
+// }
+// final String findString = Util.toString(items, "{", ", ", "}");
+// final String aggregateString = "{$project: " + findString + "}";
+// final Pair<String, String> op = Pair.of(findString, aggregateString);
+// implementor.add(op.left, op.right);
+ final List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType());
+ this.columns = outNames.stream()
+ .map(SchemaPath::getSimplePath)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void implement(PluginSortRel sort) throws IOException {
+ runAggregate = true;
+ visitChild(sort.getInput());
+
+ if (!sort.collation.getFieldCollations().isEmpty()) {
+ final List<String> keys = new ArrayList<>();
+ final List<RelDataTypeField> fields = sort.getRowType().getFieldList();
+ for (RelFieldCollation fieldCollation : sort.collation.getFieldCollations()) {
+ final String name =
+ fields.get(fieldCollation.getFieldIndex()).getName();
+ keys.add(name + ": " + direction(fieldCollation));
+ if (false) {
+ // TODO: NULLS FIRST and NULLS LAST
+ switch (fieldCollation.nullDirection) {
+ case FIRST:
+ break;
+ case LAST:
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ operations.add(
+ Aggregates.sort(BsonDocument.parse(Util.toString(keys, "{", ", ", "}"))).toBsonDocument());
+ }
+ if (sort.offset != null) {
+ operations.add(
+ Aggregates.skip(((BigDecimal) ((RexLiteral) sort.offset).getValue()).intValue()).toBsonDocument());
+ }
+ if (sort.fetch != null) {
+ operations.add(
+ Aggregates.limit(((BigDecimal) ((RexLiteral) sort.fetch).getValue()).intValue()).toBsonDocument());
+ }
+ }
+
+ @Override
+ public void implement(PluginUnionRel union) throws IOException {
+ runAggregate = true;
+
+ MongoPluginImplementor childImplementor = copy();
+ childImplementor.runAggregate = true;
+
+ boolean firstProcessed = false;
+ for (RelNode input : union.getInputs()) {
+ if (!firstProcessed) {
+ this.visitChild(input);
+ firstProcessed = true;
+ } else {
+ childImplementor.visitChild(input);
+ operations.add(
+ Aggregates.unionWith(childImplementor.groupScan.getScanSpec().getCollectionName(), childImplementor.operations).toBsonDocument()
+ );
+ }
+ }
+ }
+
+ @Override
+ public void implement(StoragePluginTableScan scan) throws IOException {
+ groupScan = (MongoGroupScan) Utilities.getDrillTable(scan.getTable()).getGroupScan();
+ operations = new ArrayList<>(this.groupScan.getScanSpec().getOperations());
+ filters = groupScan.getScanSpec().getFilters();
+ columns = groupScan.getColumns();
+ }
+
+ @Override
+ public MongoPluginImplementor copy() {
+ return new MongoPluginImplementor();
+ }
+
+ @Override
+ public GroupScan getPhysicalOperator() throws IOException {
+ MongoScanSpec scanSpec = groupScan.getScanSpec();
+ MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(), scanSpec.getCollectionName(), filters, operations);
+ return new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
+ newSpec, columns, runAggregate);
+ }
+
+ public static int direction(RelFieldCollation fieldCollation) {
+ switch (fieldCollation.getDirection()) {
+ case DESCENDING:
+ case STRICTLY_DESCENDING:
+ return -1;
+ case ASCENDING:
+ case STRICTLY_ASCENDING:
+ default:
+ return 1;
+ }
+ }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginRulesProvider.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginRulesProvider.java
new file mode 100644
index 0000000..06319b6
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginRulesProvider.java
@@ -0,0 +1,76 @@
+package org.apache.drill.exec.store.mongo.plan;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.store.PluginRulesProvider;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
+import org.apache.drill.exec.store.plan.rule.PluginAggregateRule;
+import org.apache.drill.exec.store.plan.rule.PluginFilterRule;
+import org.apache.drill.exec.store.plan.rule.PluginIntermediatePrelConverterRule;
+import org.apache.drill.exec.store.plan.rule.PluginLimitRule;
+import org.apache.drill.exec.store.plan.rule.PluginProjectRule;
+import org.apache.drill.exec.store.plan.rule.PluginSortRule;
+import org.apache.drill.exec.store.plan.rule.PluginUnionRule;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class MongoPluginRulesProvider implements PluginRulesProvider {
+ private final Convention convention;
+
+ public MongoPluginRulesProvider(Convention convention) {
+ this.convention = convention;
+ }
+
+ public List<RelOptRule> sortRules() {
+ return Arrays.asList(
+ new PluginSortRule(Convention.NONE, convention),
+ new PluginSortRule(DrillRel.DRILL_LOGICAL, convention)
+ );
+ }
+
+ public List<RelOptRule> limitRules() {
+ return Arrays.asList(
+ new PluginLimitRule(Convention.NONE, convention),
+ new PluginLimitRule(DrillRel.DRILL_LOGICAL, convention)
+ );
+ }
+
+ public List<RelOptRule> filterRules() {
+ return Arrays.asList(
+ new PluginFilterRule(Convention.NONE, convention),
+ new PluginFilterRule(DrillRel.DRILL_LOGICAL, convention)
+ );
+ }
+
+ public List<RelOptRule> projectRules() {
+ return Arrays.asList(
+ new PluginProjectRule(Convention.NONE, convention),
+ new PluginProjectRule(DrillRel.DRILL_LOGICAL, convention)
+ );
+ }
+
+ public List<RelOptRule> aggregateRules() {
+ return Arrays.asList(
+ new PluginAggregateRule(Convention.NONE, convention),
+ new PluginAggregateRule(DrillRel.DRILL_LOGICAL, convention)
+ );
+ }
+
+ public List<RelOptRule> unionRules() {
+ return Arrays.asList(
+ new PluginUnionRule(Convention.NONE, convention),
+ new PluginUnionRule(DrillRel.DRILL_LOGICAL, convention)
+ );
+ }
+
+ public RelOptRule vertexRule() {
+ return new VertexDrelConverterRule(convention);
+ }
+
+ @Override
+ public RelOptRule prelConverterRule() {
+ return new PluginIntermediatePrelConverterRule(MongoPluginImplementor::new);
+ }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java
new file mode 100644
index 0000000..fddfa7c
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoRules.java
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo.plan;
+
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Bug;
+import org.apache.calcite.util.Util;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
+import org.apache.drill.exec.store.plan.rule.PluginIntermediatePrelConverterRule;
+import org.apache.drill.exec.store.plan.rule.PluginAggregateRule;
+import org.apache.drill.exec.store.plan.rule.PluginFilterRule;
+import org.apache.drill.exec.store.plan.rule.PluginLimitRule;
+import org.apache.drill.exec.store.plan.rule.PluginProjectRule;
+import org.apache.drill.exec.store.plan.rule.PluginSortRule;
+import org.apache.drill.exec.store.plan.rule.PluginUnionRule;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MongoRules {
+
+ public List<RelOptRule> sortRules(Convention out) {
+ return Arrays.asList(
+ new PluginSortRule(Convention.NONE, out),
+ new PluginSortRule(DrillRel.DRILL_LOGICAL, out)
+ );
+ }
+
+ public List<RelOptRule> limitRules(Convention out) {
+ return Arrays.asList(
+ new PluginLimitRule(Convention.NONE, out),
+ new PluginLimitRule(DrillRel.DRILL_LOGICAL, out)
+ );
+ }
+
+ public List<RelOptRule> filterRules(Convention out) {
+ return Arrays.asList(
+ new PluginFilterRule(Convention.NONE, out),
+ new PluginFilterRule(DrillRel.DRILL_LOGICAL, out)
+ );
+ }
+
+ public List<RelOptRule> projectRules(Convention out) {
+ return Arrays.asList(
+ new PluginProjectRule(Convention.NONE, out),
+ new PluginProjectRule(DrillRel.DRILL_LOGICAL, out)
+ );
+ }
+
+ public List<RelOptRule> aggregateRules(Convention out) {
+ return Arrays.asList(
+ new PluginAggregateRule(Convention.NONE, out),
+ new PluginAggregateRule(DrillRel.DRILL_LOGICAL, out)
+ );
+ }
+
+ public List<RelOptRule> unionRules(Convention out) {
+ return Arrays.asList(
+ new PluginUnionRule(Convention.NONE, out),
+ new PluginUnionRule(DrillRel.DRILL_LOGICAL, out)
+ );
+ }
+
+ public RelOptRule vertexRule(Convention out) {
+ return new VertexDrelConverterRule(out);
+ }
+
+ public static final RelOptRule PREL_CONVERTER_INSTANCE = new PluginIntermediatePrelConverterRule(MongoPluginImplementor::new);
+
+ /** Returns 'string' if it is a call to item['string'], null otherwise. */
+ public static String isItem(RexCall call) {
+ if (call.getOperator() != SqlStdOperatorTable.ITEM) {
+ return null;
+ }
+ final RexNode op0 = call.operands.get(0);
+ final RexNode op1 = call.operands.get(1);
+ if (op0 instanceof RexInputRef
+ && ((RexInputRef) op0).getIndex() == 0
+ && op1 instanceof RexLiteral
+ && ((RexLiteral) op1).getValue2() instanceof String) {
+ return (String) ((RexLiteral) op1).getValue2();
+ }
+ return null;
+ }
+
+ static List<String> mongoFieldNames(final RelDataType rowType) {
+ return SqlValidatorUtil.uniquify(
+ new AbstractList<String>() {
+ @Override public String get(int index) {
+ final String name = rowType.getFieldList().get(index).getName();
+ return name.startsWith("$") ? "_" + name.substring(2) : name;
+ }
+
+ @Override public int size() {
+ return rowType.getFieldCount();
+ }
+ },
+ SqlValidatorUtil.EXPR_SUGGESTER, true);
+ }
+
+ static String maybeQuote(String s) {
+ if (!needsQuote(s)) {
+ return s;
+ }
+ return quote(s);
+ }
+
+ static String quote(String s) {
+ return "'" + s + "'"; // TODO: handle embedded quotes
+ }
+
+ private static boolean needsQuote(String s) {
+ for (int i = 0, n = s.length(); i < n; i++) {
+ char c = s.charAt(i);
+ if (!Character.isJavaIdentifierPart(c)
+ || c == '$') {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** Translator from {@link RexNode} to strings in MongoDB's expression
+ * language. */
+ static class RexToMongoTranslator extends RexVisitorImpl<String> {
+ private final JavaTypeFactory typeFactory;
+ private final List<String> inFields;
+
+ private static final Map<SqlOperator, String> MONGO_OPERATORS =
+ new HashMap<>();
+
+ static {
+ // Arithmetic
+ MONGO_OPERATORS.put(SqlStdOperatorTable.DIVIDE, "$divide");
+ MONGO_OPERATORS.put(SqlStdOperatorTable.MULTIPLY, "$multiply");
+ MONGO_OPERATORS.put(SqlStdOperatorTable.MOD, "$mod");
+ MONGO_OPERATORS.put(SqlStdOperatorTable.PLUS, "$add");
+ MONGO_OPERATORS.put(SqlStdOperatorTable.MINUS, "$subtract");
+ // Boolean
+ MONGO_OPERATORS.put(SqlStdOperatorTable.AND, "$and");
+ MONGO_OPERATORS.put(SqlStdOperatorTable.OR, "$or");
+ MONGO_OPERATORS.put(SqlStdOperatorTable.NOT, "$not");
+ // Comparison
+ MONGO_OPERATORS.put(SqlStdOperatorTable.EQUALS, "$eq");
+ MONGO_OPERATORS.put(SqlStdOperatorTable.NOT_EQUALS, "$ne");
+ MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN, "$gt");
+ MONGO_OPERATORS.put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, "$gte");
+ MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN, "$lt");
+ MONGO_OPERATORS.put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, "$lte");
+ }
+
+ protected RexToMongoTranslator(JavaTypeFactory typeFactory,
+ List<String> inFields) {
+ super(true);
+ this.typeFactory = typeFactory;
+ this.inFields = inFields;
+ }
+
+ @Override public String visitLiteral(RexLiteral literal) {
+ if (literal.getValue() == null) {
+ return "null";
+ }
+ return "{$literal: "
+ + RexToLixTranslator.translateLiteral(literal, literal.getType(),
+ typeFactory, RexImpTable.NullAs.NOT_POSSIBLE)
+ + "}";
+ }
+
+ @Override public String visitInputRef(RexInputRef inputRef) {
+ return maybeQuote(
+ "$" + inFields.get(inputRef.getIndex()));
+ }
+
+ @Override public String visitCall(RexCall call) {
+ String name = isItem(call);
+ if (name != null) {
+ return "'$" + name + "'";
+ }
+ final List<String> strings = new ArrayList<>();//visitList(call.operands);
+ if (call.getKind() == SqlKind.CAST) {
+ return strings.get(0);
+ }
+ String stdOperator = MONGO_OPERATORS.get(call.getOperator());
+ if (stdOperator != null) {
+ return "{" + stdOperator + ": [" + Util.commaList(strings) + "]}";
+ }
+ if (call.getOperator() == SqlStdOperatorTable.ITEM) {
+ final RexNode op1 = call.operands.get(1);
+ if (op1 instanceof RexLiteral
+ && op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) {
+ if (!Bug.CALCITE_194_FIXED) {
+ return "'" + stripQuotes(strings.get(0)) + "["
+ + ((RexLiteral) op1).getValue2() + "]'";
+ }
+ return strings.get(0) + "[" + strings.get(1) + "]";
+ }
+ }
+ if (call.getOperator() == SqlStdOperatorTable.CASE) {
+ StringBuilder sb = new StringBuilder();
+ StringBuilder finish = new StringBuilder();
+ // case(a, b, c) -> $cond:[a, b, c]
+ // case(a, b, c, d) -> $cond:[a, b, $cond:[c, d, null]]
+ // case(a, b, c, d, e) -> $cond:[a, b, $cond:[c, d, e]]
+ for (int i = 0; i < strings.size(); i += 2) {
+ sb.append("{$cond:[");
+ finish.append("]}");
+
+ sb.append(strings.get(i));
+ sb.append(',');
+ sb.append(strings.get(i + 1));
+ sb.append(',');
+ if (i == strings.size() - 3) {
+ sb.append(strings.get(i + 2));
+ break;
+ }
+ if (i == strings.size() - 2) {
+ sb.append("null");
+ break;
+ }
+ }
+ sb.append(finish);
+ return sb.toString();
+ }
+ throw new IllegalArgumentException("Translation of " + call
+ + " is not supported by MongoProject");
+ }
+
+ private static String stripQuotes(String s) {
+ return s.startsWith("'") && s.endsWith("'")
+ ? s.substring(1, s.length() - 1)
+ : s;
+ }
+ }
+
+ /*
+
+ /**
+ * Rule to convert a {@link LogicalCalc} to an
+ * {@link MongoCalcRel}.
+ o/
+ public static class MongoCalcRule
+ extends MongoConverterRule {
+ private MongoCalcRule(MongoConvention out) {
+ super(
+ LogicalCalc.class,
+ Convention.NONE,
+ out,
+ "MongoCalcRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalCalc calc = (LogicalCalc) rel;
+
+ // If there's a multiset, let FarragoMultisetSplitter work on it
+ // first.
+ if (RexMultisetUtil.containsMultiset(calc.getProgram())) {
+ return null;
+ }
+
+ return new MongoCalcRel(
+ rel.getCluster(),
+ rel.getTraitSet().replace(out),
+ convert(
+ calc.getChild(),
+ calc.getTraitSet().replace(out)),
+ calc.getProgram(),
+ Project.Flags.Boxed);
+ }
+ }
+
+ public static class MongoCalcRel extends SingleRel implements MongoRel {
+ private final RexProgram program;
+
+ /**
+ * Values defined in {@link org.apache.calcite.rel.core.Project.Flags}.
+ o/
+ protected int flags;
+
+ public MongoCalcRel(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode child,
+ RexProgram program,
+ int flags) {
+ super(cluster, traitSet, child);
+ assert getConvention() instanceof MongoConvention;
+ this.flags = flags;
+ this.program = program;
+ this.rowType = program.getOutputRowType();
+ }
+
+ public RelOptPlanWriter explainTerms(RelOptPlanWriter pw) {
+ return program.explainCalc(super.explainTerms(pw));
+ }
+
+ public double getRows() {
+ return LogicalFilter.estimateFilteredRows(
+ getChild(), program);
+ }
+
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ double dRows = RelMetadataQuery.getRowCount(this);
+ double dCpu =
+ RelMetadataQuery.getRowCount(getChild())
+ * program.getExprCount();
+ double dIo = 0;
+ return planner.makeCost(dRows, dCpu, dIo);
+ }
+
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new MongoCalcRel(
+ getCluster(),
+ traitSet,
+ sole(inputs),
+ program.copy(),
+ getFlags());
+ }
+
+ public int getFlags() {
+ return flags;
+ }
+
+ public RexProgram getProgram() {
+ return program;
+ }
+
+ public SqlString implement(MongoImplementor implementor) {
+ final SqlBuilder buf = new SqlBuilder(implementor.dialect);
+ buf.append("SELECT ");
+ if (isStar(program)) {
+ buf.append("*");
+ } else {
+ for (Ord<RexLocalRef> ref : Ord.zip(program.getProjectList())) {
+ buf.append(ref.i == 0 ? "" : ", ");
+ expr(buf, program, ref.e);
+ alias(buf, null, getRowType().getFieldNames().get(ref.i));
+ }
+ }
+ implementor.newline(buf)
+ .append("FROM ");
+ implementor.subQuery(buf, 0, getChild(), "t");
+ if (program.getCondition() != null) {
+ implementor.newline(buf);
+ buf.append("WHERE ");
+ expr(buf, program, program.getCondition());
+ }
+ return buf.toSqlString();
+ }
+
+ private static boolean isStar(RexProgram program) {
+ int i = 0;
+ for (RexLocalRef ref : program.getProjectList()) {
+ if (ref.getIndex() != i++) {
+ return false;
+ }
+ }
+ return i == program.getInputRowType().getFieldCount();
+ }
+
+ private static void expr(
+ SqlBuilder buf, RexProgram program, RexNode rex) {
+ if (rex instanceof RexLocalRef) {
+ final int index = ((RexLocalRef) rex).getIndex();
+ expr(buf, program, program.getExprList().get(index));
+ } else if (rex instanceof RexInputRef) {
+ buf.identifier(
+ program.getInputRowType().getFieldNames().get(
+ ((RexInputRef) rex).getIndex()));
+ } else if (rex instanceof RexLiteral) {
+ toSql(buf, (RexLiteral) rex);
+ } else if (rex instanceof RexCall) {
+ final RexCall call = (RexCall) rex;
+ switch (call.getOperator().getSyntax()) {
+ case Binary:
+ expr(buf, program, call.getOperands().get(0));
+ buf.append(' ')
+ .append(call.getOperator().toString())
+ .append(' ');
+ expr(buf, program, call.getOperands().get(1));
+ break;
+ default:
+ throw new AssertionError(call.getOperator());
+ }
+ } else {
+ throw new AssertionError(rex);
+ }
+ }
+ }
+
+ private static SqlBuilder toSql(SqlBuilder buf, RexLiteral rex) {
+ switch (rex.getTypeName()) {
+ case CHAR:
+ case VARCHAR:
+ return buf.append(
+ new NlsString(rex.getValue2().toString(), null, null)
+ .asSql(false, false));
+ default:
+ return buf.append(rex.getValue2().toString());
+ }
+ }
+
+*/
+
+
+// /**
+// * Rule to convert an {@link org.apache.calcite.rel.logical.Union} to a
+// * {@link MongoUnionRel}.
+// */
+// public static class MongoUnionRule
+// extends MongoConverterRule {
+// private MongoUnionRule(MongoConvention out) {
+// super(
+// Union.class,
+// Convention.NONE,
+// out,
+// "MongoUnionRule");
+// }
+//
+// public RelNode convert(RelNode rel) {
+// final Union union = (Union) rel;
+// final RelTraitSet traitSet =
+// union.getTraitSet().replace(out);
+// return new MongoUnionRel(
+// rel.getCluster(),
+// traitSet,
+// convertList(union.getInputs(), traitSet),
+// union.all);
+// }
+// }
+//
+// public static class MongoUnionRel
+// extends Union
+// implements MongoRel {
+// public MongoUnionRel(
+// RelOptCluster cluster,
+// RelTraitSet traitSet,
+// List<RelNode> inputs,
+// boolean all) {
+// super(cluster, traitSet, inputs, all);
+// }
+//
+// public MongoUnionRel copy(
+// RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+// return new MongoUnionRel(getCluster(), traitSet, inputs, all);
+// }
+//
+// @Override public RelOptCost computeSelfCost(RelOptPlanner planner) {
+// return super.computeSelfCost(planner).multiplyBy(.1);
+// }
+//
+// public SqlString implement(MongoImplementor implementor) {
+// return setOpSql(this, implementor, "UNION");
+// }
+// }
+//
+// private static SqlString setOpSql(
+// SetOp setOpRel, MongoImplementor implementor, String op) {
+// final SqlBuilder buf = new SqlBuilder(implementor.dialect);
+// for (Ord<RelNode> input : Ord.zip(setOpRel.getInputs())) {
+// if (input.i > 0) {
+// implementor.newline(buf)
+// .append(op + (setOpRel.all ? " ALL " : ""));
+// implementor.newline(buf);
+// }
+// buf.append(implementor.visitChild(input.i, input.e));
+// }
+// return buf.toSqlString();
+// }
+
+ /*
+ /**
+ * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalIntersect}
+ * to an {@link MongoIntersectRel}.
+ o/
+ public static class MongoIntersectRule
+ extends MongoConverterRule {
+ private MongoIntersectRule(MongoConvention out) {
+ super(
+ LogicalIntersect.class,
+ Convention.NONE,
+ out,
+ "MongoIntersectRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalIntersect intersect = (LogicalIntersect) rel;
+ if (intersect.all) {
+ return null; // INTERSECT ALL not implemented
+ }
+ final RelTraitSet traitSet =
+ intersect.getTraitSet().replace(out);
+ return new MongoIntersectRel(
+ rel.getCluster(),
+ traitSet,
+ convertList(intersect.getInputs(), traitSet),
+ intersect.all);
+ }
+ }
+
+ public static class MongoIntersectRel
+ extends Intersect
+ implements MongoRel {
+ public MongoIntersectRel(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ List<RelNode> inputs,
+ boolean all) {
+ super(cluster, traitSet, inputs, all);
+ assert !all;
+ }
+
+ public MongoIntersectRel copy(
+ RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new MongoIntersectRel(getCluster(), traitSet, inputs, all);
+ }
+
+ public SqlString implement(MongoImplementor implementor) {
+ return setOpSql(this, implementor, " intersect ");
+ }
+ }
+
+ /**
+ * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalMinus}
+ * to an {@link MongoMinusRel}.
+ o/
+ public static class MongoMinusRule
+ extends MongoConverterRule {
+ private MongoMinusRule(MongoConvention out) {
+ super(
+ LogicalMinus.class,
+ Convention.NONE,
+ out,
+ "MongoMinusRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalMinus minus = (LogicalMinus) rel;
+ if (minus.all) {
+ return null; // EXCEPT ALL not implemented
+ }
+ final RelTraitSet traitSet =
+ rel.getTraitSet().replace(out);
+ return new MongoMinusRel(
+ rel.getCluster(),
+ traitSet,
+ convertList(minus.getInputs(), traitSet),
+ minus.all);
+ }
+ }
+
+ public static class MongoMinusRel
+ extends Minus
+ implements MongoRel {
+ public MongoMinusRel(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ List<RelNode> inputs,
+ boolean all) {
+ super(cluster, traitSet, inputs, all);
+ assert !all;
+ }
+
+ public MongoMinusRel copy(
+ RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new MongoMinusRel(getCluster(), traitSet, inputs, all);
+ }
+
+ public SqlString implement(MongoImplementor implementor) {
+ return setOpSql(this, implementor, " minus ");
+ }
+ }
+
+ public static class MongoValuesRule extends MongoConverterRule {
+ private MongoValuesRule(MongoConvention out) {
+ super(
+ LogicalValues.class,
+ Convention.NONE,
+ out,
+ "MongoValuesRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ LogicalValues valuesRel = (LogicalValues) rel;
+ return new MongoValuesRel(
+ valuesRel.getCluster(),
+ valuesRel.getRowType(),
+ valuesRel.getTuples(),
+ valuesRel.getTraitSet().plus(out));
+ }
+ }
+
+ public static class MongoValuesRel
+ extends Values
+ implements MongoRel {
+ MongoValuesRel(
+ RelOptCluster cluster,
+ RelDataType rowType,
+ List<List<RexLiteral>> tuples,
+ RelTraitSet traitSet) {
+ super(cluster, rowType, tuples, traitSet);
+ }
+
+ @Override public RelNode copy(
+ RelTraitSet traitSet, List<RelNode> inputs) {
+ assert inputs.isEmpty();
+ return new MongoValuesRel(
+ getCluster(), rowType, tuples, traitSet);
+ }
+
+ public SqlString implement(MongoImplementor implementor) {
+ throw new AssertionError(); // TODO:
+ }
+ }
+*/
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoTable.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoTable.java
new file mode 100644
index 0000000..68e01e9
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoTable.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo.plan;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.apache.calcite.adapter.java.AbstractQueryableTable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTableQueryable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Table based on a MongoDB collection.
+ */
+public class MongoTable extends AbstractQueryableTable
+ implements TranslatableTable {
+ private final String collectionName;
+
+ /** Creates a MongoTable. */
+ MongoTable(String collectionName) {
+ super(Object[].class);
+ this.collectionName = collectionName;
+ }
+
+ @Override public String toString() {
+ return "MongoTable {" + collectionName + "}";
+ }
+
+ @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ final RelDataType mapType =
+ typeFactory.createMapType(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR),
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.ANY), true));
+ return typeFactory.builder().add("_MAP", mapType).build();
+ }
+
+ @Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
+ SchemaPlus schema, String tableName) {
+ return new MongoQueryable<>(queryProvider, schema, this, tableName);
+ }
+
+ @Override public RelNode toRel(
+ RelOptTable.ToRelContext context,
+ RelOptTable relOptTable) {
+ final RelOptCluster cluster = context.getCluster();
+ // cluster, traits, grpScan, table
+// return new MongoTableScan(cluster, cluster.traitSetOf(MongoRel.CONVENTION),
+// relOptTable, relOptTable.getRowType(), null);
+ return null;
+ }
+
+ /** Executes a "find" operation on the underlying collection.
+ *
+ * <p>For example,
+ * <code>zipsTable.find("{state: 'OR'}", "{city: 1, zipcode: 1}")</code></p>
+ *
+ * @param mongoDb MongoDB connection
+ * @param filterJson Filter JSON string, or null
+ * @param projectJson Project JSON string, or null
+ * @param fields List of fields to project; or null to return map
+ * @return Enumerator of results
+ */
+ private Enumerable<Object> find(MongoDatabase mongoDb, String filterJson,
+ String projectJson, List<Map.Entry<String, Class>> fields) {
+ final MongoCollection collection =
+ mongoDb.getCollection(collectionName);
+ final Bson filter =
+ filterJson == null ? null : BsonDocument.parse(filterJson);
+ final Bson project =
+ projectJson == null ? null : BsonDocument.parse(projectJson);
+// final Function1<Document, Object> getter = MongoEnumerator.getter(fields);
+// return new AbstractEnumerable<Object>() {
+// @Override public Enumerator<Object> enumerator() {
+// @SuppressWarnings("unchecked") final FindIterable<Document> cursor =
+// collection.find(filter).projection(project);
+// return new MongoEnumerator(cursor.iterator(), getter);
+// }
+// };
+ return null;
+ }
+
+ /** Executes an "aggregate" operation on the underlying collection.
+ *
+ * <p>For example:
+ * <code>zipsTable.aggregate(
+ * "{$filter: {state: 'OR'}",
+ * "{$group: {_id: '$city', c: {$sum: 1}, p: {$sum: '$pop'}}}")
+ * </code></p>
+ *
+ * @param mongoDb MongoDB connection
+ * @param fields List of fields to project; or null to return map
+ * @param operations One or more JSON strings
+ * @return Enumerator of results
+ */
+ private Enumerable<Object> aggregate(final MongoDatabase mongoDb,
+ List<Map.Entry<String, Class>> fields,
+ List<String> operations) {
+ List<Bson> list = new ArrayList<>();
+ for (String operation : operations) {
+ list.add(BsonDocument.parse(operation));
+ }
+// Function1<Document, Object> getter =
+// MongoEnumerator.getter(fields);
+// return new AbstractEnumerable<Object>() {
+// @Override public Enumerator<Object> enumerator() {
+// final Iterator<Document> resultIterator;
+// try {
+// resultIterator = mongoDb.getCollection(collectionName)
+// .aggregate(list).iterator();
+// } catch (Exception e) {
+// throw new RuntimeException("While running MongoDB query "
+// + Util.toString(operations, "[", ",\n", "]"), e);
+// }
+// return new MongoEnumerator(resultIterator, getter);
+// }
+// };
+ return null;
+ }
+
+ /** Implementation of {@link Queryable} based on
+ * a {@link MongoTable}.
+ *
+ * @param <T> element type */
+ public static class MongoQueryable<T> extends AbstractTableQueryable<T> {
+ MongoQueryable(QueryProvider queryProvider, SchemaPlus schema,
+ MongoTable table, String tableName) {
+ super(queryProvider, schema, table, tableName);
+ }
+
+ @Override public Enumerator<T> enumerator() {
+ //noinspection unchecked
+ final Enumerable<T> enumerable =
+ (Enumerable<T>) getTable().find(getMongoDb(), null, null, null);
+ return enumerable.enumerator();
+ }
+
+ private MongoDatabase getMongoDb() {
+// return schema.unwrap(MongoSchemaFactory.MongoSchema.class).mongoDb;
+ return null;
+ }
+
+ private MongoTable getTable() {
+ return (MongoTable) table;
+ }
+
+ @SuppressWarnings("UnusedDeclaration")
+ public Enumerable<Object> aggregate(List<Map.Entry<String, Class>> fields,
+ List<String> operations) {
+ return getTable().aggregate(getMongoDb(), fields, operations);
+ }
+
+ /** Called via code-generation.
+ *
+ * @param filterJson Filter document
+ * @param projectJson Projection document
+ * @param fields List of expected fields (and their types)
+ * @return result of mongo query
+ *
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public Enumerable<Object> find(String filterJson,
+ String projectJson, List<Map.Entry<String, Class>> fields) {
+ return getTable().find(getMongoDb(), filterJson, projectJson, fields);
+ }
+ }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java
index da52984..4249725 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java
@@ -32,8 +32,7 @@ import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory.MongoSchema;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
public class MongoDatabaseSchema extends AbstractSchema {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
- .getLogger(MongoDatabaseSchema.class);
+
private final MongoSchema mongoSchema;
private final Set<String> tableNames;
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index cc071a4..29f14ce 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -29,13 +29,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.AbstractSchemaFactory;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.mongo.MongoScanSpec;
import org.apache.drill.exec.store.mongo.MongoStoragePlugin;
import org.apache.drill.exec.store.mongo.MongoStoragePluginConfig;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -195,7 +195,7 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
DrillTable getDrillTable(String dbName, String collectionName) {
MongoScanSpec mongoScanSpec = new MongoScanSpec(schemaNameMap.get(dbName), collectionName);
- return new DynamicDrillTable(plugin, getName(), null, mongoScanSpec);
+ return new PluginDrillTable(plugin, getName(), null, mongoScanSpec, plugin.convention());
}
@Override
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
index e4c71b1..eb706d6 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
@@ -155,7 +155,7 @@ public class TestMongoChunkAssignment extends BaseTest {
mongoGroupScan = new MongoGroupScan();
mongoGroupScan.setChunksMapping(chunksMapping);
- mongoGroupScan.setInverseChunsMapping(chunksInverseMapping);
+ mongoGroupScan.setInverseChunksMapping(chunksInverseMapping);
MongoScanSpec scanSpec = new MongoScanSpec(dbName, collectionName);
mongoGroupScan.setScanSpec(scanSpec);
}
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
index 569cf5c..bb660a0 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
@@ -30,31 +30,32 @@ public class TestMongoLimitPushDown extends MongoTestBase {
public void testLimit() throws Exception {
String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` LIMIT 4";
queryBuilder()
- .sql(sql)
- .planMatcher()
- .include("Limit", "maxRecords=4")
- .match();
+ .sql(sql)
+ .planMatcher()
+ .exclude("Limit\\(")
+ .include("MongoGroupScan.*limit=4")
+ .match();
}
@Test
public void testLimitWithOrderBy() throws Exception {
- // Limit should not be pushed down for this example due to the sort
String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` ORDER BY employee_id LIMIT 4";
queryBuilder()
.sql(sql)
.planMatcher()
- .include("Limit", "maxRecords=-1")
+ .exclude("Limit")
+ .include("sort=\\{employee_id", "limit=4")
.match();
}
@Test
public void testLimitWithOffset() throws Exception {
- // Limit should be pushed down and include the offset
String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` LIMIT 4 OFFSET 5";
queryBuilder()
.sql(sql)
.planMatcher()
- .include("Limit", "maxRecords=9")
+ .exclude("Limit")
+ .include("skip=5", "limit=4")
.match();
}
@@ -64,7 +65,8 @@ public class TestMongoLimitPushDown extends MongoTestBase {
queryBuilder()
.sql(sql)
.planMatcher()
- .include("Limit", "maxRecords=4")
+ .exclude("Limit")
+ .include("limit=4", "eq=52.17")
.match();
}
}
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
index d5316f9..18f1ad5 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
@@ -113,7 +113,7 @@ public class TestMongoQueries extends MongoTestBase {
queryBuilder().sql(query, DONUTS_DB, DONUTS_COLLECTION)
.planMatcher()
.exclude("Agg\\(")
- .include("Scan\\(.*aggregates")
+ .include("MongoGroupScan.*group")
.match();
testBuilder()
@@ -131,7 +131,7 @@ public class TestMongoQueries extends MongoTestBase {
queryBuilder().sql(query, DONUTS_DB, DONUTS_COLLECTION)
.planMatcher()
.exclude("Agg\\(")
- .include("Scan\\(.*aggregates")
+ .include("MongoGroupScan.*group")
.match();
testBuilder()
@@ -149,7 +149,7 @@ public class TestMongoQueries extends MongoTestBase {
queryBuilder().sql(query, DONUTS_DB, DONUTS_COLLECTION)
.planMatcher()
.exclude("Agg\\(", "Filter")
- .include("Scan\\(.*aggregates")
+ .include("MongoGroupScan.*group")
.match();
testBuilder()
@@ -158,7 +158,52 @@ public class TestMongoQueries extends MongoTestBase {
.baselineColumns("c")
.baselineValues(1)
.go();
+ }
+
+ @Test
+ public void testUnionAll() throws Exception {
+ String query = "select t1.id as id, t1.name from mongo.%1$s.`%2$s` t1 where t1.name = 'Cake' union all " +
+ "select t2.id as id, t2.name from mongo.%1$s.`%2$s` t2";
+
+ queryBuilder().sql(query, DONUTS_DB, DONUTS_COLLECTION)
+ .planMatcher()
+ .exclude("UnionAll\\(")
+ .include("MongoGroupScan.*\\$unionWith")
+ .match();
-// queryBuilder().sql("select * from mongo.%s.`%s` t", DONUTS_DB, DONUTS_COLLECTION).printCsv();
+ testBuilder()
+ .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+ .unOrdered()
+ .baselineColumns("id", "name")
+ .baselineValues("0001", "Cake")
+ .baselineValues("0001", "Cake")
+ .baselineValues("0002", "Raised")
+ .baselineValues("0003", "Old Fashioned")
+ .baselineValues("0004", "Filled")
+ .baselineValues("0005", "Apple Fritter")
+ .go();
+ }
+
+ @Test
+ public void testUnionDistinct() throws Exception {
+ String query = "select t1.id as id, t1.name from mongo.%1$s.`%2$s` t1 where t1.name = 'Cake' union " +
+ "select t2.id as id, t2.name from mongo.%1$s.`%2$s` t2 ";
+
+ queryBuilder().sql(query, DONUTS_DB, DONUTS_COLLECTION)
+ .planMatcher()
+ .exclude("UnionAll\\(", "Agg\\(")
+ .include("MongoGroupScan.*\\$unionWith")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+ .unOrdered()
+ .baselineColumns("id", "name")
+ .baselineValues("0001", "Cake")
+ .baselineValues("0002", "Raised")
+ .baselineValues("0003", "Old Fashioned")
+ .baselineValues("0004", "Filled")
+ .baselineValues("0005", "Apple Fritter")
+ .go();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index cc549b1..6779d0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.drill.exec.metastore.analyze.AnalyzeInfoProvider;
import org.apache.drill.metastore.metadata.TableMetadata;
import org.apache.drill.metastore.metadata.TableMetadataProvider;
@@ -92,6 +93,11 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
return getScanStats();
}
+ @Override
+ public ScanStats getScanStats(RelMetadataQuery mq) {
+ return getScanStats();
+ }
+
@JsonIgnore
public ScanStats getScanStats() {
throw new UnsupportedOperationException("This should be implemented.");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 9eaa043..2c6ff4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.physical.base;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.Collection;
import java.util.List;
+
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
@@ -88,6 +90,9 @@ public interface GroupScan extends Scan, HasAffinity {
@JsonIgnore
ScanStats getScanStats(PlannerSettings settings);
+ @JsonIgnore
+ ScanStats getScanStats(RelMetadataQuery mq);
+
/**
* Returns a clone of GroupScan instance, except that the new GroupScan will use the provided list of columns .
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSortRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSortRelBase.java
new file mode 100644
index 0000000..c13effc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSortRelBase.java
@@ -0,0 +1,29 @@
+package org.apache.drill.exec.planner.common;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexNode;
+
+public abstract class DrillSortRelBase extends Sort implements DrillRelNode, OrderedRel {
+
+ protected DrillSortRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
+ super(cluster, traits, input, collation);
+ }
+
+ protected DrillSortRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traits, input, collation, offset, fetch);
+ }
+
+ @Override
+ public RexNode getOffset() {
+ return offset;
+ }
+
+ @Override
+ public RexNode getFetch() {
+ return fetch;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
index 1e380cf..0fb7b8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
@@ -25,13 +25,12 @@ import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.common.logical.data.Order;
import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.planner.common.DrillSortRelBase;
import org.apache.drill.exec.planner.torel.ConversionContext;
-import org.apache.drill.exec.planner.common.OrderedRel;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rex.RexNode;
@@ -42,7 +41,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
/**
* Sort implemented in Drill.
*/
-public class DrillSortRel extends Sort implements DrillRel,OrderedRel {
+public class DrillSortRel extends DrillSortRelBase implements DrillRel {
/** Creates a DrillSortRel. */
public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
@@ -100,16 +99,6 @@ public class DrillSortRel extends Sort implements DrillRel,OrderedRel {
}
@Override
- public RexNode getOffset() {
- return offset;
- }
-
- @Override
- public RexNode getFetch() {
- return fetch;
- }
-
- @Override
public boolean canBeDropped() {
return true;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index 7e415b2..3e68d7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -21,13 +21,13 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import org.apache.drill.exec.planner.common.DrillSortRelBase;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.calcite.rel.RelCollationImpl;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.planner.common.OrderedRel;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
@@ -41,7 +41,7 @@ import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rex.RexNode;
-public class SortPrel extends org.apache.calcite.rel.core.Sort implements OrderedRel,Prel {
+public class SortPrel extends DrillSortRelBase implements Prel {
private final boolean isRemovable;
/** Creates a DrillSortRel. */
@@ -154,16 +154,6 @@ public class SortPrel extends org.apache.calcite.rel.core.Sort implements Ordere
}
@Override
- public RexNode getOffset() {
- return offset;
- }
-
- @Override
- public RexNode getFetch() {
- return fetch;
- }
-
- @Override
public boolean canBeDropped() {
return isRemovable;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java
new file mode 100644
index 0000000..c9d4b8c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PlannableStoragePlugin.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Abstract class for StorePlugin implementations.
+ * See StoragePlugin for description of the interface intent and its methods.
+ */
+public abstract class PlannableStoragePlugin extends AbstractStoragePlugin {
+
+ private final PlannableStoragePluginConfigs<?> plannableStoragePluginConfigs;
+
+ protected PlannableStoragePlugin(PlannableStoragePluginConfigs<? extends PlannableStoragePluginConfigs<?>> plannableStoragePluginConfigs) {
+ super(plannableStoragePluginConfigs.context(), plannableStoragePluginConfigs.name());
+ this.plannableStoragePluginConfigs = plannableStoragePluginConfigs;
+ }
+
+ @Override
+ public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+ switch (phase) {
+ case PHYSICAL:
+ case LOGICAL:
+ ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.builder();
+ PluginRulesProvider rulesProvider = plannableStoragePluginConfigs.rulesProvider;
+ if (plannableStoragePluginConfigs.supportsProjectPushdown) {
+ builder.addAll(rulesProvider.projectRules());
+ }
+ if (plannableStoragePluginConfigs.supportsFilterPushdown) {
+ builder.addAll(rulesProvider.filterRules());
+ }
+ if (plannableStoragePluginConfigs.supportsSortPushdown) {
+ builder.addAll(rulesProvider.sortRules());
+ }
+ if (plannableStoragePluginConfigs.supportsUnionPushdown) {
+ builder.addAll(rulesProvider.unionRules());
+ }
+ if (plannableStoragePluginConfigs.supportsAggregatePushdown) {
+ builder.addAll(rulesProvider.aggregateRules());
+ }
+ if (plannableStoragePluginConfigs.supportsLimitPushdown) {
+ builder.addAll(rulesProvider.limitRules());
+ }
+ builder.add(rulesProvider.vertexRule());
+ builder.add(rulesProvider.prelConverterRule());
+ return builder.build();
+ case LOGICAL_PRUNE_AND_JOIN:
+ case LOGICAL_PRUNE:
+ case PARTITION_PRUNING:
+ case JOIN_PLANNING:
+ default:
+ return Collections.emptySet();
+ }
+ }
+
+ public Convention convention() {
+ return plannableStoragePluginConfigs.convention();
+ }
+
+ public static abstract class PlannableStoragePluginConfigs<T extends PlannableStoragePluginConfigs<?>> {
+ private DrillbitContext context;
+ private String name;
+ private boolean supportsProjectPushdown;
+ private boolean supportsFilterPushdown;
+ private boolean supportsAggregatePushdown;
+ private boolean supportsSortPushdown;
+ private boolean supportsUnionPushdown;
+ private boolean supportsLimitPushdown;
+ private PluginRulesProvider rulesProvider;
+ private Convention convention;
+
+ public abstract T self();
+
+ public DrillbitContext context() {
+ return context;
+ }
+
+ public T context(DrillbitContext inContext) {
+ this.context = inContext;
+ return self();
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public T name(String inName) {
+ this.name = inName;
+ return self();
+ }
+
+ public boolean supportsProjectPushdown() {
+ return supportsProjectPushdown;
+ }
+
+ public T supportsProjectPushdown(boolean supportsProjectPushdown) {
+ this.supportsProjectPushdown = supportsProjectPushdown;
+ return self();
+ }
+
+ public boolean supportsFilterPushdown() {
+ return supportsFilterPushdown;
+ }
+
+ public T supportsFilterPushdown(boolean supportsFilterPushdown) {
+ this.supportsFilterPushdown = supportsFilterPushdown;
+ return self();
+ }
+
+ public boolean supportsAggregatePushdown() {
+ return supportsAggregatePushdown;
+ }
+
+ public T supportsAggregatePushdown(boolean supportsAggregatePushdown) {
+ this.supportsAggregatePushdown = supportsAggregatePushdown;
+ return self();
+ }
+
+ public boolean supportsSortPushdown() {
+ return supportsSortPushdown;
+ }
+
+ public T supportsSortPushdown(boolean supportsSortPushdown) {
+ this.supportsSortPushdown = supportsSortPushdown;
+ return self();
+ }
+
+ public boolean supportsUnionPushdown() {
+ return supportsUnionPushdown;
+ }
+
+ public T supportsUnionPushdown(boolean supportsUnionPushdown) {
+ this.supportsUnionPushdown = supportsUnionPushdown;
+ return self();
+ }
+
+ public boolean supportsLimitPushdown() {
+ return supportsLimitPushdown;
+ }
+
+ public PlannableStoragePluginConfigs<T> supportsLimitPushdown(boolean supportsLimitPushdown) {
+ this.supportsLimitPushdown = supportsLimitPushdown;
+ return this;
+ }
+
+ public PluginRulesProvider rulesProvider() {
+ return rulesProvider;
+ }
+
+ public T rulesProvider(PluginRulesProvider rulesProvider) {
+ this.rulesProvider = rulesProvider;
+ return self();
+ }
+
+ public Convention convention() {
+ return convention;
+ }
+
+ public PlannableStoragePluginConfigs<T> convention(Convention convention) {
+ this.convention = convention;
+ return this;
+ }
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
new file mode 100644
index 0000000..d35bf83
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
@@ -0,0 +1,16 @@
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.RelOptRule;
+
+import java.util.List;
+
+public interface PluginRulesProvider {
+ List<RelOptRule> sortRules();
+ List<RelOptRule> limitRules();
+ List<RelOptRule> filterRules();
+ List<RelOptRule> projectRules();
+ List<RelOptRule> aggregateRules();
+ List<RelOptRule> unionRules();
+ RelOptRule vertexRule();
+ RelOptRule prelConverterRule();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
new file mode 100644
index 0000000..31f388d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
@@ -0,0 +1,39 @@
+package org.apache.drill.exec.store.plan;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+
+import java.io.IOException;
+
+public interface PluginImplementor {
+
+ void implement(PluginAggregateRel aggregate) throws IOException;
+
+ void implement(PluginFilterRel filter) throws IOException;
+
+ void implement(PluginLimitRel limit) throws IOException;
+
+ void implement(PluginProjectRel project) throws IOException;
+
+ void implement(PluginSortRel sort) throws IOException;
+
+ void implement(PluginUnionRel union) throws IOException;
+
+ void implement(StoragePluginTableScan scan) throws IOException;
+
+ PluginImplementor copy();
+
+ GroupScan getPhysicalOperator() throws IOException;
+
+ default void visitChild(RelNode input) throws IOException {
+ ((PluginRel) input).implement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
new file mode 100644
index 0000000..8505ac7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PluginAggregateRel extends DrillAggregateRelBase implements PluginRel {
+
+ public PluginAggregateRel(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls)
+ throws InvalidRelException {
+ super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+ assert getConvention() == input.getConvention();
+
+ for (AggregateCall aggCall : aggCalls) {
+ if (aggCall.isDistinct()) {
+ throw new InvalidRelException(
+ "distinct aggregation not supported");
+ }
+ }
+ switch (getGroupType()) {
+ case SIMPLE:
+ break;
+ default:
+ throw new InvalidRelException("unsupported group type: "
+ + getGroupType());
+ }
+ }
+
+ @Override
+ public Aggregate copy(RelTraitSet traitSet, RelNode input,
+ ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls) {
+ try {
+ return new PluginAggregateRel(getCluster(), traitSet, input,
+ groupSet, groupSets, aggCalls);
+ } catch (InvalidRelException e) {
+ // Semantic error not possible. Must be a bug. Convert to
+ // internal error.
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginDrillTable.java
new file mode 100644
index 0000000..d0a0854
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginDrillTable.java
@@ -0,0 +1,37 @@
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.util.Utilities;
+
+import java.io.IOException;
+
+public class PluginDrillTable extends DynamicDrillTable implements TranslatableTable {
+ private final Convention convention;
+
+ public PluginDrillTable(StoragePlugin plugin, String storageEngineName,
+ String userName, Object selection, Convention convention) {
+ super(plugin, storageEngineName, userName, selection);
+ this.convention = convention;
+ }
+
+ @Override
+ public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable table) {
+ DrillTable drillTable = Utilities.getDrillTable(table);
+ try {
+ return new StoragePluginTableScan(context.getCluster(),
+ context.getCluster().traitSetOf(convention),
+ drillTable.getGroupScan(),
+ table,
+ table.getRowType());
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java
new file mode 100644
index 0000000..661ce21
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillFilterRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+
+public class PluginFilterRel extends DrillFilterRelBase implements PluginRel {
+
+ public PluginFilterRel(
+ Convention convention,
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode child,
+ RexNode condition) {
+ super(convention, cluster, traitSet, child, condition);
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override public PluginFilterRel copy(RelTraitSet traitSet, RelNode input,
+ RexNode condition) {
+ return new PluginFilterRel(getConvention(), getCluster(), traitSet, input, condition);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginIntermediatePrelRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginIntermediatePrelRel.java
new file mode 100644
index 0000000..88e8c83
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginIntermediatePrelRel.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.SinglePrel;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.planner.sql.handlers.PrelFinalizable;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+public class PluginIntermediatePrelRel extends SinglePrel implements PrelFinalizable {
+
+ private final Supplier<PluginImplementor> implementorFactory;
+
+ public PluginIntermediatePrelRel(RelOptCluster cluster, RelTraitSet traits,
+ RelNode child, Supplier<PluginImplementor> implementorFactory) {
+ super(cluster, traits, child);
+ this.implementorFactory = implementorFactory;
+ }
+
+ @Override
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new PluginIntermediatePrelRel(getCluster(), traitSet, getInput(), implementorFactory);
+ }
+
+ @Override
+ protected Object clone() throws CloneNotSupportedException {
+ return copy(getTraitSet(), getInputs());
+ }
+
+ @Override
+ public SelectionVectorMode getEncoding() {
+ return SelectionVectorMode.NONE;
+ }
+
+ @Override
+ public Prel finalizeRel() {
+ return new PluginPrel(getCluster(), this);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) {
+ throw new UnsupportedOperationException("This needs to be finalized before using a PrelVisitor.");
+ }
+
+ @Override
+ public boolean needsFinalColumnReordering() {
+ return false;
+ }
+
+ public PluginImplementor getPluginImplementor() {
+ return implementorFactory.get();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java
new file mode 100644
index 0000000..b74aac4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PluginLimitRel extends DrillLimitRelBase implements PluginRel {
+ public PluginLimitRel(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode child, RexNode offset, RexNode fetch) {
+ super(cluster, traitSet, child, offset, fetch);
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.05);
+ }
+
+ @Override
+ public PluginLimitRel copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new PluginLimitRel(getCluster(), traitSet, inputs.get(0), offset, fetch);
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
new file mode 100644
index 0000000..743679d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
@@ -0,0 +1,83 @@
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.SubsetRemover;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+public class PluginPrel extends AbstractRelNode implements Prel {
+ private final GroupScan groupScan;
+ private final RelDataType rowType;
+
+ public PluginPrel(RelOptCluster cluster, PluginIntermediatePrelRel intermediatePrel) {
+ super(cluster, intermediatePrel.getTraitSet());
+ this.rowType = intermediatePrel.getRowType();
+ PluginRel input = (PluginRel) intermediatePrel.getInput().accept(SubsetRemover.INSTANCE);
+ try {
+ PluginImplementor implementor = intermediatePrel.getPluginImplementor();
+ input.implement(implementor);
+ this.groupScan = implementor.getPhysicalOperator();
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) {
+ return groupScan;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+ return logicalVisitor.visitPrel(this, value);
+ }
+
+ @Override
+ public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
+ return BatchSchema.SelectionVectorMode.DEFAULT;
+ }
+
+ @Override
+ public BatchSchema.SelectionVectorMode getEncoding() {
+ return BatchSchema.SelectionVectorMode.NONE;
+ }
+
+ @Override
+ public boolean needsFinalColumnReordering() {
+ return false;
+ }
+
+ @Override
+ public Iterator<Prel> iterator() {
+ return Collections.emptyIterator();
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw).item("groupScan", groupScan);
+ }
+
+ @Override
+ public double estimateRowCount(RelMetadataQuery mq) {
+ return groupScan.getScanStats(mq).getRecordCount();
+ }
+
+ @Override
+ protected RelDataType deriveRowType() {
+ return rowType;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java
new file mode 100644
index 0000000..49a2cac
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PluginProjectRel extends DrillProjectRelBase implements PluginRel {
+
+ public PluginProjectRel(Convention convention, RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+ super(convention, cluster, traitSet, input, projects, rowType);
+ assert getConvention() == input.getConvention();
+ }
+
+ @Override public Project copy(RelTraitSet traitSet, RelNode input,
+ List<RexNode> projects, RelDataType rowType) {
+ return new PluginProjectRel(getConvention(), getCluster(), traitSet, input, projects,
+ rowType);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
new file mode 100644
index 0000000..7364e70
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
@@ -0,0 +1,11 @@
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+
+public interface PluginRel extends RelNode {
+ void implement(PluginImplementor implementor) throws IOException;
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java
new file mode 100644
index 0000000..255bb2d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillSortRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.io.IOException;
+
+public class PluginSortRel extends DrillSortRelBase implements PluginRel {
+ public PluginSortRel(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traitSet, child, collation, offset, fetch);
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override
+ public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ @Override
+ public PluginSortRel copy(RelTraitSet traitSet, RelNode input,
+ RelCollation newCollation, RexNode offset, RexNode fetch) {
+ return new PluginSortRel(getCluster(), traitSet, input, collation, offset, fetch);
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+
+ @Override
+ public boolean canBeDropped() {
+ return false;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
new file mode 100644
index 0000000..73227bd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
@@ -0,0 +1,42 @@
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.planner.common.DrillUnionRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PluginUnionRel extends DrillUnionRelBase implements PluginRel {
+
+ public PluginUnionRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all, boolean checkCompatibility) throws InvalidRelException {
+ super(cluster, traits, inputs, all, checkCompatibility);
+ }
+
+ @Override
+ public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ try {
+ return new PluginUnionRel(getCluster(), traitSet, inputs, all, true);
+ } catch (InvalidRelException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(.1);
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
new file mode 100644
index 0000000..a062b2e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+
+public class StoragePluginTableScan extends DrillScanRelBase implements PluginRel {
+
+ private final RelDataType rowType;
+
+ public StoragePluginTableScan(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ GroupScan grpScan,
+ RelOptTable table,
+ RelDataType rowType) {
+ super(cluster, traits, grpScan, table);
+ this.rowType = rowType;
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+
+ @Override
+ public DrillScanRelBase copy(RelTraitSet traitSet, GroupScan scan, RelDataType rowType) {
+ return new StoragePluginTableScan(getCluster(), traitSet, scan, getTable(), rowType);
+ }
+
+ @Override
+ public double estimateRowCount(RelMetadataQuery mq) {
+ return getGroupScan().getScanStats(mq).getRecordCount();
+ }
+
+ @Override
+ public RelDataType deriveRowType() {
+ return this.rowType;
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw).item("groupScan", getGroupScan().getDigest());
+ }
+
+ @Override
+ protected String computeDigest() {
+ return super.computeDigest();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java
new file mode 100644
index 0000000..616493d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java
@@ -0,0 +1,38 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PluginAggregateRule extends PluginConverterRule {
+ private static final Logger logger = LoggerFactory.getLogger(PluginAggregateRule.class);
+
+ public PluginAggregateRule(RelTrait in, Convention out) {
+ super(Aggregate.class, in, out, "PluginAggregateRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Aggregate agg = (Aggregate) rel;
+ RelTraitSet traitSet =
+ agg.getTraitSet().replace(getOutConvention());
+ try {
+ return new PluginAggregateRel(
+ rel.getCluster(),
+ traitSet,
+ convert(agg.getInput(), traitSet.simplify()),
+ agg.getGroupSet(),
+ agg.getGroupSets(),
+ agg.getAggCallList());
+ } catch (InvalidRelException e) {
+ logger.warn(e.toString());
+ return null;
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java
new file mode 100644
index 0000000..5801d83
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java
@@ -0,0 +1,16 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
+
+import java.util.function.Predicate;
+
+public abstract class PluginConverterRule extends ConverterRule {
+
+ protected PluginConverterRule(Class<? extends RelNode> clazz, RelTrait in, Convention out, String description) {
+ super(clazz, (Predicate<RelNode>) input -> true, in, out, DrillRelFactories.LOGICAL_BUILDER, description);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
new file mode 100644
index 0000000..8d350f4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
@@ -0,0 +1,30 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+
+/**
+ * Rule to convert a {@link Filter} to a {@link PluginFilterRel}.
+ */
+public class PluginFilterRule extends PluginConverterRule {
+
+ public PluginFilterRule(RelTrait in, Convention out) {
+ super(Filter.class, in, out, "PluginFilterRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Filter filter = (Filter) rel;
+ RelTraitSet traitSet = filter.getTraitSet().replace(getOutConvention());
+ return new PluginFilterRel(
+ getOutConvention(),
+ rel.getCluster(),
+ traitSet,
+ convert(filter.getInput(), getOutConvention()),
+ filter.getCondition());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
new file mode 100644
index 0000000..5b08891
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginIntermediatePrelRel;
+
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+public class PluginIntermediatePrelConverterRule extends ConverterRule {
+
+ private final Supplier<PluginImplementor> implementorFactory;
+
+ public PluginIntermediatePrelConverterRule(Supplier<PluginImplementor> implementorFactory) {
+ super(VertexDrel.class, (Predicate<RelNode>) input -> true, DrillRel.DRILL_LOGICAL,
+ Prel.DRILL_PHYSICAL, DrillRelFactories.LOGICAL_BUILDER, "Plugin_prel_Converter");
+ this.implementorFactory = implementorFactory;
+ }
+
+ @Override
+ public RelNode convert(RelNode in) {
+ return new PluginIntermediatePrelRel(
+ in.getCluster(),
+ in.getTraitSet().replace(getOutTrait()),
+ in.getInput(0),
+ implementorFactory);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java
new file mode 100644
index 0000000..fea2276
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java
@@ -0,0 +1,25 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+
+public class PluginLimitRule extends PluginConverterRule {
+
+ public PluginLimitRule(RelTrait in, Convention out) {
+ super(DrillLimitRelBase.class, in, out, "PluginLimitRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ DrillLimitRelBase sort = (DrillLimitRelBase) rel;
+ RelTraitSet traitSet =
+ sort.getTraitSet().replace(getOutConvention());
+ RelNode input = convert(sort.getInput(), sort.getInput().getTraitSet().replace(getOutConvention()).simplify());
+ return new PluginLimitRel(rel.getCluster(), traitSet, input,
+ sort.getOffset(), sort.getFetch());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java
new file mode 100644
index 0000000..d02bd1e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java
@@ -0,0 +1,27 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+
+/**
+ * Rule to convert a {@link Project} to a {@link PluginProjectRel}.
+ */
+public class PluginProjectRule extends PluginConverterRule {
+
+ public PluginProjectRule(RelTrait in, Convention out) {
+ super(Project.class, in, out, "PluginProjectRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Project project = (Project) rel;
+ RelTraitSet traitSet = project.getTraitSet().replace(getOutConvention());
+ return new PluginProjectRel(getOutConvention(), project.getCluster(), traitSet,
+ convert(project.getInput(), getOutConvention()), project.getProjects(),
+ project.getRowType());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java
new file mode 100644
index 0000000..0378b0e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java
@@ -0,0 +1,28 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+
+/**
+ * Rule to convert a {@link Sort} to a {@link PluginSortRel}.
+ */
+public class PluginSortRule extends PluginConverterRule {
+
+ public PluginSortRule(RelTrait in, Convention out) {
+ super(Sort.class, in, out, "PluginSortRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Sort sort = (Sort) rel;
+ RelTraitSet traitSet = sort.getTraitSet().replace(getOutConvention())
+ .replace(sort.getCollation());
+ RelNode input = convert(sort.getInput(), sort.getInput().getTraitSet().replace(getOutConvention()).simplify());
+ return new PluginSortRel(rel.getCluster(), traitSet, input,
+ sort.getCollation(), sort.offset, sort.fetch);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java
new file mode 100644
index 0000000..c85f65e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java
@@ -0,0 +1,42 @@
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+
+public class PluginUnionRule extends PluginConverterRule {
+
+ public PluginUnionRule(RelTrait in, Convention out) {
+ super(Union.class, in, out, "PluginUnionRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Union union = (Union) rel;
+ RelTraitSet traitSet =
+ union.getTraitSet().replace(getOutConvention());
+ try {
+ return new PluginUnionRel(
+ rel.getCluster(),
+ traitSet,
+ convertList(union.getInputs(), getOutConvention()),
+ union.all,
+ true);
+ } catch (InvalidRelException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ // allow converting for union all only, since Drill adds extra aggregation for union distinct,
+ // so we will convert both union all and aggregation later
+ return call.<Union>rel(0).all;
+ }
+}