You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/12 22:09:54 UTC

[19/50] [abbrv] incubator-impala git commit: MT: Planner for multi-threaded execution

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
index 8fe3e3e..f9d0ff0 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
@@ -52,6 +52,26 @@ order by
 00:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
    predicates: l_shipdate <= '1998-09-02'
+---- PARALLELPLANS
+05:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: l_returnflag ASC, l_linestatus ASC
+|
+02:SORT
+|  order by: l_returnflag ASC, l_linestatus ASC
+|
+04:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_quantity), sum:merge(l_extendedprice), sum:merge(l_extendedprice * (1 - l_discount)), sum:merge(l_extendedprice * (1 - l_discount) * (1 + l_tax)), avg:merge(l_quantity), avg:merge(l_extendedprice), avg:merge(l_discount), count:merge(*)
+|  group by: l_returnflag, l_linestatus
+|
+03:EXCHANGE [HASH(l_returnflag,l_linestatus)]
+|
+01:AGGREGATE [STREAMING]
+|  output: sum(l_quantity), sum(l_extendedprice), sum(l_extendedprice * (1 - l_discount)), sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)), avg(l_quantity), avg(l_extendedprice), avg(l_discount), count(*)
+|  group by: l_returnflag, l_linestatus
+|
+00:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_shipdate <= '1998-09-02'
 ====
 # TPCH-Q2
 # Q2 - Minimum Cost Supplier Query
@@ -277,6 +297,141 @@ limit 100
 05:SCAN HDFS [tpch.partsupp]
    partitions=1/1 files=1 size=112.71MB
    runtime filters: RF001 -> tpch.partsupp.ps_partkey, RF004 -> ps_suppkey
+---- PARALLELPLANS
+30:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: s_acctbal DESC, n_name ASC, s_name ASC, p_partkey ASC
+|  limit: 100
+|
+18:TOP-N [LIMIT=100]
+|  order by: s_acctbal DESC, n_name ASC, s_name ASC, p_partkey ASC
+|
+17:HASH JOIN [RIGHT SEMI JOIN, PARTITIONED]
+|  hash predicates: min(ps_supplycost) = ps_supplycost, ps_partkey = p_partkey
+|  runtime filters: RF001 <- p_partkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: ps_supplycost, p_partkey
+|  |
+|  29:EXCHANGE [HASH(ps_supplycost,p_partkey)]
+|  |
+|  16:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash predicates: n_regionkey = r_regionkey
+|  |  runtime filters: RF005 <- r_regionkey
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=01 plan-id=02 cohort-id=02
+|  |  |  build expressions: r_regionkey
+|  |  |
+|  |  27:EXCHANGE [BROADCAST]
+|  |  |
+|  |  04:SCAN HDFS [tpch.region]
+|  |     partitions=1/1 files=1 size=384B
+|  |     predicates: r_name = 'EUROPE'
+|  |
+|  15:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash predicates: s_nationkey = n_nationkey
+|  |  runtime filters: RF006 <- n_nationkey
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=02 plan-id=03 cohort-id=02
+|  |  |  build expressions: n_nationkey
+|  |  |
+|  |  26:EXCHANGE [BROADCAST]
+|  |  |
+|  |  03:SCAN HDFS [tpch.nation]
+|  |     partitions=1/1 files=1 size=2.15KB
+|  |     runtime filters: RF005 -> n_regionkey
+|  |
+|  14:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash predicates: ps_suppkey = s_suppkey
+|  |  runtime filters: RF007 <- s_suppkey
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=03 plan-id=04 cohort-id=02
+|  |  |  build expressions: s_suppkey
+|  |  |
+|  |  25:EXCHANGE [BROADCAST]
+|  |  |
+|  |  01:SCAN HDFS [tpch.supplier]
+|  |     partitions=1/1 files=1 size=1.33MB
+|  |     runtime filters: RF006 -> s_nationkey
+|  |
+|  13:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash predicates: ps_partkey = p_partkey
+|  |  runtime filters: RF008 <- p_partkey
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=04 plan-id=05 cohort-id=02
+|  |  |  build expressions: p_partkey
+|  |  |
+|  |  24:EXCHANGE [BROADCAST]
+|  |  |
+|  |  00:SCAN HDFS [tpch.part]
+|  |     partitions=1/1 files=1 size=22.83MB
+|  |     predicates: p_size = 15, p_type LIKE '%BRASS'
+|  |
+|  02:SCAN HDFS [tpch.partsupp]
+|     partitions=1/1 files=1 size=112.71MB
+|     runtime filters: RF007 -> ps_suppkey, RF008 -> ps_partkey
+|
+28:EXCHANGE [HASH(min(ps_supplycost),ps_partkey)]
+|
+23:AGGREGATE [FINALIZE]
+|  output: min:merge(ps_supplycost)
+|  group by: ps_partkey
+|
+22:EXCHANGE [HASH(ps_partkey)]
+|
+12:AGGREGATE [STREAMING]
+|  output: min(ps_supplycost)
+|  group by: ps_partkey
+|
+11:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: n_regionkey = r_regionkey
+|  runtime filters: RF002 <- r_regionkey
+|
+|--JOIN BUILD
+|  |  join-table-id=05 plan-id=06 cohort-id=01
+|  |  build expressions: r_regionkey
+|  |
+|  21:EXCHANGE [BROADCAST]
+|  |
+|  08:SCAN HDFS [tpch.region]
+|     partitions=1/1 files=1 size=384B
+|     predicates: r_name = 'EUROPE'
+|
+10:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: s_nationkey = n_nationkey
+|  runtime filters: RF003 <- n_nationkey
+|
+|--JOIN BUILD
+|  |  join-table-id=06 plan-id=07 cohort-id=01
+|  |  build expressions: n_nationkey
+|  |
+|  20:EXCHANGE [BROADCAST]
+|  |
+|  07:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|     runtime filters: RF002 -> n_regionkey
+|
+09:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: ps_suppkey = s_suppkey
+|  runtime filters: RF004 <- s_suppkey
+|
+|--JOIN BUILD
+|  |  join-table-id=07 plan-id=08 cohort-id=01
+|  |  build expressions: s_suppkey
+|  |
+|  19:EXCHANGE [BROADCAST]
+|  |
+|  06:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF003 -> s_nationkey
+|
+05:SCAN HDFS [tpch.partsupp]
+   partitions=1/1 files=1 size=112.71MB
+   runtime filters: RF001 -> tpch.partsupp.ps_partkey, RF004 -> ps_suppkey
 ====
 # TPCH-Q3
 # Q3 - Shipping Priority Query
@@ -375,6 +530,57 @@ limit 10
    partitions=1/1 files=1 size=718.94MB
    predicates: l_shipdate > '1995-03-15'
    runtime filters: RF001 -> l_orderkey
