You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@drill.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/09/05 03:45:00 UTC

[jira] [Commented] (DRILL-7985) Support Mongo aggregate, union, project, limit, sort pushdowns

    [ https://issues.apache.org/jira/browse/DRILL-7985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17410094#comment-17410094 ] 

ASF GitHub Bot commented on DRILL-7985:
---------------------------------------

cgivre commented on a change in pull request #2289:
URL: https://github.com/apache/drill/pull/2289#discussion_r699784347



##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
##########
@@ -458,15 +448,21 @@ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
         endpointFragmentMapping.toString());
   }
 
-  private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
-    return new MongoSubScanSpec()
-        .setDbName(scanSpec.getDbName())
-        .setCollectionName(scanSpec.getCollectionName())
-        .setHosts(chunkInfo.getChunkLocList())
+  private BaseMongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
+    if (useAggregate) {
+      return new MongoSubScan.MongoSubScanSpec()
+          .setOperations(scanSpec.getOperations())
+          .setDbName(scanSpec.getDbName())
+          .setCollectionName(scanSpec.getCollectionName())
+          .setHosts(chunkInfo.getChunkLocList());
+    }
+    return new ShardedMongoSubScanSpec()
         .setMinFilters(chunkInfo.getMinFilters())
         .setMaxFilters(chunkInfo.getMaxFilters())
-        .setMaxRecords(maxRecords)

Review comment:
       Also, can you please explain where the limit pushdown is happening?

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
##########
@@ -75,4 +84,67 @@ public String getConnection() {
   private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) {
     return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
   }
+
+  public static class MongoPluginOptimizations {

Review comment:
       Out of curiosity, why did you decide to put this code here?  I would think this should be in its own class or in the `MongoStoragePlugin` class.  I'm not asking you to move it, just curious.

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
##########
@@ -87,15 +92,19 @@ public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec, List<SchemaP
     fragmentContext = context;
     this.plugin = plugin;
     filters = new Document();
-    Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
-        subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
+    if (subScanSpec instanceof MongoSubScan.MongoSubScanSpec) {
+      operations = ((MongoSubScan.MongoSubScanSpec) subScanSpec).getOperations();
+    } else {
+      MongoSubScan.ShardedMongoSubScanSpec shardedMongoSubScanSpec = (MongoSubScan.ShardedMongoSubScanSpec) subScanSpec;
+      Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
+          shardedMongoSubScanSpec.getMinFilters(), shardedMongoSubScanSpec.getMaxFilters());
 
-    buildFilters(subScanSpec.getFilter(), mergedFilters);
+      buildFilters(shardedMongoSubScanSpec.getFilter(), mergedFilters);
+    }
     enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
     enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val;
     readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
     isBsonRecordReader = fragmentContext.getOptions().getOption(ExecConstants.MONGO_BSON_RECORD_READER).bool_val;
-    maxRecords = subScanSpec.getMaxRecords();

