You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/22 20:17:55 UTC

[1/2] incubator-impala git commit: IMPALA-5612: join inversion should factor in parallelism

Repository: incubator-impala
Updated Branches:
  refs/heads/master f9b222e92 -> e3075c39a


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e3075c39/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
index 2c6db60..f36751d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
@@ -1389,17 +1389,17 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-28:MERGING-EXCHANGE [UNPARTITIONED]
+29:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: o_year ASC
 |
 16:SORT
 |  order by: o_year ASC
 |
-27:AGGREGATE [FINALIZE]
+28:AGGREGATE [FINALIZE]
 |  output: sum:merge(CASE WHEN nation = 'BRAZIL' THEN volume ELSE 0 END), sum:merge(volume)
 |  group by: o_year
 |
-26:EXCHANGE [HASH(o_year)]
+27:EXCHANGE [HASH(o_year)]
 |
 15:AGGREGATE [STREAMING]
 |  output: sum(CASE WHEN n2.n_name = 'BRAZIL' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum(l_extendedprice * (1 - l_discount))
@@ -1409,7 +1409,7 @@ PLAN-ROOT SINK
 |  hash predicates: s_nationkey = n2.n_nationkey
 |  runtime filters: RF000 <- n2.n_nationkey
 |
-|--25:EXCHANGE [BROADCAST]
+|--26:EXCHANGE [BROADCAST]
 |  |
 |  06:SCAN HDFS [tpch.nation n2]
 |     partitions=1/1 files=1 size=2.15KB
@@ -1418,7 +1418,7 @@ PLAN-ROOT SINK
 |  hash predicates: n1.n_regionkey = r_regionkey
 |  runtime filters: RF001 <- r_regionkey
 |
-|--24:EXCHANGE [BROADCAST]
+|--25:EXCHANGE [BROADCAST]
 |  |
 |  07:SCAN HDFS [tpch.region]
 |     partitions=1/1 files=1 size=384B
@@ -1428,74 +1428,76 @@ PLAN-ROOT SINK
 |  hash predicates: c_nationkey = n1.n_nationkey
 |  runtime filters: RF002 <- n1.n_nationkey
 |
-|--23:EXCHANGE [BROADCAST]
+|--24:EXCHANGE [BROADCAST]
 |  |
 |  05:SCAN HDFS [tpch.nation n1]
 |     partitions=1/1 files=1 size=2.15KB
 |     runtime filters: RF001 -> n1.n_regionkey
 |
-11:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: c_custkey = o_custkey
-|  runtime filters: RF003 <- o_custkey
+11:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF003 <- c_custkey
 |
-|--22:EXCHANGE [BROADCAST]
-|  |
-|  10:HASH JOIN [INNER JOIN, PARTITIONED]
-|  |  hash predicates: l_suppkey = s_suppkey
-|  |  runtime filters: RF004 <- s_suppkey
-|  |
-|  |--21:EXCHANGE [HASH(s_suppkey)]
-|  |  |
-|  |  01:SCAN HDFS [tpch.supplier]
-|  |     partitions=1/1 files=1 size=1.33MB
-|  |     runtime filters: RF000 -> s_nationkey
-|  |
-|  20:EXCHANGE [HASH(l_suppkey)]
-|  |
-|  09:HASH JOIN [INNER JOIN, PARTITIONED]
-|  |  hash predicates: o_orderkey = l_orderkey
-|  |  runtime filters: RF005 <- l_orderkey
+|--23:EXCHANGE [HASH(c_custkey)]
 |  |
-|  |--19:EXCHANGE [HASH(l_orderkey)]
-|  |  |
-|  |  08:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  |  hash predicates: l_partkey = p_partkey
-|  |  |  runtime filters: RF006 <- p_partkey
-|  |  |
-|  |  |--17:EXCHANGE [BROADCAST]
-|  |  |  |
-|  |  |  00:SCAN HDFS [tpch.part]
-|  |  |     partitions=1/1 files=1 size=22.83MB
-|  |  |     predicates: p_type = 'ECONOMY ANODIZED STEEL'
-|  |  |
-|  |  02:SCAN HDFS [tpch.lineitem]
-|  |     partitions=1/1 files=1 size=718.94MB
-|  |     runtime filters: RF004 -> l_suppkey, RF006 -> l_partkey
+|  04:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     runtime filters: RF002 -> c_nationkey
+|
+22:EXCHANGE [HASH(o_custkey)]
+|
+10:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_suppkey = s_suppkey
+|  runtime filters: RF004 <- s_suppkey
+|
+|--21:EXCHANGE [HASH(s_suppkey)]
 |  |
-|  18:EXCHANGE [HASH(o_orderkey)]
+|  01:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF000 -> s_nationkey
+|
+20:EXCHANGE [HASH(l_suppkey)]
+|
+09:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF005 <- o_orderkey
+|
+|--19:EXCHANGE [HASH(o_orderkey)]
 |  |
 |  03:SCAN HDFS [tpch.orders]
 |     partitions=1/1 files=1 size=162.56MB
 |     predicates: o_orderdate <= '1996-12-31', o_orderdate >= '1995-01-01'
-|     runtime filters: RF005 -> o_orderkey
+|     runtime filters: RF003 -> o_custkey
 |
-04:SCAN HDFS [tpch.customer]
-   partitions=1/1 files=1 size=23.08MB
-   runtime filters: RF002 -> c_nationkey, RF003 -> c_custkey
+18:EXCHANGE [HASH(l_orderkey)]
+|
+08:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_partkey = p_partkey
+|  runtime filters: RF006 <- p_partkey
+|
+|--17:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN HDFS [tpch.part]
+|     partitions=1/1 files=1 size=22.83MB
+|     predicates: p_type = 'ECONOMY ANODIZED STEEL'
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF004 -> l_suppkey, RF005 -> l_orderkey, RF006 -> l_partkey
 ---- PARALLELPLANS
 PLAN-ROOT SINK
 |
-28:MERGING-EXCHANGE [UNPARTITIONED]
+29:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: o_year ASC
 |
 16:SORT
 |  order by: o_year ASC
 |
-27:AGGREGATE [FINALIZE]
+28:AGGREGATE [FINALIZE]
 |  output: sum:merge(CASE WHEN nation = 'BRAZIL' THEN volume ELSE 0 END), sum:merge(volume)
 |  group by: o_year
 |
-26:EXCHANGE [HASH(o_year)]
+27:EXCHANGE [HASH(o_year)]
 |
 15:AGGREGATE [STREAMING]
 |  output: sum(CASE WHEN n2.n_name = 'BRAZIL' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum(l_extendedprice * (1 - l_discount))
@@ -1509,7 +1511,7 @@ PLAN-ROOT SINK
 |  |  join-table-id=00 plan-id=01 cohort-id=01
 |  |  build expressions: n2.n_nationkey
 |  |
-|  25:EXCHANGE [BROADCAST]
+|  26:EXCHANGE [BROADCAST]
 |  |
 |  06:SCAN HDFS [tpch.nation n2]
 |     partitions=1/1 files=1 size=2.15KB
@@ -1522,7 +1524,7 @@ PLAN-ROOT SINK
 |  |  join-table-id=01 plan-id=02 cohort-id=01
 |  |  build expressions: r_regionkey
 |  |
-|  24:EXCHANGE [BROADCAST]
+|  25:EXCHANGE [BROADCAST]
 |  |
 |  07:SCAN HDFS [tpch.region]
 |     partitions=1/1 files=1 size=384B
@@ -1536,76 +1538,78 @@ PLAN-ROOT SINK
 |  |  join-table-id=02 plan-id=03 cohort-id=01
 |  |  build expressions: n1.n_nationkey
 |  |
-|  23:EXCHANGE [BROADCAST]
+|  24:EXCHANGE [BROADCAST]
 |  |
 |  05:SCAN HDFS [tpch.nation n1]
 |     partitions=1/1 files=1 size=2.15KB
 |     runtime filters: RF001 -> n1.n_regionkey
 |
-11:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: c_custkey = o_custkey
-|  runtime filters: RF003 <- o_custkey
+11:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF003 <- c_custkey
 |
 |--JOIN BUILD
 |  |  join-table-id=03 plan-id=04 cohort-id=01
-|  |  build expressions: o_custkey
-|  |
-|  22:EXCHANGE [BROADCAST]
-|  |
-|  10:HASH JOIN [INNER JOIN, PARTITIONED]
-|  |  hash predicates: l_suppkey = s_suppkey
-|  |  runtime filters: RF004 <- s_suppkey
+|  |  build expressions: c_custkey
 |  |
-|  |--JOIN BUILD
-|  |  |  join-table-id=04 plan-id=05 cohort-id=02
-|  |  |  build expressions: s_suppkey
-|  |  |
-|  |  21:EXCHANGE [HASH(s_suppkey)]
-|  |  |
-|  |  01:SCAN HDFS [tpch.supplier]
-|  |     partitions=1/1 files=1 size=1.33MB
-|  |     runtime filters: RF000 -> s_nationkey
+|  23:EXCHANGE [HASH(c_custkey)]
 |  |
-|  20:EXCHANGE [HASH(l_suppkey)]
+|  04:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     runtime filters: RF002 -> c_nationkey
+|
+22:EXCHANGE [HASH(o_custkey)]
+|
+10:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_suppkey = s_suppkey
+|  runtime filters: RF004 <- s_suppkey
+|
+|--JOIN BUILD
+|  |  join-table-id=04 plan-id=05 cohort-id=01
+|  |  build expressions: s_suppkey
 |  |
-|  09:HASH JOIN [INNER JOIN, PARTITIONED]
-|  |  hash predicates: o_orderkey = l_orderkey
-|  |  runtime filters: RF005 <- l_orderkey
+|  21:EXCHANGE [HASH(s_suppkey)]
 |  |
-|  |--JOIN BUILD
-|  |  |  join-table-id=05 plan-id=06 cohort-id=02
-|  |  |  build expressions: l_orderkey
-|  |  |
-|  |  19:EXCHANGE [HASH(l_orderkey)]
-|  |  |
-|  |  08:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  |  hash predicates: l_partkey = p_partkey
-|  |  |  runtime filters: RF006 <- p_partkey
-|  |  |
-|  |  |--JOIN BUILD
-|  |  |  |  join-table-id=06 plan-id=07 cohort-id=03
-|  |  |  |  build expressions: p_partkey
-|  |  |  |
-|  |  |  17:EXCHANGE [BROADCAST]
-|  |  |  |
-|  |  |  00:SCAN HDFS [tpch.part]
-|  |  |     partitions=1/1 files=1 size=22.83MB
-|  |  |     predicates: p_type = 'ECONOMY ANODIZED STEEL'
-|  |  |
-|  |  02:SCAN HDFS [tpch.lineitem]
-|  |     partitions=1/1 files=1 size=718.94MB
-|  |     runtime filters: RF004 -> l_suppkey, RF006 -> l_partkey
+|  01:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF000 -> s_nationkey
+|
+20:EXCHANGE [HASH(l_suppkey)]
+|
+09:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF005 <- o_orderkey
+|
+|--JOIN BUILD
+|  |  join-table-id=05 plan-id=06 cohort-id=01
+|  |  build expressions: o_orderkey
 |  |
-|  18:EXCHANGE [HASH(o_orderkey)]
+|  19:EXCHANGE [HASH(o_orderkey)]
 |  |
 |  03:SCAN HDFS [tpch.orders]
 |     partitions=1/1 files=1 size=162.56MB
 |     predicates: o_orderdate <= '1996-12-31', o_orderdate >= '1995-01-01'
-|     runtime filters: RF005 -> o_orderkey
+|     runtime filters: RF003 -> o_custkey
 |
-04:SCAN HDFS [tpch.customer]
-   partitions=1/1 files=1 size=23.08MB
-   runtime filters: RF002 -> c_nationkey, RF003 -> c_custkey
+18:EXCHANGE [HASH(l_orderkey)]
+|
+08:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_partkey = p_partkey
+|  runtime filters: RF006 <- p_partkey
+|
+|--JOIN BUILD
+|  |  join-table-id=06 plan-id=07 cohort-id=01
+|  |  build expressions: p_partkey
+|  |
+|  17:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN HDFS [tpch.part]
+|     partitions=1/1 files=1 size=22.83MB
+|     predicates: p_type = 'ECONOMY ANODIZED STEEL'
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF004 -> l_suppkey, RF005 -> l_orderkey, RF006 -> l_partkey
 ====
 # TPCH-Q9
 # Q9 - Product Type Measure Query
@@ -1931,18 +1935,18 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-14:MERGING-EXCHANGE [UNPARTITIONED]
+15:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: sum(l_extendedprice * (1 - l_discount)) DESC
 |  limit: 20
 |
 08:TOP-N [LIMIT=20]
 |  order by: sum(l_extendedprice * (1 - l_discount)) DESC
 |
-13:AGGREGATE [FINALIZE]
+14:AGGREGATE [FINALIZE]
 |  output: sum:merge(l_extendedprice * (1 - l_discount))
 |  group by: c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment
 |
-12:EXCHANGE [HASH(c_custkey,c_name,c_acctbal,c_phone,n_name,c_address,c_comment)]
+13:EXCHANGE [HASH(c_custkey,c_name,c_acctbal,c_phone,n_name,c_address,c_comment)]
 |
 07:AGGREGATE [STREAMING]
 |  output: sum(l_extendedprice * (1 - l_discount))
@@ -1952,50 +1956,53 @@ PLAN-ROOT SINK
 |  hash predicates: c_nationkey = n_nationkey
 |  runtime filters: RF000 <- n_nationkey
 |
-|--11:EXCHANGE [BROADCAST]
+|--12:EXCHANGE [BROADCAST]
 |  |
 |  03:SCAN HDFS [tpch.nation]
 |     partitions=1/1 files=1 size=2.15KB
 |
-05:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: c_custkey = o_custkey
-|  runtime filters: RF001 <- o_custkey
+05:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF001 <- c_custkey
 |
-|--10:EXCHANGE [BROADCAST]
-|  |
-|  04:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: l_orderkey = o_orderkey
-|  |  runtime filters: RF002 <- o_orderkey
+|--11:EXCHANGE [HASH(c_custkey)]
 |  |
-|  |--09:EXCHANGE [BROADCAST]
-|  |  |
-|  |  01:SCAN HDFS [tpch.orders]
-|  |     partitions=1/1 files=1 size=162.56MB
-|  |     predicates: o_orderdate < '1994-01-01', o_orderdate >= '1993-10-01'
+|  00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     runtime filters: RF000 -> c_nationkey
+|
+10:EXCHANGE [HASH(o_custkey)]
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF002 <- o_orderkey
+|
+|--09:EXCHANGE [BROADCAST]
 |  |
-|  02:SCAN HDFS [tpch.lineitem]
-|     partitions=1/1 files=1 size=718.94MB
-|     predicates: l_returnflag = 'R'
-|     runtime filters: RF002 -> l_orderkey
+|  01:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: o_orderdate < '1994-01-01', o_orderdate >= '1993-10-01'
+|     runtime filters: RF001 -> o_custkey
 |
-00:SCAN HDFS [tpch.customer]
-   partitions=1/1 files=1 size=23.08MB
-   runtime filters: RF000 -> c_nationkey, RF001 -> c_custkey
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_returnflag = 'R'
+   runtime filters: RF002 -> l_orderkey
 ---- PARALLELPLANS
 PLAN-ROOT SINK
 |
-14:MERGING-EXCHANGE [UNPARTITIONED]
+15:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: sum(l_extendedprice * (1 - l_discount)) DESC
 |  limit: 20
 |
 08:TOP-N [LIMIT=20]
 |  order by: sum(l_extendedprice * (1 - l_discount)) DESC
 |
-13:AGGREGATE [FINALIZE]
+14:AGGREGATE [FINALIZE]
 |  output: sum:merge(l_extendedprice * (1 - l_discount))
 |  group by: c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment
 |
-12:EXCHANGE [HASH(c_custkey,c_name,c_acctbal,c_phone,n_name,c_address,c_comment)]
+13:EXCHANGE [HASH(c_custkey,c_name,c_acctbal,c_phone,n_name,c_address,c_comment)]
 |
 07:AGGREGATE [STREAMING]
 |  output: sum(l_extendedprice * (1 - l_discount))
@@ -2009,43 +2016,46 @@ PLAN-ROOT SINK
 |  |  join-table-id=00 plan-id=01 cohort-id=01
 |  |  build expressions: n_nationkey
 |  |
-|  11:EXCHANGE [BROADCAST]
+|  12:EXCHANGE [BROADCAST]
 |  |
 |  03:SCAN HDFS [tpch.nation]
 |     partitions=1/1 files=1 size=2.15KB
 |
-05:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: c_custkey = o_custkey
-|  runtime filters: RF001 <- o_custkey
+05:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF001 <- c_custkey
 |
 |--JOIN BUILD
 |  |  join-table-id=01 plan-id=02 cohort-id=01
-|  |  build expressions: o_custkey
+|  |  build expressions: c_custkey
 |  |
-|  10:EXCHANGE [BROADCAST]
+|  11:EXCHANGE [HASH(c_custkey)]
 |  |
-|  04:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: l_orderkey = o_orderkey
-|  |  runtime filters: RF002 <- o_orderkey
+|  00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     runtime filters: RF000 -> c_nationkey
+|
+10:EXCHANGE [HASH(o_custkey)]
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF002 <- o_orderkey
+|
+|--JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: o_orderkey
 |  |
-|  |--JOIN BUILD
-|  |  |  join-table-id=02 plan-id=03 cohort-id=02
-|  |  |  build expressions: o_orderkey
-|  |  |
-|  |  09:EXCHANGE [BROADCAST]
-|  |  |
-|  |  01:SCAN HDFS [tpch.orders]
-|  |     partitions=1/1 files=1 size=162.56MB
-|  |     predicates: o_orderdate < '1994-01-01', o_orderdate >= '1993-10-01'
+|  09:EXCHANGE [BROADCAST]
 |  |
-|  02:SCAN HDFS [tpch.lineitem]
-|     partitions=1/1 files=1 size=718.94MB
-|     predicates: l_returnflag = 'R'
-|     runtime filters: RF002 -> l_orderkey
+|  01:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: o_orderdate < '1994-01-01', o_orderdate >= '1993-10-01'
+|     runtime filters: RF001 -> o_custkey
 |
-00:SCAN HDFS [tpch.customer]
-   partitions=1/1 files=1 size=23.08MB
-   runtime filters: RF000 -> c_nationkey, RF001 -> c_custkey
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_returnflag = 'R'
+   runtime filters: RF002 -> l_orderkey
 ====
 # TPCH-Q11
 # Q11 - Important Stock Identification
@@ -2387,20 +2397,20 @@ PLAN-ROOT SINK
 |  group by: l_shipmode
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
-|  hash predicates: o_orderkey = l_orderkey
-|  runtime filters: RF000 <- l_orderkey
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF000 <- o_orderkey
 |
-|--06:EXCHANGE [HASH(l_orderkey)]
+|--06:EXCHANGE [HASH(o_orderkey)]
 |  |
-|  01:SCAN HDFS [tpch.lineitem]
-|     partitions=1/1 files=1 size=718.94MB
-|     predicates: l_shipmode IN ('MAIL', 'SHIP'), l_commitdate < l_receiptdate, l_receiptdate < '1995-01-01', l_receiptdate >= '1994-01-01', l_shipdate < l_commitdate
+|  00:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
 |
-05:EXCHANGE [HASH(o_orderkey)]
+05:EXCHANGE [HASH(l_orderkey)]
 |
-00:SCAN HDFS [tpch.orders]
-   partitions=1/1 files=1 size=162.56MB
-   runtime filters: RF000 -> o_orderkey
+01:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_shipmode IN ('MAIL', 'SHIP'), l_commitdate < l_receiptdate, l_receiptdate < '1995-01-01', l_receiptdate >= '1994-01-01', l_shipdate < l_commitdate
+   runtime filters: RF000 -> l_orderkey
 ---- PARALLELPLANS
 PLAN-ROOT SINK
 |
@@ -2421,24 +2431,24 @@ PLAN-ROOT SINK
 |  group by: l_shipmode
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
-|  hash predicates: o_orderkey = l_orderkey
-|  runtime filters: RF000 <- l_orderkey
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF000 <- o_orderkey
 |
 |--JOIN BUILD
 |  |  join-table-id=00 plan-id=01 cohort-id=01
-|  |  build expressions: l_orderkey
+|  |  build expressions: o_orderkey
 |  |
-|  06:EXCHANGE [HASH(l_orderkey)]
+|  06:EXCHANGE [HASH(o_orderkey)]
 |  |
-|  01:SCAN HDFS [tpch.lineitem]
-|     partitions=1/1 files=1 size=718.94MB
-|     predicates: l_shipmode IN ('MAIL', 'SHIP'), l_commitdate < l_receiptdate, l_receiptdate < '1995-01-01', l_receiptdate >= '1994-01-01', l_shipdate < l_commitdate
+|  00:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
 |
-05:EXCHANGE [HASH(o_orderkey)]
+05:EXCHANGE [HASH(l_orderkey)]
 |
-00:SCAN HDFS [tpch.orders]
-   partitions=1/1 files=1 size=162.56MB
-   runtime filters: RF000 -> o_orderkey
+01:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_shipmode IN ('MAIL', 'SHIP'), l_commitdate < l_receiptdate, l_receiptdate < '1995-01-01', l_receiptdate >= '1994-01-01', l_shipdate < l_commitdate
+   runtime filters: RF000 -> l_orderkey
 ====
 # TPCH-Q13
 # Q13 - Customer Distribution Query
@@ -2760,29 +2770,29 @@ PLAN-ROOT SINK
 |     partitions=1/1 files=1 size=718.94MB
 |     predicates: l_shipdate < '1996-04-01', l_shipdate >= '1996-01-01'
 |
-06:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: s_suppkey = l_suppkey
-|  runtime filters: RF000 <- l_suppkey
+06:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_suppkey = s_suppkey
+|  runtime filters: RF000 <- s_suppkey
 |
-|--11:EXCHANGE [BROADCAST]
-|  |
-|  10:AGGREGATE [FINALIZE]
-|  |  output: sum:merge(l_extendedprice * (1 - l_discount))
-|  |  group by: l_suppkey
+|--11:EXCHANGE [HASH(s_suppkey)]
 |  |
-|  09:EXCHANGE [HASH(l_suppkey)]
-|  |
-|  02:AGGREGATE [STREAMING]
-|  |  output: sum(l_extendedprice * (1 - l_discount))
-|  |  group by: l_suppkey
-|  |
-|  01:SCAN HDFS [tpch.lineitem]
-|     partitions=1/1 files=1 size=718.94MB
-|     predicates: l_shipdate < '1996-04-01', l_shipdate >= '1996-01-01'
+|  00:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
 |
-00:SCAN HDFS [tpch.supplier]
-   partitions=1/1 files=1 size=1.33MB
-   runtime filters: RF000 -> s_suppkey
+10:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_extendedprice * (1 - l_discount))
+|  group by: l_suppkey
+|
+09:EXCHANGE [HASH(l_suppkey)]
+|
+02:AGGREGATE [STREAMING]
+|  output: sum(l_extendedprice * (1 - l_discount))
+|  group by: l_suppkey
+|
+01:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_shipdate < '1996-04-01', l_shipdate >= '1996-01-01'
+   runtime filters: RF000 -> tpch.lineitem.l_suppkey
 ---- PARALLELPLANS
 PLAN-ROOT SINK
 |
@@ -2823,33 +2833,33 @@ PLAN-ROOT SINK
 |     partitions=1/1 files=1 size=718.94MB
 |     predicates: l_shipdate < '1996-04-01', l_shipdate >= '1996-01-01'
 |
-06:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: s_suppkey = l_suppkey
-|  runtime filters: RF000 <- l_suppkey
+06:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_suppkey = s_suppkey
+|  runtime filters: RF000 <- s_suppkey
 |
 |--JOIN BUILD
 |  |  join-table-id=01 plan-id=02 cohort-id=01
-|  |  build expressions: l_suppkey
-|  |
-|  11:EXCHANGE [BROADCAST]
-|  |
-|  10:AGGREGATE [FINALIZE]
-|  |  output: sum:merge(l_extendedprice * (1 - l_discount))
-|  |  group by: l_suppkey
+|  |  build expressions: s_suppkey
 |  |
-|  09:EXCHANGE [HASH(l_suppkey)]
+|  11:EXCHANGE [HASH(s_suppkey)]
 |  |
-|  02:AGGREGATE [STREAMING]
-|  |  output: sum(l_extendedprice * (1 - l_discount))
-|  |  group by: l_suppkey
-|  |
-|  01:SCAN HDFS [tpch.lineitem]
-|     partitions=1/1 files=1 size=718.94MB
-|     predicates: l_shipdate < '1996-04-01', l_shipdate >= '1996-01-01'
+|  00:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
 |
-00:SCAN HDFS [tpch.supplier]
-   partitions=1/1 files=1 size=1.33MB
-   runtime filters: RF000 -> s_suppkey
+10:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_extendedprice * (1 - l_discount))
+|  group by: l_suppkey
+|
+09:EXCHANGE [HASH(l_suppkey)]
+|
+02:AGGREGATE [STREAMING]
+|  output: sum(l_extendedprice * (1 - l_discount))
+|  group by: l_suppkey
+|
+01:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_shipdate < '1996-04-01', l_shipdate >= '1996-01-01'
+   runtime filters: RF000 -> tpch.lineitem.l_suppkey
 ====
 # TPCH-Q16
 # Q16 - Parts/Supplier Relation Query