+---- PARALLELPLANS
+11:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: sum(l_extendedprice * (1 - l_discount)) DESC, o_orderdate ASC
+|  limit: 10
+|
+06:TOP-N [LIMIT=10]
+|  order by: sum(l_extendedprice * (1 - l_discount)) DESC, o_orderdate ASC
+|
+10:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_extendedprice * (1 - l_discount))
+|  group by: l_orderkey, o_orderdate, o_shippriority
+|
+09:EXCHANGE [HASH(l_orderkey,o_orderdate,o_shippriority)]
+|
+05:AGGREGATE [STREAMING]
+|  output: sum(l_extendedprice * (1 - l_discount))
+|  group by: l_orderkey, o_orderdate, o_shippriority
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF000 <- c_custkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: c_custkey
+|  |
+|  08:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: c_mktsegment = 'BUILDING'
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF001 <- o_orderkey
+|
+|--JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: o_orderkey
+|  |
+|  07:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: o_orderdate < '1995-03-15'
+|     runtime filters: RF000 -> o_custkey
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_shipdate > '1995-03-15'
+   runtime filters: RF001 -> l_orderkey
 ====
 # TPCH-Q4
 # Q4 - Order Priority Checking Query
@@ -452,6 +658,43 @@ order by
    partitions=1/1 files=1 size=718.94MB
    predicates: l_commitdate < l_receiptdate
    runtime filters: RF000 -> l_orderkey
+---- PARALLELPLANS
+09:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: o_orderpriority ASC
+|
+04:SORT
+|  order by: o_orderpriority ASC
+|
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: o_orderpriority
+|
+07:EXCHANGE [HASH(o_orderpriority)]
+|
+03:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: o_orderpriority
+|
+02:HASH JOIN [RIGHT SEMI JOIN, PARTITIONED]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF000 <- o_orderkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: o_orderkey
+|  |
+|  06:EXCHANGE [HASH(o_orderkey)]
+|  |
+|  00:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: o_orderdate >= '1993-07-01', o_orderdate < '1993-10-01'
+|
+05:EXCHANGE [HASH(l_orderkey)]
+|
+01:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_commitdate < l_receiptdate
+   runtime filters: RF000 -> l_orderkey
 ====
 # TPCH-Q5
 # Q5 - Local Supplier Volume Query
@@ -600,6 +843,96 @@ order by
 02:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF002 -> l_suppkey, RF005 -> l_orderkey
+---- PARALLELPLANS
+20:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: sum(l_extendedprice * (1 - l_discount)) DESC
+|
+12:SORT
+|  order by: sum(l_extendedprice * (1 - l_discount)) DESC
+|
+19:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_extendedprice * (1 - l_discount))
+|  group by: n_name
+|
+18:EXCHANGE [HASH(n_name)]
+|
+11:AGGREGATE [STREAMING]
+|  output: sum(l_extendedprice * (1 - l_discount))
+|  group by: n_name
+|
+10:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: n_regionkey = r_regionkey
+|  runtime filters: RF000 <- r_regionkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: r_regionkey
+|  |
+|  17:EXCHANGE [BROADCAST]
+|  |
+|  05:SCAN HDFS [tpch.region]
+|     partitions=1/1 files=1 size=384B
+|     predicates: r_name = 'ASIA'
+|
+09:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: s_nationkey = n_nationkey
+|  runtime filters: RF001 <- n_nationkey
+|
+|--JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: n_nationkey
+|  |
+|  16:EXCHANGE [BROADCAST]
+|  |
+|  04:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|     runtime filters: RF000 -> n_regionkey
+|
+08:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_suppkey = s_suppkey, c_nationkey = s_nationkey
+|  runtime filters: RF002 <- s_suppkey, RF003 <- s_nationkey
+|
+|--JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: s_suppkey, s_nationkey
+|  |
+|  15:EXCHANGE [BROADCAST]
+|  |
+|  03:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|
+07:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF004 <- c_custkey
+|
+|--JOIN BUILD
+|  |  join-table-id=03 plan-id=04 cohort-id=01
+|  |  build expressions: c_custkey
+|  |
+|  14:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     runtime filters: RF001 -> tpch.customer.c_nationkey, RF003 -> c_nationkey
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF005 <- o_orderkey
+|
+|--JOIN BUILD
+|  |  join-table-id=04 plan-id=05 cohort-id=01
+|  |  build expressions: o_orderkey
+|  |
+|  13:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: o_orderdate >= '1994-01-01', o_orderdate < '1995-01-01'
+|     runtime filters: RF004 -> o_custkey
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF002 -> l_suppkey, RF005 -> l_orderkey
 ====
 # TPCH-Q6
 # Q6 - Forecasting Revenue Change Query
@@ -631,6 +964,18 @@ where
 00:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
    predicates: l_discount >= 0.05, l_discount <= 0.07, l_quantity < 24, l_shipdate >= '1994-01-01', l_shipdate < '1995-01-01'
+---- PARALLELPLANS
+03:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_extendedprice * l_discount)
+|
+02:EXCHANGE [UNPARTITIONED]
+|
+01:AGGREGATE
+|  output: sum(l_extendedprice * l_discount)
+|
+00:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_discount >= 0.05, l_discount <= 0.07, l_quantity < 24, l_shipdate >= '1994-01-01', l_shipdate < '1995-01-01'
 ====
 # TPCH-Q7
 # Q7 - Volume Shipping Query
@@ -795,6 +1140,98 @@ order by
    partitions=1/1 files=1 size=718.94MB
    predicates: l_shipdate >= '1995-01-01', l_shipdate <= '1996-12-31'
    runtime filters: RF003 -> l_suppkey, RF004 -> l_orderkey
+---- PARALLELPLANS
+21:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: supp_nation ASC, cust_nation ASC, l_year ASC
+|
+12:SORT
+|  order by: supp_nation ASC, cust_nation ASC, l_year ASC
+|
+20:AGGREGATE [FINALIZE]
+|  output: sum:merge(volume)
+|  group by: supp_nation, cust_nation, l_year
+|
+19:EXCHANGE [HASH(supp_nation,cust_nation,l_year)]
+|
+11:AGGREGATE [STREAMING]
+|  output: sum(l_extendedprice * (1 - l_discount))
+|  group by: n1.n_name, n2.n_name, year(l_shipdate)
+|
+10:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c_nationkey = n2.n_nationkey
+|  other predicates: ((n1.n_name = 'FRANCE' AND n2.n_name = 'GERMANY') OR (n1.n_name = 'GERMANY' AND n2.n_name = 'FRANCE'))
+|  runtime filters: RF000 <- n2.n_nationkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: n2.n_nationkey
+|  |
+|  18:EXCHANGE [BROADCAST]
+|  |
+|  05:SCAN HDFS [tpch.nation n2]
+|     partitions=1/1 files=1 size=2.15KB
+|
+09:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: s_nationkey = n1.n_nationkey
+|  runtime filters: RF001 <- n1.n_nationkey
+|
+|--JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: n1.n_nationkey
+|  |
+|  17:EXCHANGE [BROADCAST]
+|  |
+|  04:SCAN HDFS [tpch.nation n1]
+|     partitions=1/1 files=1 size=2.15KB
+|
+08:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF002 <- c_custkey
+|
+|--JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: c_custkey
+|  |
+|  16:EXCHANGE [BROADCAST]
+|  |
+|  03:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     runtime filters: RF000 -> c_nationkey
+|
+07:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_suppkey = s_suppkey
+|  runtime filters: RF003 <- s_suppkey
+|
+|--JOIN BUILD
+|  |  join-table-id=03 plan-id=04 cohort-id=01
+|  |  build expressions: s_suppkey
+|  |
+|  15:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF001 -> s_nationkey
+|
+06:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF004 <- o_orderkey
+|
+|--JOIN BUILD
+|  |  join-table-id=04 plan-id=05 cohort-id=01
+|  |  build expressions: o_orderkey
+|  |
+|  14:EXCHANGE [HASH(o_orderkey)]
+|  |
+|  02:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     runtime filters: RF002 -> o_custkey
+|
+13:EXCHANGE [HASH(l_orderkey)]
+|
+01:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_shipdate >= '1995-01-01', l_shipdate <= '1996-12-31'
+   runtime filters: RF003 -> l_suppkey, RF004 -> l_orderkey
 ====
 # TPCH-Q8
 # Q8 - National Market Share Query
