You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/09/06 16:15:27 UTC

[GitHub] [drill] vvysotskyi commented on a change in pull request #2289: DRILL-7985: Support Mongo aggregate, union, project, limit, sort pushdowns

vvysotskyi commented on a change in pull request #2289:
URL: https://github.com/apache/drill/pull/2289#discussion_r702400742



##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
##########
@@ -458,15 +448,21 @@ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
         endpointFragmentMapping.toString());
   }
 
-  private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
-    return new MongoSubScanSpec()
-        .setDbName(scanSpec.getDbName())
-        .setCollectionName(scanSpec.getCollectionName())
-        .setHosts(chunkInfo.getChunkLocList())
+  private BaseMongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
+    if (useAggregate) {
+      return new MongoSubScan.MongoSubScanSpec()
+          .setOperations(scanSpec.getOperations())
+          .setDbName(scanSpec.getDbName())
+          .setCollectionName(scanSpec.getCollectionName())
+          .setHosts(chunkInfo.getChunkLocList());
+    }
+    return new ShardedMongoSubScanSpec()
         .setMinFilters(chunkInfo.getMinFilters())
         .setMaxFilters(chunkInfo.getMaxFilters())
-        .setMaxRecords(maxRecords)

Review comment:
       If the query has a limit, mongo aggregate pipeline will be used (previous if statement), so the limit will be present in the operations list.

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
##########
@@ -24,6 +24,7 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.mongodb.ConnectionString;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;

Review comment:
       Done

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
##########
@@ -24,6 +24,7 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.mongodb.ConnectionString;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;

Review comment:
       These changes were made before adding Lombok to Drill 🙂
   

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
##########
@@ -75,4 +84,67 @@ public String getConnection() {
   private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) {
     return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
   }
