You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/06/27 13:35:16 UTC
drill git commit: DRILL-5538: Create TopProject with
validatedNodeType after PHYSICAL phase
Repository: drill
Updated Branches:
refs/heads/master 9cf6faa7a -> 6446e56f2
DRILL-5538: Create TopProject with validatedNodeType after PHYSICAL phase
closes #844
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6446e56f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6446e56f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6446e56f
Branch: refs/heads/master
Commit: 6446e56f292a5905d646462c618c056839ad5198
Parents: 9cf6faa
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Thu Jun 15 16:01:54 2017 +0300
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Tue Jun 27 13:03:56 2017 +0300
----------------------------------------------------------------------
.../physical/visitor/TopProjectVisitor.java | 142 +++++++++++++++++++
.../sql/handlers/CreateTableHandler.java | 14 +-
.../planner/sql/handlers/DefaultSqlHandler.java | 71 ++++++----
.../planner/sql/handlers/ExplainHandler.java | 6 +-
.../java/org/apache/drill/TestUnionAll.java | 1 -
5 files changed, 195 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
new file mode 100644
index 0000000..587b006
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical.visitor;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.apache.drill.exec.planner.physical.WriterPrel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Adds non-trivial top project to ensure the final output field names are preserved.
+ * Such non-trivial project is needed due to Calcite's behavior of ProjectRemoveRule.
+ * It will be added under Screen/Writer operator in the physical plan
+ * if there is no other Projects under these operators,
+ * in cases like * column expansion or partition by column processing.
+ */
+public class TopProjectVisitor extends BasePrelVisitor<Prel, Void, RuntimeException> {
+
+ private final RelDataType validatedRowType;
+
+ public TopProjectVisitor(RelDataType validatedRowType) {
+ this.validatedRowType = validatedRowType;
+ }
+
+ /**
+ * Traverses passed physical relational node and its children and checks if top project
+ * should be added under screen or writer to preserve final output fields names.
+ *
+ * @param prel physical relational node
+ * @param validatedRowType final output row type
+ * @return physical relational node with added project if necessary
+ */
+ public static Prel insertTopProject(Prel prel, RelDataType validatedRowType){
+ return prel.accept(new TopProjectVisitor(validatedRowType), null);
+ }
+
+ @Override
+ public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
+ List<RelNode> children = new ArrayList<>();
+ for (Prel child : prel){
+ child = child.accept(this, null);
+ children.add(child);
+ }
+
+ return (Prel) prel.copy(prel.getTraitSet(), children);
+ }
+
+ @Override
+ public Prel visitScreen(ScreenPrel prel, Void value) {
+ // insert project under screen only if we don't have writer underneath
+ if (containsWriter(prel)) {
+ return prel;
+ }
+
+ Prel newChild = ((Prel) prel.getInput()).accept(this, value);
+ return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)addTopProjectPrel(newChild, validatedRowType)));
+ }
+
+ @Override
+ public Prel visitWriter(WriterPrel prel, Void value) {
+ Prel newChild = ((Prel) prel.getInput()).accept(this, value);
+ return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)addTopProjectPrel(newChild, validatedRowType)));
+ }
+
+ /**
+ * Checks if at least one of passed physical relational node children is writer.
+ *
+ * @param prel physical relational node
+ * @return true of writer operator was found
+ */
+ private boolean containsWriter(Prel prel) {
+ for (Prel child : prel){
+ if (child instanceof WriterPrel || containsWriter(child)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Adds top project to ensure final output field names are preserved.
+ * In case of duplicated column names, will rename duplicates.
+ * Top project will be added only if top project is non-trivial and
+ * child physical relational node is not project.
+ *
+ * @param prel physical relational node
+ * @param validatedRowType final output row type
+ * @return physical relational node with top project if necessary
+ */
+ private Prel addTopProjectPrel(Prel prel, RelDataType validatedRowType) {
+ RelDataType rowType = prel.getRowType();
+ if (rowType.getFieldCount() != validatedRowType.getFieldCount()) {
+ return prel;
+ }
+
+ RexBuilder rexBuilder = prel.getCluster().getRexBuilder();
+ List<RexNode> projections = new ArrayList<>();
+ int projectCount = rowType.getFieldList().size();
+
+ for (int i = 0; i < projectCount; i++) {
+ projections.add(rexBuilder.makeInputRef(prel, i));
+ }
+
+ List<String> fieldNames = SqlValidatorUtil.uniquify(
+ validatedRowType.getFieldNames(),
+ SqlValidatorUtil.F_SUGGESTER2,
+ prel.getCluster().getTypeFactory().getTypeSystem().isSchemaCaseSensitive());
+
+ RelDataType newRowType = RexUtil.createStructType(prel.getCluster().getTypeFactory(), projections, fieldNames, null);
+ ProjectPrel topProject = new ProjectPrel(prel.getCluster(), prel.getTraitSet(), prel, projections, newRowType);
+
+ return prel instanceof Project && DrillRelOptUtil.isTrivialProject(topProject, true) ? prel : topProject;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
index 72444ca..d232a71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
@@ -121,7 +121,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
RelDataType queryRowType,
StorageStrategy storageStrategy)
throws RelConversionException, SqlUnsupportedException {
- final DrillRel convertedRelNode = convertToDrel(relNode);
+ final DrillRel convertedRelNode = convertToRawDrel(relNode);
// Put a non-trivial topProject to ensure the final output field name is preserved, when necessary.
// Only insert project when the field count from the child is same as that of the queryRowType.
@@ -136,7 +136,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
private Prel convertToPrel(RelNode drel, RelDataType inputRowType, List<String> partitionColumns)
throws RelConversionException, SqlUnsupportedException {
- Prel prel = convertToPrel(drel);
+ Prel prel = convertToPrel(drel, inputRowType);
prel = prel.accept(new ProjectForWriterVisitor(inputRowType, partitionColumns), null);
@@ -188,7 +188,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
final RelOptCluster cluster = prel.getCluster();
final List<RexNode> exprs = Lists.newArrayListWithExpectedSize(queryRowType.getFieldCount() + 1);
- final List<String> fieldnames = new ArrayList<String>(queryRowType.getFieldNames());
+ final List<String> fieldNames = new ArrayList<>(queryRowType.getFieldNames());
for (final RelDataTypeField field : queryRowType.getFieldList()) {
exprs.add(RexInputRef.of(field.getIndex(), queryRowType));
@@ -199,7 +199,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
final ProjectPrel projectUnderWriter = new ProjectAllowDupPrel(cluster,
cluster.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL), child, exprs, queryRowType);
- return (Prel) prel.copy(projectUnderWriter.getTraitSet(),
+ return prel.copy(projectUnderWriter.getTraitSet(),
Collections.singletonList( (RelNode) projectUnderWriter));
} else {
// find list of partition columns.
@@ -217,19 +217,19 @@ public class CreateTableHandler extends DefaultSqlHandler {
}
// Add partition column comparator to Project's field name list.
- fieldnames.add(WriterPrel.PARTITION_COMPARATOR_FIELD);
+ fieldNames.add(WriterPrel.PARTITION_COMPARATOR_FIELD);
// Add partition column comparator to Project's expression list.
final RexNode partionColComp = createPartitionColComparator(prel.getCluster().getRexBuilder(), partitionColumnExprs);
exprs.add(partionColComp);
- final RelDataType rowTypeWithPCComp = RexUtil.createStructType(cluster.getTypeFactory(), exprs, fieldnames);
+ final RelDataType rowTypeWithPCComp = RexUtil.createStructType(cluster.getTypeFactory(), exprs, fieldNames, null);
final ProjectPrel projectUnderWriter = new ProjectAllowDupPrel(cluster,
cluster.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL), child, exprs, rowTypeWithPCComp);
- return (Prel) prel.copy(projectUnderWriter.getTraitSet(),
+ return prel.copy(projectUnderWriter.getTraitSet(),
Collections.singletonList( (RelNode) projectUnderWriter));
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index ce6cedf..e03a40c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -90,6 +90,7 @@ import org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor
import org.apache.drill.exec.planner.physical.visitor.SplitUpComplexExpressions;
import org.apache.drill.exec.planner.physical.visitor.StarColumnConverter;
import org.apache.drill.exec.planner.physical.visitor.SwapHashJoinVisitor;
+import org.apache.drill.exec.planner.physical.visitor.TopProjectVisitor;
import org.apache.drill.exec.planner.sql.parser.UnsupportedOperatorsVisitor;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValue;
@@ -165,8 +166,8 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
final RelDataType validatedRowType = convertedRelNode.getValidatedRowType();
final RelNode queryRelNode = convertedRelNode.getConvertedNode();
- final DrillRel drel = convertToDrel(queryRelNode, validatedRowType);
- final Prel prel = convertToPrel(drel);
+ final DrillRel drel = convertToDrel(queryRelNode);
+ final Prel prel = convertToPrel(drel, validatedRowType);
logAndSetTextPlan("Drill Physical", prel, logger);
final PhysicalOperator pop = convertToPop(prel);
final PhysicalPlan plan = convertToPlan(pop);
@@ -199,13 +200,14 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
}
/**
- * Given a relNode tree for SELECT statement, convert to Drill Logical RelNode tree.
- * @param relNode
- * @return
+ * Given a relNode tree for SELECT statement, convert to Drill Logical RelNode tree.
+ *
+ * @param relNode relational node
+ * @return Drill Logical RelNode tree
* @throws SqlUnsupportedException
* @throws RelConversionException
*/
- protected DrillRel convertToDrel(final RelNode relNode) throws SqlUnsupportedException, RelConversionException {
+ protected DrillRel convertToRawDrel(final RelNode relNode) throws SqlUnsupportedException, RelConversionException {
if (context.getOptions().getOption(ExecConstants.EARLY_LIMIT0_OPT) &&
context.getPlannerSettings().isTypeInferenceEnabled() &&
FindLimit0Visitor.containsLimit0(relNode)) {
@@ -279,20 +281,18 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
/**
* Return Drill Logical RelNode tree for a SELECT statement, when it is executed / explained directly.
+ * Adds screen operator on top of converted node.
*
- * @param relNode : root RelNode corresponds to Calcite Logical RelNode.
- * @param validatedRowType : the rowType for the final field names. A rename project may be placed on top of the root.
- * @return
+ * @param relNode root RelNode corresponds to Calcite Logical RelNode.
+ * @return Drill Logical RelNode tree
* @throws RelConversionException
* @throws SqlUnsupportedException
*/
- protected DrillRel convertToDrel(RelNode relNode, RelDataType validatedRowType) throws RelConversionException, SqlUnsupportedException {
- final DrillRel convertedRelNode = convertToDrel(relNode);
+ protected DrillRel convertToDrel(RelNode relNode) throws RelConversionException, SqlUnsupportedException {
+ final DrillRel convertedRelNode = convertToRawDrel(relNode);
- // Put a non-trivial topProject to ensure the final output field name is preserved, when necessary.
- DrillRel topPreservedNameProj = addRenamedProject(convertedRelNode, validatedRowType);
- return new DrillScreenRel(topPreservedNameProj.getCluster(), topPreservedNameProj.getTraitSet(),
- topPreservedNameProj);
+ return new DrillScreenRel(convertedRelNode.getCluster(), convertedRelNode.getTraitSet(),
+ convertedRelNode);
}
/**
@@ -411,7 +411,16 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
return output;
}
- protected Prel convertToPrel(RelNode drel) throws RelConversionException, SqlUnsupportedException {
+ /**
+ * Applies physical rules and certain transformations to convert drill relational node into physical one.
+ *
+ * @param drel relational node
+ * @param validatedRowType final output row type
+ * @return physical relational node
+ * @throws RelConversionException
+ * @throws SqlUnsupportedException
+ */
+ protected Prel convertToPrel(RelNode drel, RelDataType validatedRowType) throws RelConversionException, SqlUnsupportedException {
Preconditions.checkArgument(drel.getConvention() == DrillRel.DRILL_LOGICAL);
final RelTraitSet traits = drel.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
@@ -459,7 +468,13 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
/* The order of the following transformations is important */
/*
- * 0.) For select * from join query, we need insert project on top of scan and a top project just
+ * 0.)
+ * Add top project before screen operator or writer to ensure that final output column names are preserved.
+ */
+ phyRelNode = TopProjectVisitor.insertTopProject(phyRelNode, validatedRowType);
+
+ /*
+ * 1.) For select * from join query, we need insert project on top of scan and a top project just
* under screen operator. The project on top of scan will rename from * to T1*, while the top project
* will rename T1* to *, before it output the final result. Only the top project will allow
* duplicate columns, since user could "explicitly" ask for duplicate columns ( select *, col, *).
@@ -468,14 +483,14 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
phyRelNode = StarColumnConverter.insertRenameProject(phyRelNode);
/*
- * 1.)
+ * 2.)
* Join might cause naming conflicts from its left and right child.
* In such case, we have to insert Project to rename the conflicting names.
*/
phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode);
/*
- * 1.1) Swap left / right for INNER hash join, if left's row count is < (1 + margin) right's row count.
+ * 2.1) Swap left / right for INNER hash join, if left's row count is < (1 + margin) right's row count.
* We want to have smaller dataset on the right side, since hash table builds on right side.
*/
if (context.getPlannerSettings().isHashJoinSwapEnabled()) {
@@ -490,20 +505,20 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
}
/*
- * 1.2) Break up all expressions with complex outputs into their own project operations
+ * 2.2) Break up all expressions with complex outputs into their own project operations
*/
phyRelNode = phyRelNode.accept(
new SplitUpComplexExpressions(config.getConverter().getTypeFactory(), context.getDrillOperatorTable(), context
.getPlannerSettings().functionImplementationRegistry), null);
/*
- * 1.3) Projections that contain reference to flatten are rewritten as Flatten operators followed by Project
+ * 2.3) Projections that contain reference to flatten are rewritten as Flatten operators followed by Project
*/
phyRelNode = phyRelNode.accept(
new RewriteProjectToFlatten(config.getConverter().getTypeFactory(), context.getDrillOperatorTable()), null);
/*
- * 2.)
+ * 3.)
* Since our operators work via names rather than indices, we have to make to reorder any
* output before we return data to the user as we may have accidentally shuffled things.
* This adds a trivial project to reorder columns prior to output.
@@ -511,14 +526,14 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
phyRelNode = FinalColumnReorderer.addFinalColumnOrdering(phyRelNode);
/*
- * 3.)
+ * 4.)
* If two fragments are both estimated to be parallelization one, remove the exchange
* separating them
*/
phyRelNode = ExcessiveExchangeIdentifier.removeExcessiveEchanges(phyRelNode, targetSliceSize);
- /* 4.)
+ /* 5.)
* Add ProducerConsumer after each scan if the option is set
* Use the configured queueSize
*/
@@ -530,7 +545,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
*/
- /* 5.)
+ /* 6.)
* if the client does not support complex types (Map, Repeated)
* insert a project which which would convert
*/
@@ -540,20 +555,20 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
}
- /* 6.)
+ /* 7.)
* Insert LocalExchange (mux and/or demux) nodes
*/
phyRelNode = InsertLocalExchangeVisitor.insertLocalExchanges(phyRelNode, queryOptions);
- /* 7.)
+ /* 8.)
* Next, we add any required selection vector removers given the supported encodings of each
* operator. This will ultimately move to a new trait but we're managing here for now to avoid
* introducing new issues in planning before the next release
*/
phyRelNode = SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode);
- /* 8.)
+ /* 9.)
* Finally, Make sure that the no rels are repeats.
* This could happen in the case of querying the same table twice as Optiq may canonicalize these.
*/
http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
index b5b5f73..d62fb4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -58,14 +58,14 @@ public class ExplainHandler extends DefaultSqlHandler {
final RelNode queryRelNode = convertedRelNode.getConvertedNode();
log("Calcite", queryRelNode, logger, null);
- DrillRel drel = convertToDrel(queryRelNode, validatedRowType);
+ DrillRel drel = convertToDrel(queryRelNode);
if (mode == ResultMode.LOGICAL) {
LogicalExplain logicalResult = new LogicalExplain(drel, level, context);
return DirectPlan.createDirectPlan(context, logicalResult);
}
- Prel prel = convertToPrel(drel);
+ Prel prel = convertToPrel(drel, validatedRowType);
logAndSetTextPlan("Drill Physical", prel, logger);
PhysicalOperator pop = convertToPop(prel);
PhysicalPlan plan = convertToPlan(pop);
http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 924486f..7700a1e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -1031,7 +1031,6 @@ public class TestUnionAll extends BaseTestQuery{
// Validate the plan
final String[] expectedPlan = {"UnionExchange.*\n",
- ".*Project.*\n" +
".*UnionAll"};
final String[] excludedPlan = {};