@@ -847,62 +1284,156 @@ order by
 |  hash predicates: s_nationkey = n2.n_nationkey
 |  runtime filters: RF000 <- n2.n_nationkey
 |
-|--06:SCAN HDFS [tpch.nation n2]
+|--06:SCAN HDFS [tpch.nation n2]
+|     partitions=1/1 files=1 size=2.15KB
+|
+13:HASH JOIN [INNER JOIN]
+|  hash predicates: n1.n_regionkey = r_regionkey
+|  runtime filters: RF001 <- r_regionkey
+|
+|--07:SCAN HDFS [tpch.region]
+|     partitions=1/1 files=1 size=384B
+|     predicates: r_name = 'AMERICA'
+|
+12:HASH JOIN [INNER JOIN]
+|  hash predicates: c_nationkey = n1.n_nationkey
+|  runtime filters: RF002 <- n1.n_nationkey
+|
+|--05:SCAN HDFS [tpch.nation n1]
+|     partitions=1/1 files=1 size=2.15KB
+|     runtime filters: RF001 -> n1.n_regionkey
+|
+11:HASH JOIN [INNER JOIN]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF003 <- c_custkey
+|
+|--04:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     runtime filters: RF002 -> c_nationkey
+|
+10:HASH JOIN [INNER JOIN]
+|  hash predicates: l_suppkey = s_suppkey
+|  runtime filters: RF004 <- s_suppkey
+|
+|--01:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF000 -> s_nationkey
+|
+09:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF005 <- o_orderkey
+|
+|--03:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: o_orderdate >= '1995-01-01', o_orderdate <= '1996-12-31'
+|     runtime filters: RF003 -> o_custkey
+|
+08:HASH JOIN [INNER JOIN]
+|  hash predicates: l_partkey = p_partkey
+|  runtime filters: RF006 <- p_partkey
+|
+|--00:SCAN HDFS [tpch.part]
+|     partitions=1/1 files=1 size=22.83MB
+|     predicates: p_type = 'ECONOMY ANODIZED STEEL'
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF004 -> l_suppkey, RF005 -> l_orderkey, RF006 -> l_partkey
+---- DISTRIBUTEDPLAN
+28:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: o_year ASC
+|
+16:SORT
+|  order by: o_year ASC
+|
+27:AGGREGATE [FINALIZE]
+|  output: sum:merge(CASE WHEN nation = 'BRAZIL' THEN volume ELSE 0 END), sum:merge(volume)
+|  group by: o_year
+|
+26:EXCHANGE [HASH(o_year)]
+|
+15:AGGREGATE [STREAMING]
+|  output: sum(CASE WHEN n2.n_name = 'BRAZIL' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum(l_extendedprice * (1 - l_discount))
+|  group by: year(o_orderdate)
+|
+14:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: s_nationkey = n2.n_nationkey
+|  runtime filters: RF000 <- n2.n_nationkey
+|
+|--25:EXCHANGE [BROADCAST]
+|  |
+|  06:SCAN HDFS [tpch.nation n2]
 |     partitions=1/1 files=1 size=2.15KB
 |
-13:HASH JOIN [INNER JOIN]
+13:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: n1.n_regionkey = r_regionkey
 |  runtime filters: RF001 <- r_regionkey
 |
-|--07:SCAN HDFS [tpch.region]
+|--24:EXCHANGE [BROADCAST]
+|  |
+|  07:SCAN HDFS [tpch.region]
 |     partitions=1/1 files=1 size=384B
 |     predicates: r_name = 'AMERICA'
 |
-12:HASH JOIN [INNER JOIN]
+12:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: c_nationkey = n1.n_nationkey
 |  runtime filters: RF002 <- n1.n_nationkey
 |
-|--05:SCAN HDFS [tpch.nation n1]
+|--23:EXCHANGE [BROADCAST]
+|  |
+|  05:SCAN HDFS [tpch.nation n1]
 |     partitions=1/1 files=1 size=2.15KB
 |     runtime filters: RF001 -> n1.n_regionkey
 |
-11:HASH JOIN [INNER JOIN]
+11:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: o_custkey = c_custkey
 |  runtime filters: RF003 <- c_custkey
 |
-|--04:SCAN HDFS [tpch.customer]
+|--22:EXCHANGE [HASH(c_custkey)]
+|  |
+|  04:SCAN HDFS [tpch.customer]
 |     partitions=1/1 files=1 size=23.08MB
 |     runtime filters: RF002 -> c_nationkey
 |
-10:HASH JOIN [INNER JOIN]
+21:EXCHANGE [HASH(o_custkey)]
+|
+10:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: l_suppkey = s_suppkey
 |  runtime filters: RF004 <- s_suppkey
 |
-|--01:SCAN HDFS [tpch.supplier]
+|--20:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch.supplier]
 |     partitions=1/1 files=1 size=1.33MB
 |     runtime filters: RF000 -> s_nationkey
 |
-09:HASH JOIN [INNER JOIN]
+09:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: l_orderkey = o_orderkey
 |  runtime filters: RF005 <- o_orderkey
 |
-|--03:SCAN HDFS [tpch.orders]
+|--19:EXCHANGE [HASH(o_orderkey)]
+|  |
+|  03:SCAN HDFS [tpch.orders]
 |     partitions=1/1 files=1 size=162.56MB
 |     predicates: o_orderdate >= '1995-01-01', o_orderdate <= '1996-12-31'
 |     runtime filters: RF003 -> o_custkey
 |
-08:HASH JOIN [INNER JOIN]
+18:EXCHANGE [HASH(l_orderkey)]
+|
+08:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: l_partkey = p_partkey
 |  runtime filters: RF006 <- p_partkey
 |
-|--00:SCAN HDFS [tpch.part]
+|--17:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN HDFS [tpch.part]
 |     partitions=1/1 files=1 size=22.83MB
 |     predicates: p_type = 'ECONOMY ANODIZED STEEL'
 |
 02:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF004 -> l_suppkey, RF005 -> l_orderkey, RF006 -> l_partkey
----- DISTRIBUTEDPLAN
+---- PARALLELPLANS
 28:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: o_year ASC
 |
@@ -923,7 +1454,11 @@ order by
 |  hash predicates: s_nationkey = n2.n_nationkey
 |  runtime filters: RF000 <- n2.n_nationkey
 |
-|--25:EXCHANGE [BROADCAST]
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: n2.n_nationkey
+|  |
+|  25:EXCHANGE [BROADCAST]
 |  |
 |  06:SCAN HDFS [tpch.nation n2]
 |     partitions=1/1 files=1 size=2.15KB