+
+  public static class MongoPluginOptimizations {

Review comment:
       This class is used as a field in `MongoStoragePluginConfig`, so I've added it here.

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import com.mongodb.client.model.Accumulators;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.BsonField;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonElement;
+import org.bson.BsonInt32;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class MongoAggregateUtils {
+
+  public static List<String> mongoFieldNames(RelDataType rowType) {
+    List<String> renamed = rowType.getFieldNames().stream()
+        .map(name -> name.startsWith("$") ? "_" + name.substring(2) : name)
+        .collect(Collectors.toList());
+    return SqlValidatorUtil.uniquify(renamed, true);
+  }
+
+  public static String maybeQuote(String s) {
+    if (!needsQuote(s)) {
+      return s;
+    }
+    return quote(s);
+  }
+
+  public static String quote(String s) {
+    return "'" + s + "'";
+  }
+
+  private static boolean needsQuote(String s) {

Review comment:
       It is used only in this class, so it was made private.

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
##########
@@ -176,13 +185,15 @@ public int next() {
       logger.debug("Filters Applied : " + filters);
       logger.debug("Fields Selected :" + fields);
 
-      // Add limit to Mongo query
-      if (maxRecords > 0) {
-        logger.debug("Limit applied: {}", maxRecords);
-        cursor = collection.find(filters).projection(fields).limit(maxRecords).batchSize(100).iterator();
+      MongoIterable<BsonDocument> projection;
+      if (CollectionUtils.isNotEmpty(operations)) {
+        List<Bson> operations = new ArrayList<>(this.operations);
+        operations.add(Aggregates.project(fields));
+        projection = collection.aggregate(operations);
       } else {
-        cursor = collection.find(filters).projection(fields).batchSize(100).iterator();
+        projection = collection.find(filters).projection(fields);
       }
+      cursor = projection.batchSize(100).iterator();

Review comment:
       I haven't changed this value, it was set initially to 100. Made it configurable plugin option.

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo.plan;
+
+import com.mongodb.client.model.Aggregates;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.mongo.MongoAggregateUtils;
+import org.apache.drill.exec.store.mongo.MongoFilterBuilder;
+import org.apache.drill.exec.store.mongo.MongoGroupScan;
+import org.apache.drill.exec.store.mongo.MongoScanSpec;
+import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+import org.apache.drill.exec.util.Utilities;
+import org.bson.BsonDocument;
+import org.bson.BsonElement;
+import org.bson.BsonInt32;
+import org.bson.BsonString;
+import org.bson.BsonValue;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link PluginImplementor} for Mongo.
+ * This class tries to convert operators to use {@link com.mongodb.client.MongoCollection#find}
+ * if only simple project and filter expressions are present,
+ * otherwise {@link com.mongodb.client.MongoCollection#aggregate} is used to obtain data from Mongo.
+ */
+public class MongoPluginImplementor extends AbstractPluginImplementor {
+
+  private MongoGroupScan groupScan;
+
+  private List<Bson> operations;
+
+  private Document filters;
+
+  private List<SchemaPath> columns;
+
+  private boolean runAggregate;
+
+  @Override
+  public void implement(PluginAggregateRel aggregate) throws IOException {
+    runAggregate = true;
+    visitChild(aggregate.getInput());
+
+    operations.addAll(
+      MongoAggregateUtils.getAggregateOperations(aggregate, aggregate.getInput().getRowType()));
+    List<String> outNames = MongoAggregateUtils.mongoFieldNames(aggregate.getRowType());
+    columns = outNames.stream()
+      .map(SchemaPath::getSimplePath)
+      .collect(Collectors.toList());
+  }
+
+  @Override
+  public void implement(PluginFilterRel filter) throws IOException {
+    visitChild(filter.getInput());
+
+    LogicalExpression conditionExp = DrillOptiq.toDrill(
+      new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+      filter.getInput(),
+      filter.getCondition());
+    MongoFilterBuilder mongoFilterBuilder = new MongoFilterBuilder(conditionExp);
+    if (runAggregate) {
+      Bson convertedFilterExpression = Aggregates.match(mongoFilterBuilder.parseTree()).toBsonDocument();
+      operations.add(convertedFilterExpression);
+    } else {
+      filters = mongoFilterBuilder.parseTree();
+    }
+  }
+
+  @Override
+  public void implement(PluginLimitRel limit) throws IOException {
+    runAggregate = true;
+    visitChild(limit.getInput());
+

Review comment:
       Yes, we do apply the limit. `runAggregate` here means to use mongo aggregate pipeline, but it doesn't mean that any of the operations should be aggregate operations.

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
##########
@@ -458,15 +448,21 @@ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
         endpointFragmentMapping.toString());
   }
 
-  private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
-    return new MongoSubScanSpec()
-        .setDbName(scanSpec.getDbName())
-        .setCollectionName(scanSpec.getCollectionName())
-        .setHosts(chunkInfo.getChunkLocList())
+  private BaseMongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
+    if (useAggregate) {
+      return new MongoSubScan.MongoSubScanSpec()
+          .setOperations(scanSpec.getOperations())
+          .setDbName(scanSpec.getDbName())

Review comment:
       `useAggregate` in this class means use mongo aggregate pipeline. For this case, all operations including filters will be present in the `operations` list. So filter will be pushed down.

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
##########
@@ -87,15 +92,19 @@ public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec, List<SchemaP
     fragmentContext = context;
     this.plugin = plugin;
     filters = new Document();
-    Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
-        subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
+    if (subScanSpec instanceof MongoSubScan.MongoSubScanSpec) {
+      operations = ((MongoSubScan.MongoSubScanSpec) subScanSpec).getOperations();
+    } else {
+      MongoSubScan.ShardedMongoSubScanSpec shardedMongoSubScanSpec = (MongoSubScan.ShardedMongoSubScanSpec) subScanSpec;
+      Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
+          shardedMongoSubScanSpec.getMinFilters(), shardedMongoSubScanSpec.getMaxFilters());
 
-    buildFilters(subScanSpec.getFilter(), mergedFilters);
+      buildFilters(shardedMongoSubScanSpec.getFilter(), mergedFilters);
+    }
     enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
     enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val;
     readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
     isBsonRecordReader = fragmentContext.getOptions().getOption(ExecConstants.MONGO_BSON_RECORD_READER).bool_val;
-    maxRecords = subScanSpec.getMaxRecords();

Review comment:
       Yes, the limit is present in the operations list.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org