You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/12/19 08:43:44 UTC

[drill] 07/10: DRILL-8190: Fix mongo project pushdown for queries with joins (#2652)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch 1.20
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 12e47c8dcba8414b3b8e995e9757f877a390c6f0
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Mon Sep 26 14:27:27 2022 +0800

    DRILL-8190: Fix mongo project pushdown for queries with joins (#2652)
---
 .../jdbc/JdbcIntermediatePrelConverterRule.java    |  3 +-
 .../store/mongo/plan/MongoPluginImplementor.java   |  5 +--
 .../exec/store/mongo/TestMongoProjectPushDown.java | 22 ++++++++++
 .../PhoenixIntermediatePrelConverterRule.java      |  3 +-
 .../exec/planner/cost/DrillRelMdRowCount.java      | 12 +++++-
 .../enumerable/ColumnConverterFactoryProvider.java |  2 +
 .../enumerable/DynamicTypeResolverBuilder.java     | 47 ++++++++++++++++++++++
 .../store/enumerable/EnumerableBatchCreator.java   |  2 +-
 .../exec/store/enumerable/EnumerableSubScan.java   |  2 +-
 .../EnumerableIntermediatePrelConverterRule.java   |  3 +-
 .../exec/store/enumerable/plan/VertexDrel.java     | 18 +++++++++
 .../exec/store/plan/rel/PluginAggregateRel.java    |  2 +-
 .../store/plan/rel/StoragePluginTableScan.java     | 20 +++++++++
 .../rule/PluginIntermediatePrelConverterRule.java  |  3 +-
 .../java/org/apache/drill/exec/util/Utilities.java | 30 +++++++-------
 15 files changed, 147 insertions(+), 27 deletions(-)

diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
index a66888f2df..81cf232342 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 
@@ -48,7 +49,7 @@ class JdbcIntermediatePrelConverterRule extends RelOptRule {
     VertexDrel in = call.rel(0);
     RelNode jdbcIntermediatePrel = new JdbcIntermediatePrel(
         in.getCluster(),
-        in.getTraitSet().replace(outTrait),
+        in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
         in.getInput(0));
     call.transformTo(jdbcIntermediatePrel);
   }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
index b55fa6b173..774a23265b 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
@@ -55,7 +55,6 @@ import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
 import org.apache.drill.exec.store.plan.rel.PluginSortRel;
 import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
 import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
-import org.apache.drill.exec.util.Utilities;
 import org.bson.BsonDocument;
 import org.bson.BsonElement;
 import org.bson.BsonInt32;
@@ -220,8 +219,8 @@ public class MongoPluginImplementor extends AbstractPluginImplementor {
   }
 
   @Override
-  public void implement(StoragePluginTableScan scan) throws IOException {
-    groupScan = (MongoGroupScan) Utilities.getDrillTable(scan.getTable()).getGroupScan();
+  public void implement(StoragePluginTableScan scan) {
+    groupScan = (MongoGroupScan) scan.getGroupScan();
     operations = this.groupScan.getScanSpec().getOperations().stream()
       .map(BsonDocument::parse)
       .collect(Collectors.toList());
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
index a691443311..372ec6d5e4 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
@@ -101,4 +101,26 @@ public class TestMongoProjectPushDown extends MongoTestBase {
         .go();
   }
 
+  @Test // DRILL-8190
+  public void testProjectWithJoin() throws Exception {
+    String query = "SELECT sum(s1.sales) s1_sales,\n" +
+      "sum(s2.sales) s2_sales\n" +
+      "FROM mongo.%s.`%s` s1\n" +
+      "JOIN mongo.%s.`%s` s2 ON s1._id = s2._id";
+
+    queryBuilder()
+      .sql(query, DONUTS_DB, DONUTS_COLLECTION, DONUTS_DB, DONUTS_COLLECTION)
+      .planMatcher()
+      .include("columns=\\[`_id`, `sales`]")
+      .exclude("columns=\\[`\\*\\*`")
+      .match();
+
+    testBuilder()
+      .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION, DONUTS_DB, DONUTS_COLLECTION)
+      .unOrdered()
+      .baselineColumns("s1_sales", "s2_sales")
+      .baselineValues(1194L, 1194L)
+      .go();
+  }
+
 }
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixIntermediatePrelConverterRule.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixIntermediatePrelConverterRule.java
index c5eaaf1dda..7d6a88d0cd 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixIntermediatePrelConverterRule.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixIntermediatePrelConverterRule.java
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 
@@ -48,7 +49,7 @@ final class PhoenixIntermediatePrelConverterRule extends RelOptRule {
     VertexDrel in = call.rel(0);
     RelNode intermediatePrel = new PhoenixIntermediatePrel(
       in.getCluster(),
-      in.getTraitSet().replace(outTrait),
+      in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
       in.getInput(0));
     call.transformTo(intermediatePrel);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java
index 0bfb70ae90..abcc83672a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdRowCount;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.schema.Table;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.exec.planner.common.DrillLimitRelBase;
@@ -45,6 +46,8 @@ public class DrillRelMdRowCount extends RelMdRowCount{
 
   public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(BuiltInMethod.ROW_COUNT.method, INSTANCE);
 
+  private static final Double DEFAULT_SCAN_ROW_COUNT = 1e9;
+
   @Override
   public Double getRowCount(Aggregate rel, RelMetadataQuery mq) {
     ImmutableBitSet groupKey = ImmutableBitSet.range(rel.getGroupCount());
@@ -96,7 +99,14 @@ public class DrillRelMdRowCount extends RelMdRowCount{
     PlannerSettings settings = PrelUtil.getSettings(rel.getCluster());
     // If guessing, return selectivity from RelMDRowCount
     if (DrillRelOptUtil.guessRows(rel)) {
-      return super.getRowCount(rel, mq);
+      if (rel instanceof DrillScanRelBase
+        || rel.getTable().unwrap(Table.class).getStatistic().getRowCount() != null) {
+        return super.getRowCount(rel, mq);
+      } else {
+        // if table doesn't have row count statistics, return large row count
+        // to make sure that limit will be pushed down
+        return DEFAULT_SCAN_ROW_COUNT;
+      }
     }
     // Return rowcount from statistics, if available. Otherwise, delegate to parent.
     try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java
index 28aec71d0f..b91e4cbb13 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java
@@ -18,10 +18,12 @@
 package org.apache.drill.exec.store.enumerable;
 
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonTypeResolver;
 import org.apache.drill.exec.record.ColumnConverterFactory;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
+@JsonTypeResolver(DynamicTypeResolverBuilder.class)
 public interface ColumnConverterFactoryProvider {
 
   ColumnConverterFactory getFactory(TupleMetadata schema);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DynamicTypeResolverBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DynamicTypeResolverBuilder.java
new file mode 100644
index 0000000000..64fa9864cf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DynamicTypeResolverBuilder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.enumerable;
+
+import com.fasterxml.jackson.databind.DeserializationConfig;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.jsontype.TypeDeserializer;
+import com.fasterxml.jackson.databind.jsontype.impl.StdTypeResolverBuilder;
+import org.reflections.Reflections;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DynamicTypeResolverBuilder extends StdTypeResolverBuilder {
+
+  @Override
+  public TypeDeserializer buildTypeDeserializer(DeserializationConfig config,
+    JavaType baseType, Collection<NamedType> subtypes) {
+
+    Reflections reflections = new Reflections("org.apache.drill.exec.store");
+    @SuppressWarnings("unchecked")
+    Class<Object> rawClass = (Class<Object>) baseType.getRawClass();
+    List<NamedType> dynamicSubtypes = reflections.getSubTypesOf(rawClass).stream()
+      .map(NamedType::new)
+      .collect(Collectors.toList());
+    dynamicSubtypes.addAll(subtypes);
+
+    return super.buildTypeDeserializer(config, baseType, dynamicSubtypes);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java
index 2dec45a61a..9930484a2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java
@@ -61,7 +61,7 @@ public class EnumerableBatchCreator implements BatchCreator<EnumerableSubScan> {
     builder.providedSchema(subScan.getSchema());
 
     ManagedReader<SchemaNegotiator> reader = new EnumerableRecordReader(subScan.getColumns(),
-        subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath(), subScan.factoryProvider());
+        subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath(), subScan.getConverterFactoryProvider());
     ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(Collections.singletonList(reader).iterator());
     builder.setReaderFactory(readerFactory);
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java
index 4476be8c53..0c7245b6cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java
@@ -79,7 +79,7 @@ public class EnumerableSubScan extends AbstractSubScan {
     return schemaPath;
   }
 
-  public ColumnConverterFactoryProvider factoryProvider() {
+  public ColumnConverterFactoryProvider getConverterFactoryProvider() {
     return converterFactoryProvider;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerableIntermediatePrelConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerableIntermediatePrelConverterRule.java
index 7272a36bf6..c5ede3a22e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerableIntermediatePrelConverterRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerableIntermediatePrelConverterRule.java
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
 
 public class EnumerableIntermediatePrelConverterRule extends RelOptRule {
@@ -48,7 +49,7 @@ public class EnumerableIntermediatePrelConverterRule extends RelOptRule {
     VertexDrel in = call.rel(0);
     RelNode intermediatePrel = new EnumerableIntermediatePrel(
         in.getCluster(),
-        in.getTraitSet().replace(outTrait),
+        in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
         in.getInput(0),
         context);
     call.transformTo(intermediatePrel);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java
index edbc5912fb..d202ebdb89 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java
@@ -18,15 +18,22 @@
 package org.apache.drill.exec.store.enumerable.plan;
 
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.exec.planner.logical.DrillImplementor;
 import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.util.Utilities;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.List;
 
+import static org.apache.drill.exec.planner.logical.DrillScanRel.STAR_COLUMN_COST;
+
 /**
  * The vertex simply holds the child nodes but contains its own traits.
  * Used for completing Drill logical planning when child nodes have some specific traits.
@@ -51,4 +58,15 @@ public class VertexDrel extends SingleRel implements DrillRel {
   public LogicalOperator implement(DrillImplementor implementor) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    double rowCount = estimateRowCount(mq);
+    double columnCount = Utilities.isStarQuery(getRowType()) ? STAR_COLUMN_COST : getRowType().getFieldCount();
+    double valueCount = rowCount * columnCount;
+    // columns count is considered during cost calculation to make preferable plans
+    // with pushed plugin project operators since in the opposite case planner wouldn't consider
+    // a plan with additional plugin projection that reduces columns as better than a plan without it
+    return planner.getCostFactory().makeCost(rowCount, valueCount, 0).multiplyBy(0.1);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
index 02885e90ab..3c7f115843 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
@@ -51,7 +51,7 @@ public class PluginAggregateRel extends DrillAggregateRelBase implements PluginR
 
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+    return super.computeLogicalAggCost(planner, mq).multiplyBy(0.1);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
index ce8ce9ec98..3e7d07c5ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.store.plan.rel;
 
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelWriter;
@@ -27,11 +29,14 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.common.DrillScanRelBase;
 import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.util.Utilities;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.drill.exec.planner.logical.DrillScanRel.STAR_COLUMN_COST;
+
 /**
  * Storage plugin table scan rel implementation.
  */
@@ -80,6 +85,21 @@ public class StoragePluginTableScan extends DrillScanRelBase implements PluginRe
     return implementor.canImplement(this);
   }
 
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    List<SchemaPath> columns = groupScan.getColumns();
+    // column count should be adjusted to consider the case of projecting nested columns,
+    // such a scan should be preferable compared to the scan where root columns are projected only
+    double columnCount = Utilities.isStarQuery(columns)
+      ? STAR_COLUMN_COST
+      : Math.pow(getRowType().getFieldCount(), 2) / Math.max(columns.size(), 1);
+
+    double rowCount = estimateRowCount(mq);
+    double valueCount = rowCount * columnCount;
+
+    return planner.getCostFactory().makeCost(rowCount, valueCount, 0).multiplyBy(0.1);
+  }
+
   private static List<SchemaPath> getColumns(RelDataType rowType) {
     return rowType.getFieldList().stream()
       .map(filed -> filed.isDynamicStar()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
index 279241efea..a13dc25e32 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 import org.apache.drill.exec.store.plan.PluginImplementor;
@@ -53,7 +54,7 @@ public class PluginIntermediatePrelConverterRule extends RelOptRule {
     VertexDrel in = call.rel(0);
     RelNode intermediatePrel = new PluginIntermediatePrel(
         in.getCluster(),
-        in.getTraitSet().replace(outTrait),
+        in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
         in.getInput(0),
         implementorFactory);
     call.transformTo(intermediatePrel);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
index 87f2201804..1d160d07ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
@@ -33,8 +34,6 @@ import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.base.Predicate;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
 
 public class Utilities {
 
@@ -52,17 +51,13 @@ public class Utilities {
     int majorFragmentId = handle.getMajorFragmentId();
     int minorFragmentId = handle.getMinorFragmentId();
 
-    String fileName = String.format("%s//%s_%s_%s_%s", location, qid, majorFragmentId, minorFragmentId, tag);
-
-    return fileName;
+    return String.format("%s//%s_%s_%s_%s", location, qid, majorFragmentId, minorFragmentId, tag);
   }
 
   /**
    * Create {@link org.apache.drill.exec.proto.BitControl.QueryContextInformation} with given <i>defaultSchemaName</i>. Rest of the members of the
    * QueryContextInformation is derived from the current state of the process.
    *
-   * @param defaultSchemaName
-   * @param sessionId
    * @return A {@link org.apache.drill.exec.proto.BitControl.QueryContextInformation} with given <i>defaultSchemaName</i>.
    */
   public static QueryContextInformation createQueryContextInfo(final String defaultSchemaName,
@@ -82,22 +77,25 @@ public class Utilities {
    * @return The Drill version.
    */
   public static String getDrillVersion() {
-      String v = Utilities.class.getPackage().getImplementationVersion();
-      return v;
+    return Utilities.class.getPackage().getImplementationVersion();
   }
 
   /**
    * Return true if list of schema path has star column.
-   * @param projected
+   *
    * @return True if the list of {@link org.apache.drill.common.expression.SchemaPath}s has star column.
    */
   public static boolean isStarQuery(Collection<SchemaPath> projected) {
-    return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() {
-      @Override
-      public boolean apply(SchemaPath path) {
-        return Preconditions.checkNotNull(path).equals(SchemaPath.STAR_COLUMN);
-      }
-    }).isPresent();
+    return Preconditions.checkNotNull(projected, COL_NULL_ERROR).stream()
+      .anyMatch(SchemaPath::isDynamicStar);
+  }
+
+  /**
+   * Return true if the row type has star column.
+   */
+  public static boolean isStarQuery(RelDataType projected) {
+    return projected.getFieldNames().stream()
+      .anyMatch(SchemaPath.DYNAMIC_STAR::equals);
   }
 
   /**