@@ -932,7 +1467,11 @@ order by
 |  hash predicates: n1.n_regionkey = r_regionkey
 |  runtime filters: RF001 <- r_regionkey
 |
-|--24:EXCHANGE [BROADCAST]
+|--JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: r_regionkey
+|  |
+|  24:EXCHANGE [BROADCAST]
 |  |
 |  07:SCAN HDFS [tpch.region]
 |     partitions=1/1 files=1 size=384B
@@ -942,7 +1481,11 @@ order by
 |  hash predicates: c_nationkey = n1.n_nationkey
 |  runtime filters: RF002 <- n1.n_nationkey
 |
-|--23:EXCHANGE [BROADCAST]
+|--JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: n1.n_nationkey
+|  |
+|  23:EXCHANGE [BROADCAST]
 |  |
 |  05:SCAN HDFS [tpch.nation n1]
 |     partitions=1/1 files=1 size=2.15KB
@@ -952,7 +1495,11 @@ order by
 |  hash predicates: o_custkey = c_custkey
 |  runtime filters: RF003 <- c_custkey
 |
-|--22:EXCHANGE [HASH(c_custkey)]
+|--JOIN BUILD
+|  |  join-table-id=03 plan-id=04 cohort-id=01
+|  |  build expressions: c_custkey
+|  |
+|  22:EXCHANGE [HASH(c_custkey)]
 |  |
 |  04:SCAN HDFS [tpch.customer]
 |     partitions=1/1 files=1 size=23.08MB
@@ -964,7 +1511,11 @@ order by
 |  hash predicates: l_suppkey = s_suppkey
 |  runtime filters: RF004 <- s_suppkey
 |
-|--20:EXCHANGE [BROADCAST]
+|--JOIN BUILD
+|  |  join-table-id=04 plan-id=05 cohort-id=01
+|  |  build expressions: s_suppkey
+|  |
+|  20:EXCHANGE [BROADCAST]
 |  |
 |  01:SCAN HDFS [tpch.supplier]
 |     partitions=1/1 files=1 size=1.33MB
@@ -974,7 +1525,11 @@ order by
 |  hash predicates: l_orderkey = o_orderkey
 |  runtime filters: RF005 <- o_orderkey
 |
-|--19:EXCHANGE [HASH(o_orderkey)]
+|--JOIN BUILD
+|  |  join-table-id=05 plan-id=06 cohort-id=01
+|  |  build expressions: o_orderkey
+|  |
+|  19:EXCHANGE [HASH(o_orderkey)]
 |  |
 |  03:SCAN HDFS [tpch.orders]
 |     partitions=1/1 files=1 size=162.56MB
@@ -987,7 +1542,11 @@ order by
 |  hash predicates: l_partkey = p_partkey
 |  runtime filters: RF006 <- p_partkey
 |
-|--17:EXCHANGE [BROADCAST]
+|--JOIN BUILD
+|  |  join-table-id=06 plan-id=07 cohort-id=01
+|  |  build expressions: p_partkey
+|  |
+|  17:EXCHANGE [BROADCAST]
 |  |
 |  00:SCAN HDFS [tpch.part]
 |     partitions=1/1 files=1 size=22.83MB
@@ -1147,6 +1706,95 @@ order by
 02:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF001 -> l_suppkey, RF002 -> l_partkey, RF003 -> l_suppkey, RF004 -> l_orderkey, RF005 -> l_partkey
+---- PARALLELPLANS
+21:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: nation ASC, o_year DESC
+|
+12:SORT
+|  order by: nation ASC, o_year DESC
+|
+20:AGGREGATE [FINALIZE]
+|  output: sum:merge(amount)
+|  group by: nation, o_year
+|
+19:EXCHANGE [HASH(nation,o_year)]
+|
+11:AGGREGATE [STREAMING]
+|  output: sum(l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity)
+|  group by: n_name, year(o_orderdate)
+|
+10:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: s_nationkey = n_nationkey
+|  runtime filters: RF000 <- n_nationkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: n_nationkey
+|  |
+|  18:EXCHANGE [BROADCAST]
+|  |
+|  05:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|
+09:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_suppkey = ps_suppkey, l_partkey = ps_partkey
+|  runtime filters: RF001 <- ps_suppkey, RF002 <- ps_partkey
+|
+|--JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: ps_suppkey, ps_partkey
+|  |
+|  17:EXCHANGE [BROADCAST]
+|  |
+|  03:SCAN HDFS [tpch.partsupp]
+|     partitions=1/1 files=1 size=112.71MB
+|
+08:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_suppkey = s_suppkey
+|  runtime filters: RF003 <- s_suppkey
+|
+|--JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: s_suppkey
+|  |
+|  16:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF000 -> s_nationkey
+|
+07:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF004 <- o_orderkey
+|
+|--JOIN BUILD
+|  |  join-table-id=03 plan-id=04 cohort-id=01
+|  |  build expressions: o_orderkey
+|  |
+|  15:EXCHANGE [HASH(o_orderkey)]
+|  |
+|  04:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|
+14:EXCHANGE [HASH(l_orderkey)]
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_partkey = p_partkey
+|  runtime filters: RF005 <- p_partkey
+|
+|--JOIN BUILD
+|  |  join-table-id=04 plan-id=05 cohort-id=01
+|  |  build expressions: p_partkey
+|  |
+|  13:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN HDFS [tpch.part]
+|     partitions=1/1 files=1 size=22.83MB
+|     predicates: p_name LIKE '%green%'
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF001 -> l_suppkey, RF002 -> l_partkey, RF003 -> l_suppkey, RF004 -> l_orderkey, RF005 -> l_partkey
 ====
 # TPCH-Q10
 # Q10 - Returned Item Reporting Query
@@ -1273,6 +1921,72 @@ limit 20
    partitions=1/1 files=1 size=718.94MB
    predicates: l_returnflag = 'R'
    runtime filters: RF002 -> l_orderkey
+---- PARALLELPLANS
+15:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: sum(l_extendedprice * (1 - l_discount)) DESC
+|  limit: 20
+|
+08:TOP-N [LIMIT=20]
+|  order by: sum(l_extendedprice * (1 - l_discount)) DESC
+|
+14:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_extendedprice * (1 - l_discount))
+|  group by: c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment
+|
+13:EXCHANGE [HASH(c_custkey,c_name,c_acctbal,c_phone,n_name,c_address,c_comment)]
+|
+07:AGGREGATE [STREAMING]
+|  output: sum(l_extendedprice * (1 - l_discount))
+|  group by: c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c_nationkey = n_nationkey
+|  runtime filters: RF000 <- n_nationkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: n_nationkey
+|  |
+|  12:EXCHANGE [BROADCAST]
+|  |
+|  03:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|
+05:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF001 <- c_custkey
+|
+|--JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: c_custkey
+|  |
+|  11:EXCHANGE [HASH(c_custkey)]
+|  |
+|  00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     runtime filters: RF000 -> c_nationkey
+|
+10:EXCHANGE [HASH(o_custkey)]
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF002 <- o_orderkey
+|
+|--JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: o_orderkey
+|  |
+|  09:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: o_orderdate >= '1993-10-01', o_orderdate < '1994-01-01'
+|     runtime filters: RF001 -> o_custkey
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_returnflag = 'R'
+   runtime filters: RF002 -> l_orderkey
 ====
 # TPCH-Q11
 # Q11 - Important Stock Identification
