You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2021/09/14 03:51:57 UTC
[drill] branch master updated: DRILL-7985: Support Mongo aggregate,
union, project, limit, sort pushdowns (#2289)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new b249baf DRILL-7985: Support Mongo aggregate, union, project, limit, sort pushdowns (#2289)
b249baf is described below
commit b249baf0ebd638105908d3697ca8d68f5753fad5
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Tue Sep 14 06:51:51 2021 +0300
DRILL-7985: Support Mongo aggregate, union, project, limit, sort pushdowns (#2289)
* DRILL-7985: Support Mongo aggregate, union, project, limit, sort pushdowns
* Fix ser/de, make batchSize configurable and minor cleanup
* Add docs for Mongo storage plugin credentials
* Update mongo driver
* Fix CASE with multiple branches
* Fix alert
---
.../store/druid/DruidPushDownFilterForScan.java | 2 +-
contrib/storage-mongo/pom.xml | 2 +-
...ngoCompareOp.java => BaseMongoSubScanSpec.java} | 33 ++-
.../exec/store/mongo/MongoAggregateUtils.java | 194 ++++++++++++++
.../drill/exec/store/mongo/MongoFilterBuilder.java | 137 +++++-----
.../drill/exec/store/mongo/MongoGroupScan.java | 126 +++++----
.../store/mongo/MongoPushDownFilterForScan.java | 99 -------
.../drill/exec/store/mongo/MongoRecordReader.java | 35 ++-
.../exec/store/mongo/MongoScanBatchCreator.java | 12 +-
.../drill/exec/store/mongo/MongoScanSpec.java | 46 ++--
.../drill/exec/store/mongo/MongoStoragePlugin.java | 50 +++-
.../exec/store/mongo/MongoStoragePluginConfig.java | 53 ++--
.../drill/exec/store/mongo/MongoSubScan.java | 152 +++--------
.../common/{MongoCompareOp.java => MongoOp.java} | 30 ++-
.../store/mongo/plan/MongoPluginImplementor.java | 294 +++++++++++++++++++++
.../store/mongo/plan/RexToMongoTranslator.java | 244 +++++++++++++++++
.../store/mongo/schema/MongoDatabaseSchema.java | 3 +-
.../store/mongo/schema/MongoSchemaFactory.java | 4 +-
.../drill/exec/store/mongo/MongoTestBase.java | 2 +-
.../exec/store/mongo/TestMongoChunkAssignment.java | 2 +-
.../exec/store/mongo/TestMongoLimitPushDown.java | 20 +-
.../exec/store/mongo/TestMongoProjectPushDown.java | 12 +-
.../drill/exec/store/mongo/TestMongoQueries.java | 199 ++++++++++++++
...TestMongoStoragePluginUsesCredentialsStore.java | 2 +-
docs/dev/MongoStoragePluginCredentials.md | 58 ++++
docs/dev/StoragePluginPushdowns.md | 31 +++
.../exec/physical/base/AbstractGroupScan.java | 6 +
.../apache/drill/exec/physical/base/GroupScan.java | 5 +
.../exec/planner/common/DrillScanRelBase.java | 4 +-
.../exec/planner/common/DrillSortRelBase.java | 49 ++++
.../drill/exec/planner/logical/DrillScanRel.java | 4 +-
.../drill/exec/planner/logical/DrillSortRel.java | 15 +-
.../drill/exec/planner/physical/ScanPrel.java | 4 +-
.../drill/exec/planner/physical/SortPrel.java | 14 +-
.../drill/exec/store/PluginRulesProvider.java | 44 +--
.../drill/exec/store/PluginRulesProviderImpl.java | 116 ++++++++
.../exec/store/StoragePluginRulesSupplier.java | 180 +++++++++++++
.../exec/store/plan/AbstractPluginImplementor.java | 134 ++++++++++
.../drill/exec/store/plan/PluginImplementor.java | 86 ++++++
.../exec/store/plan/rel/PluginAggregateRel.java | 66 +++++
.../exec/store/plan/rel/PluginDrillTable.java | 57 ++++
.../drill/exec/store/plan/rel/PluginFilterRel.java | 62 +++++
.../store/plan/rel/PluginIntermediatePrel.java | 87 ++++++
.../drill/exec/store/plan/rel/PluginJoinRel.java | 56 ++++
.../drill/exec/store/plan/rel/PluginLimitRel.java | 63 +++++
.../drill/exec/store/plan/rel/PluginPrel.java | 103 ++++++++
.../exec/store/plan/rel/PluginProjectRel.java | 66 +++++
.../drill/exec/store/plan/rel/PluginRel.java | 25 +-
.../drill/exec/store/plan/rel/PluginSortRel.java | 69 +++++
.../drill/exec/store/plan/rel/PluginUnionRel.java | 68 +++++
.../store/plan/rel/StoragePluginTableScan.java | 79 ++++++
.../exec/store/plan/rule/PluginAggregateRule.java | 47 ++++
.../exec/store/plan/rule/PluginConverterRule.java | 76 ++++++
.../exec/store/plan/rule/PluginFilterRule.java | 46 ++++
.../rule/PluginIntermediatePrelConverterRule.java | 66 +++++
.../drill/exec/store/plan/rule/PluginJoinRule.java | 49 ++++
.../exec/store/plan/rule/PluginLimitRule.java | 44 +++
.../exec/store/plan/rule/PluginProjectRule.java | 47 ++++
.../drill/exec/store/plan/rule/PluginSortRule.java | 48 ++++
.../exec/store/plan/rule/PluginUnionRule.java | 55 ++++
60 files changed, 3250 insertions(+), 532 deletions(-)
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
index 65d95aa..2c5fcee 100644
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
@@ -73,7 +73,7 @@ public class DruidPushDownFilterForScan extends StoragePluginOptimizerRule {
groupScan.getMaxRecordsToRead());
newGroupsScan.setFilterPushedDown(true);
- ScanPrel newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan);
+ ScanPrel newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan, filter.getRowType());
if (druidFilterBuilder.isAllExpressionsConverted()) {
/*
* Since we could convert the entire filter condition expression into a
diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
index 66bc0cb..90a9a0e 100644
--- a/contrib/storage-mongo/pom.xml
+++ b/contrib/storage-mongo/pom.xml
@@ -45,7 +45,7 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
- <version>4.2.3</version>
+ <version>4.3.1</version>
</dependency>
<!-- Test dependency -->
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
similarity index 58%
copy from contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
copy to contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
index ef89bfb..4f0d1bf 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
@@ -15,20 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.mongo.common;
+package org.apache.drill.exec.store.mongo;
-public enum MongoCompareOp {
- EQUAL("$eq"), NOT_EQUAL("$ne"), GREATER_OR_EQUAL("$gte"), GREATER("$gt"), LESS_OR_EQUAL(
- "$lte"), LESS("$lt"), IN("$in"), AND("$and"), OR("$or"), REGEX("$regex"), OPTIONS(
- "$options"), PROJECT("$project"), COND("$cond"), IFNULL("$ifNull"), IFNOTNULL(
- "$ifNotNull"), SUM("$sum"), GROUP_BY("$group"), EXISTS("$exists");
- private String compareOp;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.SuperBuilder;
- MongoCompareOp(String compareOp) {
- this.compareOp = compareOp;
- }
+import java.util.List;
+
+@Getter
+@Setter
+@SuperBuilder(setterPrefix = "set")
+public class BaseMongoSubScanSpec {
+
+ @JsonProperty
+ private final String dbName;
+
+ @JsonProperty
+ private final String collectionName;
+
+ @JsonProperty
+ private final List<String> hosts;
- public String getCompareOp() {
- return compareOp;
- }
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
new file mode 100644
index 0000000..a72b6eb
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import com.mongodb.client.model.Accumulators;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.BsonField;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonElement;
+import org.bson.BsonInt32;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class MongoAggregateUtils {
+
+ public static List<String> mongoFieldNames(RelDataType rowType) {
+ List<String> renamed = rowType.getFieldNames().stream()
+ .map(name -> name.startsWith("$") ? "_" + name.substring(2) : name)
+ .collect(Collectors.toList());
+ return SqlValidatorUtil.uniquify(renamed, true);
+ }
+
+ public static String maybeQuote(String s) {
+ if (!needsQuote(s)) {
+ return s;
+ }
+ return quote(s);
+ }
+
+ public static String quote(String s) {
+ return "'" + s + "'";
+ }
+
+ private static boolean needsQuote(String s) {
+ for (int i = 0, n = s.length(); i < n; i++) {
+ char c = s.charAt(i);
+ if (!Character.isJavaIdentifierPart(c)
+ || c == '$') {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static List<Bson> getAggregateOperations(Aggregate aggregate, RelDataType rowType) {
+ List<String> inNames = mongoFieldNames(rowType);
+ List<String> outNames = mongoFieldNames(aggregate.getRowType());
+ Object id;
+ if (aggregate.getGroupSet().cardinality() == 1) {
+ String inName = inNames.get(aggregate.getGroupSet().nth(0));
+ id = "$" + inName;
+ } else {
+ List<BsonElement> elements =
+ StreamSupport.stream(aggregate.getGroupSet().spliterator(), false)
+ .map(inNames::get)
+ .map(inName -> new BsonElement(inName, new BsonString("$" + inName)))
+ .collect(Collectors.toList());
+ id = new BsonDocument(elements);
+ }
+ int outNameIndex = aggregate.getGroupSet().cardinality();
+ List<BsonField> accumList = new ArrayList<>();
+ for (AggregateCall aggCall : aggregate.getAggCallList()) {
+ accumList.add(bsonAggregate(inNames, outNames.get(outNameIndex++), aggCall));
+ }
+ List<Bson> operationsList = new ArrayList<>();
+ operationsList.add(Aggregates.group(id, accumList).toBsonDocument());
+ List<BsonElement> projectFields = new ArrayList<>();
+ if (aggregate.getGroupSet().cardinality() == 1) {
+ for (int index = 0; index < outNames.size(); index++) {
+ String outName = outNames.get(index);
+ projectFields.add(new BsonElement(maybeQuote(outName),
+ new BsonString("$" + (index == 0 ? "_id" : outName))));
+ }
+ } else {
+ projectFields.add(new BsonElement("_id", new BsonInt32(0)));
+ for (int group : aggregate.getGroupSet()) {
+ projectFields.add(
+ new BsonElement(maybeQuote(outNames.get(group)),
+ new BsonString("$_id." + outNames.get(group))));
+ }
+ outNameIndex = aggregate.getGroupSet().cardinality();
+ for (AggregateCall ignored : aggregate.getAggCallList()) {
+ String outName = outNames.get(outNameIndex++);
+ projectFields.add(new BsonElement(maybeQuote(outName), new BsonString("$" + outName)));
+ }
+ }
+ if (!aggregate.getGroupSet().isEmpty()) {
+ operationsList.add(Aggregates.project(new BsonDocument(projectFields)).toBsonDocument());
+ }
+
+ return operationsList;
+ }
+
+ private static BsonField bsonAggregate(List<String> inNames, String outName, AggregateCall aggCall) {
+ String aggregationName = aggCall.getAggregation().getName();
+ List<Integer> args = aggCall.getArgList();
+ if (aggregationName.equals(SqlStdOperatorTable.COUNT.getName())) {
+ Object expr;
+ if (args.size() == 0) {
+ // count(*) case
+ expr = 1;
+ } else {
+ assert args.size() == 1;
+ String inName = inNames.get(args.get(0));
+ expr = new BsonDocument(MongoOp.COND.getCompareOp(),
+ new BsonArray(Arrays.asList(
+ new Document(MongoOp.EQUAL.getCompareOp(),
+ new BsonArray(Arrays.asList(
+ new BsonString(quote(inName)),
+ BsonNull.VALUE))).toBsonDocument(),
+ new BsonInt32(0),
+ new BsonInt32(1)
+ ))
+ );
+ }
+ return Accumulators.sum(maybeQuote(outName), expr);
+ } else {
+ BiFunction<String, Object, BsonField> mongoAccumulator = mongoAccumulator(aggregationName);
+ if (mongoAccumulator != null) {
+ return mongoAccumulator.apply(maybeQuote(outName), "$" + inNames.get(args.get(0)));
+ }
+ }
+ return null;
+ }
+
+ private static <T> BiFunction<String, T, BsonField> mongoAccumulator(String aggregationName) {
+ if (aggregationName.equals(SqlStdOperatorTable.SUM.getName())
+ || aggregationName.equals(SqlStdOperatorTable.SUM0.getName())) {
+ return Accumulators::sum;
+ } else if (aggregationName.equals(SqlStdOperatorTable.MIN.getName())) {
+ return Accumulators::min;
+ } else if (aggregationName.equals(SqlStdOperatorTable.MAX.getName())) {
+ return Accumulators::max;
+ } else if (aggregationName.equals(SqlStdOperatorTable.AVG.getName())) {
+ return Accumulators::avg;
+ } else if (aggregationName.equals(SqlStdOperatorTable.FIRST.getName())) {
+ return Accumulators::first;
+ } else if (aggregationName.equals(SqlStdOperatorTable.LAST.getName())) {
+ return Accumulators::last;
+ } else if (aggregationName.equals(SqlStdOperatorTable.STDDEV.getName())
+ || aggregationName.equals(SqlStdOperatorTable.STDDEV_SAMP.getName())) {
+ return Accumulators::stdDevSamp;
+ } else if (aggregationName.equals(SqlStdOperatorTable.STDDEV_POP.getName())) {
+ return Accumulators::stdDevPop;
+ }
+ return null;
+ }
+
+ public static boolean supportsAggregation(AggregateCall aggregateCall) {
+ String name = aggregateCall.getAggregation().getName();
+ return name.equals(SqlStdOperatorTable.COUNT.getName())
+ || name.equals(SqlStdOperatorTable.SUM.getName())
+ || name.equals(SqlStdOperatorTable.SUM0.getName())
+ || name.equals(SqlStdOperatorTable.MIN.getName())
+ || name.equals(SqlStdOperatorTable.MAX.getName())
+ || name.equals(SqlStdOperatorTable.AVG.getName())
+ || name.equals(SqlStdOperatorTable.FIRST.getName())
+ || name.equals(SqlStdOperatorTable.LAST.getName())
+ || name.equals(SqlStdOperatorTable.STDDEV.getName())
+ || name.equals(SqlStdOperatorTable.STDDEV_SAMP.getName())
+ || name.equals(SqlStdOperatorTable.STDDEV_POP.getName());
+ }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
index 10720e4..a463ad5 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
@@ -17,66 +17,52 @@
*/
package org.apache.drill.exec.store.mongo;
-import java.io.IOException;
import java.util.List;
+import lombok.extern.slf4j.Slf4j;
import org.apache.drill.common.FunctionNames;
import org.apache.drill.common.expression.BooleanOperator;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.drill.exec.store.mongo.common.MongoCompareOp;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
import org.bson.Document;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@Slf4j
public class MongoFilterBuilder extends
- AbstractExprVisitor<MongoScanSpec, Void, RuntimeException> implements
+ AbstractExprVisitor<Document, Void, RuntimeException> implements
DrillMongoConstants {
- private static final Logger logger = LoggerFactory
- .getLogger(MongoFilterBuilder.class);
- final MongoGroupScan groupScan;
- final LogicalExpression le;
+
+ private final LogicalExpression le;
private boolean allExpressionsConverted = true;
- public MongoFilterBuilder(MongoGroupScan groupScan,
- LogicalExpression conditionExp) {
- this.groupScan = groupScan;
+ public MongoFilterBuilder(LogicalExpression conditionExp) {
this.le = conditionExp;
}
- public MongoScanSpec parseTree() {
- MongoScanSpec parsedSpec = le.accept(this, null);
- if (parsedSpec != null) {
- parsedSpec = mergeScanSpecs(FunctionNames.AND, this.groupScan.getScanSpec(),
- parsedSpec);
- }
- return parsedSpec;
+ public Document parseTree() {
+ return le.accept(this, null);
}
- private MongoScanSpec mergeScanSpecs(String functionName,
- MongoScanSpec leftScanSpec, MongoScanSpec rightScanSpec) {
+ private Document mergeFilters(String functionName,
+ Document left, Document right) {
Document newFilter = new Document();
switch (functionName) {
case FunctionNames.AND:
- if (leftScanSpec.getFilters() != null
- && rightScanSpec.getFilters() != null) {
- newFilter = MongoUtils.andFilterAtIndex(leftScanSpec.getFilters(),
- rightScanSpec.getFilters());
- } else if (leftScanSpec.getFilters() != null) {
- newFilter = leftScanSpec.getFilters();
+ if (left != null && right != null) {
+ newFilter = MongoUtils.andFilterAtIndex(left, right);
+ } else if (left != null) {
+ newFilter = left;
} else {
- newFilter = rightScanSpec.getFilters();
+ newFilter = right;
}
break;
case FunctionNames.OR:
- newFilter = MongoUtils.orFilterAtIndex(leftScanSpec.getFilters(),
- rightScanSpec.getFilters());
+ newFilter = MongoUtils.orFilterAtIndex(left, right);
}
- return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan
- .getScanSpec().getCollectionName(), newFilter);
+ return newFilter;
}
public boolean isAllExpressionsConverted() {
@@ -84,41 +70,41 @@ public class MongoFilterBuilder extends
}
@Override
- public MongoScanSpec visitUnknown(LogicalExpression e, Void value)
+ public Document visitUnknown(LogicalExpression e, Void value)
throws RuntimeException {
allExpressionsConverted = false;
return null;
}
@Override
- public MongoScanSpec visitBooleanOperator(BooleanOperator op, Void value) {
+ public Document visitBooleanOperator(BooleanOperator op, Void value) {
List<LogicalExpression> args = op.args();
- MongoScanSpec nodeScanSpec = null;
+ Document condition = null;
String functionName = op.getName();
- for (int i = 0; i < args.size(); ++i) {
+ for (LogicalExpression arg : args) {
switch (functionName) {
- case FunctionNames.AND:
- case FunctionNames.OR:
- if (nodeScanSpec == null) {
- nodeScanSpec = args.get(i).accept(this, null);
- } else {
- MongoScanSpec scanSpec = args.get(i).accept(this, null);
- if (scanSpec != null) {
- nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec);
+ case FunctionNames.AND:
+ case FunctionNames.OR:
+ if (condition == null) {
+ condition = arg.accept(this, null);
} else {
- allExpressionsConverted = false;
+ Document scanSpec = arg.accept(this, null);
+ if (scanSpec != null) {
+ condition = mergeFilters(functionName, condition, scanSpec);
+ } else {
+ allExpressionsConverted = false;
+ }
}
- }
- break;
+ break;
}
}
- return nodeScanSpec;
+ return condition;
}
@Override
- public MongoScanSpec visitFunctionCall(FunctionCall call, Void value)
+ public Document visitFunctionCall(FunctionCall call, Void value)
throws RuntimeException {
- MongoScanSpec nodeScanSpec = null;
+ Document functionCall = null;
String functionName = call.getName();
List<LogicalExpression> args = call.args();
@@ -127,7 +113,7 @@ public class MongoFilterBuilder extends
.process(call);
if (processor.isSuccess()) {
try {
- nodeScanSpec = createMongoScanSpec(processor.getFunctionName(),
+ functionCall = createFunctionCall(processor.getFunctionName(),
processor.getPath(), processor.getValue());
} catch (Exception e) {
logger.error(" Failed to creare Filter ", e);
@@ -138,79 +124,76 @@ public class MongoFilterBuilder extends
switch (functionName) {
case FunctionNames.AND:
case FunctionNames.OR:
- MongoScanSpec leftScanSpec = args.get(0).accept(this, null);
- MongoScanSpec rightScanSpec = args.get(1).accept(this, null);
- if (leftScanSpec != null && rightScanSpec != null) {
- nodeScanSpec = mergeScanSpecs(functionName, leftScanSpec,
- rightScanSpec);
+ Document left = args.get(0).accept(this, null);
+ Document right = args.get(1).accept(this, null);
+ if (left != null && right != null) {
+ functionCall = mergeFilters(functionName, left, right);
} else {
allExpressionsConverted = false;
if (FunctionNames.AND.equals(functionName)) {
- nodeScanSpec = leftScanSpec == null ? rightScanSpec : leftScanSpec;
+ functionCall = left == null ? right : left;
}
}
break;
}
}
- if (nodeScanSpec == null) {
+ if (functionCall == null) {
allExpressionsConverted = false;
}
- return nodeScanSpec;
+ return functionCall;
}
- private MongoScanSpec createMongoScanSpec(String functionName,
- SchemaPath field, Object fieldValue) throws ClassNotFoundException,
- IOException {
+ private Document createFunctionCall(String functionName,
+ SchemaPath field, Object fieldValue) {
// extract the field name
String fieldName = field.getRootSegmentPath();
- MongoCompareOp compareOp = null;
+ MongoOp compareOp = null;
switch (functionName) {
case FunctionNames.EQ:
- compareOp = MongoCompareOp.EQUAL;
+ compareOp = MongoOp.EQUAL;
break;
case FunctionNames.NE:
- compareOp = MongoCompareOp.NOT_EQUAL;
+ compareOp = MongoOp.NOT_EQUAL;
break;
case FunctionNames.GE:
- compareOp = MongoCompareOp.GREATER_OR_EQUAL;
+ compareOp = MongoOp.GREATER_OR_EQUAL;
break;
case FunctionNames.GT:
- compareOp = MongoCompareOp.GREATER;
+ compareOp = MongoOp.GREATER;
break;
case FunctionNames.LE:
- compareOp = MongoCompareOp.LESS_OR_EQUAL;
+ compareOp = MongoOp.LESS_OR_EQUAL;
break;
case FunctionNames.LT:
- compareOp = MongoCompareOp.LESS;
+ compareOp = MongoOp.LESS;
break;
case FunctionNames.IS_NULL:
case "isNull":
case "is null":
- compareOp = MongoCompareOp.IFNULL;
+ compareOp = MongoOp.IFNULL;
break;
case FunctionNames.IS_NOT_NULL:
case "isNotNull":
case "is not null":
- compareOp = MongoCompareOp.IFNOTNULL;
+ compareOp = MongoOp.IFNOTNULL;
break;
}
if (compareOp != null) {
Document queryFilter = new Document();
- if (compareOp == MongoCompareOp.IFNULL) {
+ if (compareOp == MongoOp.IFNULL) {
queryFilter.put(fieldName,
- new Document(MongoCompareOp.EQUAL.getCompareOp(), null));
- } else if (compareOp == MongoCompareOp.IFNOTNULL) {
+ new Document(MongoOp.EQUAL.getCompareOp(), null));
+ } else if (compareOp == MongoOp.IFNOTNULL) {
queryFilter.put(fieldName,
- new Document(MongoCompareOp.NOT_EQUAL.getCompareOp(), null));
+ new Document(MongoOp.NOT_EQUAL.getCompareOp(), null));
} else {
queryFilter.put(fieldName, new Document(compareOp.getCompareOp(),
fieldValue));
}
- return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan
- .getScanSpec().getCollectionName(), queryFilter);
+ return queryFilter;
}
return null;
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index 8b57012..b26b410 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -38,6 +38,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.mongodb.client.MongoClient;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -51,6 +52,7 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.mongo.MongoSubScan.MongoSubScanSpec;
+import org.apache.drill.exec.store.mongo.MongoSubScan.ShardedMongoSubScanSpec;
import org.apache.drill.exec.store.mongo.common.ChunkInfo;
import org.bson.Document;
import org.bson.codecs.BsonTypeClassMap;
@@ -58,8 +60,6 @@ import org.bson.codecs.DocumentCodec;
import org.bson.conversions.Bson;
import org.bson.types.MaxKey;
import org.bson.types.MinKey;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
@@ -75,17 +75,16 @@ import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
+@Slf4j
@JsonTypeName("mongo-scan")
public class MongoGroupScan extends AbstractGroupScan implements
DrillMongoConstants {
- private static final Integer select = 1;
+ private static final int SELECT = 1;
- private static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
+ private static final Comparator<List<BaseMongoSubScanSpec>> LIST_SIZE_COMPARATOR = Comparator.comparingInt(List::size);
- private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = Comparator.comparingInt(List::size);
-
- private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
+ private static final Comparator<List<BaseMongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
private MongoStoragePlugin storagePlugin;
@@ -95,9 +94,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
private List<SchemaPath> columns;
- private int maxRecords;
-
- private Map<Integer, List<MongoSubScanSpec>> endpointFragmentMapping;
+ private Map<Integer, List<BaseMongoSubScanSpec>> endpointFragmentMapping;
// Sharding with replica sets contains all the replica server addresses for
// each chunk.
@@ -107,7 +104,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
private final Stopwatch watch = Stopwatch.createUnstarted();
- private boolean filterPushedDown = false;
+ private boolean useAggregate;
@JsonCreator
public MongoGroupScan(
@@ -115,21 +112,21 @@ public class MongoGroupScan extends AbstractGroupScan implements
@JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
@JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
- @JacksonInject StoragePluginRegistry pluginRegistry,
- @JsonProperty("maxRecords") int maxRecords) throws IOException {
+ @JsonProperty("useAggregate") boolean useAggregate,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException {
this(userName,
pluginRegistry.resolve(storagePluginConfig, MongoStoragePlugin.class),
- scanSpec, columns, maxRecords);
+ scanSpec, columns, useAggregate);
}
public MongoGroupScan(String userName, MongoStoragePlugin storagePlugin,
- MongoScanSpec scanSpec, List<SchemaPath> columns, int maxRecords) throws IOException {
+ MongoScanSpec scanSpec, List<SchemaPath> columns, boolean useAggregate) throws IOException {
super(userName);
this.storagePlugin = storagePlugin;
this.storagePluginConfig = storagePlugin.getConfig();
this.scanSpec = scanSpec;
this.columns = columns;
- this.maxRecords = maxRecords;
+ this.useAggregate = useAggregate;
init();
}
@@ -148,18 +145,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
this.chunksMapping = that.chunksMapping;
this.chunksInverseMapping = that.chunksInverseMapping;
this.endpointFragmentMapping = that.endpointFragmentMapping;
- this.filterPushedDown = that.filterPushedDown;
- this.maxRecords = that.maxRecords;
- }
-
- @JsonIgnore
- public boolean isFilterPushedDown() {
- return filterPushedDown;
- }
-
- @JsonIgnore
- public void setFilterPushedDown(boolean filterPushedDown) {
- this.filterPushedDown = filterPushedDown;
+ this.useAggregate = that.useAggregate;
}
private boolean isShardedCluster(MongoClient client) {
@@ -179,7 +165,9 @@ public class MongoGroupScan extends AbstractGroupScan implements
MongoClient client = storagePlugin.getClient();
chunksMapping = Maps.newHashMap();
chunksInverseMapping = Maps.newLinkedHashMap();
- if (isShardedCluster(client)) {
+ if (useAggregate && isShardedCluster(client)) {
+ handleUnshardedCollection(getPrimaryShardInfo());
+ } else if (isShardedCluster(client)) {
MongoDatabase db = client.getDatabase(CONFIG);
MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS);
Document filter = new Document();
@@ -190,9 +178,9 @@ public class MongoGroupScan extends AbstractGroupScan implements
+ this.scanSpec.getCollectionName());
Document projection = new Document();
- projection.put(SHARD, select);
- projection.put(MIN, select);
- projection.put(MAX, select);
+ projection.put(SHARD, SELECT);
+ projection.put(MIN, SELECT);
+ projection.put(MAX, SELECT);
FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection);
MongoCursor<Document> iterator = chunkCursor.iterator();
@@ -200,7 +188,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
MongoCollection<Document> shardsCollection = db.getCollection(SHARDS);
projection = new Document();
- projection.put(HOST, select);
+ projection.put(HOST, SELECT);
boolean hasChunks = false;
while (iterator.hasNext()) {
@@ -292,7 +280,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
//Identify the primary shard of the queried database.
MongoCollection<Document> collection = database.getCollection(DATABASES);
Bson filter = new Document(ID, this.scanSpec.getDbName());
- Bson projection = new Document(PRIMARY, select);
+ Bson projection = new Document(PRIMARY, SELECT);
Document document = Objects.requireNonNull(collection.find(filter).projection(projection).first());
String shardName = document.getString(PRIMARY);
Preconditions.checkNotNull(shardName);
@@ -300,7 +288,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
//Identify the host(s) on which this shard resides.
MongoCollection<Document> shardsCol = database.getCollection(SHARDS);
filter = new Document(ID, shardName);
- projection = new Document(HOST, select);
+ projection = new Document(HOST, SELECT);
Document hostInfo = Objects.requireNonNull(shardsCol.find(filter).projection(projection).first());
String hostEntry = hostInfo.getString(HOST);
Preconditions.checkNotNull(hostEntry);
@@ -361,7 +349,8 @@ public class MongoGroupScan extends AbstractGroupScan implements
public GroupScan clone(int maxRecords) {
MongoGroupScan clone = new MongoGroupScan(this);
- clone.maxRecords = maxRecords;
+ clone.useAggregate = true;
+ clone.getScanSpec().getOperations().add(new Document("$limit", maxRecords));
return clone;
}
@@ -409,7 +398,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
if (slots != null) {
for (ChunkInfo chunkInfo : chunkEntry.getValue()) {
Integer slotIndex = slots.poll();
- List<MongoSubScanSpec> subScanSpecList = endpointFragmentMapping
+ List<BaseMongoSubScanSpec> subScanSpecList = endpointFragmentMapping
.get(slotIndex);
subScanSpecList.add(buildSubScanSpecAndGet(chunkInfo));
slots.offer(slotIndex);
@@ -418,11 +407,11 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
}
- PriorityQueue<List<MongoSubScanSpec>> minHeap = new PriorityQueue<>(
+ PriorityQueue<List<BaseMongoSubScanSpec>> minHeap = new PriorityQueue<>(
numSlots, LIST_SIZE_COMPARATOR);
- PriorityQueue<List<MongoSubScanSpec>> maxHeap = new PriorityQueue<>(
+ PriorityQueue<List<BaseMongoSubScanSpec>> maxHeap = new PriorityQueue<>(
numSlots, LIST_SIZE_COMPARATOR_REV);
- for (List<MongoSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
+ for (List<BaseMongoSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
if (listOfScan.size() < minPerEndpointSlot) {
minHeap.offer(listOfScan);
} else if (listOfScan.size() > minPerEndpointSlot) {
@@ -433,7 +422,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
if (chunksToAssignSet.size() > 0) {
for (Entry<String, List<ChunkInfo>> chunkEntry : chunksToAssignSet) {
for (ChunkInfo chunkInfo : chunkEntry.getValue()) {
- List<MongoSubScanSpec> smallestList = minHeap.poll();
+ List<BaseMongoSubScanSpec> smallestList = minHeap.poll();
smallestList.add(buildSubScanSpecAndGet(chunkInfo));
minHeap.offer(smallestList);
}
@@ -441,8 +430,8 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
while (minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) {
- List<MongoSubScanSpec> smallestList = minHeap.poll();
- List<MongoSubScanSpec> largestList = maxHeap.poll();
+ List<BaseMongoSubScanSpec> smallestList = minHeap.poll();
+ List<BaseMongoSubScanSpec> largestList = maxHeap.poll();
smallestList.add(largestList.remove(largestList.size() - 1));
if (largestList.size() > minPerEndpointSlot) {
maxHeap.offer(largestList);
@@ -458,15 +447,23 @@ public class MongoGroupScan extends AbstractGroupScan implements
endpointFragmentMapping.toString());
}
- private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
- return new MongoSubScanSpec()
+ private BaseMongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
+ if (useAggregate) {
+ return MongoSubScanSpec.builder()
+ .setOperations(scanSpec.getOperations())
+ .setDbName(scanSpec.getDbName())
+ .setCollectionName(scanSpec.getCollectionName())
+ .setHosts(chunkInfo.getChunkLocList())
+ .build();
+ }
+ return ShardedMongoSubScanSpec.builder()
+ .setMinFilters(chunkInfo.getMinFilters())
+ .setMaxFilters(chunkInfo.getMaxFilters())
+ .setFilter(scanSpec.getFilters())
.setDbName(scanSpec.getDbName())
.setCollectionName(scanSpec.getCollectionName())
.setHosts(chunkInfo.getChunkLocList())
- .setMinFilters(chunkInfo.getMinFilters())
- .setMaxFilters(chunkInfo.getMaxFilters())
- .setMaxRecords(maxRecords)
- .setFilter(scanSpec.getFilters());
+ .build();
}
@Override
@@ -487,18 +484,11 @@ public class MongoGroupScan extends AbstractGroupScan implements
@Override
public ScanStats getScanStats() {
- long recordCount;
try{
MongoClient client = storagePlugin.getClient();
MongoDatabase db = client.getDatabase(scanSpec.getDbName());
MongoCollection<Document> collection = db.getCollection(scanSpec.getCollectionName());
- long numDocs = collection.estimatedDocumentCount();
-
- if (maxRecords > 0 && numDocs > 0) {
- recordCount = Math.min(maxRecords, numDocs);
- } else {
- recordCount = numDocs;
- }
+ long recordCount = collection.estimatedDocumentCount();
float approxDiskCost = 0;
if (recordCount != 0) {
@@ -563,9 +553,6 @@ public class MongoGroupScan extends AbstractGroupScan implements
@Override
public GroupScan applyLimit(int maxRecords) {
- if (maxRecords == this.maxRecords) {
- return null;
- }
return clone(maxRecords);
}
@@ -585,26 +572,33 @@ public class MongoGroupScan extends AbstractGroupScan implements
return storagePluginConfig;
}
- @JsonProperty("maxRecords")
- public int getMaxRecords() { return maxRecords; }
-
@JsonIgnore
public MongoStoragePlugin getStoragePlugin() {
return storagePlugin;
}
+ @JsonProperty("useAggregate")
+ public void setUseAggregate(boolean useAggregate) {
+ this.useAggregate = useAggregate;
+ }
+
+ @JsonProperty("useAggregate")
+ public boolean isUseAggregate() {
+ return useAggregate;
+ }
+
@Override
public String toString() {
return new PlanStringBuilder(this)
.field("MongoScanSpec", scanSpec)
.field("columns", columns)
- .field("maxRecords", maxRecords)
+ .field("useAggregate", useAggregate)
.toString();
}
@VisibleForTesting
MongoGroupScan() {
- super((String)null);
+ super((String) null);
}
@JsonIgnore
@@ -621,7 +615,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
@JsonIgnore
@VisibleForTesting
- void setInverseChunsMapping(Map<String, List<ChunkInfo>> chunksInverseMapping) {
+ void setInverseChunksMapping(Map<String, List<ChunkInfo>> chunksInverseMapping) {
this.chunksInverseMapping = chunksInverseMapping;
}
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
deleted file mode 100644
index 5e57890..0000000
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.mongo;
-
-import java.io.IOException;
-import java.util.Collections;
-
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.exec.planner.common.DrillScanRelBase;
-import org.apache.drill.exec.planner.logical.DrillOptiq;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rex.RexNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
- private static final Logger logger = LoggerFactory
- .getLogger(MongoPushDownFilterForScan.class);
- public static final StoragePluginOptimizerRule INSTANCE = new MongoPushDownFilterForScan();
-
- private MongoPushDownFilterForScan() {
- super(
- RelOptHelper.some(Filter.class, RelOptHelper.any(DrillScanRelBase.class)),
- "MongoPushDownFilterForScan");
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final DrillScanRelBase scan = call.rel(1);
- final Filter filter = call.rel(0);
- final RexNode condition = filter.getCondition();
-
- MongoGroupScan groupScan = (MongoGroupScan) scan.getGroupScan();
- if (groupScan.isFilterPushedDown()) {
- return;
- }
-
- LogicalExpression conditionExp = DrillOptiq.toDrill(
- new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
- MongoFilterBuilder mongoFilterBuilder = new MongoFilterBuilder(groupScan,
- conditionExp);
- MongoScanSpec newScanSpec = mongoFilterBuilder.parseTree();
- if (newScanSpec == null) {
- return; // no filter pushdown so nothing to apply.
- }
-
- MongoGroupScan newGroupsScan;
- try {
- newGroupsScan = new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
- newScanSpec, groupScan.getColumns(), groupScan.getMaxRecords());
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- throw new DrillRuntimeException(e.getMessage(), e);
- }
- newGroupsScan.setFilterPushedDown(true);
-
- RelNode newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan);
-
- if (mongoFilterBuilder.isAllExpressionsConverted()) {
- /*
- * Since we could convert the entire filter condition expression into an
- * Mongo filter, we can eliminate the filter operator altogether.
- */
- call.transformTo(newScanPrel);
- } else {
- call.transformTo(filter.copy(filter.getTraitSet(),
- Collections.singletonList(newScanPrel)));
- }
-
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- DrillScanRelBase scan = call.rel(1);
- return scan.getGroupScan() instanceof MongoGroupScan && super.matches(call);
- }
-}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index b06fe36..a26a98a 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.mongo;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -25,6 +26,9 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import com.mongodb.client.MongoIterable;
+import com.mongodb.client.model.Aggregates;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -40,6 +44,7 @@ import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.bson.BsonDocument;
import org.bson.BsonDocumentReader;
import org.bson.Document;
+import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,6 +69,7 @@ public class MongoRecordReader extends AbstractRecordReader {
private VectorContainerWriter writer;
private Document filters;
+ private List<Bson> operations;
private final Document fields;
private final FragmentContext fragmentContext;
@@ -75,9 +81,8 @@ public class MongoRecordReader extends AbstractRecordReader {
private final boolean readNumbersAsDouble;
private boolean unionEnabled;
private final boolean isBsonRecordReader;
- private final int maxRecords;
- public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
+ public MongoRecordReader(BaseMongoSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
FragmentContext context, MongoStoragePlugin plugin) {
fields = new Document();
@@ -87,15 +92,19 @@ public class MongoRecordReader extends AbstractRecordReader {
fragmentContext = context;
this.plugin = plugin;
filters = new Document();
- Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
- subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
+ if (subScanSpec instanceof MongoSubScan.MongoSubScanSpec) {
+ operations = ((MongoSubScan.MongoSubScanSpec) subScanSpec).getOperations();
+ } else {
+ MongoSubScan.ShardedMongoSubScanSpec shardedMongoSubScanSpec = (MongoSubScan.ShardedMongoSubScanSpec) subScanSpec;
+ Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
+ shardedMongoSubScanSpec.getMinFilters(), shardedMongoSubScanSpec.getMaxFilters());
- buildFilters(subScanSpec.getFilter(), mergedFilters);
+ buildFilters(shardedMongoSubScanSpec.getFilter(), mergedFilters);
+ }
enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val;
readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
isBsonRecordReader = fragmentContext.getOptions().getOption(ExecConstants.MONGO_BSON_RECORD_READER).bool_val;
- maxRecords = subScanSpec.getMaxRecords();
logger.debug("BsonRecordReader is enabled? " + isBsonRecordReader);
init(subScanSpec);
}
@@ -138,7 +147,7 @@ public class MongoRecordReader extends AbstractRecordReader {
}
}
- private void init(MongoSubScan.MongoSubScanSpec subScanSpec) {
+ private void init(BaseMongoSubScanSpec subScanSpec) {
List<String> hosts = subScanSpec.getHosts();
List<ServerAddress> addresses = Lists.newArrayList();
for (String host : hosts) {
@@ -176,13 +185,15 @@ public class MongoRecordReader extends AbstractRecordReader {
logger.debug("Filters Applied : " + filters);
logger.debug("Fields Selected :" + fields);
- // Add limit to Mongo query
- if (maxRecords > 0) {
- logger.debug("Limit applied: {}", maxRecords);
- cursor = collection.find(filters).projection(fields).limit(maxRecords).batchSize(100).iterator();
+ MongoIterable<BsonDocument> projection;
+ if (CollectionUtils.isNotEmpty(operations)) {
+ List<Bson> operations = new ArrayList<>(this.operations);
+ operations.add(Aggregates.project(fields));
+ projection = collection.aggregate(operations);
} else {
- cursor = collection.find(filters).projection(fields).batchSize(100).iterator();
+ projection = collection.find(filters).projection(fields);
}
+ cursor = projection.batchSize(plugin.getConfig().getBatchSize()).iterator();
}
writer.allocate();
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
index f97e771..350cb28 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -42,22 +42,20 @@ public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<RecordReader> readers = new LinkedList<>();
- List<SchemaPath> columns = null;
- for (MongoSubScan.MongoSubScanSpec scanSpec : subScan
- .getChunkScanSpecList()) {
+ for (BaseMongoSubScanSpec scanSpec : subScan.getChunkScanSpecList()) {
try {
- if ((columns = subScan.getColumns()) == null) {
+ List<SchemaPath> columns = subScan.getColumns();
+ if (columns == null) {
columns = GroupScan.ALL_COLUMNS;
}
readers.add(new MongoRecordReader(scanSpec, columns, context, subScan.getMongoStoragePlugin()));
} catch (Exception e) {
- logger.error("MongoRecordReader creation failed for subScan: "
- + subScan + ".");
+ logger.error("MongoRecordReader creation failed for subScan: {}.", subScan);
logger.error(e.getMessage(), e);
throw new ExecutionSetupException(e);
}
}
- logger.info("Number of record readers initialized : " + readers.size());
+ logger.info("Number of record readers initialized : {}", readers.size());
return new ScanBatch(subScan, context, readers);
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
index 5c56fcc..7459bfb 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
@@ -17,47 +17,33 @@
*/
package org.apache.drill.exec.store.mongo;
-import org.bson.Document;
-
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.ToString;
+import org.bson.Document;
+
+import org.bson.conversions.Bson;
+
+import java.util.ArrayList;
+import java.util.List;
+@AllArgsConstructor
+@Getter
+@ToString
public class MongoScanSpec {
- private String dbName;
- private String collectionName;
+ private final String dbName;
+ private final String collectionName;
private Document filters;
+ private List<Bson> operations = new ArrayList<>();
+
@JsonCreator
public MongoScanSpec(@JsonProperty("dbName") String dbName,
@JsonProperty("collectionName") String collectionName) {
this.dbName = dbName;
this.collectionName = collectionName;
}
-
- public MongoScanSpec(String dbName, String collectionName,
- Document filters) {
- this.dbName = dbName;
- this.collectionName = collectionName;
- this.filters = filters;
- }
-
- public String getDbName() {
- return dbName;
- }
-
- public String getCollectionName() {
- return collectionName;
- }
-
- public Document getFilters() {
- return filters;
- }
-
- @Override
- public String toString() {
- return "MongoScanSpec [dbName=" + dbName + ", collectionName="
- + collectionName + ", filters=" + filters + "]";
- }
-
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index da55907..0bf1e1f 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -25,18 +25,23 @@ import com.mongodb.client.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClients;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.mongo.plan.MongoPluginImplementor;
import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory;
import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
import org.apache.drill.exec.store.security.HadoopCredentialsProvider;
import org.apache.drill.common.logical.security.PlainCredentialsProvider;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -45,13 +50,13 @@ import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
import org.apache.drill.shaded.guava.com.google.common.cache.RemovalListener;
import org.apache.drill.shaded.guava.com.google.common.cache.RemovalNotification;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URLEncoder;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -64,11 +69,12 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
private final MongoSchemaFactory schemaFactory;
private final Cache<MongoCnxnKey, MongoClient> addressClientMap;
private final ConnectionString clientURI;
+ private final StoragePluginRulesSupplier storagePluginRulesSupplier;
public MongoStoragePlugin(
MongoStoragePluginConfig mongoConfig,
DrillbitContext context,
- String name) throws ExecutionSetupException {
+ String name) {
super(context, name);
this.mongoConfig = mongoConfig;
String connection = addCredentialsFromCredentialsProvider(this.mongoConfig.getConnection(), name);
@@ -78,6 +84,21 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
.removalListener(new AddressCloser())
.build();
this.schemaFactory = new MongoSchemaFactory(this, name);
+ this.storagePluginRulesSupplier = storagePluginRulesSupplier(name, mongoConfig);
+ }
+
+ private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name, MongoStoragePluginConfig mongoConfig) {
+ Convention convention = new Convention.Impl("MONGO." + name, PluginRel.class);
+ return StoragePluginRulesSupplier.builder()
+ .rulesProvider(new PluginRulesProviderImpl(convention, MongoPluginImplementor::new))
+ .supportsProjectPushdown(mongoConfig.getPluginOptimizations().isSupportsProjectPushdown())
+ .supportsSortPushdown(mongoConfig.getPluginOptimizations().isSupportsSortPushdown())
+ .supportsAggregatePushdown(mongoConfig.getPluginOptimizations().isSupportsAggregatePushdown())
+ .supportsFilterPushdown(mongoConfig.getPluginOptimizations().isSupportsFilterPushdown())
+ .supportsLimitPushdown(mongoConfig.getPluginOptimizations().isSupportsLimitPushdown())
+ .supportsUnionPushdown(mongoConfig.getPluginOptimizations().isSupportsUnionPushdown())
+ .convention(convention)
+ .build();
}
private String addCredentialsFromCredentialsProvider(String connection, String name) {
@@ -120,7 +141,7 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
schemaFactory.registerSchemas(schemaConfig, parent);
}
@@ -133,14 +154,27 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<MongoScanSpec>() {
});
- return new MongoGroupScan(userName, this, mongoScanSpec, null, -1);
+ return new MongoGroupScan(userName, this, mongoScanSpec, null, false);
}
@Override
- public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
- return ImmutableSet.of(MongoPushDownFilterForScan.INSTANCE);
+ public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+ switch (phase) {
+ case PHYSICAL:
+ case LOGICAL:
+ return storagePluginRulesSupplier.getOptimizerRules();
+ case LOGICAL_PRUNE_AND_JOIN:
+ case LOGICAL_PRUNE:
+ case PARTITION_PRUNING:
+ case JOIN_PLANNING:
+ default:
+ return Collections.emptySet();
+ }
}
+ public Convention convention() {
+ return storagePluginRulesSupplier.convention();
+ }
private static class AddressCloser implements
RemovalListener<MongoCnxnKey, MongoClient> {
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
index 2ceb0d8..be410ae 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
@@ -24,11 +24,16 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.mongodb.ConnectionString;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
import org.apache.drill.common.logical.security.CredentialsProvider;
import org.apache.drill.common.logical.security.PlainCredentialsProvider;
@JsonTypeName(MongoStoragePluginConfig.NAME)
+@EqualsAndHashCode(of = "connection", callSuper = false)
public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig {
public static final String NAME = "mongo";
@@ -38,29 +43,24 @@ public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig
@JsonIgnore
private final ConnectionString clientURI;
+ private final MongoPluginOptimizations pluginOptimizations;
+
+ private final int batchSize;
+
@JsonCreator
public MongoStoragePluginConfig(@JsonProperty("connection") String connection,
- @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
+ @JsonProperty("pluginOptimizations") MongoPluginOptimizations pluginOptimizations,
+ @JsonProperty("batchSize") Integer batchSize,
+ @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
super(getCredentialsProvider(credentialsProvider), credentialsProvider == null);
this.connection = connection;
this.clientURI = new ConnectionString(connection);
+ this.pluginOptimizations = ObjectUtils.defaultIfNull(pluginOptimizations, new MongoPluginOptimizations());
+ this.batchSize = batchSize != null ? batchSize : 100;
}
- @Override
- public boolean equals(Object that) {
- if (this == that) {
- return true;
- } else if (that == null || getClass() != that.getClass()) {
- return false;
- }
- MongoStoragePluginConfig thatConfig = (MongoStoragePluginConfig) that;
- return this.connection.equals(thatConfig.connection);
-
- }
-
- @Override
- public int hashCode() {
- return this.connection != null ? this.connection.hashCode() : 0;
+ public MongoPluginOptimizations getPluginOptimizations() {
+ return pluginOptimizations;
}
@JsonIgnore
@@ -72,7 +72,28 @@ public class MongoStoragePluginConfig extends AbstractSecuredStoragePluginConfig
return connection;
}
+ public int getBatchSize() {
+ return batchSize;
+ }
+
private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) {
return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
}
+
+ @Getter
+ @Setter
+ public static class MongoPluginOptimizations {
+
+ private boolean supportsProjectPushdown = true;
+
+ private boolean supportsFilterPushdown = true;
+
+ private boolean supportsAggregatePushdown = true;
+
+ private boolean supportsSortPushdown = true;
+
+ private boolean supportsUnionPushdown = true;
+
+ private boolean supportsLimitPushdown = true;
+ }
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index af13eb5..975dc85 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -23,7 +23,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import org.apache.drill.common.PlanStringBuilder;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -32,7 +35,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.bson.Document;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -40,6 +42,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.bson.Document;
+import org.bson.conversions.Bson;
@JsonTypeName("mongo-shard-read")
public class MongoSubScan extends AbstractBase implements SubScan {
@@ -52,14 +56,14 @@ public class MongoSubScan extends AbstractBase implements SubScan {
private final MongoStoragePlugin mongoStoragePlugin;
private final List<SchemaPath> columns;
- private final List<MongoSubScanSpec> chunkScanSpecList;
+ private final List<BaseMongoSubScanSpec> chunkScanSpecList;
@JsonCreator
public MongoSubScan(
@JacksonInject StoragePluginRegistry registry,
@JsonProperty("userName") String userName,
@JsonProperty("mongoPluginConfig") StoragePluginConfig mongoPluginConfig,
- @JsonProperty("chunkScanSpecList") LinkedList<MongoSubScanSpec> chunkScanSpecList,
+ @JsonProperty("chunkScanSpecList") LinkedList<BaseMongoSubScanSpec> chunkScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns)
throws ExecutionSetupException {
super(userName);
@@ -72,7 +76,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
public MongoSubScan(String userName, MongoStoragePlugin storagePlugin,
MongoStoragePluginConfig storagePluginConfig,
- List<MongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
+ List<BaseMongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
super(userName);
this.mongoStoragePlugin = storagePlugin;
this.mongoPluginConfig = storagePluginConfig;
@@ -100,13 +104,12 @@ public class MongoSubScan extends AbstractBase implements SubScan {
return columns;
}
- public List<MongoSubScanSpec> getChunkScanSpecList() {
+ public List<BaseMongoSubScanSpec> getChunkScanSpecList() {
return chunkScanSpecList;
}
@Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
- throws ExecutionSetupException {
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new MongoSubScan(getUserName(), mongoStoragePlugin, mongoPluginConfig,
chunkScanSpecList, columns);
@@ -122,112 +125,33 @@ public class MongoSubScan extends AbstractBase implements SubScan {
return Collections.emptyIterator();
}
- public static class MongoSubScanSpec {
-
- protected String dbName;
- protected String collectionName;
- protected List<String> hosts;
- protected Map<String, Object> minFilters;
- protected Map<String, Object> maxFilters;
- protected int maxRecords;
-
- protected Document filter;
-
- @JsonCreator
- public MongoSubScanSpec(@JsonProperty("dbName") String dbName,
- @JsonProperty("collectionName") String collectionName,
- @JsonProperty("hosts") List<String> hosts,
- @JsonProperty("minFilters") Map<String, Object> minFilters,
- @JsonProperty("maxFilters") Map<String, Object> maxFilters,
- @JsonProperty("filters") Document filters,
- @JsonProperty("maxRecords") int maxRecords) {
- this.dbName = dbName;
- this.collectionName = collectionName;
- this.hosts = hosts;
- this.minFilters = minFilters;
- this.maxFilters = maxFilters;
- this.filter = filters;
- this.maxRecords = maxRecords;
- }
-
- MongoSubScanSpec() {
-
- }
-
- public String getDbName() {
- return dbName;
- }
-
- public MongoSubScanSpec setDbName(String dbName) {
- this.dbName = dbName;
- return this;
- }
-
- public String getCollectionName() {
- return collectionName;
- }
-
- public MongoSubScanSpec setCollectionName(String collectionName) {
- this.collectionName = collectionName;
- return this;
- }
-
- public List<String> getHosts() {
- return hosts;
- }
-
- public MongoSubScanSpec setHosts(List<String> hosts) {
- this.hosts = hosts;
- return this;
- }
-
- public int getMaxRecords() { return maxRecords; }
-
- public MongoSubScanSpec setMaxRecords (int maxRecords) {
- this.maxRecords = maxRecords;
- return this;
- }
-
- public Map<String, Object> getMinFilters() {
- return minFilters;
- }
-
- public MongoSubScanSpec setMinFilters(Map<String, Object> minFilters) {
- this.minFilters = minFilters;
- return this;
- }
-
- public Map<String, Object> getMaxFilters() {
- return maxFilters;
- }
-
- public MongoSubScanSpec setMaxFilters(Map<String, Object> maxFilters) {
- this.maxFilters = maxFilters;
- return this;
- }
-
- public Document getFilter() {
- return filter;
- }
-
- public MongoSubScanSpec setFilter(Document filter) {
- this.filter = filter;
- return this;
- }
-
- @Override
- public String toString() {
- return new PlanStringBuilder(this)
- .field("dbName", dbName)
- .field("collectionName", collectionName)
- .field("hosts", hosts)
- .field("minFilters", minFilters)
- .field("maxFilters", maxFilters)
- .field("filter", filter)
- .field("maxRecords", maxRecords)
- .toString();
-
- }
+ @JsonTypeName("ShardedMongoSubScanSpec")
+ @Getter
+ @ToString
+ @SuperBuilder(setterPrefix = "set")
+ @JsonDeserialize(builder = ShardedMongoSubScanSpec.ShardedMongoSubScanSpecBuilder.class)
+ public static class ShardedMongoSubScanSpec extends BaseMongoSubScanSpec {
+
+ @JsonProperty
+ private final Map<String, Object> minFilters;
+
+ @JsonProperty
+ private final Map<String, Object> maxFilters;
+
+ @JsonProperty
+ private final Document filter;
+
+ }
+
+ @JsonTypeName("MongoSubScanSpec")
+ @Getter
+ @ToString
+ @SuperBuilder(setterPrefix = "set")
+ @JsonDeserialize(builder = MongoSubScanSpec.MongoSubScanSpecBuilder.class)
+ public static class MongoSubScanSpec extends BaseMongoSubScanSpec {
+
+ @JsonProperty
+ private final List<Bson> operations;
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java
similarity index 67%
copy from contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
copy to contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java
index ef89bfb..55f8cc5 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoOp.java
@@ -17,14 +17,30 @@
*/
package org.apache.drill.exec.store.mongo.common;
-public enum MongoCompareOp {
- EQUAL("$eq"), NOT_EQUAL("$ne"), GREATER_OR_EQUAL("$gte"), GREATER("$gt"), LESS_OR_EQUAL(
- "$lte"), LESS("$lt"), IN("$in"), AND("$and"), OR("$or"), REGEX("$regex"), OPTIONS(
- "$options"), PROJECT("$project"), COND("$cond"), IFNULL("$ifNull"), IFNOTNULL(
- "$ifNotNull"), SUM("$sum"), GROUP_BY("$group"), EXISTS("$exists");
- private String compareOp;
+public enum MongoOp {
+ EQUAL("$eq"),
+ NOT_EQUAL("$ne"),
+ GREATER_OR_EQUAL("$gte"),
+ GREATER("$gt"),
+ LESS_OR_EQUAL("$lte"),
+ LESS("$lt"),
+ IN("$in"),
+ AND("$and"),
+ OR("$or"),
+ NOT("$not"),
+ REGEX("$regex"),
+ OPTIONS("$options"),
+ PROJECT("$project"),
+ COND("$cond"),
+ IFNULL("$ifNull"),
+ IFNOTNULL("$ifNotNull"),
+ SUM("$sum"),
+ GROUP_BY("$group"),
+ EXISTS("$exists");
- MongoCompareOp(String compareOp) {
+ private final String compareOp;
+
+ MongoOp(String compareOp) {
this.compareOp = compareOp;
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
new file mode 100644
index 0000000..9176fe2
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo.plan;
+
+import com.mongodb.client.model.Aggregates;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.mongo.MongoAggregateUtils;
+import org.apache.drill.exec.store.mongo.MongoFilterBuilder;
+import org.apache.drill.exec.store.mongo.MongoGroupScan;
+import org.apache.drill.exec.store.mongo.MongoScanSpec;
+import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+import org.apache.drill.exec.util.Utilities;
+import org.bson.BsonDocument;
+import org.bson.BsonElement;
+import org.bson.BsonInt32;
+import org.bson.BsonString;
+import org.bson.BsonValue;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link PluginImplementor} for Mongo.
+ * This class tries to convert operators to use {@link com.mongodb.client.MongoCollection#find}
+ * if only simple project and filter expressions are present,
+ * otherwise {@link com.mongodb.client.MongoCollection#aggregate} is used to obtain data from Mongo.
+ */
+public class MongoPluginImplementor extends AbstractPluginImplementor {
+
+ private MongoGroupScan groupScan;
+
+ private List<Bson> operations;
+
+ private Document filters;
+
+ private List<SchemaPath> columns;
+
+ private boolean runAggregate;
+
+ @Override
+ public void implement(PluginAggregateRel aggregate) throws IOException {
+ runAggregate = true;
+ visitChild(aggregate.getInput());
+
+ operations.addAll(
+ MongoAggregateUtils.getAggregateOperations(aggregate, aggregate.getInput().getRowType()));
+ List<String> outNames = MongoAggregateUtils.mongoFieldNames(aggregate.getRowType());
+ columns = outNames.stream()
+ .map(SchemaPath::getSimplePath)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void implement(PluginFilterRel filter) throws IOException {
+ visitChild(filter.getInput());
+
+ LogicalExpression conditionExp = DrillOptiq.toDrill(
+ new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+ filter.getInput(),
+ filter.getCondition());
+ MongoFilterBuilder mongoFilterBuilder = new MongoFilterBuilder(conditionExp);
+ if (runAggregate) {
+ Bson convertedFilterExpression = Aggregates.match(mongoFilterBuilder.parseTree()).toBsonDocument();
+ operations.add(convertedFilterExpression);
+ } else {
+ filters = mongoFilterBuilder.parseTree();
+ }
+ }
+
+ @Override
+ public void implement(PluginLimitRel limit) throws IOException {
+ runAggregate = true;
+ visitChild(limit.getInput());
+
+ if (limit.getOffset() != null) {
+ operations.add(
+ Aggregates.skip(rexLiteralIntValue((RexLiteral) limit.getOffset())).toBsonDocument());
+ }
+ if (limit.getFetch() != null) {
+ operations.add(
+ Aggregates.limit(rexLiteralIntValue((RexLiteral) limit.getFetch())).toBsonDocument());
+ }
+ }
+
+ @Override
+ public void implement(PluginProjectRel project) throws IOException {
+ runAggregate = runAggregate || project.getProjects().stream()
+ .anyMatch(expression -> !expression.isA(SqlKind.INPUT_REF));
+
+ visitChild(project.getInput());
+
+ if (runAggregate) {
+ RexToMongoTranslator translator =
+ new RexToMongoTranslator(
+ (JavaTypeFactory) project.getCluster().getTypeFactory(),
+ MongoAggregateUtils.mongoFieldNames(project.getInput().getRowType()));
+ List<BsonElement> items = new ArrayList<>();
+ for (Pair<RexNode, String> pair : project.getNamedProjects()) {
+ String name = pair.right;
+ BsonValue expr = pair.left.accept(translator);
+ items.add(expr.equals(new BsonString("$" + name))
+ ? new BsonElement(MongoAggregateUtils.maybeQuote(name), new BsonInt32(1))
+ : new BsonElement(MongoAggregateUtils.maybeQuote(name), expr));
+ }
+ BsonDocument projection = Aggregates.project(new BsonDocument(items)).toBsonDocument();
+
+ operations.add(projection);
+ List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType());
+ this.columns = outNames.stream()
+ .map(SchemaPath::getSimplePath)
+ .collect(Collectors.toList());
+ } else {
+ List<String> outNames = MongoAggregateUtils.mongoFieldNames(project.getRowType());
+ this.columns = outNames.stream()
+ .map(SchemaPath::getSimplePath)
+ .collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ public void implement(PluginSortRel sort) throws IOException {
+ runAggregate = true;
+ visitChild(sort.getInput());
+
+ if (!sort.collation.getFieldCollations().isEmpty()) {
+ BsonDocument sortKeys = new BsonDocument();
+ List<RelDataTypeField> fields = sort.getRowType().getFieldList();
+ for (RelFieldCollation fieldCollation : sort.collation.getFieldCollations()) {
+ String name = fields.get(fieldCollation.getFieldIndex()).getName();
+ sortKeys.put(name, new BsonInt32(direction(fieldCollation)));
+ }
+
+ operations.add(Aggregates.sort(sortKeys).toBsonDocument());
+ }
+ if (sort.offset != null) {
+ operations.add(
+ Aggregates.skip(rexLiteralIntValue((RexLiteral) sort.offset)).toBsonDocument());
+ }
+ if (sort.fetch != null) {
+ operations.add(
+ Aggregates.limit(rexLiteralIntValue((RexLiteral) sort.fetch)).toBsonDocument());
+ }
+ }
+
+ private int rexLiteralIntValue(RexLiteral offset) {
+ return ((BigDecimal) offset.getValue()).intValue();
+ }
+
+ @Override
+ public void implement(PluginUnionRel union) throws IOException {
+ runAggregate = true;
+
+ MongoPluginImplementor childImplementor = new MongoPluginImplementor();
+ childImplementor.runAggregate = true;
+
+ boolean firstProcessed = false;
+ for (RelNode input : union.getInputs()) {
+ if (!firstProcessed) {
+ this.visitChild(input);
+ firstProcessed = true;
+ } else {
+ childImplementor.visitChild(input);
+ operations.add(
+ Aggregates.unionWith(childImplementor.groupScan.getScanSpec().getCollectionName(), childImplementor.operations).toBsonDocument()
+ );
+ }
+ }
+ }
+
+ @Override
+ public void implement(StoragePluginTableScan scan) throws IOException {
+ groupScan = (MongoGroupScan) Utilities.getDrillTable(scan.getTable()).getGroupScan();
+ operations = new ArrayList<>(this.groupScan.getScanSpec().getOperations());
+ filters = groupScan.getScanSpec().getFilters();
+ columns = groupScan.getColumns();
+ }
+
+ @Override
+ public boolean canImplement(Aggregate aggregate) {
+ return aggregate.getGroupType() == Aggregate.Group.SIMPLE
+ && aggregate.getAggCallList().stream()
+ .noneMatch(AggregateCall::isDistinct)
+ && aggregate.getAggCallList().stream()
+ .allMatch(MongoAggregateUtils::supportsAggregation);
+ }
+
+ @Override
+ public boolean canImplement(Filter filter) {
+ LogicalExpression conditionExp = DrillOptiq.toDrill(
+ new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+ filter.getInput(),
+ filter.getCondition());
+ MongoFilterBuilder filterBuilder = new MongoFilterBuilder(conditionExp);
+ filterBuilder.parseTree();
+ return filterBuilder.isAllExpressionsConverted();
+ }
+
+ @Override
+ public boolean canImplement(DrillLimitRelBase limit) {
+ return true;
+ }
+
+ @Override
+ public boolean canImplement(Project project) {
+ return project.getProjects().stream()
+ .allMatch(RexToMongoTranslator::supportsExpression);
+ }
+
+ @Override
+ public boolean canImplement(Sort sort) {
+ return true;
+ }
+
+ @Override
+ public boolean canImplement(Union union) {
+ // allow converting for union all only, since Drill adds extra aggregation for union distinct,
+ // so we will convert both union all and aggregation later
+ return union.all;
+ }
+
+ @Override
+ public boolean canImplement(TableScan scan) {
+ return true;
+ }
+
+ @Override
+ public GroupScan getPhysicalOperator() throws IOException {
+ MongoScanSpec scanSpec = groupScan.getScanSpec();
+ MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(), scanSpec.getCollectionName(), filters, operations);
+ return new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
+ newSpec, columns, runAggregate);
+ }
+
+ public static int direction(RelFieldCollation fieldCollation) {
+ switch (fieldCollation.getDirection()) {
+ case DESCENDING:
+ case STRICTLY_DESCENDING:
+ return -1;
+ case ASCENDING:
+ case STRICTLY_ASCENDING:
+ default:
+ return 1;
+ }
+ }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java
new file mode 100644
index 0000000..cc97c82
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/RexToMongoTranslator.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo.plan;
+
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.BsonValue;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Translator from {@link RexNode} to strings in MongoDB's expression language.
+ */
+class RexToMongoTranslator extends RexVisitorImpl<BsonValue> {
+
+ private final JavaTypeFactory typeFactory;
+
+ private final List<String> inFields;
+
+ private static final Map<SqlOperator, String> MONGO_OPERATORS = ImmutableMap.<SqlOperator, String>builder()
+ .put(SqlStdOperatorTable.DIVIDE, "$divide")
+ .put(SqlStdOperatorTable.MULTIPLY, "$multiply")
+ .put(SqlStdOperatorTable.ABS, "$abs")
+ .put(SqlStdOperatorTable.ACOS, "$acos")
+ .put(SqlStdOperatorTable.ASIN, "$asin")
+ .put(SqlStdOperatorTable.ATAN, "$atan")
+ .put(SqlStdOperatorTable.ATAN2, "$atan2")
+ .put(SqlStdOperatorTable.CEIL, "$ceil")
+ .put(SqlStdOperatorTable.CONCAT, "$concat")
+ .put(SqlStdOperatorTable.COS, "$cos")
+ .put(SqlStdOperatorTable.DAYOFMONTH, "$dayOfMonth")
+ .put(SqlStdOperatorTable.WEEK, "$isoWeek")
+ .put(SqlStdOperatorTable.YEAR, "$isoWeekYear")
+ .put(SqlStdOperatorTable.DAYOFWEEK, "$isoDayOfWeek")
+ .put(SqlStdOperatorTable.DAYOFYEAR, "$dayOfYear")
+ .put(SqlStdOperatorTable.RADIANS, "$degreesToRadians")
+ .put(SqlStdOperatorTable.DENSE_RANK, "$denseRank")
+ .put(SqlStdOperatorTable.EXP, "$exp")
+ .put(SqlStdOperatorTable.FLOOR, "$floor")
+ .put(SqlStdOperatorTable.HOUR, "$hour")
+ .put(SqlStdOperatorTable.LN, "$ln")
+ .put(SqlStdOperatorTable.LOG10, "$log10")
+ .put(SqlStdOperatorTable.MINUTE, "$minute")
+ .put(SqlStdOperatorTable.MOD, "$mod")
+ .put(SqlStdOperatorTable.MONTH, "$month")
+ .put(SqlStdOperatorTable.POWER, "$pow")
+ .put(SqlStdOperatorTable.DEGREES, "$radiansToDegrees")
+ .put(SqlStdOperatorTable.RAND, "$rand")
+ .put(SqlStdOperatorTable.REPLACE, "$replaceAll")
+ .put(SqlStdOperatorTable.ROUND, "$round")
+ .put(SqlStdOperatorTable.SECOND, "$second")
+ .put(SqlStdOperatorTable.SIN, "$sin")
+ .put(SqlStdOperatorTable.SQRT, "$sqrt")
+ .put(SqlStdOperatorTable.SUBSTRING, "$substr")
+ .put(SqlStdOperatorTable.PLUS, "$add")
+ .put(SqlStdOperatorTable.MINUS, "$subtract")
+ .put(SqlStdOperatorTable.TAN, "$tan")
+ .put(SqlStdOperatorTable.TRIM, "trim")
+ .put(SqlStdOperatorTable.TRUNCATE, "$trunc")
+ .put(SqlStdOperatorTable.AND, MongoOp.AND.getCompareOp())
+ .put(SqlStdOperatorTable.OR, MongoOp.OR.getCompareOp())
+ .put(SqlStdOperatorTable.NOT, MongoOp.NOT.getCompareOp())
+ .put(SqlStdOperatorTable.EQUALS, MongoOp.EQUAL.getCompareOp())
+ .put(SqlStdOperatorTable.NOT_EQUALS, MongoOp.NOT_EQUAL.getCompareOp())
+ .put(SqlStdOperatorTable.GREATER_THAN, MongoOp.GREATER.getCompareOp())
+ .put(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, MongoOp.GREATER_OR_EQUAL.getCompareOp())
+ .put(SqlStdOperatorTable.LESS_THAN, MongoOp.LESS.getCompareOp())
+ .put(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, MongoOp.LESS_OR_EQUAL.getCompareOp())
+ .build();
+
+
+ protected RexToMongoTranslator(JavaTypeFactory typeFactory,
+ List<String> inFields) {
+ super(true);
+ this.typeFactory = typeFactory;
+ this.inFields = inFields;
+ }
+
+ @Override
+ public BsonValue visitLiteral(RexLiteral literal) {
+ if (literal.getValue() == null) {
+ return BsonNull.VALUE;
+ }
+ return BsonDocument.parse(String.format("{$literal: %s}",
+ RexToLixTranslator.translateLiteral(literal, literal.getType(),
+ typeFactory, RexImpTable.NullAs.NOT_POSSIBLE)));
+ }
+
+ @Override
+ public BsonValue visitInputRef(RexInputRef inputRef) {
+ return new BsonString("$" + inFields.get(inputRef.getIndex()));
+ }
+
+ @Override
+ public BsonValue visitCall(RexCall call) {
+ String name = isItem(call);
+ if (name != null) {
+ return new BsonString("'$" + name + "'");
+ }
+ List<BsonValue> strings = call.operands.stream()
+ .map(operand -> operand.accept(this))
+ .collect(Collectors.toList());
+
+ if (call.getKind() == SqlKind.CAST) {
+ return strings.get(0);
+ }
+ String stdOperator = MONGO_OPERATORS.get(call.getOperator());
+ if (stdOperator != null) {
+ return new BsonDocument(stdOperator, new BsonArray(strings));
+ }
+ if (call.getOperator() == SqlStdOperatorTable.ITEM) {
+ RexNode op1 = call.operands.get(1);
+ if (op1 instanceof RexLiteral) {
+ if (op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) {
+ return new BsonDocument("$arrayElemAt", new BsonArray(
+ Arrays.asList(strings.get(0), new BsonInt32(((RexLiteral) op1).getValueAs(Integer.class)))));
+ } else if (op1.getType().getSqlTypeName() == SqlTypeName.CHAR) {
+ return new BsonString(strings.get(0).asString().getValue() + "." + ((RexLiteral) op1).getValueAs(String.class));
+ }
+ }
+ }
+ if (call.getOperator() == SqlStdOperatorTable.CASE) {
+ // case(a, b, c) -> $cond:[a, b, c]
+ // case(a, b, c, d) -> $cond:[a, b, $cond:[c, d, null]]
+ // case(a, b, c, d, e) -> $cond:[a, b, $cond:[c, d, e]]
+ BsonDocument result = new BsonDocument();
+ BsonArray args = new BsonArray();
+ result.put("$cond", args);
+ for (int i = 0; i < strings.size(); i += 2) {
+ args.add(strings.get(i));
+ args.add(strings.get(i + 1));
+ if (i == strings.size() - 3) {
+ args.add(strings.get(i + 2));
+ break;
+ }
+ if (i == strings.size() - 2) {
+ args.add(BsonNull.VALUE);
+ break;
+ }
+ BsonArray innerArgs = new BsonArray();
+ BsonDocument innerDocument = new BsonDocument();
+ innerDocument.put("$cond", innerArgs);
+ args.add(innerDocument);
+ args = innerArgs;
+ }
+ return result;
+ }
+ throw new IllegalArgumentException("Translation of " + call + " is not supported by MongoProject");
+ }
+
+
+ /**
+ * Returns 'string' if it is a call to item['string'], null otherwise.
+ */
+ public static String isItem(RexCall call) {
+ if (call.getOperator() != SqlStdOperatorTable.ITEM) {
+ return null;
+ }
+ RexNode op0 = call.operands.get(0);
+ RexNode op1 = call.operands.get(1);
+ if (op0 instanceof RexInputRef
+ && ((RexInputRef) op0).getIndex() == 0
+ && op1 instanceof RexLiteral
+ && ((RexLiteral) op1).getValue2() instanceof String) {
+ return (String) ((RexLiteral) op1).getValue2();
+ }
+ return null;
+ }
+
+ public static boolean supportsExpression(RexNode expr) {
+ return expr.accept(new RexMongoChecker());
+ }
+
+ private static class RexMongoChecker extends RexVisitorImpl<Boolean> {
+
+ protected RexMongoChecker() {
+ super(true);
+ }
+
+ @Override
+ public Boolean visitLiteral(RexLiteral literal) {
+ return true;
+ }
+
+ @Override
+ public Boolean visitInputRef(RexInputRef inputRef) {
+ return true;
+ }
+
+ @Override
+ public Boolean visitCall(RexCall call) {
+ if (isItem(call) != null
+ || call.getKind() == SqlKind.CAST
+ || call.getOperator() == SqlStdOperatorTable.CASE
+ || MONGO_OPERATORS.get(call.getOperator()) != null) {
+ return true;
+ }
+
+ if (call.getOperator() == SqlStdOperatorTable.ITEM) {
+ RexNode op = call.operands.get(1);
+ return op instanceof RexLiteral
+ && (op.getType().getSqlTypeName() == SqlTypeName.INTEGER
+ || op.getType().getSqlTypeName() == SqlTypeName.CHAR);
+ }
+
+ return false;
+ }
+ }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java
index da52984..4249725 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoDatabaseSchema.java
@@ -32,8 +32,7 @@ import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory.MongoSchema;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
public class MongoDatabaseSchema extends AbstractSchema {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
- .getLogger(MongoDatabaseSchema.class);
+
private final MongoSchema mongoSchema;
private final Set<String> tableNames;
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index cc071a4..29f14ce 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -29,13 +29,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.AbstractSchemaFactory;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.mongo.MongoScanSpec;
import org.apache.drill.exec.store.mongo.MongoStoragePlugin;
import org.apache.drill.exec.store.mongo.MongoStoragePluginConfig;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -195,7 +195,7 @@ public class MongoSchemaFactory extends AbstractSchemaFactory {
DrillTable getDrillTable(String dbName, String collectionName) {
MongoScanSpec mongoScanSpec = new MongoScanSpec(schemaNameMap.get(dbName), collectionName);
- return new DynamicDrillTable(plugin, getName(), null, mongoScanSpec);
+ return new PluginDrillTable(plugin, getName(), null, mongoScanSpec, plugin.convention());
}
@Override
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
index d9cff73..8224470 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
@@ -39,7 +39,7 @@ public class MongoTestBase extends ClusterTest implements MongoTestConstants {
private static void initMongoStoragePlugin(String connectionURI) throws Exception {
MongoStoragePluginConfig storagePluginConfig = new MongoStoragePluginConfig(connectionURI,
- PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+ null, 100, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
storagePluginConfig.setEnabled(true);
pluginRegistry.put(MongoStoragePluginConfig.NAME, storagePluginConfig);
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
index e4c71b1..eb706d6 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
@@ -155,7 +155,7 @@ public class TestMongoChunkAssignment extends BaseTest {
mongoGroupScan = new MongoGroupScan();
mongoGroupScan.setChunksMapping(chunksMapping);
- mongoGroupScan.setInverseChunsMapping(chunksInverseMapping);
+ mongoGroupScan.setInverseChunksMapping(chunksInverseMapping);
MongoScanSpec scanSpec = new MongoScanSpec(dbName, collectionName);
mongoGroupScan.setScanSpec(scanSpec);
}
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
index 569cf5c..2344df7 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoLimitPushDown.java
@@ -30,31 +30,32 @@ public class TestMongoLimitPushDown extends MongoTestBase {
public void testLimit() throws Exception {
String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` LIMIT 4";
queryBuilder()
- .sql(sql)
- .planMatcher()
- .include("Limit", "maxRecords=4")
- .match();
+ .sql(sql)
+ .planMatcher()
+ .exclude("Limit\\(")
+ .include("MongoGroupScan.*\"\\$limit\": 4")
+ .match();
}
@Test
public void testLimitWithOrderBy() throws Exception {
- // Limit should not be pushed down for this example due to the sort
String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` ORDER BY employee_id LIMIT 4";
queryBuilder()
.sql(sql)
.planMatcher()
- .include("Limit", "maxRecords=-1")
+ .exclude("Limit")
+ .include("MongoGroupScan.*\"\\$sort\": \\{\"employee_id\": 1}", "\"\\$limit\": 4")
.match();
}
@Test
public void testLimitWithOffset() throws Exception {
- // Limit should be pushed down and include the offset
String sql = "SELECT `employee_id` FROM mongo.employee.`empinfo` LIMIT 4 OFFSET 5";
queryBuilder()
.sql(sql)
.planMatcher()
- .include("Limit", "maxRecords=9")
+ .exclude("Limit")
+ .include("\"\\$skip\": 5", "\"\\$limit\": 4")
.match();
}
@@ -64,7 +65,8 @@ public class TestMongoLimitPushDown extends MongoTestBase {
queryBuilder()
.sql(sql)
.planMatcher()
- .include("Limit", "maxRecords=4")
+ .exclude("Limit")
+ .include("\"\\$limit\": 4", "\"\\$eq\": 52\\.17")
.match();
}
}
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
index 48d13ce..a691443 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
@@ -23,21 +23,21 @@ import static org.apache.drill.test.TestBuilder.mapOf;
import org.apache.drill.categories.MongoStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.ExecConstants;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Ignore("DRILL-3775")
@Category({SlowTest.class, MongoStorageTest.class})
public class TestMongoProjectPushDown extends MongoTestBase {
- /**
- *
- * @throws Exception
- */
@Test
public void testComplexProjectPushdown() throws Exception {
+ queryBuilder()
+ .sql("select t.field_4.inner_3 as col_1, t.field_4 as col_2 from mongo.employee.schema_change t")
+ .planMatcher()
+ .include("MongoGroupScan.*\"\\$project\": \\{\"col_1\": \"\\$field_4.inner_3\", \"col_2\": \"\\$field_4\"\\}")
+ .match();
+
try {
testBuilder()
.sqlQuery("select t.field_4.inner_3 as col_1, t.field_4 as col_2 from mongo.employee.schema_change t")
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
index 4b20ebc..2c79dd8 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
@@ -23,6 +23,8 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.junit.Assert.assertEquals;
+
@Category({SlowTest.class, MongoStorageTest.class})
public class TestMongoQueries extends MongoTestBase {
@@ -42,6 +44,15 @@ public class TestMongoQueries extends MongoTestBase {
}
@Test
+ public void testSerDe() throws Exception {
+ String plan = queryBuilder()
+ .sql(String.format(TEST_BOOLEAN_FILTER_QUERY_TEMPLATE1, EMPLOYEE_DB, EMPINFO_COLLECTION))
+ .explainJson();
+
+ assertEquals(queryBuilder().physical(plan).run().recordCount(), 11);
+ }
+
+ @Test
public void testWithANDOperator() throws Exception {
testBuilder()
.sqlQuery(String.format(TEST_BOOLEAN_FILTER_QUERY_TEMPLATE3, EMPLOYEE_DB, EMPINFO_COLLECTION))
@@ -105,4 +116,192 @@ public class TestMongoQueries extends MongoTestBase {
.expectsNumRecords(5)
.go();
}
+
+ @Test
+ public void testCountColumnPushDown() throws Exception {
+ String query = "select count(t.name) as c from mongo.%s.`%s` t";
+
+ queryBuilder()
+ .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+ .planMatcher()
+ .exclude("Agg\\(")
+ .include("MongoGroupScan.*group")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(5)
+ .go();
+ }
+
+ @Test
+ public void testSumColumnPushDown() throws Exception {
+ String query = "select sum(t.sales) as s from mongo.%s.`%s` t";
+
+ queryBuilder()
+ .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+ .planMatcher()
+ .exclude("Agg\\(")
+ .include("MongoGroupScan.*group")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+ .unOrdered()
+ .baselineColumns("s")
+ .baselineValues(1194)
+ .go();
+ }
+
+ @Test
+ public void testCountGroupByPushDown() throws Exception {
+ String query = "select count(t.id) as c, t.type from mongo.%s.`%s` t group by t.type";
+
+ queryBuilder()
+ .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+ .planMatcher()
+ .exclude("Agg\\(")
+ .include("MongoGroupScan.*group")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+ .unOrdered()
+ .baselineColumns("c", "type")
+ .baselineValues(5, "donut")
+ .go();
+ }
+
+ @Test
+ public void testSumGroupByPushDown() throws Exception {
+ String query = "select sum(t.sales) s, t.type from mongo.%s.`%s` t group by t.type";
+
+ queryBuilder()
+ .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+ .planMatcher()
+ .exclude("Agg\\(")
+ .include("MongoGroupScan.*group")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+ .unOrdered()
+ .baselineColumns("s", "type")
+ .baselineValues(1194, "donut")
+ .go();
+ }
+
+ @Test
+ public void testCountColumnPushDownWithFilter() throws Exception {
+ String query = "select count(t.id) as c from mongo.%s.`%s` t where t.name = 'Cake'";
+
+ queryBuilder()
+ .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+ .planMatcher()
+ .exclude("Agg\\(", "Filter")
+ .include("MongoGroupScan.*group")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(1)
+ .go();
+ }
+
+ @Test
+ public void testUnionAll() throws Exception {
+ String query = "select t1.id as id, t1.name from mongo.%1$s.`%2$s` t1 where t1.name = 'Cake' union all " +
+ "select t2.id as id, t2.name from mongo.%1$s.`%2$s` t2";
+
+ queryBuilder()
+ .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+ .planMatcher()
+ .exclude("UnionAll\\(")
+ .include("MongoGroupScan.*\\$unionWith")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+ .unOrdered()
+ .baselineColumns("id", "name")
+ .baselineValues("0001", "Cake")
+ .baselineValues("0001", "Cake")
+ .baselineValues("0002", "Raised")
+ .baselineValues("0003", "Old Fashioned")
+ .baselineValues("0004", "Filled")
+ .baselineValues("0005", "Apple Fritter")
+ .go();
+ }
+
+ @Test
+ public void testUnionDistinct() throws Exception {
+ String query = "select t1.id as id, t1.name from mongo.%1$s.`%2$s` t1 where t1.name = 'Cake' union " +
+ "select t2.id as id, t2.name from mongo.%1$s.`%2$s` t2 ";
+
+ queryBuilder()
+ .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+ .planMatcher()
+ .exclude("UnionAll\\(", "Agg\\(")
+ .include("MongoGroupScan.*\\$unionWith")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+ .unOrdered()
+ .baselineColumns("id", "name")
+ .baselineValues("0001", "Cake")
+ .baselineValues("0002", "Raised")
+ .baselineValues("0003", "Old Fashioned")
+ .baselineValues("0004", "Filled")
+ .baselineValues("0005", "Apple Fritter")
+ .go();
+ }
+
+ @Test
+ public void testProjectPushDown() throws Exception {
+ String query = "select t.sales * t.sales as c, t.name from mongo.%s.`%s` t";
+
+ queryBuilder()
+ .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+ .planMatcher()
+ .include("MongoGroupScan.*project.*multiply")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+ .unOrdered()
+ .baselineColumns("c", "name")
+ .baselineValues(196, "Filled")
+ .baselineValues(1225, "Cake")
+ .baselineValues(21025, "Raised")
+ .baselineValues(90000, "Old Fashioned")
+ .baselineValues(490000, "Apple Fritter")
+ .go();
+ }
+
+ @Test
+ public void testProjectPushDownWithCase() throws Exception {
+ String query = "select case when t.sales >= 700 then 2 when t.sales > 145 then 1 else 0 end as c, t.name from mongo.%s.`%s` t";
+
+ queryBuilder()
+ .sql(query, DONUTS_DB, DONUTS_COLLECTION)
+ .planMatcher()
+ .include("MongoGroupScan.*project.*cond.*\\$gt")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION)
+ .unOrdered()
+ .baselineColumns("c", "name")
+ .baselineValues(0, "Filled")
+ .baselineValues(0, "Cake")
+ .baselineValues(0, "Raised")
+ .baselineValues(1, "Old Fashioned")
+ .baselineValues(2, "Apple Fritter")
+ .go();
+ }
}
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
index d1fd63f..298744b 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoStoragePluginUsesCredentialsStore.java
@@ -35,7 +35,7 @@ public class TestMongoStoragePluginUsesCredentialsStore extends BaseTest {
private void test(String expectedUserName, String expectedPassword, String connection, String name) throws ExecutionSetupException {
MongoStoragePlugin plugin = new MongoStoragePlugin(
- new MongoStoragePluginConfig(connection, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER),
+ new MongoStoragePluginConfig(connection, null, 100, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER),
null, name);
MongoClientImpl client = (MongoClientImpl) plugin.getClient();
MongoCredential cred = client.getSettings().getCredential();
diff --git a/docs/dev/MongoStoragePluginCredentials.md b/docs/dev/MongoStoragePluginCredentials.md
new file mode 100644
index 0000000..9819a52
--- /dev/null
+++ b/docs/dev/MongoStoragePluginCredentials.md
@@ -0,0 +1,58 @@
+# Mongo storage plugin credentials
+
+Apache Drill provides the ability to connect and submit queries to
+the MongoDB cluster with enabled authentication.
+
+## Configuring storage plugin
+
+Drill provides numerous ways for providing credentials
+that will be used for connecting to MongoDB.
+
+### Providing username and password with the connection string
+
+The simplest way for providing credentials is by specifying them within the mongo connection string:
+
+```json
+{
+ "type": "mongo",
+ "connection": "mongodb://user1:user1Pass@mongoHost:27017/",
+ "batchSize": 100,
+ "enabled": true
+}
+```
+
+where
+ - `user1` - name of the user
+ - `user1Pass` - user password
+ - `mongoHost` mongo host
+
+This way of providing username and password takes precedence over all other methods.
+
+### Providing username and password with Credentials Provider
+
+Mongo storage plugin is integrated with Credentials provider, so it is possible to specify credentials using it.
+
+Credentials provider creates connection string with provided username and password,
+similar to the previous section, so Drill will use this connection string when connecting to MongoDB.
+
+Here is the example of using Plain Credentials Provider with Mongo storage plugin, but it is possible to use any
+other Credentials Provider implementation (including custom ones) in a similar manner:
+
+```json
+{
+ "type": "mongo",
+ "connection": "mongodb://mongoHost:27017/",
+ "batchSize": 100,
+ "credentialsProvider": {
+ "credentialsProviderType": "PlainCredentialsProvider",
+ "credentials": {
+ "username": "user1",
+ "password": "user1Pass"
+ }
+ },
+ "enabled": true
+}
+```
+
+Please refer to [Plugin credentials provider](https://github.com/apache/drill/blob/master/docs/dev/PluginCredentialsProvider.md#developer-notes)
+for more details related to Credentials Provider.
diff --git a/docs/dev/StoragePluginPushdowns.md b/docs/dev/StoragePluginPushdowns.md
new file mode 100644
index 0000000..0b97317
--- /dev/null
+++ b/docs/dev/StoragePluginPushdowns.md
@@ -0,0 +1,31 @@
+# Generic storage plugin pushdown framework
+
+Storage plugins may support specific set of operations that have corresponding Drill operators.
+The goal of this framework is to simplify the process of creating conversions from drill operators to
+plugin-specific operations (pushdowns).
+
+## How it works
+
+A specific plugin that uses this framework, defines a list of pushdowns it supports.
+The framework will add all required rules based on this list to the planner,
+so if at any point of the planning process, the planner will find out that it is possible to do
+a specific optimization, corresponding part of the plan will be marked as plugin operation.
+
+## How to use it
+
+Storage plugin should define logic of converting specific operators to plugin expressions by implementing
+`org.apache.drill.exec.store.plan.PluginImplementor` interface
+(or extending `org.apache.drill.exec.store.plan.AbstractPluginImplementor`) class.
+This implementation should have both `canImplement(RelNode)` and `implement(PluginRel)` methods
+to be able to support specific pushdown. The first one checks whether it is possible to convert
+the operation to plugin-specific expression, and the second one is responsible for the conversion itself.
+
+Storage plugin should instantiate `org.apache.drill.exec.store.StoragePluginRulesSupplier` and specify for it
+list of optimizations it wants to enable (it may be done configurable here), specify previously defined
+implementation of `org.apache.drill.exec.store.plan.PluginImplementor`, and `org.apache.calcite.plan.Convention`
+that includes plugin name.
+This `org.apache.drill.exec.store.StoragePluginRulesSupplier` should be used in
+`AbstractStoragePlugin.getOptimizerRules()` overridden method by calling
+`org.apache.drill.exec.store.StoragePluginRulesSupplier.getOptimizerRules()`
+
+Please use `MongoStoragePlugin` as an example, it supports almost all of supported optimizations.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index cc549b1..6779d0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.drill.exec.metastore.analyze.AnalyzeInfoProvider;
import org.apache.drill.metastore.metadata.TableMetadata;
import org.apache.drill.metastore.metadata.TableMetadataProvider;
@@ -92,6 +93,11 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
return getScanStats();
}
+ @Override
+ public ScanStats getScanStats(RelMetadataQuery mq) {
+ return getScanStats();
+ }
+
@JsonIgnore
public ScanStats getScanStats() {
throw new UnsupportedOperationException("This should be implemented.");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 9eaa043..2c6ff4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.physical.base;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.Collection;
import java.util.List;
+
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
@@ -88,6 +90,9 @@ public interface GroupScan extends Scan, HasAffinity {
@JsonIgnore
ScanStats getScanStats(PlannerSettings settings);
+ @JsonIgnore
+ ScanStats getScanStats(RelMetadataQuery mq);
+
/**
* Returns a clone of GroupScan instance, except that the new GroupScan will use the provided list of columns .
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
index fe67709..a307f93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.planner.common;
import java.io.IOException;
import java.util.List;
+
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.logical.DrillTable;
@@ -87,5 +89,5 @@ public abstract class DrillScanRelBase extends TableScan implements DrillRelNode
return planner.getCostFactory().makeCost(dRows, dCpu, dIo);
}
- public abstract DrillScanRelBase copy(RelTraitSet traitSet, GroupScan scan);
+ public abstract DrillScanRelBase copy(RelTraitSet traitSet, GroupScan scan, RelDataType rowType);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSortRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSortRelBase.java
new file mode 100644
index 0000000..75b0c1e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillSortRelBase.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * Base class for logical and physical Sort implemented in Drill.
+ */
+public abstract class DrillSortRelBase extends Sort implements DrillRelNode, OrderedRel {
+
+ protected DrillSortRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
+ super(cluster, traits, input, collation);
+ }
+
+ protected DrillSortRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traits, input, collation, offset, fetch);
+ }
+
+ @Override
+ public RexNode getOffset() {
+ return offset;
+ }
+
+ @Override
+ public RexNode getFetch() {
+ return fetch;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index 26ef4ea..bcd9792 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -193,7 +193,7 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
}
@Override
- public DrillScanRel copy(RelTraitSet traitSet, GroupScan scan) {
- return new DrillScanRel(getCluster(), getTraitSet(), getTable(), scan, getRowType(), getColumns(), partitionFilterPushdown());
+ public DrillScanRel copy(RelTraitSet traitSet, GroupScan scan, RelDataType rowType) {
+ return new DrillScanRel(getCluster(), getTraitSet(), getTable(), scan, rowType, getColumns(), partitionFilterPushdown());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
index 1e380cf..0fb7b8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
@@ -25,13 +25,12 @@ import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.common.logical.data.Order;
import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.planner.common.DrillSortRelBase;
import org.apache.drill.exec.planner.torel.ConversionContext;
-import org.apache.drill.exec.planner.common.OrderedRel;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rex.RexNode;
@@ -42,7 +41,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
/**
* Sort implemented in Drill.
*/
-public class DrillSortRel extends Sort implements DrillRel,OrderedRel {
+public class DrillSortRel extends DrillSortRelBase implements DrillRel {
/** Creates a DrillSortRel. */
public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
@@ -100,16 +99,6 @@ public class DrillSortRel extends Sort implements DrillRel,OrderedRel {
}
@Override
- public RexNode getOffset() {
- return offset;
- }
-
- @Override
- public RexNode getFetch() {
- return fetch;
- }
-
- @Override
public boolean canBeDropped() {
return true;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index 50996b9..1e0bdf4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -60,8 +60,8 @@ public class ScanPrel extends DrillScanRelBase implements LeafPrel, HasDistribut
}
@Override
- public ScanPrel copy(RelTraitSet traitSet, GroupScan scan) {
- return new ScanPrel(getCluster(), traitSet, scan, getRowType(), getTable());
+ public ScanPrel copy(RelTraitSet traitSet, GroupScan scan, RelDataType rowType) {
+ return new ScanPrel(getCluster(), traitSet, scan, rowType, getTable());
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index 7e415b2..3e68d7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -21,13 +21,13 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import org.apache.drill.exec.planner.common.DrillSortRelBase;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.calcite.rel.RelCollationImpl;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.planner.common.OrderedRel;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
@@ -41,7 +41,7 @@ import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rex.RexNode;
-public class SortPrel extends org.apache.calcite.rel.core.Sort implements OrderedRel,Prel {
+public class SortPrel extends DrillSortRelBase implements Prel {
private final boolean isRemovable;
/** Creates a DrillSortRel. */
@@ -154,16 +154,6 @@ public class SortPrel extends org.apache.calcite.rel.core.Sort implements Ordere
}
@Override
- public RexNode getOffset() {
- return offset;
- }
-
- @Override
- public RexNode getFetch() {
- return fetch;
- }
-
- @Override
public boolean canBeDropped() {
return isRemovable;
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
similarity index 58%
copy from contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
index ef89bfb..635b7c6 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProvider.java
@@ -15,20 +15,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.mongo.common;
-
-public enum MongoCompareOp {
- EQUAL("$eq"), NOT_EQUAL("$ne"), GREATER_OR_EQUAL("$gte"), GREATER("$gt"), LESS_OR_EQUAL(
- "$lte"), LESS("$lt"), IN("$in"), AND("$and"), OR("$or"), REGEX("$regex"), OPTIONS(
- "$options"), PROJECT("$project"), COND("$cond"), IFNULL("$ifNull"), IFNOTNULL(
- "$ifNotNull"), SUM("$sum"), GROUP_BY("$group"), EXISTS("$exists");
- private String compareOp;
-
- MongoCompareOp(String compareOp) {
- this.compareOp = compareOp;
- }
-
- public String getCompareOp() {
- return compareOp;
- }
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.RelOptRule;
+
+import java.util.List;
+
+/**
+ * Provides rules required for adding support of specific operator pushdown for storage plugin.
+ */
+public interface PluginRulesProvider {
+
+ List<RelOptRule> sortRules();
+
+ List<RelOptRule> limitRules();
+
+ List<RelOptRule> filterRules();
+
+ List<RelOptRule> projectRules();
+
+ List<RelOptRule> aggregateRules();
+
+ List<RelOptRule> unionRules();
+
+ List<RelOptRule> joinRules();
+
+ RelOptRule vertexRule();
+
+ RelOptRule prelConverterRule();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProviderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProviderImpl.java
new file mode 100644
index 0000000..b91b47e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRulesProviderImpl.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rule.PluginAggregateRule;
+import org.apache.drill.exec.store.plan.rule.PluginFilterRule;
+import org.apache.drill.exec.store.plan.rule.PluginIntermediatePrelConverterRule;
+import org.apache.drill.exec.store.plan.rule.PluginJoinRule;
+import org.apache.drill.exec.store.plan.rule.PluginLimitRule;
+import org.apache.drill.exec.store.plan.rule.PluginProjectRule;
+import org.apache.drill.exec.store.plan.rule.PluginSortRule;
+import org.apache.drill.exec.store.plan.rule.PluginUnionRule;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class PluginRulesProviderImpl implements PluginRulesProvider {
+
+ private final Supplier<PluginImplementor> implementorSupplier;
+ private final PluginImplementor pluginImplementor;
+ private final Convention convention;
+
+ public PluginRulesProviderImpl(Convention convention,
+ Supplier<PluginImplementor> implementorSupplier) {
+ this.convention = convention;
+ this.implementorSupplier = implementorSupplier;
+ this.pluginImplementor = implementorSupplier.get();
+ }
+
+ @Override
+ public List<RelOptRule> sortRules() {
+ return Arrays.asList(
+ new PluginSortRule(Convention.NONE, convention, pluginImplementor),
+ new PluginSortRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+ );
+ }
+
+ @Override
+ public List<RelOptRule> limitRules() {
+ return Arrays.asList(
+ new PluginLimitRule(Convention.NONE, convention, pluginImplementor),
+ new PluginLimitRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+ );
+ }
+
+ @Override
+ public List<RelOptRule> filterRules() {
+ return Arrays.asList(
+ new PluginFilterRule(Convention.NONE, convention, pluginImplementor),
+ new PluginFilterRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+ );
+ }
+
+ @Override
+ public List<RelOptRule> projectRules() {
+ return Arrays.asList(
+ new PluginProjectRule(Convention.NONE, convention, pluginImplementor),
+ new PluginProjectRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+ );
+ }
+
+ @Override
+ public List<RelOptRule> aggregateRules() {
+ return Arrays.asList(
+ new PluginAggregateRule(Convention.NONE, convention, pluginImplementor),
+ new PluginAggregateRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+ );
+ }
+
+ @Override
+ public List<RelOptRule> unionRules() {
+ return Arrays.asList(
+ new PluginUnionRule(Convention.NONE, convention, pluginImplementor),
+ new PluginUnionRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+ );
+ }
+
+ @Override
+ public List<RelOptRule> joinRules() {
+ return Arrays.asList(
+ new PluginJoinRule(Convention.NONE, convention, pluginImplementor),
+ new PluginJoinRule(DrillRel.DRILL_LOGICAL, convention, pluginImplementor)
+ );
+ }
+
+ @Override
+ public RelOptRule vertexRule() {
+ return new VertexDrelConverterRule(convention);
+ }
+
+ @Override
+ public RelOptRule prelConverterRule() {
+ return new PluginIntermediatePrelConverterRule(convention, implementorSupplier);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRulesSupplier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRulesSupplier.java
new file mode 100644
index 0000000..2dc14ac
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRulesSupplier.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+/**
+ * Helper class that can be used to obtain rules required for pushing down operators
+ * that specific plugin supports configured using {@link StoragePluginRulesSupplierBuilder}.
+ */
+public class StoragePluginRulesSupplier {
+
+ private final StoragePluginRulesSupplierBuilder storagePluginRulesSupplierBuilder;
+
+ private StoragePluginRulesSupplier(StoragePluginRulesSupplierBuilder storagePluginRulesSupplierBuilder) {
+ this.storagePluginRulesSupplierBuilder = storagePluginRulesSupplierBuilder;
+ }
+
+ public Set<? extends RelOptRule> getOptimizerRules() {
+ ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.builder();
+ PluginRulesProvider rulesProvider = storagePluginRulesSupplierBuilder.rulesProvider();
+ if (storagePluginRulesSupplierBuilder.supportsProjectPushdown()) {
+ builder.addAll(rulesProvider.projectRules());
+ }
+ if (storagePluginRulesSupplierBuilder.supportsFilterPushdown()) {
+ builder.addAll(rulesProvider.filterRules());
+ }
+ if (storagePluginRulesSupplierBuilder.supportsSortPushdown()) {
+ builder.addAll(rulesProvider.sortRules());
+ }
+ if (storagePluginRulesSupplierBuilder.supportsUnionPushdown()) {
+ builder.addAll(rulesProvider.unionRules());
+ }
+ if (storagePluginRulesSupplierBuilder.supportsJoinPushdown()) {
+ builder.addAll(rulesProvider.joinRules());
+ }
+ if (storagePluginRulesSupplierBuilder.supportsAggregatePushdown()) {
+ builder.addAll(rulesProvider.aggregateRules());
+ }
+ if (storagePluginRulesSupplierBuilder.supportsLimitPushdown()) {
+ builder.addAll(rulesProvider.limitRules());
+ }
+ builder.add(rulesProvider.vertexRule());
+ builder.add(rulesProvider.prelConverterRule());
+ return builder.build();
+ }
+
+ public Convention convention() {
+ return storagePluginRulesSupplierBuilder.convention();
+ }
+
+ public static StoragePluginRulesSupplierBuilder builder() {
+ return new StoragePluginRulesSupplierBuilder();
+ }
+
+ public static class StoragePluginRulesSupplierBuilder {
+
+ private boolean supportsProjectPushdown;
+
+ private boolean supportsFilterPushdown;
+
+ private boolean supportsAggregatePushdown;
+
+ private boolean supportsSortPushdown;
+
+ private boolean supportsUnionPushdown;
+
+ private boolean supportsJoinPushdown;
+
+ private boolean supportsLimitPushdown;
+
+ private PluginRulesProvider rulesProvider;
+
+ private Convention convention;
+
+ public boolean supportsProjectPushdown() {
+ return supportsProjectPushdown;
+ }
+
+ public StoragePluginRulesSupplierBuilder supportsProjectPushdown(boolean supportsProjectPushdown) {
+ this.supportsProjectPushdown = supportsProjectPushdown;
+ return this;
+ }
+
+ public boolean supportsFilterPushdown() {
+ return supportsFilterPushdown;
+ }
+
+ public StoragePluginRulesSupplierBuilder supportsFilterPushdown(boolean supportsFilterPushdown) {
+ this.supportsFilterPushdown = supportsFilterPushdown;
+ return this;
+ }
+
+ public boolean supportsAggregatePushdown() {
+ return supportsAggregatePushdown;
+ }
+
+ public StoragePluginRulesSupplierBuilder supportsAggregatePushdown(boolean supportsAggregatePushdown) {
+ this.supportsAggregatePushdown = supportsAggregatePushdown;
+ return this;
+ }
+
+ public boolean supportsSortPushdown() {
+ return supportsSortPushdown;
+ }
+
+ public StoragePluginRulesSupplierBuilder supportsSortPushdown(boolean supportsSortPushdown) {
+ this.supportsSortPushdown = supportsSortPushdown;
+ return this;
+ }
+
+ public boolean supportsUnionPushdown() {
+ return supportsUnionPushdown;
+ }
+
+ public StoragePluginRulesSupplierBuilder supportsUnionPushdown(boolean supportsUnionPushdown) {
+ this.supportsUnionPushdown = supportsUnionPushdown;
+ return this;
+ }
+
+ public boolean supportsJoinPushdown() {
+ return supportsJoinPushdown;
+ }
+
+ public StoragePluginRulesSupplierBuilder supportsJoinPushdown(boolean supportsJoinPushdown) {
+ this.supportsJoinPushdown = supportsJoinPushdown;
+ return this;
+ }
+
+ public boolean supportsLimitPushdown() {
+ return supportsLimitPushdown;
+ }
+
+ public StoragePluginRulesSupplierBuilder supportsLimitPushdown(boolean supportsLimitPushdown) {
+ this.supportsLimitPushdown = supportsLimitPushdown;
+ return this;
+ }
+
+ public PluginRulesProvider rulesProvider() {
+ return rulesProvider;
+ }
+
+ public StoragePluginRulesSupplierBuilder rulesProvider(PluginRulesProvider rulesProvider) {
+ this.rulesProvider = rulesProvider;
+ return this;
+ }
+
+ public Convention convention() {
+ return convention;
+ }
+
+ public StoragePluginRulesSupplierBuilder convention(Convention convention) {
+ this.convention = convention;
+ return this;
+ }
+
+ public StoragePluginRulesSupplier build() {
+ return new StoragePluginRulesSupplier(this);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
new file mode 100644
index 0000000..7c3639b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan;
+
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Abstract base implementation of {@link PluginImplementor} that can be used by
+ * plugin implementors which can support only a subset of all provided operations.
+ */
+public abstract class AbstractPluginImplementor implements PluginImplementor {
+ private static final Logger logger = LoggerFactory.getLogger(AbstractPluginImplementor.class);
+
+ @Override
+ public void implement(PluginAggregateRel aggregate) throws IOException {
+ throw getUnsupported("aggregate");
+ }
+
+ @Override
+ public void implement(PluginFilterRel filter) throws IOException {
+ throw getUnsupported("filter");
+ }
+
+ @Override
+ public void implement(PluginLimitRel limit) throws IOException {
+ throw getUnsupported("limit");
+ }
+
+ @Override
+ public void implement(PluginProjectRel project) throws IOException {
+ throw getUnsupported("project");
+ }
+
+ @Override
+ public void implement(PluginSortRel sort) throws IOException {
+ throw getUnsupported("sort");
+ }
+
+ @Override
+ public void implement(PluginUnionRel union) throws IOException {
+ throw getUnsupported("union");
+ }
+
+ @Override
+ public void implement(PluginJoinRel join) throws IOException {
+ throw getUnsupported("join");
+ }
+
+ @Override
+ public void implement(StoragePluginTableScan scan) throws IOException {
+ throw getUnsupported("scan");
+ }
+
+ @Override
+ public boolean canImplement(Aggregate aggregate) {
+ return false;
+ }
+
+ @Override
+ public boolean canImplement(Filter filter) {
+ return false;
+ }
+
+ @Override
+ public boolean canImplement(DrillLimitRelBase limit) {
+ return false;
+ }
+
+ @Override
+ public boolean canImplement(Project project) {
+ return false;
+ }
+
+ @Override
+ public boolean canImplement(Sort sort) {
+ return false;
+ }
+
+ @Override
+ public boolean canImplement(Union union) {
+ return false;
+ }
+
+ @Override
+ public boolean canImplement(TableScan scan) {
+ return false;
+ }
+
+ @Override
+ public boolean canImplement(Join scan) {
+ return false;
+ }
+
+ private UserException getUnsupported(String rel) {
+ return UserException.unsupportedError()
+ .message("Plugin implementor doesn't support push down for %s", rel)
+ .build(logger);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
new file mode 100644
index 0000000..db46467
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+
+import java.io.IOException;
+
+/**
+ * Callback for the implementation process that checks whether a specific operator
+ * can be converted and converts a tree of {@link PluginRel} nodes into expressions
+ * that can be consumed by the storage plugin.
+ */
+public interface PluginImplementor {
+
+ void implement(PluginAggregateRel aggregate) throws IOException;
+
+ void implement(PluginFilterRel filter) throws IOException;
+
+ void implement(PluginLimitRel limit) throws IOException;
+
+ void implement(PluginProjectRel project) throws IOException;
+
+ void implement(PluginSortRel sort) throws IOException;
+
+ void implement(PluginUnionRel union) throws IOException;
+
+ void implement(PluginJoinRel join) throws IOException;
+
+ void implement(StoragePluginTableScan scan) throws IOException;
+
+ boolean canImplement(Aggregate aggregate);
+
+ boolean canImplement(Filter filter);
+
+ boolean canImplement(DrillLimitRelBase limit);
+
+ boolean canImplement(Project project);
+
+ boolean canImplement(Sort sort);
+
+ boolean canImplement(Union union);
+
+ boolean canImplement(Join scan);
+
+ boolean canImplement(TableScan scan);
+
+ GroupScan getPhysicalOperator() throws IOException;
+
+ default void visitChild(RelNode input) throws IOException {
+ ((PluginRel) input).implement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
new file mode 100644
index 0000000..02885e9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Aggregate implementation for Drill plugins.
+ */
+public class PluginAggregateRel extends DrillAggregateRelBase implements PluginRel {
+
+ public PluginAggregateRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
+ ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
+ assert getConvention() == input.getConvention();
+ }
+
+ @Override
+ public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ return new PluginAggregateRel(getCluster(), traitSet, input, groupSet, groupSets, aggCalls);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+
+ @Override
+ public boolean canImplement(PluginImplementor implementor) {
+ return implementor.canImplement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginDrillTable.java
new file mode 100644
index 0000000..7528d99
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginDrillTable.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.util.Utilities;
+
+import java.io.IOException;
+
+/**
+ * Table implementation based on {@link DynamicDrillTable} to be used by Drill plugins.
+ */
+public class PluginDrillTable extends DynamicDrillTable implements TranslatableTable {
+ private final Convention convention;
+
+ public PluginDrillTable(StoragePlugin plugin, String storageEngineName,
+ String userName, Object selection, Convention convention) {
+ super(plugin, storageEngineName, userName, selection);
+ this.convention = convention;
+ }
+
+ @Override
+ public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable table) {
+ DrillTable drillTable = Utilities.getDrillTable(table);
+ try {
+ return new StoragePluginTableScan(context.getCluster(),
+ context.getCluster().traitSetOf(convention),
+ drillTable.getGroupScan(),
+ table,
+ table.getRowType());
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java
new file mode 100644
index 0000000..cb63571
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginFilterRel.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillFilterRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+
+/**
+ * Filter implementation for Drill plugins.
+ */
+public class PluginFilterRel extends DrillFilterRelBase implements PluginRel {
+
+ public PluginFilterRel(Convention convention, RelOptCluster cluster,
+ RelTraitSet traitSet, RelNode child, RexNode condition) {
+ super(convention, cluster, traitSet, child, condition);
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override public PluginFilterRel copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+ return new PluginFilterRel(getConvention(), getCluster(), traitSet, input, condition);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+
+ @Override
+ public boolean canImplement(PluginImplementor implementor) {
+ return implementor.canImplement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginIntermediatePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginIntermediatePrel.java
new file mode 100644
index 0000000..8c9f9bb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginIntermediatePrel.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.SinglePrel;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.planner.sql.handlers.PrelFinalizable;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Prel used to represent a Plugin Conversion within an expression tree.
+ * This Prel will replaced with a full {@link PluginPrel} before execution can happen.
+ */
+public class PluginIntermediatePrel extends SinglePrel implements PrelFinalizable {
+
+ private final Supplier<PluginImplementor> implementorFactory;
+
+ public PluginIntermediatePrel(RelOptCluster cluster, RelTraitSet traits,
+ RelNode child, Supplier<PluginImplementor> implementorFactory) {
+ super(cluster, traits, child);
+ this.implementorFactory = implementorFactory;
+ }
+
+ @Override
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new PluginIntermediatePrel(getCluster(), traitSet, getInput(), implementorFactory);
+ }
+
+ @Override
+ protected Object clone() throws CloneNotSupportedException {
+ return copy(getTraitSet(), getInputs());
+ }
+
+ @Override
+ public SelectionVectorMode getEncoding() {
+ return SelectionVectorMode.NONE;
+ }
+
+ @Override
+ public Prel finalizeRel() {
+ return new PluginPrel(getCluster(), this);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) {
+ throw new UnsupportedOperationException("This needs to be finalized before using a PrelVisitor.");
+ }
+
+ @Override
+ public boolean needsFinalColumnReordering() {
+ return false;
+ }
+
+ public PluginImplementor getPluginImplementor() {
+ return implementorFactory.get();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginJoinRel.java
new file mode 100644
index 0000000..4bf767f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginJoinRel.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillJoinRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+
+/**
+ * Join implementation for Drill plugins.
+ */
+public class PluginJoinRel extends DrillJoinRelBase implements PluginRel {
+
+ public PluginJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+ RexNode condition, JoinRelType joinType) {
+ super(cluster, traits, left, right, condition, joinType);
+ }
+
+ @Override
+ public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right,
+ JoinRelType joinType, boolean semiJoinDone) {
+ return new PluginJoinRel(getCluster(), traitSet, left, right, conditionExpr, joinType);
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+
+ @Override
+ public boolean canImplement(PluginImplementor implementor) {
+ return implementor.canImplement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java
new file mode 100644
index 0000000..6902ce2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginLimitRel.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Limit implementation for Drill plugins.
+ */
+public class PluginLimitRel extends DrillLimitRelBase implements PluginRel {
+
+ public PluginLimitRel(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode child, RexNode offset, RexNode fetch) {
+ super(cluster, traitSet, child, offset, fetch);
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.05);
+ }
+
+ @Override
+ public PluginLimitRel copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new PluginLimitRel(getCluster(), traitSet, inputs.get(0), offset, fetch);
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+
+ @Override
+ public boolean canImplement(PluginImplementor implementor) {
+ return implementor.canImplement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
new file mode 100644
index 0000000..cc96426
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.SubsetRemover;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * Represents a plugin-specific plan once children nodes have been pushed down into group scan.
+ */
+public class PluginPrel extends AbstractRelNode implements Prel {
+ private final GroupScan groupScan;
+ private final RelDataType rowType;
+
+ public PluginPrel(RelOptCluster cluster, PluginIntermediatePrel intermediatePrel) {
+ super(cluster, intermediatePrel.getTraitSet());
+ this.rowType = intermediatePrel.getRowType();
+ PluginRel input = (PluginRel) intermediatePrel.getInput().accept(SubsetRemover.INSTANCE);
+ try {
+ PluginImplementor implementor = intermediatePrel.getPluginImplementor();
+ input.implement(implementor);
+ this.groupScan = implementor.getPhysicalOperator();
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) {
+ return creator.addMetadata(this, groupScan);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+ return logicalVisitor.visitPrel(this, value);
+ }
+
+ @Override
+ public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
+ return BatchSchema.SelectionVectorMode.DEFAULT;
+ }
+
+ @Override
+ public BatchSchema.SelectionVectorMode getEncoding() {
+ return BatchSchema.SelectionVectorMode.NONE;
+ }
+
+ @Override
+ public boolean needsFinalColumnReordering() {
+ return false;
+ }
+
+ @Override
+ public Iterator<Prel> iterator() {
+ return Collections.emptyIterator();
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw).item("groupScan", groupScan);
+ }
+
+ @Override
+ public double estimateRowCount(RelMetadataQuery mq) {
+ return groupScan.getScanStats(mq).getRecordCount();
+ }
+
+ @Override
+ protected RelDataType deriveRowType() {
+ return rowType;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java
new file mode 100644
index 0000000..b646ef7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginProjectRel.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Project implementation for Drill plugins.
+ */
+public class PluginProjectRel extends DrillProjectRelBase implements PluginRel {
+
+ public PluginProjectRel(Convention convention, RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+ super(convention, cluster, traitSet, input, projects, rowType);
+ assert getConvention() == input.getConvention();
+ }
+
+ @Override public Project copy(RelTraitSet traitSet, RelNode input,
+ List<RexNode> projects, RelDataType rowType) {
+ return new PluginProjectRel(getConvention(), getCluster(), traitSet, input, projects, rowType);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+
+ @Override
+ public boolean canImplement(PluginImplementor implementor) {
+ return implementor.canImplement(this);
+ }
+}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
similarity index 58%
rename from contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
index ef89bfb..55b54f7 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/common/MongoCompareOp.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginRel.java
@@ -15,20 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.mongo.common;
+package org.apache.drill.exec.store.plan.rel;
-public enum MongoCompareOp {
- EQUAL("$eq"), NOT_EQUAL("$ne"), GREATER_OR_EQUAL("$gte"), GREATER("$gt"), LESS_OR_EQUAL(
- "$lte"), LESS("$lt"), IN("$in"), AND("$and"), OR("$or"), REGEX("$regex"), OPTIONS(
- "$options"), PROJECT("$project"), COND("$cond"), IFNULL("$ifNull"), IFNOTNULL(
- "$ifNotNull"), SUM("$sum"), GROUP_BY("$group"), EXISTS("$exists");
- private String compareOp;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.store.plan.PluginImplementor;
- MongoCompareOp(String compareOp) {
- this.compareOp = compareOp;
- }
+import java.io.IOException;
- public String getCompareOp() {
- return compareOp;
- }
+/**
+ * Relational expression that uses specific plugin calling convention.
+ */
+public interface PluginRel extends RelNode {
+
+ void implement(PluginImplementor implementor) throws IOException;
+
+ boolean canImplement(PluginImplementor implementor);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java
new file mode 100644
index 0000000..3e2968a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginSortRel.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillSortRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.io.IOException;
+
+/**
+ * Sort implementation for Drill plugins.
+ */
+public class PluginSortRel extends DrillSortRelBase implements PluginRel {
+ public PluginSortRel(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traitSet, child, collation, offset, fetch);
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override
+ public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ @Override
+ public PluginSortRel copy(RelTraitSet traitSet, RelNode input,
+ RelCollation newCollation, RexNode offset, RexNode fetch) {
+ return new PluginSortRel(getCluster(), traitSet, input, collation, offset, fetch);
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+
+ @Override
+ public boolean canBeDropped() {
+ return true;
+ }
+
+ @Override
+ public boolean canImplement(PluginImplementor implementor) {
+ return implementor.canImplement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
new file mode 100644
index 0000000..3d61ea5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginUnionRel.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.planner.common.DrillUnionRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Union implementation for Drill plugins.
+ */
+public class PluginUnionRel extends DrillUnionRelBase implements PluginRel {
+
+ public PluginUnionRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+ boolean all, boolean checkCompatibility) throws InvalidRelException {
+ super(cluster, traits, inputs, all, checkCompatibility);
+ }
+
+ @Override
+ public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ try {
+ return new PluginUnionRel(getCluster(), traitSet, inputs, all, true);
+ } catch (InvalidRelException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(.1);
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+
+ @Override
+ public boolean canImplement(PluginImplementor implementor) {
+ return implementor.canImplement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
new file mode 100644
index 0000000..68e4b1f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.io.IOException;
+
+/**
+ * Storage plugin table scan rel implementation.
+ */
+public class StoragePluginTableScan extends DrillScanRelBase implements PluginRel {
+
+ private final RelDataType rowType;
+
+ public StoragePluginTableScan(RelOptCluster cluster, RelTraitSet traits, GroupScan grpScan,
+ RelOptTable table, RelDataType rowType) {
+ super(cluster, traits, grpScan, table);
+ this.rowType = rowType;
+ }
+
+ @Override
+ public void implement(PluginImplementor implementor) throws IOException {
+ implementor.implement(this);
+ }
+
+ @Override
+ public DrillScanRelBase copy(RelTraitSet traitSet, GroupScan scan, RelDataType rowType) {
+ return new StoragePluginTableScan(getCluster(), traitSet, scan, getTable(), rowType);
+ }
+
+ @Override
+ public double estimateRowCount(RelMetadataQuery mq) {
+ return getGroupScan().getScanStats(mq).getRecordCount();
+ }
+
+ @Override
+ public RelDataType deriveRowType() {
+ return this.rowType;
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw).item("groupScan", getGroupScan().getDigest());
+ }
+
+ @Override
+ protected String computeDigest() {
+ return super.computeDigest();
+ }
+
+ @Override
+ public boolean canImplement(PluginImplementor implementor) {
+ return implementor.canImplement(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java
new file mode 100644
index 0000000..2ebf66e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginAggregateRule.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+
+/**
+ * The rule that converts provided aggregate operator to plugin-specific implementation.
+ */
+public class PluginAggregateRule extends PluginConverterRule {
+
+ public PluginAggregateRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+ super(Aggregate.class, in, out, "PluginAggregateRule", pluginImplementor);
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Aggregate agg = (Aggregate) rel;
+ return new PluginAggregateRel(
+ rel.getCluster(),
+ agg.getTraitSet().replace(getOutConvention()),
+ convert(agg.getInput(), agg.getTraitSet().replace(getOutConvention()).simplify()),
+ agg.getGroupSet(),
+ agg.getGroupSets(),
+ agg.getAggCallList());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java
new file mode 100644
index 0000000..377eba3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginConverterRule.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+
+import java.util.function.Predicate;
+
+/**
+ * Abstract base class for a rule that converts provided operator to plugin-specific implementation.
+ */
+public abstract class PluginConverterRule extends ConverterRule {
+
+ private final PluginImplementor pluginImplementor;
+
+ protected PluginConverterRule(Class<? extends RelNode> clazz,
+ RelTrait in, Convention out, String description, PluginImplementor pluginImplementor) {
+ super(clazz, (Predicate<RelNode>) input -> true, in, out, DrillRelFactories.LOGICAL_BUILDER, description);
+ this.pluginImplementor = pluginImplementor;
+ }
+
+ public PluginImplementor getPluginImplementor() {
+ return pluginImplementor;
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ RelNode rel = call.rel(0);
+ boolean canImplement = false;
+ // cannot use visitor pattern here, since RelShuttle supports only logical rel implementations
+ if (rel instanceof Aggregate) {
+ canImplement = pluginImplementor.canImplement(((Aggregate) rel));
+ } else if (rel instanceof Filter) {
+ canImplement = pluginImplementor.canImplement(((Filter) rel));
+ } else if (rel instanceof DrillLimitRelBase) {
+ canImplement = pluginImplementor.canImplement(((DrillLimitRelBase) rel));
+ } else if (rel instanceof Project) {
+ canImplement = pluginImplementor.canImplement(((Project) rel));
+ } else if (rel instanceof Sort) {
+ canImplement = pluginImplementor.canImplement(((Sort) rel));
+ } else if (rel instanceof Union) {
+ canImplement = pluginImplementor.canImplement(((Union) rel));
+ } else if (rel instanceof Join) {
+ canImplement = pluginImplementor.canImplement(((Join) rel));
+ }
+ return canImplement && super.matches(call);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
new file mode 100644
index 0000000..e5e02e7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+
+/**
+ * The rule that converts provided filter operator to plugin-specific implementation.
+ */
+public class PluginFilterRule extends PluginConverterRule {
+
+ public PluginFilterRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+ super(Filter.class, in, out, "PluginFilterRule", pluginImplementor);
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Filter filter = (Filter) rel;
+ return new PluginFilterRel(
+ getOutConvention(),
+ rel.getCluster(),
+ filter.getTraitSet().replace(getOutConvention()),
+ convert(filter.getInput(), filter.getTraitSet().replace(getOutConvention())),
+ filter.getCondition());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
new file mode 100644
index 0000000..279241e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginIntermediatePrel;
+
+import java.util.function.Supplier;
+
+public class PluginIntermediatePrelConverterRule extends RelOptRule {
+ private final Supplier<PluginImplementor> implementorFactory;
+ private final RelTrait inTrait;
+ private final RelTrait outTrait;
+
+ public PluginIntermediatePrelConverterRule(Convention convention,
+ Supplier<PluginImplementor> implementorFactory) {
+ super(
+ RelOptHelper.some(VertexDrel.class, DrillRel.DRILL_LOGICAL,
+ RelOptHelper.any(RelNode.class, convention)),
+ DrillRelFactories.LOGICAL_BUILDER, "EnumerableIntermediatePrelConverterRule" + convention);
+ this.implementorFactory = implementorFactory;
+ this.inTrait = DrillRel.DRILL_LOGICAL;
+ this.outTrait = Prel.DRILL_PHYSICAL;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ VertexDrel in = call.rel(0);
+ RelNode intermediatePrel = new PluginIntermediatePrel(
+ in.getCluster(),
+ in.getTraitSet().replace(outTrait),
+ in.getInput(0),
+ implementorFactory);
+ call.transformTo(intermediatePrel);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ return super.matches(call) && call.rel(0).getTraitSet().contains(inTrait);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
new file mode 100644
index 0000000..a71060b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
+
+/**
+ * The rule that converts provided join operator to plugin-specific implementation.
+ */
+public class PluginJoinRule extends PluginConverterRule {
+
+ public PluginJoinRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+ super(Join.class, in, out, "PluginProjectRule", pluginImplementor);
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Join join = (Join) rel;
+ RelTraitSet traits = join.getTraitSet().replace(getOutConvention());
+ return new PluginJoinRel(
+ join.getCluster(),
+ traits,
+ convert(join.getLeft(), traits),
+ convert(join.getRight(), traits),
+ join.getCondition(),
+ join.getJoinType());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java
new file mode 100644
index 0000000..96a9fcc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginLimitRule.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+
+public class PluginLimitRule extends PluginConverterRule {
+
+ public PluginLimitRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+ super(DrillLimitRelBase.class, in, out, "PluginLimitRule", pluginImplementor);
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ DrillLimitRelBase sort = (DrillLimitRelBase) rel;
+ RelNode input = convert(sort.getInput(), sort.getInput().getTraitSet().replace(getOutConvention()).simplify());
+ return new PluginLimitRel(
+ rel.getCluster(),
+ sort.getTraitSet().replace(getOutConvention()),
+ input,
+ sort.getOffset(),
+ sort.getFetch());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java
new file mode 100644
index 0000000..6968cc6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginProjectRule.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+
+/**
+ * The rule that converts provided project operator to plugin-specific implementation.
+ */
+public class PluginProjectRule extends PluginConverterRule {
+
+ public PluginProjectRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+ super(Project.class, in, out, "PluginProjectRule", pluginImplementor);
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Project project = (Project) rel;
+ return new PluginProjectRel(
+ getOutConvention(),
+ project.getCluster(),
+ project.getTraitSet().replace(getOutConvention()),
+ convert(project.getInput(), project.getTraitSet().replace(getOutConvention())),
+ project.getProjects(),
+ project.getRowType());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java
new file mode 100644
index 0000000..c9d9823
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginSortRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+
+/**
+ * The rule that converts provided sort operator to plugin-specific implementation.
+ */
+public class PluginSortRule extends PluginConverterRule {
+
+ public PluginSortRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+ super(Sort.class, in, out, "PluginSortRule", pluginImplementor);
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Sort sort = (Sort) rel;
+ RelNode input = convert(sort.getInput(), sort.getInput().getTraitSet().replace(getOutConvention()).simplify());
+ return new PluginSortRel(
+ rel.getCluster(),
+ sort.getTraitSet().replace(getOutConvention()).replace(sort.getCollation()),
+ input,
+ sort.getCollation(),
+ sort.offset,
+ sort.fetch);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java
new file mode 100644
index 0000000..fa1722a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginUnionRule.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plan.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The rule that converts provided union operator to plugin-specific implementation.
+ */
+public class PluginUnionRule extends PluginConverterRule {
+ private static final Logger logger = LoggerFactory.getLogger(PluginUnionRule.class);
+
+ public PluginUnionRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
+ super(Union.class, in, out, "PluginUnionRule", pluginImplementor);
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Union union = (Union) rel;
+ try {
+ return new PluginUnionRel(
+ rel.getCluster(),
+ union.getTraitSet().replace(getOutConvention()),
+ convertList(union.getInputs(), getOutConvention()),
+ union.all,
+ true);
+ } catch (InvalidRelException e) {
+ logger.warn(e.getMessage());
+ return null;
+ }
+ }
+}