[2/2] incubator-impala git commit: IMPALA-5612: join inversion should factor in parallelism

Posted by ta...@apache.org.
IMPALA-5612: join inversion should factor in parallelism

The join inversion optimisation did not factor in the degree of
parallelism that the join executed with after inversion. In some cases
this lead to bad decisions, e.g. executing a join on a single node
instead of 20 nodes.

This patch adds a more sophisticated cost model that factors degree
of parallelism into the join inversion decision.

The behaviour is unchanged if inversion does not change the degree of
parallelism.

Perf:
Ran cluster TPC-H and TPC-DS benchmarks. Average changes were small:
< 3%. Saw a mix of improvements and regressions. We were satisfied
that the regressions were cases when the planner "got lucky" previously.
E.g. on TPC-H Q2 a join was flipped to put lineitem on the left as a
result of inaccurate cardinality estimates.

Mostafa also ran a TPC-DS benchmark where the dimension tables were
loaded with num_nodes=1 to minimise the number of files. We saw some
huge speedups there on the unmodified queries, e.g. TPCDS-Q10 went from
291s to 32.25s. The worst percentage regression was Q50, which went
from 1.61s to 2.4s and the worst absolute regression was Q72, which
went from 694s to 874s (25%).

Change-Id: Icacea4565ce25ef15aaab014684c9440dd501d4e
Reviewed-on: http://gerrit.cloudera.org:8080/7351
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e3075c39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e3075c39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e3075c39