@@ -1430,7 +2144,104 @@ order by
 |  hash predicates: ps_suppkey = s_suppkey
 |  runtime filters: RF001 <- s_suppkey
 |
-|--14:EXCHANGE [BROADCAST]
+|--14:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF000 -> s_nationkey
+|
+00:SCAN HDFS [tpch.partsupp]
+   partitions=1/1 files=1 size=112.71MB
+   runtime filters: RF001 -> ps_suppkey
+---- PARALLELPLANS
+23:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: value DESC
+|
+13:SORT
+|  order by: value DESC
+|
+12:NESTED LOOP JOIN [INNER JOIN, BROADCAST]
+|  join table id: 00
+|  predicates: sum(ps_supplycost * ps_availqty) > sum(ps_supplycost * ps_availqty) * 0.0001
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |
+|  22:EXCHANGE [BROADCAST]
+|  |
+|  21:AGGREGATE [FINALIZE]
+|  |  output: sum:merge(ps_supplycost * ps_availqty)
+|  |
+|  20:EXCHANGE [UNPARTITIONED]
+|  |
+|  11:AGGREGATE
+|  |  output: sum(ps_supplycost * ps_availqty)
+|  |
+|  10:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash predicates: s_nationkey = n_nationkey
+|  |  runtime filters: RF002 <- n_nationkey
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=01 plan-id=02 cohort-id=02
+|  |  |  build expressions: n_nationkey
+|  |  |
+|  |  19:EXCHANGE [BROADCAST]
+|  |  |
+|  |  08:SCAN HDFS [tpch.nation]
+|  |     partitions=1/1 files=1 size=2.15KB
+|  |     predicates: n_name = 'GERMANY'
+|  |
+|  09:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash predicates: ps_suppkey = s_suppkey
+|  |  runtime filters: RF003 <- s_suppkey
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=02 plan-id=03 cohort-id=02
+|  |  |  build expressions: s_suppkey
+|  |  |
+|  |  18:EXCHANGE [BROADCAST]
+|  |  |
+|  |  07:SCAN HDFS [tpch.supplier]
+|  |     partitions=1/1 files=1 size=1.33MB
+|  |     runtime filters: RF002 -> s_nationkey
+|  |
+|  06:SCAN HDFS [tpch.partsupp]
+|     partitions=1/1 files=1 size=112.71MB
+|     runtime filters: RF003 -> ps_suppkey
+|
+17:AGGREGATE [FINALIZE]
+|  output: sum:merge(ps_supplycost * ps_availqty)
+|  group by: ps_partkey
+|
+16:EXCHANGE [HASH(ps_partkey)]
+|
+05:AGGREGATE [STREAMING]
+|  output: sum(ps_supplycost * ps_availqty)
+|  group by: ps_partkey
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: s_nationkey = n_nationkey
+|  runtime filters: RF000 <- n_nationkey
+|
+|--JOIN BUILD
+|  |  join-table-id=03 plan-id=04 cohort-id=01
+|  |  build expressions: n_nationkey
+|  |
+|  15:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|     predicates: n_name = 'GERMANY'
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: ps_suppkey = s_suppkey
+|  runtime filters: RF001 <- s_suppkey
+|
+|--JOIN BUILD
+|  |  join-table-id=04 plan-id=05 cohort-id=01
+|  |  build expressions: s_suppkey
+|  |
+|  14:EXCHANGE [BROADCAST]
 |  |
 |  01:SCAN HDFS [tpch.supplier]
 |     partitions=1/1 files=1 size=1.33MB
@@ -1519,6 +2330,40 @@ order by
 00:SCAN HDFS [tpch.orders]
    partitions=1/1 files=1 size=162.56MB
    runtime filters: RF000 -> o_orderkey
+---- PARALLELPLANS
+08:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: l_shipmode ASC
+|
+04:SORT
+|  order by: l_shipmode ASC
+|
+07:AGGREGATE [FINALIZE]
+|  output: sum:merge(CASE WHEN o_orderpriority = '1-URGENT' OR o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END), sum:merge(CASE WHEN o_orderpriority != '1-URGENT' AND o_orderpriority != '2-HIGH' THEN 1 ELSE 0 END)
+|  group by: l_shipmode
+|
+06:EXCHANGE [HASH(l_shipmode)]
+|
+03:AGGREGATE [STREAMING]
+|  output: sum(CASE WHEN o_orderpriority = '1-URGENT' OR o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END), sum(CASE WHEN o_orderpriority != '1-URGENT' AND o_orderpriority != '2-HIGH' THEN 1 ELSE 0 END)
+|  group by: l_shipmode
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o_orderkey = l_orderkey
+|  runtime filters: RF000 <- l_orderkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: l_orderkey
+|  |
+|  05:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch.lineitem]
+|     partitions=1/1 files=1 size=718.94MB
+|     predicates: l_shipmode IN ('MAIL', 'SHIP'), l_commitdate < l_receiptdate, l_shipdate < l_commitdate, l_receiptdate >= '1994-01-01', l_receiptdate < '1995-01-01'
+|
+00:SCAN HDFS [tpch.orders]
+   partitions=1/1 files=1 size=162.56MB
+   runtime filters: RF000 -> o_orderkey
 ====
 # TPCH-Q13
 # Q13 - Customer Distribution Query
@@ -1601,6 +2446,46 @@ order by
    partitions=1/1 files=1 size=162.56MB
    predicates: NOT o_comment LIKE '%special%requests%'
    runtime filters: RF000 -> o_custkey
+---- PARALLELPLANS
+10:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: count(*) DESC, c_count DESC
+|
+05:SORT
+|  order by: count(*) DESC, c_count DESC
+|
+09:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: c_count
+|
+08:EXCHANGE [HASH(c_count)]
+|
+04:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: count(o_orderkey)
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(o_orderkey)
+|  group by: c_custkey
+|
+02:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF000 <- c_custkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: c_custkey
+|  |
+|  07:EXCHANGE [HASH(c_custkey)]
+|  |
+|  00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|
+06:EXCHANGE [HASH(o_custkey)]
+|
+01:SCAN HDFS [tpch.orders]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: NOT o_comment LIKE '%special%requests%'
+   runtime filters: RF000 -> o_custkey
 ====
 # TPCH-Q14
 # Q14 - Promotion Effect
@@ -1654,6 +2539,32 @@ where
    partitions=1/1 files=1 size=718.94MB
    predicates: l_shipdate >= '1995-09-01', l_shipdate < '1995-10-01'
    runtime filters: RF000 -> l_partkey
+---- PARALLELPLANS
+06:AGGREGATE [FINALIZE]
+|  output: sum:merge(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum:merge(l_extendedprice * (1 - l_discount))
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE
+|  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum(l_extendedprice * (1 - l_discount))
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_partkey = p_partkey
+|  runtime filters: RF000 <- p_partkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: p_partkey
+|  |
+|  04:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch.part]
+|     partitions=1/1 files=1 size=22.83MB
+|
+00:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_shipdate >= '1995-09-01', l_shipdate < '1995-10-01'
+   runtime filters: RF000 -> l_partkey
 ====
 # TPCH-Q15
 # Q15 - Top Supplier Query
