You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/31 05:32:25 UTC

[6/7] incubator-impala git commit: IMPALA-5850: Cast sender partition exprs under unions.

IMPALA-5850: Cast sender partition exprs under unions.

For a series of partitioned joins within the same fragment we must
cast the sender partition exprs of exchanges to compatible types.
Otherwise, the hashes generated for identical partition values may
differ among senders leading to wrong results.

The bug was that this casting process was only performed for
fragments that are hash-partitioned. However, a union produces a
fragment with RANDOM partition, but the union could still contain
partitioned joins whose senders need to be cast appropriately. The
fix is to add casts regardless of the fragment's data partition.

Testing:
- Core/hdfs run passed
- Added a new regresion test

Change-Id: I0aa801bcad8c2324d848349c7967d949224404e0
Reviewed-on: http://gerrit.cloudera.org:8080/7884
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a58394be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a58394be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a58394be

Branch: refs/heads/release-2.10.0
Commit: a58394be7c7998a5dfea53d8a3dbf8beb3370a48
Parents: 2912a0f
Author: Alex Behm <al...@cloudera.com>
Authored: Mon Aug 28 19:01:39 2017 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Wed Aug 30 14:54:50 2017 -0700

----------------------------------------------------------------------
 .../org/apache/impala/planner/PlanFragment.java | 78 +++++++++++---------
 .../queries/QueryTest/joins.test                | 41 ++++++++++
 2 files changed, 83 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a58394be/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index ab72863..5ac9b2e 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -25,9 +25,9 @@ import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