Branch: refs/heads/master
Commit: e3075c39ac488ebbb59a2483c78ebb106d13d5b1
Parents: f9b222e
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Sat Jul 1 16:02:35 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Aug 22 19:42:05 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/impala/planner/Planner.java | 107 ++-
 .../queries/PlannerTest/inline-view.test        |  60 +-
 .../queries/PlannerTest/join-order.test         |  32 +-
 .../queries/PlannerTest/joins.test              | 117 +--
 .../queries/PlannerTest/kudu-update.test        |  14 +-
 .../queries/PlannerTest/order.test              |  30 +-
 .../queries/PlannerTest/tpcds-all.test          | 906 ++++++++++---------
 .../queries/PlannerTest/tpch-all.test           | 444 ++++-----
 8 files changed, 908 insertions(+), 802 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e3075c39/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 930c354..1491873 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -30,6 +30,7 @@ import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.analysis.SortInfo;
+import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Table;
@@ -394,14 +395,14 @@ public class Planner {
 
 
   /**
-   * Traverses the plan tree rooted at 'root' and inverts outer and semi joins
-   * in the following situations:
+   * Traverses the plan tree rooted at 'root' and inverts joins in the following
+   * situations:
    * 1. If the left-hand side is a SingularRowSrcNode then we invert the join because
    *    then the build side is guaranteed to have only a single row.
    * 2. There is no backend support for distributed non-equi right outer/semi joins,
    *    so we invert them (any distributed left semi/outer join is ok).
-   * 3. Invert semi/outer joins if the right-hand size is estimated to have a higher
-   *    cardinality*avgSerializedSize. Do not invert if relevant stats are missing.
+   * 3. If we estimate that the inverted join is cheaper (see isInvertedJoinCheaper()).
+   *    Do not invert if relevant stats are missing.
    * The first two inversion rules are independent of the presence/absence of stats.
    * Left Null Aware Anti Joins are never inverted due to lack of backend support.
    * Joins that originate from query blocks with a straight join hint are not inverted.
@@ -436,18 +437,8 @@ public class Planner {
         // The current join is a distributed non-equi right outer or semi join
         // which has no backend support. Invert the join to make it executable.
         joinNode.invertJoin();
-      } else {
-        // Invert the join if doing so reduces the size of the materialized rhs
-        // (may also reduce network costs depending on the join strategy).
-        // Only consider this optimization if both the lhs/rhs cardinalities are known.
-        long lhsCard = joinNode.getChild(0).getCardinality();
-        long rhsCard = joinNode.getChild(1).getCardinality();
-        float lhsAvgRowSize = joinNode.getChild(0).getAvgRowSize();
-        float rhsAvgRowSize = joinNode.getChild(1).getAvgRowSize();
-        if (lhsCard != -1 && rhsCard != -1 &&
-            lhsCard * lhsAvgRowSize < rhsCard * rhsAvgRowSize) {
-          joinNode.invertJoin();
-        }
+      } else if (isInvertedJoinCheaper(joinNode, isLocalPlan)) {
+        joinNode.invertJoin();
       }
     }
 
@@ -457,6 +448,90 @@ public class Planner {
   }
 
   /**
+   * Return true if we estimate that 'joinNode' will be cheaper to execute after
+   * inversion. Returns false if any join input is missing relevant stats.
+   *
+   * For nested loop joins, we simply assume that the cost is determined by the size of
+   * the build side.
+   *
+   * For hash joins, the cost model is more nuanced and depends on:
+   * - est. number of rows in the build and probe: lhsCard and rhsCard
+   * - est. size of the rows in the build and probe: lhsAvgRowSize and rhsAvgRowSize
+   * - est. parallelism with which the lhs and rhs trees execute: lhsNumNodes
+   *   and rhsNumNodes. The parallelism of the join is determined by the lhs.
+   *
+   * The assumptions are:
+   * - the join strategy is PARTITIONED and rows are distributed evenly. We don't know
+   *   what join strategy will be chosen until later in planning so this assumption
+   *   simplifies the analysis. Generally if one input is small enough that broadcast
+   *   join is viable then this formula will prefer to put that input on the right side
+   *   anyway.
+   * - processing a build row is twice as expensive as processing a probe row of the
+   *   same size.
+   * - the cost of processing each byte of a row has a fixed component (C) (e.g.
+   *   hashing and comparing the row) and a variable component (e.g. looking up the
+   *   hash table).
+   * - The variable component grows proportionally to the log of the build side, to
+   *   approximate the effect of accesses to the the hash table hitting slower levels
+   *   of the memory hierarchy.
+   *
+   * The estimated per-host cost of a hash join before and after inversion, measured in
+   * an arbitrary unit of time, is then:
+   *
+   *    (log_b(rhsBytes) + C) * (lhsBytes + 2 * rhsBytes) / lhsNumNodes
+   *    vs.
+   *    (log_b(lhsBytes) + C) * (rhsBytes + 2 * lhsBytes) / rhsNumNodes
+   *
+   * where lhsBytes = lhsCard * lhsAvgRowSize and rhsBytes = rhsCard * rhsAvgRowSize
+   *
+   * We choose b = 10 and C = 5 empirically because it seems to give reasonable
+   * results for a range of inputs. The model is not particularly sensitive to the
+   * parameters.
+   *
+   * If the parallelism of both sides is the same then this reduces to comparing
+   * the size of input on both sides. Otherwise, if inverting a hash join reduces
+   * parallelism significantly, then a significant difference between lhs and rhs
+   * bytes is needed to justify inversion.
+   */
+  private boolean isInvertedJoinCheaper(JoinNode joinNode, boolean isLocalPlan) {
+    long lhsCard = joinNode.getChild(0).getCardinality();
+    long rhsCard = joinNode.getChild(1).getCardinality();
+    // Need cardinality estimates to make a decision.
+    if (lhsCard == -1 || rhsCard == -1) return false;
+    double lhsBytes = lhsCard * joinNode.getChild(0).getAvgRowSize();
+    double rhsBytes = rhsCard * joinNode.getChild(1).getAvgRowSize();
+    if (joinNode instanceof NestedLoopJoinNode) {
+      // For NLJ, simply try to minimize the size of the build side, since it needs to
+      // be broadcast to all participating nodes.
+      return lhsBytes < rhsBytes;
+    }
+    Preconditions.checkState(joinNode instanceof HashJoinNode);
+    int lhsNumNodes = isLocalPlan ? 1 : joinNode.getChild(0).getNumNodes();
+    int rhsNumNodes = isLocalPlan ? 1 : joinNode.getChild(1).getNumNodes();
+    // Need parallelism to determine whether inverting a hash join is profitable.
+    if (lhsNumNodes <= 0 || rhsNumNodes <= 0) return false;
+
+    final long CONSTANT_COST_PER_BYTE = 5;
+    // Add 1 to the log argument to avoid taking log of 0.
+    double totalCost =
+        (Math.log10(rhsBytes + 1) + CONSTANT_COST_PER_BYTE) * (lhsBytes + 2 * rhsBytes);
+    double invertedTotalCost =
+        (Math.log10(lhsBytes + 1) + CONSTANT_COST_PER_BYTE) * (rhsBytes + 2 * lhsBytes);
+    double perNodeCost = totalCost / lhsNumNodes;
+    double invertedPerNodeCost = invertedTotalCost / rhsNumNodes;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("isInvertedJoinCheaper() " + TupleId.printIds(joinNode.getTupleIds()));
+      LOG.trace("lhsCard " + lhsCard + " lhsBytes " + lhsBytes +
+          " lhsNumNodes " + lhsNumNodes);
+      LOG.trace("rhsCard " + rhsCard + " rhsBytes " + rhsBytes +
+          " rhsNumNodes " + rhsNumNodes);
+      LOG.trace("cost " + perNodeCost + " invCost " + invertedPerNodeCost);
+      LOG.trace("INVERT? " + (invertedPerNodeCost < perNodeCost));
+    }
+    return invertedPerNodeCost < perNodeCost;
+  }
+
+  /**
    * Converts hash joins to nested-loop joins if the right-side is a SingularRowSrcNode.
    * Does not convert Null Aware Anti Joins because we only support that join op with
    * a hash join.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e3075c39/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
index fdaed6d..7d10dc6 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
@@ -367,31 +367,33 @@ NODE 2:
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-08:EXCHANGE [UNPARTITIONED]
+09:EXCHANGE [UNPARTITIONED]
 |
 04:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: c.id = a.tinyint_col
 |  other predicates: a.int_col + b.float_col + CAST(c.string_col AS FLOAT) < 1000
 |  runtime filters: RF000 <- a.tinyint_col
 |
-|--07:EXCHANGE [HASH(a.tinyint_col)]
+|--08:EXCHANGE [HASH(a.tinyint_col)]
 |  |
-|  03:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: a.smallint_col = b.id
-|  |  runtime filters: RF001 <- b.id
+|  03:HASH JOIN [INNER JOIN, PARTITIONED]
+|  |  hash predicates: b.id = a.smallint_col
+|  |  runtime filters: RF001 <- a.smallint_col
 |  |
-|  |--05:EXCHANGE [BROADCAST]
+|  |--06:EXCHANGE [HASH(a.smallint_col)]
 |  |  |
-|  |  02:SCAN HDFS [functional.alltypessmall b]
-|  |     partitions=4/4 files=4 size=6.32KB
-|  |     predicates: b.float_col > 4.5
+|  |  01:SCAN HDFS [functional.alltypesagg a]
+|  |     partitions=1/11 files=1 size=73.39KB
+|  |     predicates: a.int_col > 899
 |  |
-|  01:SCAN HDFS [functional.alltypesagg a]
-|     partitions=1/11 files=1 size=73.39KB
-|     predicates: a.int_col > 899
-|     runtime filters: RF001 -> a.smallint_col
+|  05:EXCHANGE [HASH(b.id)]
+|  |
+|  02:SCAN HDFS [functional.alltypessmall b]
+|     partitions=4/4 files=4 size=6.32KB
+|     predicates: b.float_col > 4.5
+|     runtime filters: RF001 -> b.id
 |
-06:EXCHANGE [HASH(c.id)]
+07:EXCHANGE [HASH(c.id)]
 |
 00:SCAN HDFS [functional.alltypessmall c]
    partitions=4/4 files=4 size=6.32KB
@@ -839,31 +841,33 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-08:EXCHANGE [UNPARTITIONED]
+09:EXCHANGE [UNPARTITIONED]
 |
 04:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: c.id = a.tinyint_col
 |  other predicates: a.int_col + b.float_col + CAST(c.string_col AS FLOAT) < 1000
 |  runtime filters: RF000 <- a.tinyint_col
 |
-|--07:EXCHANGE [HASH(a.tinyint_col)]
+|--08:EXCHANGE [HASH(a.tinyint_col)]
 |  |
-|  02:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: a.smallint_col = b.id
-|  |  runtime filters: RF001 <- b.id
+|  02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  |  hash predicates: b.id = a.smallint_col
+|  |  runtime filters: RF001 <- a.smallint_col
 |  |
-|  |--05:EXCHANGE [BROADCAST]
+|  |--06:EXCHANGE [HASH(a.smallint_col)]
 |  |  |
-|  |  01:SCAN HDFS [functional.alltypessmall b]
-|  |     partitions=4/4 files=4 size=6.32KB
-|  |     predicates: b.float_col > 4.5
+|  |  00:SCAN HDFS [functional.alltypesagg a]
+|  |     partitions=1/11 files=1 size=73.39KB
+|  |     predicates: a.int_col > 899
 |  |
-|  00:SCAN HDFS [functional.alltypesagg a]
-|     partitions=1/11 files=1 size=73.39KB
-|     predicates: a.int_col > 899
-|     runtime filters: RF001 -> a.smallint_col
+|  05:EXCHANGE [HASH(b.id)]
+|  |
+|  01:SCAN HDFS [functional.alltypessmall b]
+|     partitions=4/4 files=4 size=6.32KB
+|     predicates: b.float_col > 4.5
+|     runtime filters: RF001 -> b.id
 |
-06:EXCHANGE [HASH(c.id)]
+07:EXCHANGE [HASH(c.id)]
 |
 03:SCAN HDFS [functional.alltypessmall c]
    partitions=4/4 files=4 size=6.32KB

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e3075c39/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test b/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
index c4b45ba..a525f91 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
@@ -589,17 +589,17 @@ PLAN-ROOT SINK
 |  group by: o_orderpriority
 |
 02:HASH JOIN [FULL OUTER JOIN, PARTITIONED]
-|  hash predicates: o_orderkey = l_orderkey
+|  hash predicates: l_orderkey = o_orderkey
 |
-|--06:EXCHANGE [HASH(l_orderkey)]
+|--06:EXCHANGE [HASH(o_orderkey)]
 |  |
-|  01:SCAN HDFS [tpch.lineitem]
-|     partitions=1/1 files=1 size=718.94MB
+|  00:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
 |
-05:EXCHANGE [HASH(o_orderkey)]
+05:EXCHANGE [HASH(l_orderkey)]
 |
-00:SCAN HDFS [tpch.orders]
-   partitions=1/1 files=1 size=162.56MB
+01:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
 ====
 # the largest input is prevented from becoming the leftmost input by the right outer join
 select o_orderpriority, count(*) as order_count
@@ -648,20 +648,18 @@ PLAN-ROOT SINK
 |  output: count(*)
 |  group by: o_orderpriority
 |
-02:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
-|  hash predicates: o_orderkey = l_orderkey
-|  runtime filters: RF000 <- l_orderkey
+02:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
+|  hash predicates: l_orderkey = o_orderkey
 |
-|--06:EXCHANGE [HASH(l_orderkey)]
+|--06:EXCHANGE [HASH(o_orderkey)]
 |  |
-|  01:SCAN HDFS [tpch.lineitem]
-|     partitions=1/1 files=1 size=718.94MB
+|  00:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
 |
-05:EXCHANGE [HASH(o_orderkey)]
+05:EXCHANGE [HASH(l_orderkey)]
 |
-00:SCAN HDFS [tpch.orders]
-   partitions=1/1 files=1 size=162.56MB
-   runtime filters: RF000 -> o_orderkey
+01:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
 ====
 # order does not become the leftmost input because of the outer join;
 # the join with nation is done first because it reduces the intermediate output

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e3075c39/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
index 42493ff..6c42545 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
@@ -211,31 +211,34 @@ PLAN-ROOT SINK
 |
 09:EXCHANGE [UNPARTITIONED]
 |
-04:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
-|  hash predicates: c.id = a.id, c.string_col = b.string_col
+04:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
+|  hash predicates: a.id = c.id, b.string_col = c.string_col
 |  other predicates: a.tinyint_col = 15, b.string_col = '15', a.day >= 6, b.month > 2, a.float_col - c.double_col < 0, a.tinyint_col + b.tinyint_col < 15, (b.double_col * c.tinyint_col > 1000 OR c.tinyint_col < 1000)
+|  runtime filters: RF000 <- c.id, RF001 <- c.string_col
 |
-|--08:EXCHANGE [HASH(a.id,b.string_col)]
-|  |
-|  03:HASH JOIN [FULL OUTER JOIN, PARTITIONED]
-|  |  hash predicates: a.id = b.id, a.int_col = b.int_col
-|  |
-|  |--06:EXCHANGE [HASH(b.id,b.int_col)]
-|  |  |
-|  |  01:SCAN HDFS [functional.alltypessmall b]
-|  |     partitions=2/4 files=2 size=3.17KB
-|  |     predicates: b.string_col = '15'
+|--08:EXCHANGE [HASH(c.id,c.string_col)]
 |  |
-|  05:EXCHANGE [HASH(a.id,a.int_col)]
+|  02:SCAN HDFS [functional.alltypesaggnonulls c]
+|     partitions=2/10 files=2 size=148.10KB
+|
+07:EXCHANGE [HASH(a.id,b.string_col)]
+|
+03:HASH JOIN [FULL OUTER JOIN, PARTITIONED]
+|  hash predicates: a.id = b.id, a.int_col = b.int_col
+|
+|--06:EXCHANGE [HASH(b.id,b.int_col)]
 |  |
-|  00:SCAN HDFS [functional.alltypesagg a]
-|     partitions=5/11 files=5 size=372.38KB
-|     predicates: a.tinyint_col = 15
+|  01:SCAN HDFS [functional.alltypessmall b]
+|     partitions=2/4 files=2 size=3.17KB
+|     predicates: b.string_col = '15'
+|     runtime filters: RF001 -> b.string_col
 |
-07:EXCHANGE [HASH(c.id,c.string_col)]
+05:EXCHANGE [HASH(a.id,a.int_col)]
 |
-02:SCAN HDFS [functional.alltypesaggnonulls c]
-   partitions=2/10 files=2 size=148.10KB
+00:SCAN HDFS [functional.alltypesagg a]
+   partitions=5/11 files=5 size=372.38KB
+   predicates: a.tinyint_col = 15
+   runtime filters: RF000 -> a.id
 ====
 # equi join with constants in the on clause are not supported
 select a.id, b.id from
@@ -1334,30 +1337,31 @@ where b.id < 5
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-06:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-04:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: id = b.id
-|  runtime filters: RF000 <- b.id
+04:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: b.id = id
+|  runtime filters: RF000 <- id
 |
-|--05:EXCHANGE [BROADCAST]
+|--06:EXCHANGE [HASH(id)]
 |  |
-|  03:SCAN HDFS [functional.alltypestiny b]
-|     partitions=4/4 files=4 size=460B
-|     predicates: b.id < 5
-|
-00:UNION
-|  constant-operands=1
-|
-|--02:SCAN HDFS [functional.alltypessmall]
-|     partitions=1/4 files=1 size=1.58KB
+|  00:UNION
+|  |  constant-operands=1
+|  |
+|  |--02:SCAN HDFS [functional.alltypessmall]
+|  |     partitions=1/4 files=1 size=1.58KB
+|  |     predicates: functional.alltypessmall.id < 5
+|  |
+|  01:SCAN HDFS [functional.alltypessmall]
+|     partitions=1/4 files=1 size=1.57KB
 |     predicates: functional.alltypessmall.id < 5
-|     runtime filters: RF000 -> functional.alltypessmall.id
 |
-01:SCAN HDFS [functional.alltypessmall]
-   partitions=1/4 files=1 size=1.57KB
-   predicates: functional.alltypessmall.id < 5
-   runtime filters: RF000 -> functional.alltypessmall.id
+05:EXCHANGE [HASH(b.id)]
+|
+03:SCAN HDFS [functional.alltypestiny b]
+   partitions=4/4 files=4 size=460B
+   predicates: b.id < 5
+   runtime filters: RF000 -> b.id
 ====
 # Test joins with union inputs. One input is a union.
 select a.id, b.id, a.string_col, b.string_col
@@ -1378,30 +1382,27 @@ PLAN-ROOT SINK
 |
 07:EXCHANGE [UNPARTITIONED]
 |
-04:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
-|  hash predicates: id = b.id
-|  runtime filters: RF000 <- b.id
+04:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
+|  hash predicates: b.id = id
 |
-|--06:EXCHANGE [HASH(b.id)]
+|--06:EXCHANGE [HASH(id)]
 |  |
-|  00:SCAN HDFS [functional.alltypestiny b]
-|     partitions=4/4 files=4 size=460B
-|     predicates: b.id < 5
-|
-05:EXCHANGE [HASH(id)]
-|
-01:UNION
-|  constant-operands=1
-|
-|--03:SCAN HDFS [functional.alltypessmall]
-|     partitions=1/4 files=1 size=1.58KB
+|  01:UNION
+|  |  constant-operands=1
+|  |
+|  |--03:SCAN HDFS [functional.alltypessmall]
+|  |     partitions=1/4 files=1 size=1.58KB
+|  |     predicates: functional.alltypessmall.id < 5
+|  |
+|  02:SCAN HDFS [functional.alltypessmall]
+|     partitions=1/4 files=1 size=1.57KB
 |     predicates: functional.alltypessmall.id < 5
-|     runtime filters: RF000 -> functional.alltypessmall.id
 |
-02:SCAN HDFS [functional.alltypessmall]
-   partitions=1/4 files=1 size=1.57KB
-   predicates: functional.alltypessmall.id < 5
-   runtime filters: RF000 -> functional.alltypessmall.id
+05:EXCHANGE [HASH(b.id)]
+|
+00:SCAN HDFS [functional.alltypestiny b]
+   partitions=4/4 files=4 size=460B
+   predicates: b.id < 5
 ====
 # Test joins with union inputs. Both inputs are a union.
 select a.id, b.id, a.string_col, b.string_col

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e3075c39/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
index 4681b91..b779ee3 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
@@ -83,15 +83,17 @@ UPDATE KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 UPDATE KUDU [functional_kudu.testtbl]
 |
-02:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: ids = a.id
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.id = ids
 |
-|--03:EXCHANGE [UNPARTITIONED]
+|--04:EXCHANGE [HASH(ids)]
 |  |
-|  00:SCAN KUDU [functional_kudu.testtbl a]
+|  01:UNION
+|     constant-operands=1
 |
-01:UNION
-   constant-operands=1
+03:EXCHANGE [HASH(a.id)]
+|
+00:SCAN KUDU [functional_kudu.testtbl a]
 ====
 update a
 set a.name = 'values'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e3075c39/testdata/workloads/functional-planner/queries/PlannerTest/order.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/order.test b/testdata/workloads/functional-planner/queries/PlannerTest/order.test
index 6c39366..4f51a2a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/order.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/order.test
@@ -377,7 +377,7 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-09:MERGING-EXCHANGE [UNPARTITIONED]
+10:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: string_col DESC, smallint_col ASC
 |
 05:SORT
@@ -388,24 +388,26 @@ PLAN-ROOT SINK
 |  other predicates: a.int_col + b.float_col + CAST(c.string_col AS FLOAT) < 1000
 |  runtime filters: RF000 <- a.tinyint_col
 |
-|--08:EXCHANGE [HASH(a.tinyint_col)]
+|--09:EXCHANGE [HASH(a.tinyint_col)]
 |  |
-|  03:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: a.smallint_col = b.id
-|  |  runtime filters: RF001 <- b.id
+|  03:HASH JOIN [INNER JOIN, PARTITIONED]
+|  |  hash predicates: b.id = a.smallint_col
+|  |  runtime filters: RF001 <- a.smallint_col
 |  |
-|  |--06:EXCHANGE [BROADCAST]
+|  |--07:EXCHANGE [HASH(a.smallint_col)]
 |  |  |
-|  |  01:SCAN HDFS [functional.alltypessmall b]
-|  |     partitions=4/4 files=4 size=6.32KB
-|  |     predicates: b.float_col > 4.5
+|  |  00:SCAN HDFS [functional.alltypesagg a]
+|  |     partitions=1/11 files=1 size=73.39KB
+|  |     predicates: a.int_col > 899
 |  |
-|  00:SCAN HDFS [functional.alltypesagg a]
-|     partitions=1/11 files=1 size=73.39KB
-|     predicates: a.int_col > 899
-|     runtime filters: RF001 -> a.smallint_col
+|  06:EXCHANGE [HASH(b.id)]
+|  |
+|  01:SCAN HDFS [functional.alltypessmall b]
+|     partitions=4/4 files=4 size=6.32KB
+|     predicates: b.float_col > 4.5
+|     runtime filters: RF001 -> b.id
 |
-07:EXCHANGE [HASH(c.id)]
+08:EXCHANGE [HASH(c.id)]
 |
 02:SCAN HDFS [functional.alltypessmall c]
    partitions=4/4 files=4 size=6.32KB

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e3075c39/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
index 85baa6c..a18f4f5 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
@@ -744,18 +744,18 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-20:MERGING-EXCHANGE [UNPARTITIONED]
+22:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: sum(ss_ext_sales_price) DESC, i_brand ASC, i_brand_id ASC, i_manufact_id ASC, i_manufact ASC
 |  limit: 100
 |
 12:TOP-N [LIMIT=100]
 |  order by: sum(ss_ext_sales_price) DESC, i_brand ASC, i_brand_id ASC, i_manufact_id ASC, i_manufact ASC
 |
-19:AGGREGATE [FINALIZE]
+21:AGGREGATE [FINALIZE]
 |  output: sum:merge(ss_ext_sales_price)
 |  group by: i_brand, i_brand_id, i_manufact_id, i_manufact
 |
-18:EXCHANGE [HASH(i_brand,i_brand_id,i_manufact_id,i_manufact)]
+20:EXCHANGE [HASH(i_brand,i_brand_id,i_manufact_id,i_manufact)]
 |
 11:AGGREGATE [STREAMING]
 |  output: sum(ss_ext_sales_price)
@@ -766,69 +766,72 @@ PLAN-ROOT SINK
 |  other predicates: substr(ca_zip, 1, 5) != substr(s_zip, 1, 5)
 |  runtime filters: RF000 <- s_store_sk
 |
-|--17:EXCHANGE [BROADCAST]
+|--19:EXCHANGE [BROADCAST]
 |  |
 |  05:SCAN HDFS [tpcds.store]
 |     partitions=1/1 files=1 size=3.08KB
 |
-09:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: ca_address_sk = c_current_addr_sk
-|  runtime filters: RF001 <- c_current_addr_sk
+09:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: c_current_addr_sk = ca_address_sk
+|  runtime filters: RF001 <- ca_address_sk
 |
-|--16:EXCHANGE [BROADCAST]
+|--18:EXCHANGE [HASH(ca_address_sk)]
 |  |
-|  08:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: c_customer_sk = ss_customer_sk
-|  |  runtime filters: RF002 <- ss_customer_sk
-|  |
-|  |--15:EXCHANGE [BROADCAST]
-|  |  |
-|  |  07:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  |  hash predicates: ss_sold_date_sk = d_date_sk
-|  |  |  runtime filters: RF003 <- d_date_sk
-|  |  |
-|  |  |--14:EXCHANGE [BROADCAST]
-|  |  |  |
-|  |  |  00:SCAN HDFS [tpcds.date_dim]
-|  |  |     partitions=1/1 files=1 size=9.84MB
-|  |  |     predicates: d_year = 1999, d_moy = 11, tpcds.date_dim.d_date_sk <= 2451513, tpcds.date_dim.d_date_sk >= 2451484
-|  |  |
-|  |  06:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  |  hash predicates: ss_item_sk = i_item_sk
-|  |  |  runtime filters: RF004 <- i_item_sk
-|  |  |
-|  |  |--13:EXCHANGE [BROADCAST]
-|  |  |  |
-|  |  |  02:SCAN HDFS [tpcds.item]
-|  |  |     partitions=1/1 files=1 size=4.82MB
-|  |  |     predicates: i_manager_id = 7
-|  |  |
-|  |  01:SCAN HDFS [tpcds.store_sales]
-|  |     partitions=30/1824 files=30 size=9.93MB
-|  |     runtime filters: RF000 -> ss_store_sk, RF003 -> ss_sold_date_sk, RF004 -> ss_item_sk
+|  04:SCAN HDFS [tpcds.customer_address]
+|     partitions=1/1 files=1 size=5.25MB
+|
+17:EXCHANGE [HASH(c_current_addr_sk)]
+|
+08:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_customer_sk = c_customer_sk
+|  runtime filters: RF002 <- c_customer_sk
+|
+|--16:EXCHANGE [HASH(c_customer_sk)]
 |  |
 |  03:SCAN HDFS [tpcds.customer]
 |     partitions=1/1 files=1 size=12.60MB
-|     runtime filters: RF002 -> c_customer_sk
+|     runtime filters: RF001 -> c_current_addr_sk
 |
-04:SCAN HDFS [tpcds.customer_address]
-   partitions=1/1 files=1 size=5.25MB
-   runtime filters: RF001 -> ca_address_sk
+15:EXCHANGE [HASH(ss_customer_sk)]
+|
+07:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: ss_sold_date_sk = d_date_sk
+|  runtime filters: RF003 <- d_date_sk
+|
+|--14:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN HDFS [tpcds.date_dim]
+|     partitions=1/1 files=1 size=9.84MB
+|     predicates: d_year = 1999, d_moy = 11, tpcds.date_dim.d_date_sk <= 2451513, tpcds.date_dim.d_date_sk >= 2451484
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: ss_item_sk = i_item_sk
+|  runtime filters: RF004 <- i_item_sk
+|
+|--13:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN HDFS [tpcds.item]
+|     partitions=1/1 files=1 size=4.82MB
+|     predicates: i_manager_id = 7
+|
+01:SCAN HDFS [tpcds.store_sales]
+   partitions=30/1824 files=30 size=9.93MB
+   runtime filters: RF000 -> ss_store_sk, RF002 -> ss_customer_sk, RF003 -> ss_sold_date_sk, RF004 -> ss_item_sk
 ---- PARALLELPLANS
 PLAN-ROOT SINK
 |
-20:MERGING-EXCHANGE [UNPARTITIONED]
+22:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: sum(ss_ext_sales_price) DESC, i_brand ASC, i_brand_id ASC, i_manufact_id ASC, i_manufact ASC
 |  limit: 100
 |
 12:TOP-N [LIMIT=100]
 |  order by: sum(ss_ext_sales_price) DESC, i_brand ASC, i_brand_id ASC, i_manufact_id ASC, i_manufact ASC
 |
-19:AGGREGATE [FINALIZE]
+21:AGGREGATE [FINALIZE]
 |  output: sum:merge(ss_ext_sales_price)
 |  group by: i_brand, i_brand_id, i_manufact_id, i_manufact
 |
-18:EXCHANGE [HASH(i_brand,i_brand_id,i_manufact_id,i_manufact)]
+20:EXCHANGE [HASH(i_brand,i_brand_id,i_manufact_id,i_manufact)]
 |
 11:AGGREGATE [STREAMING]
 |  output: sum(ss_ext_sales_price)
@@ -843,70 +846,73 @@ PLAN-ROOT SINK
 |  |  join-table-id=00 plan-id=01 cohort-id=01
 |  |  build expressions: s_store_sk
 |  |
-|  17:EXCHANGE [BROADCAST]
+|  19:EXCHANGE [BROADCAST]
 |  |
 |  05:SCAN HDFS [tpcds.store]
 |     partitions=1/1 files=1 size=3.08KB
 |
-09:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: ca_address_sk = c_current_addr_sk
-|  runtime filters: RF001 <- c_current_addr_sk
+09:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: c_current_addr_sk = ca_address_sk
+|  runtime filters: RF001 <- ca_address_sk
 |
 |--JOIN BUILD
 |  |  join-table-id=01 plan-id=02 cohort-id=01
-|  |  build expressions: c_current_addr_sk
+|  |  build expressions: ca_address_sk
 |  |
-|  16:EXCHANGE [BROADCAST]
+|  18:EXCHANGE [HASH(ca_address_sk)]
 |  |
-|  08:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: c_customer_sk = ss_customer_sk
-|  |  runtime filters: RF002 <- ss_customer_sk
+|  04:SCAN HDFS [tpcds.customer_address]
+|     partitions=1/1 files=1 size=5.25MB
+|
+17:EXCHANGE [HASH(c_current_addr_sk)]
+|
+08:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_customer_sk = c_customer_sk
+|  runtime filters: RF002 <- c_customer_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: c_customer_sk
 |  |
-|  |--JOIN BUILD
-|  |  |  join-table-id=02 plan-id=03 cohort-id=02
-|  |  |  build expressions: ss_customer_sk
-|  |  |
-|  |  15:EXCHANGE [BROADCAST]
-|  |  |
-|  |  07:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  |  hash predicates: ss_sold_date_sk = d_date_sk
-|  |  |  runtime filters: RF003 <- d_date_sk
-|  |  |
-|  |  |--JOIN BUILD
-|  |  |  |  join-table-id=03 plan-id=04 cohort-id=03
-|  |  |  |  build expressions: d_date_sk
-|  |  |  |
-|  |  |  14:EXCHANGE [BROADCAST]
-|  |  |  |
-|  |  |  00:SCAN HDFS [tpcds.date_dim]
-|  |  |     partitions=1/1 files=1 size=9.84MB
-|  |  |     predicates: d_year = 1999, d_moy = 11, tpcds.date_dim.d_date_sk <= 2451513, tpcds.date_dim.d_date_sk >= 2451484
-|  |  |
-|  |  06:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  |  hash predicates: ss_item_sk = i_item_sk
-|  |  |  runtime filters: RF004 <- i_item_sk
-|  |  |
-|  |  |--JOIN BUILD
-|  |  |  |  join-table-id=04 plan-id=05 cohort-id=03
-|  |  |  |  build expressions: i_item_sk
-|  |  |  |
-|  |  |  13:EXCHANGE [BROADCAST]
-|  |  |  |
-|  |  |  02:SCAN HDFS [tpcds.item]
-|  |  |     partitions=1/1 files=1 size=4.82MB
-|  |  |     predicates: i_manager_id = 7
-|  |  |
-|  |  01:SCAN HDFS [tpcds.store_sales]
-|  |     partitions=30/1824 files=30 size=9.93MB
-|  |     runtime filters: RF000 -> ss_store_sk, RF003 -> ss_sold_date_sk, RF004 -> ss_item_sk
+|  16:EXCHANGE [HASH(c_customer_sk)]
 |  |
 |  03:SCAN HDFS [tpcds.customer]
 |     partitions=1/1 files=1 size=12.60MB
-|     runtime filters: RF002 -> c_customer_sk
+|     runtime filters: RF001 -> c_current_addr_sk
 |
-04:SCAN HDFS [tpcds.customer_address]
-   partitions=1/1 files=1 size=5.25MB
-   runtime filters: RF001 -> ca_address_sk
+15:EXCHANGE [HASH(ss_customer_sk)]
+|
+07:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: ss_sold_date_sk = d_date_sk
+|  runtime filters: RF003 <- d_date_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=03 plan-id=04 cohort-id=01
+|  |  build expressions: d_date_sk
+|  |
+|  14:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN HDFS [tpcds.date_dim]
+|     partitions=1/1 files=1 size=9.84MB
+|     predicates: d_year = 1999, d_moy = 11, tpcds.date_dim.d_date_sk <= 2451513, tpcds.date_dim.d_date_sk >= 2451484
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: ss_item_sk = i_item_sk
+|  runtime filters: RF004 <- i_item_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=04 plan-id=05 cohort-id=01
+|  |  build expressions: i_item_sk
+|  |
+|  13:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN HDFS [tpcds.item]
+|     partitions=1/1 files=1 size=4.82MB
+|     predicates: i_manager_id = 7
+|
+01:SCAN HDFS [tpcds.store_sales]
+   partitions=30/1824 files=30 size=9.93MB
+   runtime filters: RF000 -> ss_store_sk, RF002 -> ss_customer_sk, RF003 -> ss_sold_date_sk, RF004 -> ss_item_sk
 ====
 # TPCDS-Q27
 select
@@ -1217,147 +1223,149 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-17:MERGING-EXCHANGE [UNPARTITIONED]
+18:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: c_last_name ASC, c_first_name ASC, c_salutation ASC, c_preferred_cust_flag DESC
 |  limit: 100000
 |
 10:TOP-N [LIMIT=100000]
 |  order by: c_last_name ASC, c_first_name ASC, c_salutation ASC, c_preferred_cust_flag DESC
 |
-09:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: c_customer_sk = ss_customer_sk
-|  runtime filters: RF000 <- ss_customer_sk
+09:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_customer_sk = c_customer_sk
+|  runtime filters: RF000 <- c_customer_sk
 |
-|--16:EXCHANGE [BROADCAST]
+|--17:EXCHANGE [HASH(c_customer_sk)]
 |  |
-|  15:AGGREGATE [FINALIZE]
-|  |  output: count:merge(*)
-|  |  group by: ss_ticket_number, ss_customer_sk
-|  |  having: count(*) <= 20, count(*) >= 15
-|  |
-|  14:EXCHANGE [HASH(ss_ticket_number,ss_customer_sk)]
-|  |
-|  07:AGGREGATE [STREAMING]
-|  |  output: count(*)
-|  |  group by: ss_ticket_number, ss_customer_sk
-|  |
-|  06:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_store_sk = store.s_store_sk
-|  |  runtime filters: RF001 <- store.s_store_sk
-|  |
-|  |--13:EXCHANGE [BROADCAST]
-|  |  |
-|  |  02:SCAN HDFS [tpcds.store]
-|  |     partitions=1/1 files=1 size=3.08KB
-|  |     predicates: store.s_county IN ('Saginaw County', 'Sumner County', 'Appanoose County', 'Daviess County', 'Fairfield County', 'Raleigh County', 'Ziebach County', 'Williamson County')
-|  |
-|  05:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_sold_date_sk = date_dim.d_date_sk
-|  |  runtime filters: RF002 <- date_dim.d_date_sk
-|  |
-|  |--12:EXCHANGE [BROADCAST]
-|  |  |
-|  |  01:SCAN HDFS [tpcds.date_dim]
-|  |     partitions=1/1 files=1 size=9.84MB
-|  |     predicates: date_dim.d_year IN (1998, 1999, 2000), (date_dim.d_dom >= 1 AND date_dim.d_dom <= 3 OR date_dim.d_dom >= 25 AND date_dim.d_dom <= 28)
+|  08:SCAN HDFS [tpcds.customer]
+|     partitions=1/1 files=1 size=12.60MB
+|
+16:EXCHANGE [HASH(ss_customer_sk)]
+|
+15:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: ss_ticket_number, ss_customer_sk
+|  having: count(*) <= 20, count(*) >= 15
+|
+14:EXCHANGE [HASH(ss_ticket_number,ss_customer_sk)]
+|
+07:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: ss_ticket_number, ss_customer_sk
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_store_sk = store.s_store_sk
+|  runtime filters: RF001 <- store.s_store_sk
+|
+|--13:EXCHANGE [BROADCAST]
 |  |
-|  04:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
-|  |  runtime filters: RF003 <- household_demographics.hd_demo_sk
+|  02:SCAN HDFS [tpcds.store]
+|     partitions=1/1 files=1 size=3.08KB
+|     predicates: store.s_county IN ('Saginaw County', 'Sumner County', 'Appanoose County', 'Daviess County', 'Fairfield County', 'Raleigh County', 'Ziebach County', 'Williamson County')
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_sold_date_sk = date_dim.d_date_sk
+|  runtime filters: RF002 <- date_dim.d_date_sk
+|
+|--12:EXCHANGE [BROADCAST]
 |  |
-|  |--11:EXCHANGE [BROADCAST]
-|  |  |
-|  |  03:SCAN HDFS [tpcds.household_demographics]
-|  |     partitions=1/1 files=1 size=148.10KB
-|  |     predicates: household_demographics.hd_vehicle_count > 0, household_demographics.hd_buy_potential IN ('>10000', 'unknown'), (CASE WHEN household_demographics.hd_vehicle_count > 0 THEN household_demographics.hd_dep_count / household_demographics.hd_vehicle_count ELSE NULL END) > 1.2
+|  01:SCAN HDFS [tpcds.date_dim]
+|     partitions=1/1 files=1 size=9.84MB
+|     predicates: date_dim.d_year IN (1998, 1999, 2000), (date_dim.d_dom >= 1 AND date_dim.d_dom <= 3 OR date_dim.d_dom >= 25 AND date_dim.d_dom <= 28)
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
+|  runtime filters: RF003 <- household_demographics.hd_demo_sk
+|
+|--11:EXCHANGE [BROADCAST]
 |  |
-|  00:SCAN HDFS [tpcds.store_sales]
-|     partitions=1824/1824 files=1824 size=326.32MB
-|     runtime filters: RF001 -> store_sales.ss_store_sk, RF002 -> store_sales.ss_sold_date_sk, RF003 -> store_sales.ss_hdemo_sk
+|  03:SCAN HDFS [tpcds.household_demographics]
+|     partitions=1/1 files=1 size=148.10KB
+|     predicates: household_demographics.hd_vehicle_count > 0, household_demographics.hd_buy_potential IN ('>10000', 'unknown'), (CASE WHEN household_demographics.hd_vehicle_count > 0 THEN household_demographics.hd_dep_count / household_demographics.hd_vehicle_count ELSE NULL END) > 1.2
 |
-08:SCAN HDFS [tpcds.customer]
-   partitions=1/1 files=1 size=12.60MB
-   runtime filters: RF000 -> c_customer_sk
+00:SCAN HDFS [tpcds.store_sales]
+   partitions=1824/1824 files=1824 size=326.32MB
+   runtime filters: RF000 -> tpcds.store_sales.ss_customer_sk, RF001 -> store_sales.ss_store_sk, RF002 -> store_sales.ss_sold_date_sk, RF003 -> store_sales.ss_hdemo_sk
 ---- PARALLELPLANS
 PLAN-ROOT SINK
 |
-17:MERGING-EXCHANGE [UNPARTITIONED]
+18:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: c_last_name ASC, c_first_name ASC, c_salutation ASC, c_preferred_cust_flag DESC
 |  limit: 100000
 |
 10:TOP-N [LIMIT=100000]
 |  order by: c_last_name ASC, c_first_name ASC, c_salutation ASC, c_preferred_cust_flag DESC
 |
-09:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: c_customer_sk = ss_customer_sk
-|  runtime filters: RF000 <- ss_customer_sk
+09:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_customer_sk = c_customer_sk
+|  runtime filters: RF000 <- c_customer_sk
 |
 |--JOIN BUILD
 |  |  join-table-id=00 plan-id=01 cohort-id=01
-|  |  build expressions: ss_customer_sk
-|  |
-|  16:EXCHANGE [BROADCAST]
-|  |
-|  15:AGGREGATE [FINALIZE]
-|  |  output: count:merge(*)
-|  |  group by: ss_ticket_number, ss_customer_sk
-|  |  having: count(*) <= 20, count(*) >= 15
+|  |  build expressions: c_customer_sk
 |  |
-|  14:EXCHANGE [HASH(ss_ticket_number,ss_customer_sk)]
+|  17:EXCHANGE [HASH(c_customer_sk)]
 |  |
-|  07:AGGREGATE [STREAMING]
-|  |  output: count(*)
-|  |  group by: ss_ticket_number, ss_customer_sk
-|  |
-|  06:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_store_sk = store.s_store_sk
-|  |  runtime filters: RF001 <- store.s_store_sk
+|  08:SCAN HDFS [tpcds.customer]
+|     partitions=1/1 files=1 size=12.60MB
+|
+16:EXCHANGE [HASH(ss_customer_sk)]
+|
+15:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: ss_ticket_number, ss_customer_sk
+|  having: count(*) <= 20, count(*) >= 15
+|
+14:EXCHANGE [HASH(ss_ticket_number,ss_customer_sk)]
+|
+07:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: ss_ticket_number, ss_customer_sk
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_store_sk = store.s_store_sk
+|  runtime filters: RF001 <- store.s_store_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: store.s_store_sk
 |  |
-|  |--JOIN BUILD
-|  |  |  join-table-id=01 plan-id=02 cohort-id=02
-|  |  |  build expressions: store.s_store_sk
-|  |  |
-|  |  13:EXCHANGE [BROADCAST]
-|  |  |
-|  |  02:SCAN HDFS [tpcds.store]
-|  |     partitions=1/1 files=1 size=3.08KB
-|  |     predicates: store.s_county IN ('Saginaw County', 'Sumner County', 'Appanoose County', 'Daviess County', 'Fairfield County', 'Raleigh County', 'Ziebach County', 'Williamson County')
+|  13:EXCHANGE [BROADCAST]
 |  |
-|  05:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_sold_date_sk = date_dim.d_date_sk
-|  |  runtime filters: RF002 <- date_dim.d_date_sk
+|  02:SCAN HDFS [tpcds.store]
+|     partitions=1/1 files=1 size=3.08KB
+|     predicates: store.s_county IN ('Saginaw County', 'Sumner County', 'Appanoose County', 'Daviess County', 'Fairfield County', 'Raleigh County', 'Ziebach County', 'Williamson County')
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_sold_date_sk = date_dim.d_date_sk
+|  runtime filters: RF002 <- date_dim.d_date_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: date_dim.d_date_sk
 |  |
-|  |--JOIN BUILD
-|  |  |  join-table-id=02 plan-id=03 cohort-id=02
-|  |  |  build expressions: date_dim.d_date_sk
-|  |  |
-|  |  12:EXCHANGE [BROADCAST]
-|  |  |
-|  |  01:SCAN HDFS [tpcds.date_dim]
-|  |     partitions=1/1 files=1 size=9.84MB
-|  |     predicates: date_dim.d_year IN (1998, 1999, 2000), (date_dim.d_dom >= 1 AND date_dim.d_dom <= 3 OR date_dim.d_dom >= 25 AND date_dim.d_dom <= 28)
+|  12:EXCHANGE [BROADCAST]
 |  |
-|  04:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
-|  |  runtime filters: RF003 <- household_demographics.hd_demo_sk
+|  01:SCAN HDFS [tpcds.date_dim]
+|     partitions=1/1 files=1 size=9.84MB
+|     predicates: date_dim.d_year IN (1998, 1999, 2000), (date_dim.d_dom >= 1 AND date_dim.d_dom <= 3 OR date_dim.d_dom >= 25 AND date_dim.d_dom <= 28)
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
+|  runtime filters: RF003 <- household_demographics.hd_demo_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=03 plan-id=04 cohort-id=01
+|  |  build expressions: household_demographics.hd_demo_sk
 |  |
-|  |--JOIN BUILD
-|  |  |  join-table-id=03 plan-id=04 cohort-id=02
-|  |  |  build expressions: household_demographics.hd_demo_sk
-|  |  |
-|  |  11:EXCHANGE [BROADCAST]
-|  |  |
-|  |  03:SCAN HDFS [tpcds.household_demographics]
-|  |     partitions=1/1 files=1 size=148.10KB
-|  |     predicates: household_demographics.hd_vehicle_count > 0, household_demographics.hd_buy_potential IN ('>10000', 'unknown'), (CASE WHEN household_demographics.hd_vehicle_count > 0 THEN household_demographics.hd_dep_count / household_demographics.hd_vehicle_count ELSE NULL END) > 1.2
+|  11:EXCHANGE [BROADCAST]
 |  |
-|  00:SCAN HDFS [tpcds.store_sales]
-|     partitions=1824/1824 files=1824 size=326.32MB
-|     runtime filters: RF001 -> store_sales.ss_store_sk, RF002 -> store_sales.ss_sold_date_sk, RF003 -> store_sales.ss_hdemo_sk
+|  03:SCAN HDFS [tpcds.household_demographics]
+|     partitions=1/1 files=1 size=148.10KB
+|     predicates: household_demographics.hd_vehicle_count > 0, household_demographics.hd_buy_potential IN ('>10000', 'unknown'), (CASE WHEN household_demographics.hd_vehicle_count > 0 THEN household_demographics.hd_dep_count / household_demographics.hd_vehicle_count ELSE NULL END) > 1.2
 |
-08:SCAN HDFS [tpcds.customer]
-   partitions=1/1 files=1 size=12.60MB
-   runtime filters: RF000 -> c_customer_sk
+00:SCAN HDFS [tpcds.store_sales]
+   partitions=1824/1824 files=1824 size=326.32MB
+   runtime filters: RF000 -> tpcds.store_sales.ss_customer_sk, RF001 -> store_sales.ss_store_sk, RF002 -> store_sales.ss_sold_date_sk, RF003 -> store_sales.ss_hdemo_sk
 ====
 # TPCDS-Q42
 select
@@ -3774,145 +3782,147 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-17:MERGING-EXCHANGE [UNPARTITIONED]
+18:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: cnt DESC
 |  limit: 1000
 |
 10:TOP-N [LIMIT=1000]
 |  order by: cnt DESC
 |
-09:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: c_customer_sk = ss_customer_sk
-|  runtime filters: RF000 <- ss_customer_sk
+09:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_customer_sk = c_customer_sk
+|  runtime filters: RF000 <- c_customer_sk
 |
-|--16:EXCHANGE [BROADCAST]
-|  |
-|  15:AGGREGATE [FINALIZE]
-|  |  output: count:merge(*)
-|  |  group by: ss_ticket_number, ss_customer_sk
-|  |  having: count(*) <= 5, count(*) >= 1
-|  |
-|  14:EXCHANGE [HASH(ss_ticket_number,ss_customer_sk)]
-|  |
-|  07:AGGREGATE [STREAMING]
-|  |  output: count(*)
-|  |  group by: ss_ticket_number, ss_customer_sk
-|  |
-|  06:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_store_sk = store.s_store_sk
-|  |  runtime filters: RF001 <- store.s_store_sk
-|  |
-|  |--13:EXCHANGE [BROADCAST]
-|  |  |
-|  |  02:SCAN HDFS [tpcds.store]
-|  |     partitions=1/1 files=1 size=3.08KB
-|  |     predicates: store.s_county IN ('Saginaw County', 'Sumner County', 'Appanoose County', 'Daviess County')
+|--17:EXCHANGE [HASH(c_customer_sk)]
 |  |
-|  05:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_sold_date_sk = date_dim.d_date_sk
-|  |  runtime filters: RF002 <- date_dim.d_date_sk
-|  |
-|  |--12:EXCHANGE [BROADCAST]
-|  |  |
-|  |  01:SCAN HDFS [tpcds.date_dim]
-|  |     partitions=1/1 files=1 size=9.84MB
+|  08:SCAN HDFS [tpcds.customer]
+|     partitions=1/1 files=1 size=12.60MB
+|
+16:EXCHANGE [HASH(ss_customer_sk)]
+|
+15:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: ss_ticket_number, ss_customer_sk
+|  having: count(*) <= 5, count(*) >= 1
+|
+14:EXCHANGE [HASH(ss_ticket_number,ss_customer_sk)]
+|
+07:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: ss_ticket_number, ss_customer_sk
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_store_sk = store.s_store_sk
+|  runtime filters: RF001 <- store.s_store_sk
+|
+|--13:EXCHANGE [BROADCAST]
 |  |
-|  04:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
-|  |  runtime filters: RF003 <- household_demographics.hd_demo_sk
+|  02:SCAN HDFS [tpcds.store]
+|     partitions=1/1 files=1 size=3.08KB
+|     predicates: store.s_county IN ('Saginaw County', 'Sumner County', 'Appanoose County', 'Daviess County')
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_sold_date_sk = date_dim.d_date_sk
+|  runtime filters: RF002 <- date_dim.d_date_sk
+|
+|--12:EXCHANGE [BROADCAST]
 |  |
-|  |--11:EXCHANGE [BROADCAST]
-|  |  |
-|  |  03:SCAN HDFS [tpcds.household_demographics]
-|  |     partitions=1/1 files=1 size=148.10KB
-|  |     predicates: household_demographics.hd_vehicle_count > 0, household_demographics.hd_buy_potential IN ('>10000', 'unknown'), CASE WHEN household_demographics.hd_vehicle_count > 0 THEN household_demographics.hd_dep_count / household_demographics.hd_vehicle_count ELSE NULL END > 1
+|  01:SCAN HDFS [tpcds.date_dim]
+|     partitions=1/1 files=1 size=9.84MB
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
+|  runtime filters: RF003 <- household_demographics.hd_demo_sk
+|
+|--11:EXCHANGE [BROADCAST]
 |  |
-|  00:SCAN HDFS [tpcds.store_sales]
-|     partitions=1824/1824 files=1824 size=326.32MB
-|     runtime filters: RF001 -> store_sales.ss_store_sk, RF002 -> store_sales.ss_sold_date_sk, RF003 -> store_sales.ss_hdemo_sk
+|  03:SCAN HDFS [tpcds.household_demographics]
+|     partitions=1/1 files=1 size=148.10KB
+|     predicates: household_demographics.hd_vehicle_count > 0, household_demographics.hd_buy_potential IN ('>10000', 'unknown'), CASE WHEN household_demographics.hd_vehicle_count > 0 THEN household_demographics.hd_dep_count / household_demographics.hd_vehicle_count ELSE NULL END > 1
 |
-08:SCAN HDFS [tpcds.customer]
-   partitions=1/1 files=1 size=12.60MB
-   runtime filters: RF000 -> c_customer_sk
+00:SCAN HDFS [tpcds.store_sales]
+   partitions=1824/1824 files=1824 size=326.32MB
+   runtime filters: RF000 -> tpcds.store_sales.ss_customer_sk, RF001 -> store_sales.ss_store_sk, RF002 -> store_sales.ss_sold_date_sk, RF003 -> store_sales.ss_hdemo_sk
 ---- PARALLELPLANS
 PLAN-ROOT SINK
 |
-17:MERGING-EXCHANGE [UNPARTITIONED]
+18:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: cnt DESC
 |  limit: 1000
 |
 10:TOP-N [LIMIT=1000]
 |  order by: cnt DESC
 |
-09:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: c_customer_sk = ss_customer_sk
-|  runtime filters: RF000 <- ss_customer_sk
+09:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_customer_sk = c_customer_sk
+|  runtime filters: RF000 <- c_customer_sk
 |
 |--JOIN BUILD
 |  |  join-table-id=00 plan-id=01 cohort-id=01
-|  |  build expressions: ss_customer_sk
-|  |
-|  16:EXCHANGE [BROADCAST]
-|  |
-|  15:AGGREGATE [FINALIZE]
-|  |  output: count:merge(*)
-|  |  group by: ss_ticket_number, ss_customer_sk
-|  |  having: count(*) <= 5, count(*) >= 1
-|  |
-|  14:EXCHANGE [HASH(ss_ticket_number,ss_customer_sk)]
-|  |
-|  07:AGGREGATE [STREAMING]
-|  |  output: count(*)
-|  |  group by: ss_ticket_number, ss_customer_sk
+|  |  build expressions: c_customer_sk
 |  |
-|  06:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_store_sk = store.s_store_sk
-|  |  runtime filters: RF001 <- store.s_store_sk
+|  17:EXCHANGE [HASH(c_customer_sk)]
 |  |
-|  |--JOIN BUILD
-|  |  |  join-table-id=01 plan-id=02 cohort-id=02
-|  |  |  build expressions: store.s_store_sk
-|  |  |
-|  |  13:EXCHANGE [BROADCAST]
-|  |  |
-|  |  02:SCAN HDFS [tpcds.store]
-|  |     partitions=1/1 files=1 size=3.08KB
-|  |     predicates: store.s_county IN ('Saginaw County', 'Sumner County', 'Appanoose County', 'Daviess County')
+|  08:SCAN HDFS [tpcds.customer]
+|     partitions=1/1 files=1 size=12.60MB
+|
+16:EXCHANGE [HASH(ss_customer_sk)]
+|
+15:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: ss_ticket_number, ss_customer_sk
+|  having: count(*) <= 5, count(*) >= 1
+|
+14:EXCHANGE [HASH(ss_ticket_number,ss_customer_sk)]
+|
+07:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: ss_ticket_number, ss_customer_sk
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_store_sk = store.s_store_sk
+|  runtime filters: RF001 <- store.s_store_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: store.s_store_sk
 |  |
-|  05:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_sold_date_sk = date_dim.d_date_sk
-|  |  runtime filters: RF002 <- date_dim.d_date_sk
+|  13:EXCHANGE [BROADCAST]
 |  |
-|  |--JOIN BUILD
-|  |  |  join-table-id=02 plan-id=03 cohort-id=02
-|  |  |  build expressions: date_dim.d_date_sk
-|  |  |
-|  |  12:EXCHANGE [BROADCAST]
-|  |  |
-|  |  01:SCAN HDFS [tpcds.date_dim]
-|  |     partitions=1/1 files=1 size=9.84MB
+|  02:SCAN HDFS [tpcds.store]
+|     partitions=1/1 files=1 size=3.08KB
+|     predicates: store.s_county IN ('Saginaw County', 'Sumner County', 'Appanoose County', 'Daviess County')
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_sold_date_sk = date_dim.d_date_sk
+|  runtime filters: RF002 <- date_dim.d_date_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: date_dim.d_date_sk
 |  |
-|  04:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
-|  |  runtime filters: RF003 <- household_demographics.hd_demo_sk
+|  12:EXCHANGE [BROADCAST]
 |  |
-|  |--JOIN BUILD
-|  |  |  join-table-id=03 plan-id=04 cohort-id=02
-|  |  |  build expressions: household_demographics.hd_demo_sk
-|  |  |
-|  |  11:EXCHANGE [BROADCAST]
-|  |  |
-|  |  03:SCAN HDFS [tpcds.household_demographics]
-|  |     partitions=1/1 files=1 size=148.10KB
-|  |     predicates: household_demographics.hd_vehicle_count > 0, household_demographics.hd_buy_potential IN ('>10000', 'unknown'), CASE WHEN household_demographics.hd_vehicle_count > 0 THEN household_demographics.hd_dep_count / household_demographics.hd_vehicle_count ELSE NULL END > 1
+|  01:SCAN HDFS [tpcds.date_dim]
+|     partitions=1/1 files=1 size=9.84MB
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
+|  runtime filters: RF003 <- household_demographics.hd_demo_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=03 plan-id=04 cohort-id=01
+|  |  build expressions: household_demographics.hd_demo_sk
 |  |
-|  00:SCAN HDFS [tpcds.store_sales]
-|     partitions=1824/1824 files=1824 size=326.32MB
-|     runtime filters: RF001 -> store_sales.ss_store_sk, RF002 -> store_sales.ss_sold_date_sk, RF003 -> store_sales.ss_hdemo_sk
+|  11:EXCHANGE [BROADCAST]
+|  |
+|  03:SCAN HDFS [tpcds.household_demographics]
+|     partitions=1/1 files=1 size=148.10KB
+|     predicates: household_demographics.hd_vehicle_count > 0, household_demographics.hd_buy_potential IN ('>10000', 'unknown'), CASE WHEN household_demographics.hd_vehicle_count > 0 THEN household_demographics.hd_dep_count / household_demographics.hd_vehicle_count ELSE NULL END > 1
 |
-08:SCAN HDFS [tpcds.customer]
-   partitions=1/1 files=1 size=12.60MB
-   runtime filters: RF000 -> c_customer_sk
+00:SCAN HDFS [tpcds.store_sales]
+   partitions=1824/1824 files=1824 size=326.32MB
+   runtime filters: RF000 -> tpcds.store_sales.ss_customer_sk, RF001 -> store_sales.ss_store_sk, RF002 -> store_sales.ss_sold_date_sk, RF003 -> store_sales.ss_hdemo_sk
 ====
 # TPCDS-Q79
 select
@@ -4005,145 +4015,147 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-17:MERGING-EXCHANGE [UNPARTITIONED]
+18:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: c_last_name ASC, c_first_name ASC, substr(s_city, 1, 30) ASC, profit ASC
 |  limit: 100
 |
 10:TOP-N [LIMIT=100]
 |  order by: c_last_name ASC, c_first_name ASC, substr(s_city, 1, 30) ASC, profit ASC
 |
-09:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: c_customer_sk = ss_customer_sk
-|  runtime filters: RF000 <- ss_customer_sk
+09:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_customer_sk = c_customer_sk
+|  runtime filters: RF000 <- c_customer_sk
 |
-|--16:EXCHANGE [BROADCAST]
-|  |
-|  15:AGGREGATE [FINALIZE]
-|  |  output: sum:merge(ss_coupon_amt), sum:merge(ss_net_profit)
-|  |  group by: ss_ticket_number, ss_customer_sk, ss_addr_sk, store.s_city
-|  |
-|  14:EXCHANGE [HASH(ss_ticket_number,ss_customer_sk,ss_addr_sk,store.s_city)]
-|  |
-|  07:AGGREGATE [STREAMING]
-|  |  output: sum(ss_coupon_amt), sum(ss_net_profit)
-|  |  group by: ss_ticket_number, ss_customer_sk, ss_addr_sk, store.s_city
-|  |
-|  06:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_sold_date_sk = date_dim.d_date_sk
-|  |  runtime filters: RF001 <- date_dim.d_date_sk
-|  |
-|  |--13:EXCHANGE [BROADCAST]
-|  |  |
-|  |  01:SCAN HDFS [tpcds.date_dim]
-|  |     partitions=1/1 files=1 size=9.84MB
-|  |     predicates: d_date <= '1999-03-31', d_date >= '1999-01-01'
-|  |
-|  05:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_store_sk = store.s_store_sk
-|  |  runtime filters: RF002 <- store.s_store_sk
+|--17:EXCHANGE [HASH(c_customer_sk)]
 |  |
-|  |--12:EXCHANGE [BROADCAST]
-|  |  |
-|  |  02:SCAN HDFS [tpcds.store]
-|  |     partitions=1/1 files=1 size=3.08KB
-|  |     predicates: store.s_number_employees <= 295, store.s_number_employees >= 200
+|  08:SCAN HDFS [tpcds.customer]
+|     partitions=1/1 files=1 size=12.60MB
+|
+16:EXCHANGE [HASH(ss_customer_sk)]
+|
+15:AGGREGATE [FINALIZE]
+|  output: sum:merge(ss_coupon_amt), sum:merge(ss_net_profit)
+|  group by: ss_ticket_number, ss_customer_sk, ss_addr_sk, store.s_city
+|
+14:EXCHANGE [HASH(ss_ticket_number,ss_customer_sk,ss_addr_sk,store.s_city)]
+|
+07:AGGREGATE [STREAMING]
+|  output: sum(ss_coupon_amt), sum(ss_net_profit)
+|  group by: ss_ticket_number, ss_customer_sk, ss_addr_sk, store.s_city
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_sold_date_sk = date_dim.d_date_sk
+|  runtime filters: RF001 <- date_dim.d_date_sk
+|
+|--13:EXCHANGE [BROADCAST]
 |  |
-|  04:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
-|  |  runtime filters: RF003 <- household_demographics.hd_demo_sk
+|  01:SCAN HDFS [tpcds.date_dim]
+|     partitions=1/1 files=1 size=9.84MB
+|     predicates: d_date <= '1999-03-31', d_date >= '1999-01-01'
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_store_sk = store.s_store_sk
+|  runtime filters: RF002 <- store.s_store_sk
+|
+|--12:EXCHANGE [BROADCAST]
 |  |
-|  |--11:EXCHANGE [BROADCAST]
-|  |  |
-|  |  03:SCAN HDFS [tpcds.household_demographics]
-|  |     partitions=1/1 files=1 size=148.10KB
-|  |     predicates: (household_demographics.hd_dep_count = 8 OR household_demographics.hd_vehicle_count > 0)
+|  02:SCAN HDFS [tpcds.store]
+|     partitions=1/1 files=1 size=3.08KB
+|     predicates: store.s_number_employees <= 295, store.s_number_employees >= 200
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
+|  runtime filters: RF003 <- household_demographics.hd_demo_sk
+|
+|--11:EXCHANGE [BROADCAST]
 |  |
-|  00:SCAN HDFS [tpcds.store_sales]
-|     partitions=1824/1824 files=1824 size=326.32MB
-|     runtime filters: RF001 -> store_sales.ss_sold_date_sk, RF002 -> store_sales.ss_store_sk, RF003 -> store_sales.ss_hdemo_sk
+|  03:SCAN HDFS [tpcds.household_demographics]
+|     partitions=1/1 files=1 size=148.10KB
+|     predicates: (household_demographics.hd_dep_count = 8 OR household_demographics.hd_vehicle_count > 0)
 |
-08:SCAN HDFS [tpcds.customer]
-   partitions=1/1 files=1 size=12.60MB
-   runtime filters: RF000 -> c_customer_sk
+00:SCAN HDFS [tpcds.store_sales]
+   partitions=1824/1824 files=1824 size=326.32MB
+   runtime filters: RF000 -> tpcds.store_sales.ss_customer_sk, RF001 -> store_sales.ss_sold_date_sk, RF002 -> store_sales.ss_store_sk, RF003 -> store_sales.ss_hdemo_sk
 ---- PARALLELPLANS
 PLAN-ROOT SINK
 |
-17:MERGING-EXCHANGE [UNPARTITIONED]
+18:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: c_last_name ASC, c_first_name ASC, substr(s_city, 1, 30) ASC, profit ASC
 |  limit: 100
 |
 10:TOP-N [LIMIT=100]
 |  order by: c_last_name ASC, c_first_name ASC, substr(s_city, 1, 30) ASC, profit ASC
 |
-09:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: c_customer_sk = ss_customer_sk
-|  runtime filters: RF000 <- ss_customer_sk
+09:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_customer_sk = c_customer_sk
+|  runtime filters: RF000 <- c_customer_sk
 |
 |--JOIN BUILD
 |  |  join-table-id=00 plan-id=01 cohort-id=01
-|  |  build expressions: ss_customer_sk
-|  |
-|  16:EXCHANGE [BROADCAST]
-|  |
-|  15:AGGREGATE [FINALIZE]
-|  |  output: sum:merge(ss_coupon_amt), sum:merge(ss_net_profit)
-|  |  group by: ss_ticket_number, ss_customer_sk, ss_addr_sk, store.s_city
-|  |
-|  14:EXCHANGE [HASH(ss_ticket_number,ss_customer_sk,ss_addr_sk,store.s_city)]
+|  |  build expressions: c_customer_sk
 |  |
-|  07:AGGREGATE [STREAMING]
-|  |  output: sum(ss_coupon_amt), sum(ss_net_profit)
-|  |  group by: ss_ticket_number, ss_customer_sk, ss_addr_sk, store.s_city
+|  17:EXCHANGE [HASH(c_customer_sk)]
 |  |
-|  06:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_sold_date_sk = date_dim.d_date_sk
-|  |  runtime filters: RF001 <- date_dim.d_date_sk
+|  08:SCAN HDFS [tpcds.customer]
+|     partitions=1/1 files=1 size=12.60MB
+|
+16:EXCHANGE [HASH(ss_customer_sk)]
+|
+15:AGGREGATE [FINALIZE]
+|  output: sum:merge(ss_coupon_amt), sum:merge(ss_net_profit)
+|  group by: ss_ticket_number, ss_customer_sk, ss_addr_sk, store.s_city
+|
+14:EXCHANGE [HASH(ss_ticket_number,ss_customer_sk,ss_addr_sk,store.s_city)]
+|
+07:AGGREGATE [STREAMING]
+|  output: sum(ss_coupon_amt), sum(ss_net_profit)
+|  group by: ss_ticket_number, ss_customer_sk, ss_addr_sk, store.s_city
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_sold_date_sk = date_dim.d_date_sk
+|  runtime filters: RF001 <- date_dim.d_date_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: date_dim.d_date_sk
 |  |
-|  |--JOIN BUILD
-|  |  |  join-table-id=01 plan-id=02 cohort-id=02
-|  |  |  build expressions: date_dim.d_date_sk
-|  |  |
-|  |  13:EXCHANGE [BROADCAST]
-|  |  |
-|  |  01:SCAN HDFS [tpcds.date_dim]
-|  |     partitions=1/1 files=1 size=9.84MB
-|  |     predicates: d_date <= '1999-03-31', d_date >= '1999-01-01'
+|  13:EXCHANGE [BROADCAST]
 |  |
-|  05:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_store_sk = store.s_store_sk
-|  |  runtime filters: RF002 <- store.s_store_sk
+|  01:SCAN HDFS [tpcds.date_dim]
+|     partitions=1/1 files=1 size=9.84MB
+|     predicates: d_date <= '1999-03-31', d_date >= '1999-01-01'
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_store_sk = store.s_store_sk
+|  runtime filters: RF002 <- store.s_store_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: store.s_store_sk
 |  |
-|  |--JOIN BUILD
-|  |  |  join-table-id=02 plan-id=03 cohort-id=02
-|  |  |  build expressions: store.s_store_sk
-|  |  |
-|  |  12:EXCHANGE [BROADCAST]
-|  |  |
-|  |  02:SCAN HDFS [tpcds.store]
-|  |     partitions=1/1 files=1 size=3.08KB
-|  |     predicates: store.s_number_employees <= 295, store.s_number_employees >= 200
+|  12:EXCHANGE [BROADCAST]
 |  |
-|  04:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
-|  |  runtime filters: RF003 <- household_demographics.hd_demo_sk
+|  02:SCAN HDFS [tpcds.store]
+|     partitions=1/1 files=1 size=3.08KB
+|     predicates: store.s_number_employees <= 295, store.s_number_employees >= 200
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
+|  runtime filters: RF003 <- household_demographics.hd_demo_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=03 plan-id=04 cohort-id=01
+|  |  build expressions: household_demographics.hd_demo_sk
 |  |
-|  |--JOIN BUILD
-|  |  |  join-table-id=03 plan-id=04 cohort-id=02
-|  |  |  build expressions: household_demographics.hd_demo_sk
-|  |  |
-|  |  11:EXCHANGE [BROADCAST]
-|  |  |
-|  |  03:SCAN HDFS [tpcds.household_demographics]
-|  |     partitions=1/1 files=1 size=148.10KB
-|  |     predicates: (household_demographics.hd_dep_count = 8 OR household_demographics.hd_vehicle_count > 0)
+|  11:EXCHANGE [BROADCAST]
 |  |
-|  00:SCAN HDFS [tpcds.store_sales]
-|     partitions=1824/1824 files=1824 size=326.32MB
-|     runtime filters: RF001 -> store_sales.ss_sold_date_sk, RF002 -> store_sales.ss_store_sk, RF003 -> store_sales.ss_hdemo_sk
+|  03:SCAN HDFS [tpcds.household_demographics]
+|     partitions=1/1 files=1 size=148.10KB
+|     predicates: (household_demographics.hd_dep_count = 8 OR household_demographics.hd_vehicle_count > 0)
 |
-08:SCAN HDFS [tpcds.customer]
-   partitions=1/1 files=1 size=12.60MB
-   runtime filters: RF000 -> c_customer_sk
+00:SCAN HDFS [tpcds.store_sales]
+   partitions=1824/1824 files=1824 size=326.32MB
+   runtime filters: RF000 -> tpcds.store_sales.ss_customer_sk, RF001 -> store_sales.ss_sold_date_sk, RF002 -> store_sales.ss_store_sk, RF003 -> store_sales.ss_hdemo_sk
 ====
 # TPCDS-Q89
 select * from (select  *
@@ -4574,7 +4586,7 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-13:MERGING-EXCHANGE [UNPARTITIONED]
+14:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: i_category ASC, i_class ASC, i_item_id ASC, i_item_desc ASC, sum(ss_ext_sales_price) * 100 / sum(sum(ss_ext_sales_price)) ASC
 |  limit: 1000
 |
@@ -4588,11 +4600,11 @@ PLAN-ROOT SINK
 06:SORT
 |  order by: i_class ASC NULLS FIRST
 |
-12:AGGREGATE [FINALIZE]
+13:AGGREGATE [FINALIZE]
 |  output: sum:merge(ss_ext_sales_price)
 |  group by: i_item_id, i_item_desc, i_category, i_class, i_current_price
 |
-11:EXCHANGE [HASH(i_class)]
+12:EXCHANGE [HASH(i_class)]
 |
 05:AGGREGATE [STREAMING]
 |  output: sum(ss_ext_sales_price)
@@ -4602,30 +4614,31 @@ PLAN-ROOT SINK
 |  hash predicates: ss_sold_date_sk = d_date_sk
 |  runtime filters: RF000 <- d_date_sk
 |
-|--10:EXCHANGE [BROADCAST]
+|--11:EXCHANGE [BROADCAST]
 |  |
 |  02:SCAN HDFS [tpcds.date_dim]
 |     partitions=1/1 files=1 size=9.84MB
 |     predicates: tpcds.date_dim.d_date_sk <= 2451941, tpcds.date_dim.d_date_sk >= 2451911, d_date <= '2001-01-31', d_date >= '2001-01-01'
 |
-03:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: i_item_sk = ss_item_sk
-|  runtime filters: RF001 <- ss_item_sk
+03:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_item_sk = i_item_sk
+|  runtime filters: RF001 <- i_item_sk
 |
-|--09:EXCHANGE [BROADCAST]
+|--10:EXCHANGE [HASH(i_item_sk)]
 |  |
-|  00:SCAN HDFS [tpcds.store_sales]
-|     partitions=31/1824 files=31 size=3.43MB
-|     runtime filters: RF000 -> ss_sold_date_sk
+|  01:SCAN HDFS [tpcds.item]
+|     partitions=1/1 files=1 size=4.82MB
+|     predicates: i_category IN ('Jewelry', 'Sports', 'Books')
 |
-01:SCAN HDFS [tpcds.item]
-   partitions=1/1 files=1 size=4.82MB
-   predicates: i_category IN ('Jewelry', 'Sports', 'Books')
-   runtime filters: RF001 -> i_item_sk
+09:EXCHANGE [HASH(ss_item_sk)]
+|
+00:SCAN HDFS [tpcds.store_sales]
+   partitions=31/1824 files=31 size=3.43MB
+   runtime filters: RF000 -> ss_sold_date_sk, RF001 -> ss_item_sk
 ---- PARALLELPLANS
 PLAN-ROOT SINK
 |
-13:MERGING-EXCHANGE [UNPARTITIONED]
+14:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: i_category ASC, i_class ASC, i_item_id ASC, i_item_desc ASC, sum(ss_ext_sales_price) * 100 / sum(sum(ss_ext_sales_price)) ASC
 |  limit: 1000
 |
@@ -4639,11 +4652,11 @@ PLAN-ROOT SINK
 06:SORT
 |  order by: i_class ASC NULLS FIRST
 |
-12:AGGREGATE [FINALIZE]
+13:AGGREGATE [FINALIZE]
 |  output: sum:merge(ss_ext_sales_price)
 |  group by: i_item_id, i_item_desc, i_category, i_class, i_current_price
 |
-11:EXCHANGE [HASH(i_class)]
+12:EXCHANGE [HASH(i_class)]
 |
 05:AGGREGATE [STREAMING]
 |  output: sum(ss_ext_sales_price)
@@ -4657,30 +4670,31 @@ PLAN-ROOT SINK
 |  |  join-table-id=00 plan-id=01 cohort-id=01
 |  |  build expressions: d_date_sk
 |  |
-|  10:EXCHANGE [BROADCAST]
+|  11:EXCHANGE [BROADCAST]
 |  |
 |  02:SCAN HDFS [tpcds.date_dim]
 |     partitions=1/1 files=1 size=9.84MB
 |     predicates: tpcds.date_dim.d_date_sk <= 2451941, tpcds.date_dim.d_date_sk >= 2451911, d_date <= '2001-01-31', d_date >= '2001-01-01'
 |
-03:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: i_item_sk = ss_item_sk
-|  runtime filters: RF001 <- ss_item_sk
+03:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_item_sk = i_item_sk
+|  runtime filters: RF001 <- i_item_sk
 |
 |--JOIN BUILD
 |  |  join-table-id=01 plan-id=02 cohort-id=01
-|  |  build expressions: ss_item_sk
+|  |  build expressions: i_item_sk
 |  |
-|  09:EXCHANGE [BROADCAST]
+|  10:EXCHANGE [HASH(i_item_sk)]
 |  |
-|  00:SCAN HDFS [tpcds.store_sales]
-|     partitions=31/1824 files=31 size=3.43MB
-|     runtime filters: RF000 -> ss_sold_date_sk
+|  01:SCAN HDFS [tpcds.item]
+|     partitions=1/1 files=1 size=4.82MB
+|     predicates: i_category IN ('Jewelry', 'Sports', 'Books')
 |
-01:SCAN HDFS [tpcds.item]
-   partitions=1/1 files=1 size=4.82MB
-   predicates: i_category IN ('Jewelry', 'Sports', 'Books')
-   runtime filters: RF001 -> i_item_sk
+09:EXCHANGE [HASH(ss_item_sk)]
+|
+00:SCAN HDFS [tpcds.store_sales]
+   partitions=31/1824 files=31 size=3.43MB
+   runtime filters: RF000 -> ss_sold_date_sk, RF001 -> ss_item_sk
 ====
 # TPCD-Q6
 select * from (