@@ -1777,6 +2688,71 @@ order by
 00:SCAN HDFS [tpch.supplier]
    partitions=1/1 files=1 size=1.33MB
    runtime filters: RF000 -> s_suppkey
+---- PARALLELPLANS
+17:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: s_suppkey ASC
+|
+08:SORT
+|  order by: s_suppkey ASC
+|
+07:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+|  hash predicates: sum(l_extendedprice * (1 - l_discount)) = max(total_revenue)
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: max(total_revenue)
+|  |
+|  16:EXCHANGE [BROADCAST]
+|  |
+|  15:AGGREGATE [FINALIZE]
+|  |  output: max:merge(total_revenue)
+|  |
+|  14:EXCHANGE [UNPARTITIONED]
+|  |
+|  05:AGGREGATE
+|  |  output: max(sum(l_extendedprice * (1 - l_discount)))
+|  |
+|  13:AGGREGATE [FINALIZE]
+|  |  output: sum:merge(l_extendedprice * (1 - l_discount))
+|  |  group by: l_suppkey
+|  |
+|  12:EXCHANGE [HASH(l_suppkey)]
+|  |
+|  04:AGGREGATE [STREAMING]
+|  |  output: sum(l_extendedprice * (1 - l_discount))
+|  |  group by: l_suppkey
+|  |
+|  03:SCAN HDFS [tpch.lineitem]
+|     partitions=1/1 files=1 size=718.94MB
+|     predicates: l_shipdate >= '1996-01-01', l_shipdate < '1996-04-01'
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: s_suppkey = l_suppkey
+|  runtime filters: RF000 <- l_suppkey
+|
+|--JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: l_suppkey
+|  |
+|  11:EXCHANGE [BROADCAST]
+|  |
+|  10:AGGREGATE [FINALIZE]
+|  |  output: sum:merge(l_extendedprice * (1 - l_discount))
+|  |  group by: l_suppkey
+|  |
+|  09:EXCHANGE [HASH(l_suppkey)]
+|  |
+|  02:AGGREGATE [STREAMING]
+|  |  output: sum(l_extendedprice * (1 - l_discount))
+|  |  group by: l_suppkey
+|  |
+|  01:SCAN HDFS [tpch.lineitem]
+|     partitions=1/1 files=1 size=718.94MB
+|     predicates: l_shipdate >= '1996-01-01', l_shipdate < '1996-04-01'
+|
+00:SCAN HDFS [tpch.supplier]
+   partitions=1/1 files=1 size=1.33MB
+   runtime filters: RF000 -> s_suppkey
 ====
 # TPCH-Q16
 # Q16 - Parts/Supplier Relation Query
@@ -1880,6 +2856,55 @@ order by
 00:SCAN HDFS [tpch.partsupp]
    partitions=1/1 files=1 size=112.71MB
    runtime filters: RF000 -> ps_partkey
+---- PARALLELPLANS
+12:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
+|
+07:SORT
+|  order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
+|
+06:AGGREGATE [FINALIZE]
+|  output: count(ps_suppkey)
+|  group by: p_brand, p_type, p_size
+|
+11:AGGREGATE
+|  group by: p_brand, p_type, p_size, ps_suppkey
+|
+10:EXCHANGE [HASH(p_brand,p_type,p_size)]
+|
+05:AGGREGATE [STREAMING]
+|  group by: p_brand, p_type, p_size, ps_suppkey
+|
+04:HASH JOIN [NULL AWARE LEFT ANTI JOIN, BROADCAST]
+|  hash predicates: ps_suppkey = s_suppkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: s_suppkey
+|  |
+|  09:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     predicates: s_comment LIKE '%Customer%Complaints%'
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: ps_partkey = p_partkey
+|  runtime filters: RF000 <- p_partkey
+|
+|--JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: p_partkey
+|  |
+|  08:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch.part]
+|     partitions=1/1 files=1 size=22.83MB
+|     predicates: p_size IN (49, 14, 23, 45, 19, 3, 36, 9), p_brand != 'Brand#45', NOT p_type LIKE 'MEDIUM POLISHED%'
+|
+00:SCAN HDFS [tpch.partsupp]
+   partitions=1/1 files=1 size=112.71MB
+   runtime filters: RF000 -> ps_partkey
 ====
 # TPCH-Q17
 # Q17 - Small-Quantity-Order Revenue Query
@@ -1971,6 +2996,57 @@ where
 02:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF000 -> tpch.lineitem.l_partkey
+---- PARALLELPLANS
+12:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_extendedprice)
+|
+11:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE
+|  output: sum(l_extendedprice)
+|
+05:HASH JOIN [RIGHT SEMI JOIN, PARTITIONED]
+|  hash predicates: l_partkey = p_partkey
+|  other join predicates: l_quantity < 0.2 * avg(l_quantity)
+|  runtime filters: RF000 <- p_partkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: p_partkey
+|  |
+|  10:EXCHANGE [HASH(p_partkey)]
+|  |
+|  04:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash predicates: l_partkey = p_partkey
+|  |  runtime filters: RF001 <- p_partkey
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=01 plan-id=02 cohort-id=02
+|  |  |  build expressions: p_partkey
+|  |  |
+|  |  09:EXCHANGE [BROADCAST]
+|  |  |
+|  |  01:SCAN HDFS [tpch.part]
+|  |     partitions=1/1 files=1 size=22.83MB
+|  |     predicates: p_container = 'MED BOX', p_brand = 'Brand#23'
+|  |
+|  00:SCAN HDFS [tpch.lineitem]
+|     partitions=1/1 files=1 size=718.94MB
+|     runtime filters: RF001 -> l_partkey
+|
+08:AGGREGATE [FINALIZE]
+|  output: avg:merge(l_quantity)
+|  group by: l_partkey
+|
+07:EXCHANGE [HASH(l_partkey)]
+|
+03:AGGREGATE [STREAMING]
+|  output: avg(l_quantity)
+|  group by: l_partkey
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF000 -> tpch.lineitem.l_partkey
 ====
 # TPCH-Q18
 # Q18 - Large Value Customer Query
@@ -2012,41 +3088,101 @@ limit 100
 09:TOP-N [LIMIT=100]
 |  order by: o_totalprice DESC, o_orderdate ASC
 |
-08:AGGREGATE [FINALIZE]
+08:AGGREGATE [FINALIZE]
+|  output: sum(l_quantity)
+|  group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice
+|
+07:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: o_orderkey = l_orderkey
+|  runtime filters: RF000 <- l_orderkey
+|
+|--04:AGGREGATE [FINALIZE]
+|  |  output: sum(l_quantity)
+|  |  group by: l_orderkey
+|  |  having: sum(l_quantity) > 300
+|  |
+|  03:SCAN HDFS [tpch.lineitem]
+|     partitions=1/1 files=1 size=718.94MB
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF001 <- c_custkey
+|
+|--00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF002 <- o_orderkey
+|
+|--01:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     runtime filters: RF001 -> o_custkey
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF000 -> tpch.lineitem.l_orderkey, RF002 -> l_orderkey
+---- DISTRIBUTEDPLAN
+17:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: o_totalprice DESC, o_orderdate ASC
+|  limit: 100
+|
+09:TOP-N [LIMIT=100]
+|  order by: o_totalprice DESC, o_orderdate ASC
+|
+16:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_quantity)
+|  group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice
+|
+15:EXCHANGE [HASH(c_name,c_custkey,o_orderkey,o_orderdate,o_totalprice)]
+|
+08:AGGREGATE [STREAMING]
 |  output: sum(l_quantity)
 |  group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice
 |
