You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2016/08/29 16:45:16 UTC

incubator-impala git commit: IMPALA-2932: Extend DistributedPlanner to account for hash table build cost

Repository: incubator-impala
Updated Branches:
  refs/heads/master d0ff27cc3 -> d72353d0c


IMPALA-2932: Extend DistributedPlanner to account for hash table build cost

When deciding between a broadcast or repartition join, Impala calculates
the cost of each join as the total amount of data that is sent over the
network. This ignores some relevant costs, and can lead to bad plans.

One such relevant cost is the work to create the hash table used in the
join. This patch accounts for this by adding the amount of data inserted
into the hash table (the size of the right side of the join) to the
previous cost.

This generally increases the estimated cost of broadcast joins relative
to repartitioning joins, as the broadcast join must build the hash table
on each node the data was broadcast to, so its effect will be to make
repartitioning joins more likely to be chosen, especially in large
clusters.

This patch has not yet been performance tested.

Change-Id: I03a0f56f69c8deae68d48dfdb9dc95b71aec11f1
Reviewed-on: http://gerrit.cloudera.org:8080/4098
Tested-by: Internal Jenkins
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d72353d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d72353d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d72353d0

Branch: refs/heads/master
Commit: d72353d0c9a563e69b34ba0cccdc6c1c8bf2630b
Parents: d0ff27c
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Tue Aug 23 11:42:39 2016 -0700
Committer: Matthew Jacobs <mj...@cloudera.com>
Committed: Mon Aug 29 16:44:22 2016 +0000

----------------------------------------------------------------------
 .../impala/planner/DistributedPlanner.java      |  21 ++--
 .../queries/PlannerTest/joins.test              |   8 +-
 .../queries/PlannerTest/tpcds-all.test          |  64 +++++-----
 .../queries/PlannerTest/tpch-all.test           | 126 +++++++++++--------
 .../queries/PlannerTest/tpch-nested.test        |   8 +-
 .../queries/PlannerTest/union.test              |  20 +--
 .../queries/PlannerTest/views.test              |  18 +--
 .../queries/QueryTest/spilling.test             |   2 +-
 8 files changed, 148 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