Review comment:
       Are we applying the limit somewhere else?

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoAggregateUtils.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo;
+
+import com.mongodb.client.model.Accumulators;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.BsonField;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.drill.exec.store.mongo.common.MongoOp;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonElement;
+import org.bson.BsonInt32;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class MongoAggregateUtils {
+
+  public static List<String> mongoFieldNames(RelDataType rowType) {
+    List<String> renamed = rowType.getFieldNames().stream()
+        .map(name -> name.startsWith("$") ? "_" + name.substring(2) : name)
+        .collect(Collectors.toList());
+    return SqlValidatorUtil.uniquify(renamed, true);
+  }
+
+  public static String maybeQuote(String s) {
+    if (!needsQuote(s)) {
+      return s;
+    }
+    return quote(s);
+  }
+
+  public static String quote(String s) {
+    return "'" + s + "'";
+  }
+
+  private static boolean needsQuote(String s) {

Review comment:
       Why is this function private when all others are public?

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
##########
@@ -24,6 +24,7 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.mongodb.ConnectionString;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;

Review comment:
       Would you want to use lombok for this config file?

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
##########
@@ -176,13 +185,15 @@ public int next() {
       logger.debug("Filters Applied : " + filters);
       logger.debug("Fields Selected :" + fields);
 
-      // Add limit to Mongo query
-      if (maxRecords > 0) {
-        logger.debug("Limit applied: {}", maxRecords);
-        cursor = collection.find(filters).projection(fields).limit(maxRecords).batchSize(100).iterator();
+      MongoIterable<BsonDocument> projection;
+      if (CollectionUtils.isNotEmpty(operations)) {
+        List<Bson> operations = new ArrayList<>(this.operations);
+        operations.add(Aggregates.project(fields));
+        projection = collection.aggregate(operations);
       } else {
-        cursor = collection.find(filters).projection(fields).batchSize(100).iterator();
+        projection = collection.find(filters).projection(fields);
       }
+      cursor = projection.batchSize(100).iterator();

Review comment:
       Is there a reason the `batchSize` is `100`?  Could we set this as a constant?  Or alternatively, a configurable option?

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
##########
@@ -458,15 +448,21 @@ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
         endpointFragmentMapping.toString());
   }
 
-  private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
-    return new MongoSubScanSpec()
-        .setDbName(scanSpec.getDbName())
-        .setCollectionName(scanSpec.getCollectionName())
-        .setHosts(chunkInfo.getChunkLocList())
+  private BaseMongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
+    if (useAggregate) {
+      return new MongoSubScan.MongoSubScanSpec()
+          .setOperations(scanSpec.getOperations())
+          .setDbName(scanSpec.getDbName())

Review comment:
       If there's an aggregate query, are we not pushing down filters?

##########
File path: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo.plan;
+
+import com.mongodb.client.model.Aggregates;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.mongo.MongoAggregateUtils;
+import org.apache.drill.exec.store.mongo.MongoFilterBuilder;
+import org.apache.drill.exec.store.mongo.MongoGroupScan;
+import org.apache.drill.exec.store.mongo.MongoScanSpec;
+import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
+import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+import org.apache.drill.exec.util.Utilities;
+import org.bson.BsonDocument;
+import org.bson.BsonElement;
+import org.bson.BsonInt32;
+import org.bson.BsonString;
+import org.bson.BsonValue;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link PluginImplementor} for Mongo.
+ * This class tries to convert operators to use {@link com.mongodb.client.MongoCollection#find}
+ * if only simple project and filter expressions are present,
+ * otherwise {@link com.mongodb.client.MongoCollection#aggregate} is used to obtain data from Mongo.
+ */
+public class MongoPluginImplementor extends AbstractPluginImplementor {
+
+  private MongoGroupScan groupScan;
+
+  private List<Bson> operations;
+
+  private Document filters;
+
+  private List<SchemaPath> columns;
+
+  private boolean runAggregate;
+
+  @Override
+  public void implement(PluginAggregateRel aggregate) throws IOException {
+    runAggregate = true;
+    visitChild(aggregate.getInput());
+
+    operations.addAll(
+      MongoAggregateUtils.getAggregateOperations(aggregate, aggregate.getInput().getRowType()));
+    List<String> outNames = MongoAggregateUtils.mongoFieldNames(aggregate.getRowType());
+    columns = outNames.stream()
+      .map(SchemaPath::getSimplePath)
+      .collect(Collectors.toList());
+  }
+
+  @Override
+  public void implement(PluginFilterRel filter) throws IOException {
+    visitChild(filter.getInput());
+
+    LogicalExpression conditionExp = DrillOptiq.toDrill(
+      new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+      filter.getInput(),
+      filter.getCondition());
+    MongoFilterBuilder mongoFilterBuilder = new MongoFilterBuilder(conditionExp);
+    if (runAggregate) {
+      Bson convertedFilterExpression = Aggregates.match(mongoFilterBuilder.parseTree()).toBsonDocument();
+      operations.add(convertedFilterExpression);
+    } else {
+      filters = mongoFilterBuilder.parseTree();
+    }
+  }
+
+  @Override
+  public void implement(PluginLimitRel limit) throws IOException {
+    runAggregate = true;
+    visitChild(limit.getInput());
+

Review comment:
       I'm a little confused here.  What happens if the operation isn't an aggregate?  Do we still apply the limit somewhere?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Support Mongo aggregate, union, project, limit, sort pushdowns
> --------------------------------------------------------------
>
>                 Key: DRILL-7985
>                 URL: https://issues.apache.org/jira/browse/DRILL-7985
>             Project: Apache Drill
>          Issue Type: New Feature
>    Affects Versions: 1.20.0
>            Reporter: Vova Vysotskyi
>            Assignee: Vova Vysotskyi
>            Priority: Major
>             Fix For: 1.20.0
>
>
> Add support for more mongo pushdowns:
> - aggregate
> - project (including expressions)
> - limit (including offset)
> - union / union all
> - sort



--
This message was sent by Atlassian Jira
(v8.3.4#803005)