-07:HASH JOIN [LEFT SEMI JOIN]
+07:HASH JOIN [LEFT SEMI JOIN, PARTITIONED]
 |  hash predicates: o_orderkey = l_orderkey
 |  runtime filters: RF000 <- l_orderkey
 |
-|--04:AGGREGATE [FINALIZE]
-|  |  output: sum(l_quantity)
+|--14:AGGREGATE [FINALIZE]
+|  |  output: sum:merge(l_quantity)
 |  |  group by: l_orderkey
 |  |  having: sum(l_quantity) > 300
 |  |
+|  13:EXCHANGE [HASH(l_orderkey)]
+|  |
+|  04:AGGREGATE [STREAMING]
+|  |  output: sum(l_quantity)
+|  |  group by: l_orderkey
+|  |
 |  03:SCAN HDFS [tpch.lineitem]
 |     partitions=1/1 files=1 size=718.94MB
 |
-06:HASH JOIN [INNER JOIN]
+06:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: o_custkey = c_custkey
 |  runtime filters: RF001 <- c_custkey
 |
-|--00:SCAN HDFS [tpch.customer]
+|--12:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN HDFS [tpch.customer]
 |     partitions=1/1 files=1 size=23.08MB
 |
-05:HASH JOIN [INNER JOIN]
+05:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: l_orderkey = o_orderkey
 |  runtime filters: RF002 <- o_orderkey
 |
-|--01:SCAN HDFS [tpch.orders]
+|--11:EXCHANGE [HASH(o_orderkey)]
+|  |
+|  01:SCAN HDFS [tpch.orders]
 |     partitions=1/1 files=1 size=162.56MB
 |     runtime filters: RF001 -> o_custkey
 |
+10:EXCHANGE [HASH(l_orderkey)]
+|
 02:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF000 -> tpch.lineitem.l_orderkey, RF002 -> l_orderkey
----- DISTRIBUTEDPLAN
+---- PARALLELPLANS
 17:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: o_totalprice DESC, o_orderdate ASC
 |  limit: 100
@@ -2068,7 +3204,11 @@ limit 100
 |  hash predicates: o_orderkey = l_orderkey
 |  runtime filters: RF000 <- l_orderkey
 |
-|--14:AGGREGATE [FINALIZE]
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: l_orderkey
+|  |
+|  14:AGGREGATE [FINALIZE]
 |  |  output: sum:merge(l_quantity)
 |  |  group by: l_orderkey
 |  |  having: sum(l_quantity) > 300
@@ -2086,7 +3226,11 @@ limit 100
 |  hash predicates: o_custkey = c_custkey
 |  runtime filters: RF001 <- c_custkey
 |
-|--12:EXCHANGE [BROADCAST]
+|--JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: c_custkey
+|  |
+|  12:EXCHANGE [BROADCAST]
 |  |
 |  00:SCAN HDFS [tpch.customer]
 |     partitions=1/1 files=1 size=23.08MB
@@ -2095,7 +3239,11 @@ limit 100
 |  hash predicates: l_orderkey = o_orderkey
 |  runtime filters: RF002 <- o_orderkey
 |
-|--11:EXCHANGE [HASH(o_orderkey)]
+|--JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: o_orderkey
+|  |
+|  11:EXCHANGE [HASH(o_orderkey)]
 |  |
 |  01:SCAN HDFS [tpch.orders]
 |     partitions=1/1 files=1 size=162.56MB
@@ -2181,6 +3329,32 @@ where
 00:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF000 -> l_partkey
+---- PARALLELPLANS
+06:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_extendedprice * (1 - l_discount))
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE
+|  output: sum(l_extendedprice * (1 - l_discount))
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_partkey = p_partkey
+|  other predicates: ((p_brand = 'Brand#12' AND p_container IN ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') AND l_quantity >= 1 AND l_quantity <= 11 AND p_size BETWEEN 1 AND 5 AND l_shipmode IN ('AIR', 'AIR REG') AND l_shipinstruct = 'DELIVER IN PERSON') OR (p_brand = 'Brand#23' AND p_container IN ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') AND l_quantity >= 10 AND l_quantity <= 20 AND p_size BETWEEN 1 AND 10 AND l_shipmode IN ('AIR', 'AIR REG') AND l_shipinstruct = 'DELIVER IN PERSON') OR (p_brand = 'Brand#34' AND p_container IN ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') AND l_quantity >= 20 AND l_quantity <= 30 AND p_size BETWEEN 1 AND 15 AND l_shipmode IN ('AIR', 'AIR REG') AND l_shipinstruct = 'DELIVER IN PERSON'))
+|  runtime filters: RF000 <- p_partkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: p_partkey
+|  |
+|  04:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch.part]
+|     partitions=1/1 files=1 size=22.83MB
+|
+00:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF000 -> l_partkey
 ====
 # TPCH-Q20
 # Q20 - Potential Part Promotion Query
@@ -2330,6 +3504,86 @@ order by
    partitions=1/1 files=1 size=718.94MB
    predicates: l_shipdate >= '1994-01-01', l_shipdate < '1995-01-01'
    runtime filters: RF000 -> tpch.lineitem.l_suppkey, RF001 -> tpch.lineitem.l_suppkey, RF002 -> tpch.lineitem.l_partkey
+---- PARALLELPLANS
+18:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: s_name ASC
+|
+10:SORT
+|  order by: s_name ASC
+|
+09:HASH JOIN [RIGHT SEMI JOIN, PARTITIONED]
+|  hash predicates: ps_suppkey = s_suppkey
+|  runtime filters: RF000 <- s_suppkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: s_suppkey
+|  |
+|  17:EXCHANGE [HASH(s_suppkey)]
+|  |
+|  08:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash predicates: s_nationkey = n_nationkey
+|  |  runtime filters: RF004 <- n_nationkey
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=01 plan-id=02 cohort-id=02
+|  |  |  build expressions: n_nationkey
+|  |  |
+|  |  15:EXCHANGE [BROADCAST]
+|  |  |
+|  |  01:SCAN HDFS [tpch.nation]
+|  |     partitions=1/1 files=1 size=2.15KB
+|  |     predicates: n_name = 'CANADA'
+|  |
+|  00:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF004 -> s_nationkey
+|
+16:EXCHANGE [HASH(ps_suppkey)]
+|
+07:HASH JOIN [RIGHT SEMI JOIN, PARTITIONED]
+|  hash predicates: l_suppkey = ps_suppkey, l_partkey = ps_partkey
+|  other join predicates: ps_availqty > 0.5 * sum(l_quantity)
+|  runtime filters: RF001 <- ps_suppkey, RF002 <- ps_partkey
+|
+|--JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: ps_suppkey, ps_partkey
+|  |
+|  14:EXCHANGE [HASH(ps_partkey,ps_suppkey)]
+|  |
+|  06:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+|  |  hash predicates: ps_partkey = p_partkey
+|  |  runtime filters: RF003 <- p_partkey
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=03 plan-id=04 cohort-id=03
+|  |  |  build expressions: p_partkey
+|  |  |
+|  |  13:EXCHANGE [BROADCAST]
+|  |  |
+|  |  03:SCAN HDFS [tpch.part]
+|  |     partitions=1/1 files=1 size=22.83MB
+|  |     predicates: p_name LIKE 'forest%'
+|  |
+|  02:SCAN HDFS [tpch.partsupp]
+|     partitions=1/1 files=1 size=112.71MB
+|     runtime filters: RF003 -> ps_partkey
+|
+12:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_quantity)
+|  group by: l_partkey, l_suppkey
+|
+11:EXCHANGE [HASH(l_partkey,l_suppkey)]
+|
+05:AGGREGATE [STREAMING]
+|  output: sum(l_quantity)
+|  group by: l_partkey, l_suppkey
+|
+04:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l_shipdate >= '1994-01-01', l_shipdate < '1995-01-01'
+   runtime filters: RF000 -> tpch.lineitem.l_suppkey, RF001 -> tpch.lineitem.l_suppkey, RF002 -> tpch.lineitem.l_partkey
 ====
 # TPCH-Q21
 # Q21 - Suppliers Who Kept Orders Waiting Query