index d546245..2c3124b 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
@@ -417,17 +417,20 @@ public class DistributedPlanner {
       PlanFragment leftChildFragment, long perNodeMemLimit,
       ArrayList<PlanFragment> fragments)
       throws ImpalaException {
+    // For both join types, the total cost is calculated as the amount of data
+    // sent over the network, plus the amount of data inserted into the hash table.
     // broadcast: send the rightChildFragment's output to each node executing
-    // the leftChildFragment; the cost across all nodes is proportional to the
-    // total amount of data sent
+    // the leftChildFragment, and build a hash table with it on each node.
     Analyzer analyzer = ctx_.getRootAnalyzer();
     PlanNode rhsTree = rightChildFragment.getPlanRoot();
     long rhsDataSize = 0;
     long broadcastCost = Long.MAX_VALUE;
-    if (rhsTree.getCardinality() != -1 && leftChildFragment.getNumNodes() != -1) {
+    if (rhsTree.getCardinality() != -1) {
       rhsDataSize = Math.round(
           rhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(rhsTree));
-      broadcastCost = rhsDataSize * leftChildFragment.getNumNodes();
+      if (leftChildFragment.getNumNodes() != -1) {
+        broadcastCost = 2 * rhsDataSize * leftChildFragment.getNumNodes();
+      }
     }
     LOG.debug("broadcast: cost=" + Long.toString(broadcastCost));
     LOG.debug("card=" + Long.toString(rhsTree.getCardinality()) + " row_size="
@@ -435,7 +438,7 @@ public class DistributedPlanner {
         + Integer.toString(leftChildFragment.getNumNodes()));
 
     // repartition: both left- and rightChildFragment are partitioned on the
-    // join exprs
+    // join exprs, and a hash table is built with the rightChildFragment's output.
     PlanNode lhsTree = leftChildFragment.getPlanRoot();
     long partitionCost = Long.MAX_VALUE;
     List<Expr> lhsJoinExprs = Lists.newArrayList();
@@ -453,13 +456,11 @@ public class DistributedPlanner {
       rhsHasCompatPartition = analyzer.equivSets(rhsJoinExprs,
           rightChildFragment.getDataPartition().getPartitionExprs());
 
-      double lhsCost = (lhsHasCompatPartition) ? 0.0 :
+      double lhsNetworkCost = (lhsHasCompatPartition) ? 0.0 :
         Math.round(
             lhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(lhsTree));
-      double rhsCost = (rhsHasCompatPartition) ? 0.0 :
-        Math.round(
-            rhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(rhsTree));
-      partitionCost = Math.round(lhsCost + rhsCost);
+      double rhsNetworkCost = (rhsHasCompatPartition) ? 0.0 : rhsDataSize;
+      partitionCost = Math.round(lhsNetworkCost + rhsNetworkCost + rhsDataSize);
     }
     LOG.debug("partition: cost=" + Long.toString(partitionCost));
     LOG.debug("lhs card=" + Long.toString(lhsTree.getCardinality()) + " row_size="

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
index ec3c7a2..b5b36a0 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
@@ -189,13 +189,13 @@ and (b.double_col * c.tinyint_col > 1000 or c.tinyint_col < 1000)
 02:SCAN HDFS [functional.alltypesaggnonulls c]
    partitions=2/10 files=2 size=148.10KB
 ---- DISTRIBUTEDPLAN
-08:EXCHANGE [UNPARTITIONED]
+09:EXCHANGE [UNPARTITIONED]
 |
-04:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+04:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
 |  hash predicates: c.id = a.id, c.string_col = b.string_col
 |  other predicates: a.tinyint_col = 15, b.string_col = '15', a.day >= 6, b.month > 2, a.tinyint_col + b.tinyint_col < 15, a.float_col - c.double_col < 0, (b.double_col * c.tinyint_col > 1000 OR c.tinyint_col < 1000)
 |
-|--07:EXCHANGE [BROADCAST]
+|--08:EXCHANGE [HASH(a.id,b.string_col)]
 |  |
 |  03:HASH JOIN [FULL OUTER JOIN, PARTITIONED]
 |  |  hash predicates: a.id = b.id, a.int_col = b.int_col
@@ -212,6 +212,8 @@ and (b.double_col * c.tinyint_col > 1000 or c.tinyint_col < 1000)
 |     partitions=5/11 files=5 size=372.38KB
 |     predicates: a.tinyint_col = 15
 |
+07:EXCHANGE [HASH(c.id,c.string_col)]
+|
 02:SCAN HDFS [functional.alltypesaggnonulls c]
    partitions=2/10 files=2 size=148.10KB
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
index 334f7b2..492f7a4 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
@@ -4715,19 +4715,19 @@ select * from (
    partitions=120/120 files=120 size=21.31MB
    runtime filters: RF002 -> s.ss_item_sk, RF003 -> s.ss_sold_date_sk, RF005 -> s.ss_customer_sk
 ---- DISTRIBUTEDPLAN
-30:MERGING-EXCHANGE [UNPARTITIONED]
+31:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: count(*) ASC
 |  limit: 100
 |
 16:TOP-N [LIMIT=100]
 |  order by: count(*) ASC
 |
-29:AGGREGATE [FINALIZE]
+30:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
 |  group by: a.ca_state
 |  having: count(*) >= 10
 |
-28:EXCHANGE [HASH(a.ca_state)]
+29:EXCHANGE [HASH(a.ca_state)]
 |
 15:AGGREGATE [STREAMING]
 |  output: count(*)
@@ -4738,13 +4738,13 @@ select * from (
 |  other join predicates: i.i_current_price > 1.2 * avg(j.i_current_price)
 |  runtime filters: RF000 <- j.i_category
 |
-|--27:EXCHANGE [BROADCAST]
+|--28:EXCHANGE [BROADCAST]
 |  |
-|  26:AGGREGATE [FINALIZE]
+|  27:AGGREGATE [FINALIZE]
 |  |  output: avg:merge(j.i_current_price)
 |  |  group by: j.i_category
 |  |
-|  25:EXCHANGE [HASH(j.i_category)]
+|  26:EXCHANGE [HASH(j.i_category)]
 |  |
 |  08:AGGREGATE [STREAMING]
 |  |  output: avg(j.i_current_price)
@@ -4757,16 +4757,16 @@ select * from (
 |  hash predicates: d.d_month_seq = (d_month_seq)
 |  runtime filters: RF001 <- (d_month_seq)
 |
-|--24:EXCHANGE [BROADCAST]
+|--25:EXCHANGE [BROADCAST]
 |  |
-|  23:EXCHANGE [UNPARTITIONED]
+|  24:EXCHANGE [UNPARTITIONED]
 |  |  limit: 1
 |  |
-|  22:AGGREGATE [FINALIZE]
+|  23:AGGREGATE [FINALIZE]
 |  |  group by: (d_month_seq)
 |  |  limit: 1
 |  |
-|  21:EXCHANGE [HASH((d_month_seq))]
+|  22:EXCHANGE [HASH((d_month_seq))]
 |  |
 |  06:AGGREGATE [STREAMING]
 |  |  group by: (d_month_seq)
@@ -4779,7 +4779,7 @@ select * from (
 |  hash predicates: s.ss_item_sk = i.i_item_sk
 |  runtime filters: RF002 <- i.i_item_sk
 |
-|--20:EXCHANGE [BROADCAST]
+|--21:EXCHANGE [BROADCAST]
 |  |
 |  04:SCAN HDFS [tpcds.item i]
 |     partitions=1/1 files=1 size=4.82MB
@@ -4789,7 +4789,7 @@ select * from (
 |  hash predicates: s.ss_sold_date_sk = d.d_date_sk
 |  runtime filters: RF003 <- d.d_date_sk
 |
-|--19:EXCHANGE [BROADCAST]
+|--20:EXCHANGE [BROADCAST]
 |  |
 |  03:SCAN HDFS [tpcds.date_dim d]
 |     partitions=1/1 files=1 size=9.84MB
@@ -4799,38 +4799,40 @@ select * from (
 |  hash predicates: c.c_current_addr_sk = a.ca_address_sk
 |  runtime filters: RF004 <- a.ca_address_sk
 |
-|--18:EXCHANGE [BROADCAST]
+|--19:EXCHANGE [BROADCAST]
 |  |
 |  00:SCAN HDFS [tpcds.customer_address a]
 |     partitions=1/1 files=1 size=5.25MB
 |
-09:HASH JOIN [INNER JOIN, BROADCAST]
+09:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: s.ss_customer_sk = c.c_customer_sk
 |  runtime filters: RF005 <- c.c_customer_sk
 |
-|--17:EXCHANGE [BROADCAST]
+|--18:EXCHANGE [HASH(c.c_customer_sk)]
 |  |
 |  01:SCAN HDFS [tpcds.customer c]
 |     partitions=1/1 files=1 size=12.60MB
 |     runtime filters: RF004 -> c.c_current_addr_sk
 |
+17:EXCHANGE [HASH(s.ss_customer_sk)]
+|
 02:SCAN HDFS [tpcds.store_sales s]
    partitions=120/120 files=120 size=21.31MB
    runtime filters: RF002 -> s.ss_item_sk, RF003 -> s.ss_sold_date_sk, RF005 -> s.ss_customer_sk
 ---- PARALLELPLANS
-30:MERGING-EXCHANGE [UNPARTITIONED]
+31:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: count(*) ASC
 |  limit: 100
 |
 16:TOP-N [LIMIT=100]
 |  order by: count(*) ASC
 |
-29:AGGREGATE [FINALIZE]
+30:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
 |  group by: a.ca_state
 |  having: count(*) >= 10
 |
-28:EXCHANGE [HASH(a.ca_state)]
+29:EXCHANGE [HASH(a.ca_state)]
 |
 15:AGGREGATE [STREAMING]
 |  output: count(*)
@@ -4845,13 +4847,13 @@ select * from (
 |  |  join-table-id=00 plan-id=01 cohort-id=01
 |  |  build expressions: j.i_category
 |  |
-|  27:EXCHANGE [BROADCAST]
+|  28:EXCHANGE [BROADCAST]
 |  |
-|  26:AGGREGATE [FINALIZE]
+|  27:AGGREGATE [FINALIZE]
 |  |  output: avg:merge(j.i_current_price)
 |  |  group by: j.i_category
 |  |
-|  25:EXCHANGE [HASH(j.i_category)]
+|  26:EXCHANGE [HASH(j.i_category)]
 |  |
 |  08:AGGREGATE [STREAMING]
 |  |  output: avg(j.i_current_price)
@@ -4868,16 +4870,16 @@ select * from (
 |  |  join-table-id=01 plan-id=02 cohort-id=01
 |  |  build expressions: (d_month_seq)
 |  |
-|  24:EXCHANGE [BROADCAST]
+|  25:EXCHANGE [BROADCAST]
 |  |
-|  23:EXCHANGE [UNPARTITIONED]
+|  24:EXCHANGE [UNPARTITIONED]
 |  |  limit: 1
 |  |
-|  22:AGGREGATE [FINALIZE]
+|  23:AGGREGATE [FINALIZE]
 |  |  group by: (d_month_seq)
 |  |  limit: 1
 |  |
-|  21:EXCHANGE [HASH((d_month_seq))]
+|  22:EXCHANGE [HASH((d_month_seq))]
 |  |
 |  06:AGGREGATE [STREAMING]
 |  |  group by: (d_month_seq)
@@ -4894,7 +4896,7 @@ select * from (
 |  |  join-table-id=02 plan-id=03 cohort-id=01
 |  |  build expressions: i.i_item_sk
 |  |
-|  20:EXCHANGE [BROADCAST]
+|  21:EXCHANGE [BROADCAST]
 |  |
 |  04:SCAN HDFS [tpcds.item i]
 |     partitions=1/1 files=1 size=4.82MB
@@ -4908,7 +4910,7 @@ select * from (
 |  |  join-table-id=03 plan-id=04 cohort-id=01
 |  |  build expressions: d.d_date_sk
 |  |
-|  19:EXCHANGE [BROADCAST]
+|  20:EXCHANGE [BROADCAST]
 |  |
 |  03:SCAN HDFS [tpcds.date_dim d]
 |     partitions=1/1 files=1 size=9.84MB
@@ -4922,12 +4924,12 @@ select * from (
 |  |  join-table-id=04 plan-id=05 cohort-id=01
 |  |  build expressions: a.ca_address_sk
 |  |
-|  18:EXCHANGE [BROADCAST]
+|  19:EXCHANGE [BROADCAST]
 |  |
 |  00:SCAN HDFS [tpcds.customer_address a]
 |     partitions=1/1 files=1 size=5.25MB
 |
-09:HASH JOIN [INNER JOIN, BROADCAST]
+09:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: s.ss_customer_sk = c.c_customer_sk
 |  runtime filters: RF005 <- c.c_customer_sk
 |
@@ -4935,12 +4937,14 @@ select * from (
 |  |  join-table-id=05 plan-id=06 cohort-id=01
 |  |  build expressions: c.c_customer_sk
 |  |
-|  17:EXCHANGE [BROADCAST]
+|  18:EXCHANGE [HASH(c.c_customer_sk)]
 |  |
 |  01:SCAN HDFS [tpcds.customer c]
 |     partitions=1/1 files=1 size=12.60MB
 |     runtime filters: RF004 -> c.c_current_addr_sk
 |
+17:EXCHANGE [HASH(s.ss_customer_sk)]
+|
 02:SCAN HDFS [tpcds.store_sales s]
    partitions=120/120 files=120 size=21.31MB
    runtime filters: RF002 -> s.ss_item_sk, RF003 -> s.ss_sold_date_sk, RF005 -> s.ss_customer_sk

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
index 4e7851f..7864153 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
@@ -1343,17 +1343,17 @@ order by
    partitions=1/1 files=1 size=23.08MB
    runtime filters: RF002 -> c_nationkey, RF003 -> c_custkey
 ---- DISTRIBUTEDPLAN
-26:MERGING-EXCHANGE [UNPARTITIONED]
+28:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: o_year ASC
 |
 16:SORT
 |  order by: o_year ASC
 |
-25:AGGREGATE [FINALIZE]
+27:AGGREGATE [FINALIZE]
 |  output: sum:merge(CASE WHEN nation = 'BRAZIL' THEN volume ELSE 0 END), sum:merge(volume)
 |  group by: o_year
 |
-24:EXCHANGE [HASH(o_year)]
+26:EXCHANGE [HASH(o_year)]
 |
 15:AGGREGATE [STREAMING]
 |  output: sum(CASE WHEN n2.n_name = 'BRAZIL' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum(l_extendedprice * (1 - l_discount))
@@ -1363,7 +1363,7 @@ order by
 |  hash predicates: s_nationkey = n2.n_nationkey
 |  runtime filters: RF000 <- n2.n_nationkey
 |
-|--23:EXCHANGE [BROADCAST]
+|--25:EXCHANGE [BROADCAST]
 |  |
 |  06:SCAN HDFS [tpch.nation n2]
 |     partitions=1/1 files=1 size=2.15KB
@@ -1372,7 +1372,7 @@ order by
 |  hash predicates: n1.n_regionkey = r_regionkey
 |  runtime filters: RF001 <- r_regionkey
 |
-|--22:EXCHANGE [BROADCAST]
+|--24:EXCHANGE [BROADCAST]
 |  |
 |  07:SCAN HDFS [tpch.region]
 |     partitions=1/1 files=1 size=384B
@@ -1382,7 +1382,7 @@ order by
 |  hash predicates: c_nationkey = n1.n_nationkey
 |  runtime filters: RF002 <- n1.n_nationkey
 |
-|--21:EXCHANGE [BROADCAST]
+|--23:EXCHANGE [BROADCAST]
 |  |
 |  05:SCAN HDFS [tpch.nation n1]
 |     partitions=1/1 files=1 size=2.15KB
@@ -1392,23 +1392,25 @@ order by
 |  hash predicates: c_custkey = o_custkey
 |  runtime filters: RF003 <- o_custkey
 |
-|--20:EXCHANGE [BROADCAST]
+|--22:EXCHANGE [BROADCAST]
 |  |
-|  10:HASH JOIN [INNER JOIN, BROADCAST]
+|  10:HASH JOIN [INNER JOIN, PARTITIONED]
 |  |  hash predicates: l_suppkey = s_suppkey
 |  |  runtime filters: RF004 <- s_suppkey
 |  |
-|  |--19:EXCHANGE [BROADCAST]
+|  |--21:EXCHANGE [HASH(s_suppkey)]
 |  |  |
 |  |  01:SCAN HDFS [tpch.supplier]
 |  |     partitions=1/1 files=1 size=1.33MB
 |  |     runtime filters: RF000 -> s_nationkey
 |  |
-|  09:HASH JOIN [INNER JOIN, BROADCAST]
+|  20:EXCHANGE [HASH(l_suppkey)]
+|  |
+|  09:HASH JOIN [INNER JOIN, PARTITIONED]
 |  |  hash predicates: o_orderkey = l_orderkey
 |  |  runtime filters: RF005 <- l_orderkey
 |  |
-|  |--18:EXCHANGE [BROADCAST]
+|  |--19:EXCHANGE [HASH(l_orderkey)]
 |  |  |
 |  |  08:HASH JOIN [INNER JOIN, BROADCAST]
 |  |  |  hash predicates: l_partkey = p_partkey
@@ -1424,6 +1426,8 @@ order by
 |  |     partitions=1/1 files=1 size=718.94MB
 |  |     runtime filters: RF004 -> l_suppkey, RF006 -> l_partkey
 |  |
+|  18:EXCHANGE [HASH(o_orderkey)]
+|  |
 |  03:SCAN HDFS [tpch.orders]
 |     partitions=1/1 files=1 size=162.56MB
 |     predicates: o_orderdate >= '1995-01-01', o_orderdate <= '1996-12-31'
@@ -1433,17 +1437,17 @@ order by
    partitions=1/1 files=1 size=23.08MB
    runtime filters: RF002 -> c_nationkey, RF003 -> c_custkey
 ---- PARALLELPLANS
-26:MERGING-EXCHANGE [UNPARTITIONED]
+28:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: o_year ASC
 |
 16:SORT
 |  order by: o_year ASC
 |
-25:AGGREGATE [FINALIZE]
+27:AGGREGATE [FINALIZE]
 |  output: sum:merge(CASE WHEN nation = 'BRAZIL' THEN volume ELSE 0 END), sum:merge(volume)
 |  group by: o_year
 |
-24:EXCHANGE [HASH(o_year)]
+26:EXCHANGE [HASH(o_year)]
 |
 15:AGGREGATE [STREAMING]
 |  output: sum(CASE WHEN n2.n_name = 'BRAZIL' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum(l_extendedprice * (1 - l_discount))
@@ -1457,7 +1461,7 @@ order by
 |  |  join-table-id=00 plan-id=01 cohort-id=01
 |  |  build expressions: n2.n_nationkey
 |  |
-|  23:EXCHANGE [BROADCAST]
+|  25:EXCHANGE [BROADCAST]
 |  |
 |  06:SCAN HDFS [tpch.nation n2]
 |     partitions=1/1 files=1 size=2.15KB
@@ -1470,7 +1474,7 @@ order by
 |  |  join-table-id=01 plan-id=02 cohort-id=01
 |  |  build expressions: r_regionkey
 |  |
-|  22:EXCHANGE [BROADCAST]
+|  24:EXCHANGE [BROADCAST]
 |  |
 |  07:SCAN HDFS [tpch.region]
 |     partitions=1/1 files=1 size=384B
@@ -1484,7 +1488,7 @@ order by
 |  |  join-table-id=02 plan-id=03 cohort-id=01
 |  |  build expressions: n1.n_nationkey
 |  |
-|  21:EXCHANGE [BROADCAST]
+|  23:EXCHANGE [BROADCAST]
 |  |
 |  05:SCAN HDFS [tpch.nation n1]
 |     partitions=1/1 files=1 size=2.15KB
@@ -1498,9 +1502,9 @@ order by
 |  |  join-table-id=03 plan-id=04 cohort-id=01
 |  |  build expressions: o_custkey
 |  |
-|  20:EXCHANGE [BROADCAST]
+|  22:EXCHANGE [BROADCAST]
 |  |
-|  10:HASH JOIN [INNER JOIN, BROADCAST]
+|  10:HASH JOIN [INNER JOIN, PARTITIONED]
 |  |  hash predicates: l_suppkey = s_suppkey
 |  |  runtime filters: RF004 <- s_suppkey
 |  |
@@ -1508,13 +1512,15 @@ order by
 |  |  |  join-table-id=04 plan-id=05 cohort-id=02
 |  |  |  build expressions: s_suppkey
 |  |  |
-|  |  19:EXCHANGE [BROADCAST]
+|  |  21:EXCHANGE [HASH(s_suppkey)]
 |  |  |
 |  |  01:SCAN HDFS [tpch.supplier]
 |  |     partitions=1/1 files=1 size=1.33MB
 |  |     runtime filters: RF000 -> s_nationkey
 |  |
-|  09:HASH JOIN [INNER JOIN, BROADCAST]
+|  20:EXCHANGE [HASH(l_suppkey)]
+|  |
+|  09:HASH JOIN [INNER JOIN, PARTITIONED]
 |  |  hash predicates: o_orderkey = l_orderkey
 |  |  runtime filters: RF005 <- l_orderkey
 |  |
@@ -1522,7 +1528,7 @@ order by
 |  |  |  join-table-id=05 plan-id=06 cohort-id=02
 |  |  |  build expressions: l_orderkey
 |  |  |
-|  |  18:EXCHANGE [BROADCAST]
+|  |  19:EXCHANGE [HASH(l_orderkey)]
 |  |  |
 |  |  08:HASH JOIN [INNER JOIN, BROADCAST]
 |  |  |  hash predicates: l_partkey = p_partkey
@@ -1542,6 +1548,8 @@ order by
 |  |     partitions=1/1 files=1 size=718.94MB
 |  |     runtime filters: RF004 -> l_suppkey, RF006 -> l_partkey
 |  |
+|  18:EXCHANGE [HASH(o_orderkey)]
+|  |
 |  03:SCAN HDFS [tpch.orders]
 |     partitions=1/1 files=1 size=162.56MB
 |     predicates: o_orderdate >= '1995-01-01', o_orderdate <= '1996-12-31'
@@ -2292,53 +2300,55 @@ order by
    partitions=1/1 files=1 size=162.56MB
    runtime filters: RF000 -> o_orderkey
 ---- DISTRIBUTEDPLAN
-08:MERGING-EXCHANGE [UNPARTITIONED]
+09:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: l_shipmode ASC
 |
 04:SORT
 |  order by: l_shipmode ASC
 |
-07:AGGREGATE [FINALIZE]
+08:AGGREGATE [FINALIZE]
 |  output: sum:merge(CASE WHEN o_orderpriority = '1-URGENT' OR o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END), sum:merge(CASE WHEN o_orderpriority != '1-URGENT' AND o_orderpriority != '2-HIGH' THEN 1 ELSE 0 END)
 |  group by: l_shipmode
 |
-06:EXCHANGE [HASH(l_shipmode)]
+07:EXCHANGE [HASH(l_shipmode)]
 |
 03:AGGREGATE [STREAMING]
 |  output: sum(CASE WHEN o_orderpriority = '1-URGENT' OR o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END), sum(CASE WHEN o_orderpriority != '1-URGENT' AND o_orderpriority != '2-HIGH' THEN 1 ELSE 0 END)
 |  group by: l_shipmode
 |
-02:HASH JOIN [INNER JOIN, BROADCAST]
+02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: o_orderkey = l_orderkey
 |  runtime filters: RF000 <- l_orderkey
 |
-|--05:EXCHANGE [BROADCAST]
+|--06:EXCHANGE [HASH(l_orderkey)]
 |  |
 |  01:SCAN HDFS [tpch.lineitem]
 |     partitions=1/1 files=1 size=718.94MB
 |     predicates: l_shipmode IN ('MAIL', 'SHIP'), l_commitdate < l_receiptdate, l_shipdate < l_commitdate, l_receiptdate >= '1994-01-01', l_receiptdate < '1995-01-01'
 |
+05:EXCHANGE [HASH(o_orderkey)]
+|
 00:SCAN HDFS [tpch.orders]
    partitions=1/1 files=1 size=162.56MB
    runtime filters: RF000 -> o_orderkey
 ---- PARALLELPLANS
-08:MERGING-EXCHANGE [UNPARTITIONED]
+09:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: l_shipmode ASC
 |
 04:SORT
 |  order by: l_shipmode ASC
 |
-07:AGGREGATE [FINALIZE]
+08:AGGREGATE [FINALIZE]
 |  output: sum:merge(CASE WHEN o_orderpriority = '1-URGENT' OR o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END), sum:merge(CASE WHEN o_orderpriority != '1-URGENT' AND o_orderpriority != '2-HIGH' THEN 1 ELSE 0 END)
 |  group by: l_shipmode
 |
-06:EXCHANGE [HASH(l_shipmode)]
+07:EXCHANGE [HASH(l_shipmode)]
 |
 03:AGGREGATE [STREAMING]
 |  output: sum(CASE WHEN o_orderpriority = '1-URGENT' OR o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END), sum(CASE WHEN o_orderpriority != '1-URGENT' AND o_orderpriority != '2-HIGH' THEN 1 ELSE 0 END)
 |  group by: l_shipmode
 |
-02:HASH JOIN [INNER JOIN, BROADCAST]
+02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: o_orderkey = l_orderkey
 |  runtime filters: RF000 <- l_orderkey
 |
@@ -2346,12 +2356,14 @@ order by
 |  |  join-table-id=00 plan-id=01 cohort-id=01
 |  |  build expressions: l_orderkey
 |  |
-|  05:EXCHANGE [BROADCAST]
+|  06:EXCHANGE [HASH(l_orderkey)]
 |  |
 |  01:SCAN HDFS [tpch.lineitem]
 |     partitions=1/1 files=1 size=718.94MB
 |     predicates: l_shipmode IN ('MAIL', 'SHIP'), l_commitdate < l_receiptdate, l_shipdate < l_commitdate, l_receiptdate >= '1994-01-01', l_receiptdate < '1995-01-01'
 |
+05:EXCHANGE [HASH(o_orderkey)]
+|
 00:SCAN HDFS [tpch.orders]
    partitions=1/1 files=1 size=162.56MB
    runtime filters: RF000 -> o_orderkey
@@ -2509,37 +2521,39 @@ where
    predicates: l_shipdate >= '1995-09-01', l_shipdate < '1995-10-01'
    runtime filters: RF000 -> l_partkey
 ---- DISTRIBUTEDPLAN
-06:AGGREGATE [FINALIZE]
+07:AGGREGATE [FINALIZE]
 |  output: sum:merge(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum:merge(l_extendedprice * (1 - l_discount))
 |
-05:EXCHANGE [UNPARTITIONED]
+06:EXCHANGE [UNPARTITIONED]
 |
 03:AGGREGATE
 |  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum(l_extendedprice * (1 - l_discount))
 |
-02:HASH JOIN [INNER JOIN, BROADCAST]
+02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: l_partkey = p_partkey
 |  runtime filters: RF000 <- p_partkey
 |
-|--04:EXCHANGE [BROADCAST]
+|--05:EXCHANGE [HASH(p_partkey)]
 |  |
 |  01:SCAN HDFS [tpch.part]
 |     partitions=1/1 files=1 size=22.83MB
 |
+04:EXCHANGE [HASH(l_partkey)]
+|
 00:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
    predicates: l_shipdate >= '1995-09-01', l_shipdate < '1995-10-01'
    runtime filters: RF000 -> l_partkey
 ---- PARALLELPLANS
-06:AGGREGATE [FINALIZE]
+07:AGGREGATE [FINALIZE]
 |  output: sum:merge(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum:merge(l_extendedprice * (1 - l_discount))
 |
-05:EXCHANGE [UNPARTITIONED]
+06:EXCHANGE [UNPARTITIONED]
 |
 03:AGGREGATE
 |  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum(l_extendedprice * (1 - l_discount))
 |
-02:HASH JOIN [INNER JOIN, BROADCAST]
+02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: l_partkey = p_partkey
 |  runtime filters: RF000 <- p_partkey
 |
@@ -2547,11 +2561,13 @@ where
 |  |  join-table-id=00 plan-id=01 cohort-id=01
 |  |  build expressions: p_partkey
 |  |
-|  04:EXCHANGE [BROADCAST]
+|  05:EXCHANGE [HASH(p_partkey)]
 |  |
 |  01:SCAN HDFS [tpch.part]
 |     partitions=1/1 files=1 size=22.83MB
 |
+04:EXCHANGE [HASH(l_partkey)]
+|
 00:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
    predicates: l_shipdate >= '1995-09-01', l_shipdate < '1995-10-01'
@@ -3698,13 +3714,11 @@ limit 100
 |  |  other join predicates: l2.l_suppkey != l1.l_suppkey
 |  |  runtime filters: RF000 <- l1.l_orderkey
 |  |
-|  |--17:EXCHANGE [HASH(l1.l_orderkey)]
-|  |  |
-|  |  08:HASH JOIN [INNER JOIN, BROADCAST]
+|  |--08:HASH JOIN [INNER JOIN, BROADCAST]
 |  |  |  hash predicates: s_nationkey = n_nationkey
 |  |  |  runtime filters: RF001 <- n_nationkey
 |  |  |
-|  |  |--15:EXCHANGE [BROADCAST]
+|  |  |--16:EXCHANGE [BROADCAST]
 |  |  |  |
 |  |  |  03:SCAN HDFS [tpch.nation]
 |  |  |     partitions=1/1 files=1 size=2.15KB
@@ -3714,28 +3728,30 @@ limit 100
 |  |  |  hash predicates: l1.l_suppkey = s_suppkey
 |  |  |  runtime filters: RF002 <- s_suppkey
 |  |  |
-|  |  |--14:EXCHANGE [BROADCAST]
+|  |  |--15:EXCHANGE [BROADCAST]
 |  |  |  |
 |  |  |  00:SCAN HDFS [tpch.supplier]
 |  |  |     partitions=1/1 files=1 size=1.33MB
 |  |  |     runtime filters: RF001 -> s_nationkey
 |  |  |
-|  |  06:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  06:HASH JOIN [INNER JOIN, PARTITIONED]
 |  |  |  hash predicates: l1.l_orderkey = o_orderkey
 |  |  |  runtime filters: RF003 <- o_orderkey
 |  |  |
-|  |  |--13:EXCHANGE [BROADCAST]
+|  |  |--14:EXCHANGE [HASH(o_orderkey)]
 |  |  |  |
 |  |  |  02:SCAN HDFS [tpch.orders]
 |  |  |     partitions=1/1 files=1 size=162.56MB
 |  |  |     predicates: o_orderstatus = 'F'
 |  |  |
+|  |  13:EXCHANGE [HASH(l1.l_orderkey)]
+|  |  |
 |  |  01:SCAN HDFS [tpch.lineitem l1]
 |  |     partitions=1/1 files=1 size=718.94MB
 |  |     predicates: l1.l_receiptdate > l1.l_commitdate
 |  |     runtime filters: RF002 -> l1.l_suppkey, RF003 -> l1.l_orderkey
 |  |
-|  16:EXCHANGE [HASH(l2.l_orderkey)]
+|  17:EXCHANGE [HASH(l2.l_orderkey)]
 |  |
 |  04:SCAN HDFS [tpch.lineitem l2]
 |     partitions=1/1 files=1 size=718.94MB
@@ -3781,8 +3797,6 @@ limit 100
 |  |  |  join-table-id=01 plan-id=02 cohort-id=02
 |  |  |  build expressions: l1.l_orderkey
 |  |  |
-|  |  17:EXCHANGE [HASH(l1.l_orderkey)]
-|  |  |
 |  |  08:HASH JOIN [INNER JOIN, BROADCAST]
 |  |  |  hash predicates: s_nationkey = n_nationkey
 |  |  |  runtime filters: RF001 <- n_nationkey
@@ -3791,7 +3805,7 @@ limit 100
 |  |  |  |  join-table-id=02 plan-id=03 cohort-id=03
 |  |  |  |  build expressions: n_nationkey
 |  |  |  |
-|  |  |  15:EXCHANGE [BROADCAST]
+|  |  |  16:EXCHANGE [BROADCAST]
 |  |  |  |
 |  |  |  03:SCAN HDFS [tpch.nation]
 |  |  |     partitions=1/1 files=1 size=2.15KB
@@ -3805,13 +3819,13 @@ limit 100
 |  |  |  |  join-table-id=03 plan-id=04 cohort-id=03
 |  |  |  |  build expressions: s_suppkey
 |  |  |  |
-|  |  |  14:EXCHANGE [BROADCAST]
+|  |  |  15:EXCHANGE [BROADCAST]
 |  |  |  |
 |  |  |  00:SCAN HDFS [tpch.supplier]
 |  |  |     partitions=1/1 files=1 size=1.33MB
 |  |  |     runtime filters: RF001 -> s_nationkey
 |  |  |
-|  |  06:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  06:HASH JOIN [INNER JOIN, PARTITIONED]
 |  |  |  hash predicates: l1.l_orderkey = o_orderkey
 |  |  |  runtime filters: RF003 <- o_orderkey
 |  |  |
@@ -3819,18 +3833,20 @@ limit 100
 |  |  |  |  join-table-id=04 plan-id=05 cohort-id=03
 |  |  |  |  build expressions: o_orderkey
 |  |  |  |
-|  |  |  13:EXCHANGE [BROADCAST]
+|  |  |  14:EXCHANGE [HASH(o_orderkey)]
 |  |  |  |
 |  |  |  02:SCAN HDFS [tpch.orders]
 |  |  |     partitions=1/1 files=1 size=162.56MB
 |  |  |     predicates: o_orderstatus = 'F'
 |  |  |
+|  |  13:EXCHANGE [HASH(l1.l_orderkey)]
+|  |  |
 |  |  01:SCAN HDFS [tpch.lineitem l1]
 |  |     partitions=1/1 files=1 size=718.94MB
 |  |     predicates: l1.l_receiptdate > l1.l_commitdate
 |  |     runtime filters: RF002 -> l1.l_suppkey, RF003 -> l1.l_orderkey
 |  |
-|  16:EXCHANGE [HASH(l2.l_orderkey)]
+|  17:EXCHANGE [HASH(l2.l_orderkey)]
 |  |
 |  04:SCAN HDFS [tpch.lineitem l2]
 |     partitions=1/1 files=1 size=718.94MB

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
index 1f0e315..4354814 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
@@ -1887,14 +1887,12 @@ where
 06:AGGREGATE
 |  output: sum(l_extendedprice)
 |
-05:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+05:HASH JOIN [LEFT SEMI JOIN, PARTITIONED]
 |  hash predicates: p_partkey = l_partkey
 |  other join predicates: l_quantity < 0.2 * avg(l_quantity)
 |  runtime filters: RF000 <- l_partkey
 |
-|--10:EXCHANGE [BROADCAST]
-|  |
-|  09:AGGREGATE [FINALIZE]
+|--09:AGGREGATE [FINALIZE]
 |  |  output: avg:merge(l_quantity)
 |  |  group by: l_partkey
 |  |
@@ -1907,6 +1905,8 @@ where
 |  02:SCAN HDFS [tpch_nested_parquet.customer.c_orders.o_lineitems l]
 |     partitions=1/1 files=4 size=577.87MB
 |
+10:EXCHANGE [HASH(p_partkey)]
+|
 04:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: l_partkey = p_partkey
 |  runtime filters: RF001 <- p_partkey

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-planner/queries/PlannerTest/union.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/union.test b/testdata/workloads/functional-planner/queries/PlannerTest/union.test
index 1dfbdcc..7e9549a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/union.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/union.test
@@ -2469,7 +2469,7 @@ select 1000, 2000
 01:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
 ---- DISTRIBUTEDPLAN
-10:EXCHANGE [UNPARTITIONED]
+11:EXCHANGE [UNPARTITIONED]
 |
 00:UNION
 |  constant-operands=1
@@ -2477,15 +2477,17 @@ select 1000, 2000
 |--01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |
-|--06:HASH JOIN [INNER JOIN, BROADCAST]
+|--06:HASH JOIN [INNER JOIN, PARTITIONED]
 |  |  hash predicates: a.id = b.id
 |  |  runtime filters: RF000 <- b.id
 |  |
-|  |--09:EXCHANGE [BROADCAST]
+|  |--10:EXCHANGE [HASH(b.id)]
 |  |  |
 |  |  05:SCAN HDFS [functional.alltypestiny b]
 |  |     partitions=4/4 files=4 size=460B
 |  |
+|  09:EXCHANGE [HASH(a.id)]
+|  |
 |  04:SCAN HDFS [functional.alltypestiny a]
 |     partitions=4/4 files=4 size=460B
 |     runtime filters: RF000 -> a.id
@@ -2602,7 +2604,7 @@ select 1000, 2000
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
-19:EXCHANGE [UNPARTITIONED]
+20:EXCHANGE [UNPARTITIONED]
 |
 00:UNION
 |  constant-operands=1
@@ -2610,7 +2612,7 @@ select 1000, 2000
 |--05:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |
-|--18:EXCHANGE [RANDOM]
+|--19:EXCHANGE [RANDOM]
 |  |
 |  13:MERGING-EXCHANGE [UNPARTITIONED]
 |  |  order by: id ASC
@@ -2622,7 +2624,7 @@ select 1000, 2000
 |  03:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
 |
-|--17:EXCHANGE [RANDOM]
+|--18:EXCHANGE [RANDOM]
 |  |
 |  12:AGGREGATE [FINALIZE]
 |  |  output: count:merge(id), sum:merge(bigint_col)
@@ -2635,15 +2637,17 @@ select 1000, 2000
 |  01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |
-|--10:HASH JOIN [INNER JOIN, BROADCAST]
+|--10:HASH JOIN [INNER JOIN, PARTITIONED]
 |  |  hash predicates: a.id = b.id
 |  |  runtime filters: RF000 <- b.id
 |  |
-|  |--16:EXCHANGE [BROADCAST]
+|  |--17:EXCHANGE [HASH(b.id)]
 |  |  |
 |  |  09:SCAN HDFS [functional.alltypestiny b]
 |  |     partitions=4/4 files=4 size=460B
 |  |
+|  16:EXCHANGE [HASH(a.id)]
+|  |
 |  08:SCAN HDFS [functional.alltypestiny a]
 |     partitions=4/4 files=4 size=460B
 |     runtime filters: RF000 -> a.id

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-planner/queries/PlannerTest/views.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/views.test b/testdata/workloads/functional-planner/queries/PlannerTest/views.test
index 9836d90..50bee61 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/views.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/views.test
@@ -155,27 +155,27 @@ functional.complex_view t3 where t1.id = t2.x and t2.x = t3.abc
    predicates: functional.alltypes.id > 1
    runtime filters: RF000 -> functional.alltypes.id, RF001 -> functional.alltypes.id
 ---- DISTRIBUTEDPLAN
-15:EXCHANGE [UNPARTITIONED]
+16:EXCHANGE [UNPARTITIONED]
 |
 08:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: int_col = count(a.bigint_col)
 |  runtime filters: RF000 <- count(a.bigint_col)
 |
-|--14:EXCHANGE [BROADCAST]
+|--15:EXCHANGE [BROADCAST]
 |  |
-|  13:MERGING-EXCHANGE [UNPARTITIONED]
+|  14:MERGING-EXCHANGE [UNPARTITIONED]
 |  |  order by: b.string_col ASC
 |  |  limit: 100
 |  |
 |  06:TOP-N [LIMIT=100]
 |  |  order by: b.string_col ASC
 |  |
-|  12:AGGREGATE [FINALIZE]
+|  13:AGGREGATE [FINALIZE]
 |  |  output: count:merge(a.bigint_col)
 |  |  group by: b.string_col
 |  |  having: count(a.bigint_col) > 1
 |  |
-|  11:EXCHANGE [HASH(b.string_col)]
+|  12:EXCHANGE [HASH(b.string_col)]
 |  |
 |  05:AGGREGATE [STREAMING]
 |  |  output: count(a.bigint_col)
@@ -185,7 +185,7 @@ functional.complex_view t3 where t1.id = t2.x and t2.x = t3.abc
 |  |  hash predicates: a.id = b.id
 |  |  runtime filters: RF002 <- b.id
 |  |
-|  |--10:EXCHANGE [BROADCAST]
+|  |--11:EXCHANGE [BROADCAST]
 |  |  |
 |  |  03:SCAN HDFS [functional.alltypestiny b]
 |  |     partitions=4/4 files=4 size=460B
@@ -195,17 +195,19 @@ functional.complex_view t3 where t1.id = t2.x and t2.x = t3.abc
 |     predicates: a.bigint_col < 50
 |     runtime filters: RF002 -> a.id
 |
-07:HASH JOIN [INNER JOIN, BROADCAST]
+07:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: functional.alltypes.id = int_col
 |  runtime filters: RF001 <- int_col
 |
-|--09:EXCHANGE [BROADCAST]
+|--10:EXCHANGE [HASH(int_col)]
 |  |
 |  01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: functional.alltypes.int_col > 1
 |     runtime filters: RF000 -> int_col
 |
+09:EXCHANGE [HASH(functional.alltypes.id)]
+|
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    predicates: functional.alltypes.id > 1

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-query/queries/QueryTest/spilling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index e97afc5..91b425e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -203,7 +203,7 @@ select
   count(*) as numwait
 from
   supplier,
-  lineitem l1,
+  lineitem l1 join [BROADCAST]
   orders,
   nation
 where