You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/06/27 13:35:16 UTC

drill git commit: DRILL-5538: Create TopProject with validatedNodeType after PHYSICAL phase

Repository: drill
Updated Branches:
  refs/heads/master 9cf6faa7a -> 6446e56f2


DRILL-5538: Create TopProject with validatedNodeType after PHYSICAL phase

closes #844


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6446e56f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6446e56f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6446e56f

Branch: refs/heads/master
Commit: 6446e56f292a5905d646462c618c056839ad5198
Parents: 9cf6faa
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Thu Jun 15 16:01:54 2017 +0300
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Tue Jun 27 13:03:56 2017 +0300

----------------------------------------------------------------------
 .../physical/visitor/TopProjectVisitor.java     | 142 +++++++++++++++++++
 .../sql/handlers/CreateTableHandler.java        |  14 +-
 .../planner/sql/handlers/DefaultSqlHandler.java |  71 ++++++----
 .../planner/sql/handlers/ExplainHandler.java    |   6 +-
 .../java/org/apache/drill/TestUnionAll.java     |   1 -
 5 files changed, 195 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
new file mode 100644
index 0000000..587b006
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical.visitor;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.apache.drill.exec.planner.physical.WriterPrel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Adds non-trivial top project to ensure the final output field names are preserved.
+ * Such non-trivial project is needed due to Calcite's behavior of ProjectRemoveRule.
+ * It will be added under Screen/Writer operator in the physical plan
+ * if there is no other Projects under these operators,
+ * in cases like * column expansion or partition by column processing.
+ */
+public class TopProjectVisitor extends BasePrelVisitor<Prel, Void, RuntimeException> {
+
+  private final RelDataType validatedRowType;
+
+  public TopProjectVisitor(RelDataType validatedRowType) {
+    this.validatedRowType = validatedRowType;
+  }
+
+  /**
+   * Traverses passed physical relational node and its children and checks if top project
+   * should be added under screen or writer to preserve final output fields names.
+   *
+   * @param prel physical relational node
+   * @param validatedRowType final output row type
+   * @return physical relational node with added project if necessary
+   */
+  public static Prel insertTopProject(Prel prel, RelDataType validatedRowType){
+    return prel.accept(new TopProjectVisitor(validatedRowType), null);
+  }
+
+  @Override
+  public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
+    List<RelNode> children = new ArrayList<>();
+    for (Prel child : prel){
+      child = child.accept(this, null);
+      children.add(child);
+    }
+
+    return (Prel) prel.copy(prel.getTraitSet(), children);
+  }
+
+  @Override
+  public Prel visitScreen(ScreenPrel prel, Void value) {
+    // insert project under screen only if we don't have writer underneath
+    if (containsWriter(prel)) {
+      return prel;
+    }
+
+    Prel newChild = ((Prel) prel.getInput()).accept(this, value);
+    return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)addTopProjectPrel(newChild, validatedRowType)));
+  }
+
+  @Override
+  public Prel visitWriter(WriterPrel prel, Void value) {
+    Prel newChild = ((Prel) prel.getInput()).accept(this, value);
+    return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)addTopProjectPrel(newChild, validatedRowType)));
+  }
+
+  /**
+   * Checks if at least one of passed physical relational node children is writer.
+   *
+   * @param prel physical relational node
+   * @return true of writer operator was found
+   */
+  private boolean containsWriter(Prel prel) {
+    for (Prel child : prel){
+      if (child instanceof WriterPrel || containsWriter(child)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Adds top project to ensure final output field names are preserved.
+   * In case of duplicated column names, will rename duplicates.
+   * Top project will be added only if top project is non-trivial and
+   * child physical relational node is not project.
+   *
+   * @param prel physical relational node
+   * @param validatedRowType final output row type
+   * @return physical relational node with top project if necessary
+   */
+  private Prel addTopProjectPrel(Prel prel, RelDataType validatedRowType) {
+    RelDataType rowType = prel.getRowType();
+    if (rowType.getFieldCount() != validatedRowType.getFieldCount()) {
+      return prel;
+    }
+
+    RexBuilder rexBuilder = prel.getCluster().getRexBuilder();
+    List<RexNode> projections = new ArrayList<>();
+    int projectCount = rowType.getFieldList().size();
+
+    for (int i = 0; i < projectCount; i++) {
+      projections.add(rexBuilder.makeInputRef(prel, i));
+    }
+
+    List<String> fieldNames = SqlValidatorUtil.uniquify(
+        validatedRowType.getFieldNames(),
+        SqlValidatorUtil.F_SUGGESTER2,
+        prel.getCluster().getTypeFactory().getTypeSystem().isSchemaCaseSensitive());
+
+    RelDataType newRowType = RexUtil.createStructType(prel.getCluster().getTypeFactory(), projections, fieldNames, null);
+    ProjectPrel topProject = new ProjectPrel(prel.getCluster(), prel.getTraitSet(), prel, projections, newRowType);
+
+    return prel instanceof Project && DrillRelOptUtil.isTrivialProject(topProject, true) ? prel : topProject;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
index 72444ca..d232a71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
@@ -121,7 +121,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
                                  RelDataType queryRowType,
                                  StorageStrategy storageStrategy)
       throws RelConversionException, SqlUnsupportedException {
-    final DrillRel convertedRelNode = convertToDrel(relNode);
+    final DrillRel convertedRelNode = convertToRawDrel(relNode);
 
     // Put a non-trivial topProject to ensure the final output field name is preserved, when necessary.
     // Only insert project when the field count from the child is same as that of the queryRowType.
@@ -136,7 +136,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
 
   private Prel convertToPrel(RelNode drel, RelDataType inputRowType, List<String> partitionColumns)
       throws RelConversionException, SqlUnsupportedException {
-    Prel prel = convertToPrel(drel);
+    Prel prel = convertToPrel(drel, inputRowType);
 
     prel = prel.accept(new ProjectForWriterVisitor(inputRowType, partitionColumns), null);
 
@@ -188,7 +188,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
       final RelOptCluster cluster = prel.getCluster();
 
       final List<RexNode> exprs = Lists.newArrayListWithExpectedSize(queryRowType.getFieldCount() + 1);
-      final List<String> fieldnames = new ArrayList<String>(queryRowType.getFieldNames());
+      final List<String> fieldNames = new ArrayList<>(queryRowType.getFieldNames());
 
       for (final RelDataTypeField field : queryRowType.getFieldList()) {
         exprs.add(RexInputRef.of(field.getIndex(), queryRowType));
@@ -199,7 +199,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
         final ProjectPrel projectUnderWriter = new ProjectAllowDupPrel(cluster,
             cluster.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL), child, exprs, queryRowType);
 
-        return (Prel) prel.copy(projectUnderWriter.getTraitSet(),
+        return prel.copy(projectUnderWriter.getTraitSet(),
             Collections.singletonList( (RelNode) projectUnderWriter));
       } else {
         // find list of partition columns.
@@ -217,19 +217,19 @@ public class CreateTableHandler extends DefaultSqlHandler {
         }
 
         // Add partition column comparator to Project's field name list.
-        fieldnames.add(WriterPrel.PARTITION_COMPARATOR_FIELD);
+        fieldNames.add(WriterPrel.PARTITION_COMPARATOR_FIELD);
 
         // Add partition column comparator to Project's expression list.
         final RexNode partionColComp = createPartitionColComparator(prel.getCluster().getRexBuilder(), partitionColumnExprs);
         exprs.add(partionColComp);
 
 
-        final RelDataType rowTypeWithPCComp = RexUtil.createStructType(cluster.getTypeFactory(), exprs, fieldnames);
+        final RelDataType rowTypeWithPCComp = RexUtil.createStructType(cluster.getTypeFactory(), exprs, fieldNames, null);
 
         final ProjectPrel projectUnderWriter = new ProjectAllowDupPrel(cluster,
             cluster.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL), child, exprs, rowTypeWithPCComp);
 
-        return (Prel) prel.copy(projectUnderWriter.getTraitSet(),
+        return prel.copy(projectUnderWriter.getTraitSet(),
             Collections.singletonList( (RelNode) projectUnderWriter));
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index ce6cedf..e03a40c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -90,6 +90,7 @@ import org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor
 import org.apache.drill.exec.planner.physical.visitor.SplitUpComplexExpressions;
 import org.apache.drill.exec.planner.physical.visitor.StarColumnConverter;
 import org.apache.drill.exec.planner.physical.visitor.SwapHashJoinVisitor;
+import org.apache.drill.exec.planner.physical.visitor.TopProjectVisitor;
 import org.apache.drill.exec.planner.sql.parser.UnsupportedOperatorsVisitor;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
@@ -165,8 +166,8 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     final RelDataType validatedRowType = convertedRelNode.getValidatedRowType();
     final RelNode queryRelNode = convertedRelNode.getConvertedNode();
 
-    final DrillRel drel = convertToDrel(queryRelNode, validatedRowType);
-    final Prel prel = convertToPrel(drel);
+    final DrillRel drel = convertToDrel(queryRelNode);
+    final Prel prel = convertToPrel(drel, validatedRowType);
     logAndSetTextPlan("Drill Physical", prel, logger);
     final PhysicalOperator pop = convertToPop(prel);
     final PhysicalPlan plan = convertToPlan(pop);
@@ -199,13 +200,14 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
   }
 
   /**
-   *  Given a relNode tree for SELECT statement, convert to Drill Logical RelNode tree.
-   * @param relNode
-   * @return
+   * Given a relNode tree for SELECT statement, convert to Drill Logical RelNode tree.
+   *
+   * @param relNode relational node
+   * @return Drill Logical RelNode tree
    * @throws SqlUnsupportedException
    * @throws RelConversionException
    */
-  protected DrillRel convertToDrel(final RelNode relNode) throws SqlUnsupportedException, RelConversionException {
+  protected DrillRel convertToRawDrel(final RelNode relNode) throws SqlUnsupportedException, RelConversionException {
     if (context.getOptions().getOption(ExecConstants.EARLY_LIMIT0_OPT) &&
         context.getPlannerSettings().isTypeInferenceEnabled() &&
         FindLimit0Visitor.containsLimit0(relNode)) {
@@ -279,20 +281,18 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
 
   /**
    * Return Drill Logical RelNode tree for a SELECT statement, when it is executed / explained directly.
+   * Adds screen operator on top of converted node.
    *
-   * @param relNode : root RelNode corresponds to Calcite Logical RelNode.
-   * @param validatedRowType : the rowType for the final field names. A rename project may be placed on top of the root.
-   * @return
+   * @param relNode root RelNode corresponds to Calcite Logical RelNode.
+   * @return Drill Logical RelNode tree
    * @throws RelConversionException
    * @throws SqlUnsupportedException
    */
-  protected DrillRel convertToDrel(RelNode relNode, RelDataType validatedRowType) throws RelConversionException, SqlUnsupportedException {
-    final DrillRel convertedRelNode = convertToDrel(relNode);
+  protected DrillRel convertToDrel(RelNode relNode) throws RelConversionException, SqlUnsupportedException {
+    final DrillRel convertedRelNode = convertToRawDrel(relNode);
 
-    // Put a non-trivial topProject to ensure the final output field name is preserved, when necessary.
-    DrillRel topPreservedNameProj = addRenamedProject(convertedRelNode, validatedRowType);
-    return new DrillScreenRel(topPreservedNameProj.getCluster(), topPreservedNameProj.getTraitSet(),
-        topPreservedNameProj);
+    return new DrillScreenRel(convertedRelNode.getCluster(), convertedRelNode.getTraitSet(),
+        convertedRelNode);
   }
 
   /**
@@ -411,7 +411,16 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     return output;
   }
 
-  protected Prel convertToPrel(RelNode drel) throws RelConversionException, SqlUnsupportedException {
+  /**
+   * Applies physical rules and certain transformations to convert drill relational node into physical one.
+   *
+   * @param drel relational node
+   * @param validatedRowType final output row type
+   * @return physical relational node
+   * @throws RelConversionException
+   * @throws SqlUnsupportedException
+   */
+  protected Prel convertToPrel(RelNode drel, RelDataType validatedRowType) throws RelConversionException, SqlUnsupportedException {
     Preconditions.checkArgument(drel.getConvention() == DrillRel.DRILL_LOGICAL);
 
     final RelTraitSet traits = drel.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
@@ -459,7 +468,13 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     /* The order of the following transformations is important */
 
     /*
-     * 0.) For select * from join query, we need insert project on top of scan and a top project just
+     * 0.)
+     * Add top project before screen operator or writer to ensure that final output column names are preserved.
+     */
+    phyRelNode = TopProjectVisitor.insertTopProject(phyRelNode, validatedRowType);
+
+    /*
+     * 1.) For select * from join query, we need insert project on top of scan and a top project just
      * under screen operator. The project on top of scan will rename from * to T1*, while the top project
      * will rename T1* to *, before it output the final result. Only the top project will allow
      * duplicate columns, since user could "explicitly" ask for duplicate columns ( select *, col, *).
@@ -468,14 +483,14 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     phyRelNode = StarColumnConverter.insertRenameProject(phyRelNode);
 
     /*
-     * 1.)
+     * 2.)
      * Join might cause naming conflicts from its left and right child.
      * In such case, we have to insert Project to rename the conflicting names.
      */
     phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode);
 
     /*
-     * 1.1) Swap left / right for INNER hash join, if left's row count is < (1 + margin) right's row count.
+     * 2.1) Swap left / right for INNER hash join, if left's row count is < (1 + margin) right's row count.
      * We want to have smaller dataset on the right side, since hash table builds on right side.
      */
     if (context.getPlannerSettings().isHashJoinSwapEnabled()) {
@@ -490,20 +505,20 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     }
 
     /*
-     * 1.2) Break up all expressions with complex outputs into their own project operations
+     * 2.2) Break up all expressions with complex outputs into their own project operations
      */
     phyRelNode = phyRelNode.accept(
         new SplitUpComplexExpressions(config.getConverter().getTypeFactory(), context.getDrillOperatorTable(), context
             .getPlannerSettings().functionImplementationRegistry), null);
 
     /*
-     * 1.3) Projections that contain reference to flatten are rewritten as Flatten operators followed by Project
+     * 2.3) Projections that contain reference to flatten are rewritten as Flatten operators followed by Project
      */
     phyRelNode = phyRelNode.accept(
         new RewriteProjectToFlatten(config.getConverter().getTypeFactory(), context.getDrillOperatorTable()), null);
 
     /*
-     * 2.)
+     * 3.)
      * Since our operators work via names rather than indices, we have to make to reorder any
      * output before we return data to the user as we may have accidentally shuffled things.
      * This adds a trivial project to reorder columns prior to output.
@@ -511,14 +526,14 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     phyRelNode = FinalColumnReorderer.addFinalColumnOrdering(phyRelNode);
 
     /*
-     * 3.)
+     * 4.)
      * If two fragments are both estimated to be parallelization one, remove the exchange
      * separating them
      */
     phyRelNode = ExcessiveExchangeIdentifier.removeExcessiveEchanges(phyRelNode, targetSliceSize);
 
 
-    /* 4.)
+    /* 5.)
      * Add ProducerConsumer after each scan if the option is set
      * Use the configured queueSize
      */
@@ -530,7 +545,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     */
 
 
-    /* 5.)
+    /* 6.)
      * if the client does not support complex types (Map, Repeated)
      * insert a project which which would convert
      */
@@ -540,20 +555,20 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     }
 
 
-    /* 6.)
+    /* 7.)
      * Insert LocalExchange (mux and/or demux) nodes
      */
     phyRelNode = InsertLocalExchangeVisitor.insertLocalExchanges(phyRelNode, queryOptions);
 
 
-    /* 7.)
+    /* 8.)
      * Next, we add any required selection vector removers given the supported encodings of each
      * operator. This will ultimately move to a new trait but we're managing here for now to avoid
      * introducing new issues in planning before the next release
      */
     phyRelNode = SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode);
 
-    /* 8.)
+    /* 9.)
      * Finally, Make sure that the no rels are repeats.
      * This could happen in the case of querying the same table twice as Optiq may canonicalize these.
      */

http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
index b5b5f73..d62fb4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -58,14 +58,14 @@ public class ExplainHandler extends DefaultSqlHandler {
     final RelNode queryRelNode = convertedRelNode.getConvertedNode();
 
     log("Calcite", queryRelNode, logger, null);
-    DrillRel drel = convertToDrel(queryRelNode, validatedRowType);
+    DrillRel drel = convertToDrel(queryRelNode);
 
     if (mode == ResultMode.LOGICAL) {
       LogicalExplain logicalResult = new LogicalExplain(drel, level, context);
       return DirectPlan.createDirectPlan(context, logicalResult);
     }
 
-    Prel prel = convertToPrel(drel);
+    Prel prel = convertToPrel(drel, validatedRowType);
     logAndSetTextPlan("Drill Physical", prel, logger);
     PhysicalOperator pop = convertToPop(prel);
     PhysicalPlan plan = convertToPlan(pop);

http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 924486f..7700a1e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -1031,7 +1031,6 @@ public class TestUnionAll extends BaseTestQuery{
 
     // Validate the plan
     final String[] expectedPlan = {"UnionExchange.*\n",
-        ".*Project.*\n" +
         ".*UnionAll"};
     final String[] excludedPlan = {};