@@ -2501,6 +3755,101 @@ limit 100
 05:SCAN HDFS [tpch.lineitem l3]
    partitions=1/1 files=1 size=718.94MB
    predicates: l3.l_receiptdate > l3.l_commitdate
+---- PARALLELPLANS
+21:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: count(*) DESC, s_name ASC
+|  limit: 100
+|
+12:TOP-N [LIMIT=100]
+|  order by: count(*) DESC, s_name ASC
+|
+20:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: s_name
+|
+19:EXCHANGE [HASH(s_name)]
+|
+11:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: s_name
+|
+10:HASH JOIN [RIGHT ANTI JOIN, PARTITIONED]
+|  hash predicates: l3.l_orderkey = l1.l_orderkey
+|  other join predicates: l3.l_suppkey != l1.l_suppkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: l1.l_orderkey
+|  |
+|  09:HASH JOIN [RIGHT SEMI JOIN, PARTITIONED]
+|  |  hash predicates: l2.l_orderkey = l1.l_orderkey
+|  |  other join predicates: l2.l_suppkey != l1.l_suppkey
+|  |  runtime filters: RF000 <- l1.l_orderkey
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=01 plan-id=02 cohort-id=02
+|  |  |  build expressions: l1.l_orderkey
+|  |  |
+|  |  17:EXCHANGE [HASH(l1.l_orderkey)]
+|  |  |
+|  |  08:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  |  hash predicates: s_nationkey = n_nationkey
+|  |  |  runtime filters: RF001 <- n_nationkey
+|  |  |
+|  |  |--JOIN BUILD
+|  |  |  |  join-table-id=02 plan-id=03 cohort-id=03
+|  |  |  |  build expressions: n_nationkey
+|  |  |  |
+|  |  |  15:EXCHANGE [BROADCAST]
+|  |  |  |
+|  |  |  03:SCAN HDFS [tpch.nation]
+|  |  |     partitions=1/1 files=1 size=2.15KB
+|  |  |     predicates: n_name = 'SAUDI ARABIA'
+|  |  |
+|  |  07:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  |  hash predicates: l1.l_suppkey = s_suppkey
+|  |  |  runtime filters: RF002 <- s_suppkey
+|  |  |
+|  |  |--JOIN BUILD
+|  |  |  |  join-table-id=03 plan-id=04 cohort-id=03
+|  |  |  |  build expressions: s_suppkey
+|  |  |  |
+|  |  |  14:EXCHANGE [BROADCAST]
+|  |  |  |
+|  |  |  00:SCAN HDFS [tpch.supplier]
+|  |  |     partitions=1/1 files=1 size=1.33MB
+|  |  |     runtime filters: RF001 -> s_nationkey
+|  |  |
+|  |  06:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  |  hash predicates: l1.l_orderkey = o_orderkey
+|  |  |  runtime filters: RF003 <- o_orderkey
+|  |  |
+|  |  |--JOIN BUILD
+|  |  |  |  join-table-id=04 plan-id=05 cohort-id=03
+|  |  |  |  build expressions: o_orderkey
+|  |  |  |
+|  |  |  13:EXCHANGE [BROADCAST]
+|  |  |  |
+|  |  |  02:SCAN HDFS [tpch.orders]
+|  |  |     partitions=1/1 files=1 size=162.56MB
+|  |  |     predicates: o_orderstatus = 'F'
+|  |  |
+|  |  01:SCAN HDFS [tpch.lineitem l1]
+|  |     partitions=1/1 files=1 size=718.94MB
+|  |     predicates: l1.l_receiptdate > l1.l_commitdate
+|  |     runtime filters: RF002 -> l1.l_suppkey, RF003 -> l1.l_orderkey
+|  |
+|  16:EXCHANGE [HASH(l2.l_orderkey)]
+|  |
+|  04:SCAN HDFS [tpch.lineitem l2]
+|     partitions=1/1 files=1 size=718.94MB
+|     runtime filters: RF000 -> l2.l_orderkey
+|
+18:EXCHANGE [HASH(l3.l_orderkey)]
+|
+05:SCAN HDFS [tpch.lineitem l3]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: l3.l_receiptdate > l3.l_commitdate
 ====
 # TPCH-Q22
 # Q22 - Global Sales Opportunity Query
@@ -2612,4 +3961,59 @@ order by
 |
 03:SCAN HDFS [tpch.orders]
    partitions=1/1 files=1 size=162.56MB
+---- PARALLELPLANS
+15:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: cntrycode ASC
+|
+07:SORT
+|  order by: cntrycode ASC
+|
+14:AGGREGATE [FINALIZE]
+|  output: count:merge(*), sum:merge(c_acctbal)
+|  group by: cntrycode
+|
+13:EXCHANGE [HASH(cntrycode)]
+|
+06:AGGREGATE [STREAMING]
+|  output: count(*), sum(c_acctbal)
+|  group by: substr(c_phone, 1, 2)
+|
+05:HASH JOIN [RIGHT ANTI JOIN, PARTITIONED]
+|  hash predicates: o_custkey = c_custkey
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: c_custkey
+|  |
+|  12:EXCHANGE [HASH(c_custkey)]
+|  |
+|  04:NESTED LOOP JOIN [INNER JOIN, BROADCAST]
+|  |  join table id: 01
+|  |  predicates: c_acctbal > avg(c_acctbal)
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=01 plan-id=02 cohort-id=02
+|  |  |
+|  |  10:EXCHANGE [BROADCAST]
+|  |  |
+|  |  09:AGGREGATE [FINALIZE]
+|  |  |  output: avg:merge(c_acctbal)
+|  |  |
+|  |  08:EXCHANGE [UNPARTITIONED]
+|  |  |
+|  |  02:AGGREGATE
+|  |  |  output: avg(c_acctbal)
+|  |  |
+|  |  01:SCAN HDFS [tpch.customer]
+|  |     partitions=1/1 files=1 size=23.08MB
+|  |     predicates: c_acctbal > 0.00, substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
+|  |
+|  00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
+|
+11:EXCHANGE [HASH(o_custkey)]
+|
+03:SCAN HDFS [tpch.orders]
+   partitions=1/1 files=1 size=162.56MB
 ====