You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/07/19 06:09:29 UTC

[drill] 02/03: DRILL-6475: Unnest: Null fieldId Pointer.

This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 5a0c75f69166283a17179e80b97712bed57110d9
Author: HanumathRao <ha...@gmail.com>
AuthorDate: Fri Jun 29 08:46:41 2018 -0700

    DRILL-6475: Unnest: Null fieldId Pointer.
    
    closes #1381
---
 .../visitor/AdjustOperatorsSchemaVisitor.java      | 148 +++++++++++++++++++++
 .../physical/visitor/JoinPrelRenameVisitor.java    |  87 ------------
 .../planner/sql/handlers/DefaultSqlHandler.java    |   5 +-
 .../impl/lateraljoin/TestLateralPlans.java         |  24 ++++
 4 files changed, 175 insertions(+), 89 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/AdjustOperatorsSchemaVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/AdjustOperatorsSchemaVisitor.java
new file mode 100644
index 0000000..c46b725
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/AdjustOperatorsSchemaVisitor.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical.visitor;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.physical.JoinPrel;
+import org.apache.drill.exec.planner.physical.LateralJoinPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.calcite.rel.RelNode;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.planner.physical.UnnestPrel;
+
+/**
+ * AdjustOperatorsSchemaVisitor visits corresponding operators' which depending upon their functionality
+ * adjusts their output row types. The adjusting mechanism is unique to each operator. In case of joins this visitor
+ * adjusts the field names to make sure that upstream operator only sees that there are unique field names even though
+ * the children of the join has same field names. Whereas in case of lateral/unnest operators it changes the correlated
+ * field and also the unnest operator's output row type.
+ */
+public class AdjustOperatorsSchemaVisitor extends BasePrelVisitor<Prel, Void, RuntimeException>{
+
+  private Prel registeredPrel = null;
+
+  private static AdjustOperatorsSchemaVisitor INSTANCE = new AdjustOperatorsSchemaVisitor();
+
+  public static Prel adjustSchema(Prel prel){
+    return prel.accept(INSTANCE, null);
+  }
+
+  private void register(Prel prel) {
+    this.registeredPrel = prel;
+  }
+
+  private Prel getRegisteredPrel() {
+    return this.registeredPrel;
+  }
+
+  @Override
+  public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
+    return preparePrel(prel, getChildren(prel));
+  }
+
+  public void unRegister() {
+    this.registeredPrel = null;
+  }
+
+  private List<RelNode> getChildren(Prel prel, int registerForChild) {
+    int ch = 0;
+    List<RelNode> children = Lists.newArrayList();
+    for(Prel child : prel){
+      if (ch == registerForChild) {
+        register(prel);
+      }
+      child = child.accept(this, null);
+      if (ch == registerForChild) {
+        unRegister();
+      }
+      children.add(child);
+      ch++;
+    }
+    return children;
+  }
+
+  private List<RelNode> getChildren(Prel prel) {
+    return getChildren(prel, -1);
+  }
+
+  private Prel preparePrel(Prel prel, List<RelNode> renamedNodes) {
+    return (Prel) prel.copy(prel.getTraitSet(), renamedNodes);
+  }
+
+  @Override
+  public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException {
+
+    List<RelNode> children = getChildren(prel);
+
+    final int leftCount = children.get(0).getRowType().getFieldCount();
+
+    List<RelNode> reNamedChildren = Lists.newArrayList();
+
+    RelNode left = prel.getJoinInput(0, children.get(0));
+    RelNode right = prel.getJoinInput(leftCount, children.get(1));
+
+    reNamedChildren.add(left);
+    reNamedChildren.add(right);
+
+    return preparePrel(prel, reNamedChildren);
+  }
+
+  @Override
+  public Prel visitLateral(LateralJoinPrel prel, Void value) throws RuntimeException {
+
+    List<RelNode> children = getChildren(prel, 1);
+    List<RelNode> reNamedChildren = new ArrayList<>();
+
+    for (int i = 0; i < children.size(); i++) {
+      reNamedChildren.add(prel.getLateralInput(i, children.get(i)));
+    }
+
+    return preparePrel(prel, reNamedChildren);
+  }
+
+  @Override
+  public Prel visitUnnest(UnnestPrel prel, Void value) throws RuntimeException {
+    Preconditions.checkArgument(registeredPrel != null && registeredPrel instanceof LateralJoinPrel);
+    Preconditions.checkArgument(prel.getRowType().getFieldCount() == 1);
+    RexBuilder builder = prel.getCluster().getRexBuilder();
+
+    LateralJoinPrel lateralJoinPrel = (LateralJoinPrel) getRegisteredPrel();
+    int correlationIndex = lateralJoinPrel.getRequiredColumns().nextSetBit(0);
+    String correlationColumnName = lateralJoinPrel.getLeft().getRowType().getFieldNames().get(correlationIndex);
+    RexNode corrRef = builder.makeCorrel(lateralJoinPrel.getLeft().getRowType(), lateralJoinPrel.getCorrelationId());
+    RexNode fieldAccess = builder.makeFieldAccess(corrRef, correlationColumnName, false);
+
+    List<String> fieldNames = new ArrayList<>();
+    List<RelDataType> fieldTypes = new ArrayList<>();
+    for (RelDataTypeField field : prel.getRowType().getFieldList()) {
+      fieldNames.add(correlationColumnName);
+      fieldTypes.add(field.getType());
+    }
+
+    UnnestPrel unnestPrel = new UnnestPrel(prel.getCluster(), prel.getTraitSet(),
+            prel.getCluster().getTypeFactory().createStructType(fieldTypes, fieldNames), fieldAccess);
+    return unnestPrel;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
deleted file mode 100644
index 3a2529b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.physical.visitor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.drill.exec.planner.physical.JoinPrel;
-import org.apache.drill.exec.planner.physical.LateralJoinPrel;
-import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.calcite.rel.RelNode;
-
-import com.google.common.collect.Lists;
-
-public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeException>{
-
-  private static JoinPrelRenameVisitor INSTANCE = new JoinPrelRenameVisitor();
-
-  public static Prel insertRenameProject(Prel prel){
-    return prel.accept(INSTANCE, null);
-  }
-
-  @Override
-  public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
-    return preparePrel(prel, getChildren(prel));
-  }
-
-  private List<RelNode> getChildren(Prel prel) {
-    List<RelNode> children = Lists.newArrayList();
-    for(Prel child : prel){
-      child = child.accept(this, null);
-      children.add(child);
-    }
-    return children;
-  }
-
-  private Prel preparePrel(Prel prel, List<RelNode> renamedNodes) {
-    return (Prel) prel.copy(prel.getTraitSet(), renamedNodes);
-  }
-
-  @Override
-  public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException {
-
-    List<RelNode> children = getChildren(prel);
-
-    final int leftCount = children.get(0).getRowType().getFieldCount();
-
-    List<RelNode> reNamedChildren = Lists.newArrayList();
-
-    RelNode left = prel.getJoinInput(0, children.get(0));
-    RelNode right = prel.getJoinInput(leftCount, children.get(1));
-
-    reNamedChildren.add(left);
-    reNamedChildren.add(right);
-
-    return preparePrel(prel, reNamedChildren);
-  }
-
-  //TODO: consolidate this code with join column renaming.
-  @Override
-  public Prel visitLateral(LateralJoinPrel prel, Void value) throws RuntimeException {
-
-    List<RelNode> children = getChildren(prel);
-    List<RelNode> reNamedChildren = new ArrayList<>();
-
-    for (int i = 0; i < children.size(); i++) {
-      reNamedChildren.add(prel.getLateralInput(i, children.get(i)));
-    }
-
-    return preparePrel(prel, reNamedChildren);
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 1e671ff..83e1a8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -88,11 +88,11 @@ import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.explain.PrelSequencer;
+import org.apache.drill.exec.planner.physical.visitor.AdjustOperatorsSchemaVisitor;
 import org.apache.drill.exec.planner.physical.visitor.ComplexToJsonPrelVisitor;
 import org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier;
 import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer;
 import org.apache.drill.exec.planner.physical.visitor.InsertLocalExchangeVisitor;
-import org.apache.drill.exec.planner.physical.visitor.JoinPrelRenameVisitor;
 import org.apache.drill.exec.planner.physical.visitor.MemoryEstimationVisitor;
 import org.apache.drill.exec.planner.physical.visitor.RelUniqifier;
 import org.apache.drill.exec.planner.physical.visitor.RewriteProjectToFlatten;
@@ -512,8 +512,9 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
      * 2.)
      * Join might cause naming conflicts from its left and right child.
      * In such case, we have to insert Project to rename the conflicting names.
+     * Unnest operator might need to adjust the correlated field after the physical planning.
      */
-    phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode);
+    phyRelNode = AdjustOperatorsSchemaVisitor.adjustSchema(phyRelNode);
 
     /*
      * 2.1) Swap left / right for INNER hash join, if left's row count is < (1 + margin) right's row count.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
index 77d245f..222b036 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
@@ -358,6 +358,7 @@ public class TestLateralPlans extends BaseTestQuery {
   public void testNoExchangeWithStreamAggWithGrpBy() throws Exception {
     String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t," +
             " lateral ( select sum(t2.ord.o_totalprice) as totalprice from unnest(t.c_orders) t2(ord) group by t2.ord.o_orderkey) d1";
+
     ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
             .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
             .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
@@ -532,4 +533,27 @@ public class TestLateralPlans extends BaseTestQuery {
           plan, not(containsString("Sort")));
     }
   }
+
+  @Test
+  public void testMultiUnnestQuery() throws Exception {
+    String Sql = "SELECT t5.l_quantity FROM dfs.`lateraljoin/multipleFiles` t, " +
+            "LATERAL (SELECT t2.ordrs.o_lineitems FROM UNNEST(t.c_orders) t2(ordrs)) t3(lineitems), " +
+            "LATERAL (SELECT t4.lineitems.l_quantity FROM UNNEST(t3.lineitems) t4(lineitems)) t5(l_quantity) order by 1";
+
+    String baselineQuery = "select dt.lineitems.l_quantity as l_quantity from (select flatten(dt.orders.o_lineitems) as lineitems " +
+            "from (select flatten(c_orders) as orders from dfs.`lateraljoin/multipleFiles` t) dt)dt order by 1";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+      .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+      .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      client.testBuilder()
+              .ordered()
+              .sqlBaselineQuery(baselineQuery)
+              .sqlQuery(Sql)
+              .go();
+    }
+  }
 }