-import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.TreeNode;
+import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.planner.PlanNode.ExecPhaseResourceProfiles;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPartitionType;
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -160,13 +159,12 @@ public class PlanFragment extends TreeNode<PlanFragment> {
 
   /**
    * Do any final work to set up the ExchangeNodes and DataStreamSinks for this fragment.
-   * If this fragment is hash partitioned, ensures that the corresponding partition
-   * exprs of all hash-partitioning senders are cast to identical types.
+   * If this fragment has partitioned joins, ensures that the corresponding partition
+   * exprs of all hash-partitioning senders are cast to appropriate types.
    * Otherwise, the hashes generated for identical partition values may differ
    * among senders if the partition-expr types are not identical.
    */
-  public void finalizeExchanges(Analyzer analyzer)
-      throws InternalException, NotImplementedException {
+  public void finalizeExchanges(Analyzer analyzer) throws InternalException {
     if (destNode_ != null) {
       Preconditions.checkState(sink_ == null);
       // we're streaming to an exchange node
@@ -175,38 +173,46 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       sink_ = streamSink;
     }
 
-    if (!dataPartition_.isHashPartitioned()) return;
-
-    // This fragment is hash partitioned. Gather all exchange nodes and ensure
-    // that all hash-partitioning senders hash on exprs-values of the same type.
-    List<ExchangeNode> exchNodes = Lists.newArrayList();
-    planRoot_.collect(Predicates.instanceOf(ExchangeNode.class), exchNodes);
+    // Must be called regardless of this fragment's data partition. This fragment might
+    // be RANDOM partitioned due to a union. The union could still have partitioned joins
+    // in its child subtrees for which casts on the exchange senders are needed.
+    castPartitionedJoinExchanges(planRoot_, analyzer);
+  }
 
-    // Contains partition-expr lists of all hash-partitioning sender fragments.
-    List<List<Expr>> senderPartitionExprs = Lists.newArrayList();
-    for (ExchangeNode exchNode: exchNodes) {
-      Preconditions.checkState(!exchNode.getChildren().isEmpty());
-      PlanFragment senderFragment = exchNode.getChild(0).getFragment();
-      Preconditions.checkNotNull(senderFragment);
-      if (!senderFragment.getOutputPartition().isHashPartitioned()) continue;
-      List<Expr> partExprs = senderFragment.getOutputPartition().getPartitionExprs();
-      // All hash-partitioning senders must have compatible partition exprs, otherwise
-      // this fragment's data partition must not be hash partitioned.
-      Preconditions.checkState(
-          partExprs.size() == dataPartition_.getPartitionExprs().size());
-      senderPartitionExprs.add(partExprs);
-    }
+  /**
+   * Recursively traverses the plan tree rooted at 'node' and casts the partition exprs
+   * of all senders feeding into a series of partitioned joins to compatible types.
+   */
+  private void castPartitionedJoinExchanges(PlanNode node, Analyzer analyzer) {
+    if (node instanceof HashJoinNode
+        && ((JoinNode) node).getDistributionMode() == DistributionMode.PARTITIONED) {
+      // Contains all exchange nodes in this fragment below the current join node.
+      List<ExchangeNode> exchNodes = Lists.newArrayList();
+      node.collect(ExchangeNode.class, exchNodes);
+
+      // Contains partition-expr lists of all hash-partitioning sender fragments.
+      List<List<Expr>> senderPartitionExprs = Lists.newArrayList();
+      for (ExchangeNode exchNode: exchNodes) {
+        Preconditions.checkState(!exchNode.getChildren().isEmpty());
+        PlanFragment senderFragment = exchNode.getChild(0).getFragment();
+        Preconditions.checkNotNull(senderFragment);
+        if (!senderFragment.getOutputPartition().isHashPartitioned()) continue;
+        List<Expr> partExprs = senderFragment.getOutputPartition().getPartitionExprs();
+        senderPartitionExprs.add(partExprs);
+      }
 
-    // Cast all corresponding hash partition exprs of all hash-partitioning senders
-    // to their compatible types. Also cast the data partition's exprs for consistency,
-    // although not strictly necessary. They should already be type identical to the
-    // exprs of one of the senders and they are not directly used for hashing in the BE.
-    senderPartitionExprs.add(dataPartition_.getPartitionExprs());
-    try {
-      analyzer.castToUnionCompatibleTypes(senderPartitionExprs);
-    } catch (AnalysisException e) {
-      // Should never happen. Analysis should have ensured type compatibility already.
-      throw new IllegalStateException(e);
+      // Cast partition exprs of all hash-partitioning senders to their compatible types.
+      try {
+        analyzer.castToUnionCompatibleTypes(senderPartitionExprs);
+      } catch (AnalysisException e) {
+        // Should never happen. Analysis should have ensured type compatibility already.
+        throw new IllegalStateException(e);
+      }
+    } else {
+      // Recursively traverse plan nodes in this fragment.
+      for (PlanNode child: node.getChildren()) {
+        if (child.getFragment() == this) castPartitionedJoinExchanges(child, analyzer);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a58394be/testdata/workloads/functional-query/queries/QueryTest/joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/joins.test b/testdata/workloads/functional-query/queries/QueryTest/joins.test
index db915df..ab2a531 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/joins.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/joins.test
@@ -730,3 +730,44 @@ t1.timestamp_col = cast(t2.d4 as TIMESTAMP);
 ---- TYPES
 BIGINT
 ====
+---- QUERY
+# IMPALA-5850: Tests that the sender partition exprs of a series of partitioned hash
+# joins are cast to compatible types even when the containing fragment is RANDOM
+# partitioned due a union. This test borrows ideas from existing tests for IMPALA-1123.
+# The test has a union with three branches where each branch contains independent and
+# incompatible hash-partitioning exchanges. Two of the union branches contain
+# partitioned joins which require the sender partition exprs to be cast to produce
+# correct results.
+# Breakdown of result count:
+# The first union branch is expected to produce 2 rows
+# The second union branch is expected to produce 11200 rows (see IMPALA-1123 test)
+# The third union branch is expected to produce 10 rows
+select count(*) from (
+  select distinct tinyint_col, smallint_col, int_col
+  from functional.alltypestiny
+  union all
+  select /* +straight_join */ b.id, c.tinyint_col, null
+  from functional.alltypessmall a
+  inner join /* +shuffle */
+    (select /* +straight_join */ t2.id, t2.smallint_col
+     from functional.alltypessmall t1
+     inner join /* +shuffle */ functional.alltypessmall t2
+     on t1.tinyint_col = t2.smallint_col) b
+  on a.int_col = b.smallint_col
+  inner join /* +shuffle */
+    (select distinct tinyint_col
+     from functional.alltypessmall) c
+  on a.int_col = c.tinyint_col
+  union all
+  select /* +straight_join */ tinyint_col, bigint_col, null from
+    (select distinct tinyint_col, bigint_col div 10 as bigint_col
+     from functional.alltypessmall) a
+  inner join /* +shuffle */
+    (select distinct int_col, smallint_col
+     from functional.alltypessmall) b
+  on a.tinyint_col = b.int_col and a.bigint_col = b.smallint_col) v
+---- RESULTS
+11212
+---- TYPES
+BIGINT
+====