You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2016/08/11 17:30:08 UTC

[1/2] incubator-impala git commit: IMPALA-3940: Fix getting column stats through views.

Repository: incubator-impala
Updated Branches:
  refs/heads/master 88b89b872 -> 286da5921


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/286da592/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test
new file mode 100644
index 0000000..ee2d024
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test
@@ -0,0 +1,1473 @@
+# TPCH-Q1
+# Q1 - Pricing Summary Report Query
+select
+  l_returnflag,
+  l_linestatus,
+  sum(l_quantity) as sum_qty,
+  sum(l_extendedprice) as sum_base_price,
+  sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
+  sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
+  avg(l_quantity) as avg_qty,
+  avg(l_extendedprice) as avg_price,
+  avg(l_discount) as avg_disc,
+  count(*) as count_order
+from
+  lineitem
+where
+  l_shipdate <= '1998-09-02'
+group by
+  l_returnflag,
+  l_linestatus
+order by
+  l_returnflag,
+  l_linestatus
+---- PLAN
+02:SORT
+|  order by: l_returnflag ASC, l_linestatus ASC
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum(tpch.lineitem.l_quantity), sum(tpch.lineitem.l_extendedprice), sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount)), sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount) * (1 + tpch.lineitem.l_tax)), avg(tpch.lineitem.l_quantity), avg(tpch.lineitem.l_extendedprice), avg(tpch.lineitem.l_discount), count(*)
+|  group by: tpch.lineitem.l_returnflag, tpch.lineitem.l_linestatus
+|
+00:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: tpch.lineitem.l_shipdate <= '1998-09-02'
+====
+# TPCH-Q2
+# Q2 - Minimum Cost Supplier Query
+select
+  s_acctbal,
+  s_name,
+  n_name,
+  p_partkey,
+  p_mfgr,
+  s_address,
+  s_phone,
+  s_comment
+from
+  part,
+  supplier,
+  partsupp,
+  nation,
+  region
+where
+  p_partkey = ps_partkey
+  and s_suppkey = ps_suppkey
+  and p_size = 15
+  and p_type like '%BRASS'
+  and s_nationkey = n_nationkey
+  and n_regionkey = r_regionkey
+  and r_name = 'EUROPE'
+  and ps_supplycost = (
+    select
+      min(ps_supplycost)
+    from
+      tpch.partsupp,
+      tpch.supplier,
+      tpch.nation,
+      tpch.region
+    where
+      p_partkey = ps_partkey
+      and s_suppkey = ps_suppkey
+      and s_nationkey = n_nationkey
+      and n_regionkey = r_regionkey
+      and r_name = 'EUROPE'
+    )
+order by
+  s_acctbal desc,
+  n_name,
+  s_name,
+  p_partkey
+limit 100
+---- PLAN
+18:TOP-N [LIMIT=100]
+|  order by: s_acctbal DESC, n_name ASC, s_name ASC, p_partkey ASC
+|
+17:HASH JOIN [RIGHT SEMI JOIN]
+|  hash predicates: min(ps_supplycost) = tpch.partsupp.ps_supplycost, ps_partkey = tpch.part.p_partkey
+|  runtime filters: RF001 <- tpch.part.p_partkey
+|
+|--16:HASH JOIN [INNER JOIN]
+|  |  hash predicates: tpch.nation.n_regionkey = tpch.region.r_regionkey
+|  |  runtime filters: RF005 <- tpch.region.r_regionkey
+|  |
+|  |--04:SCAN HDFS [tpch.region]
+|  |     partitions=1/1 files=1 size=384B
+|  |     predicates: tpch.region.r_name = 'EUROPE'
+|  |
+|  15:HASH JOIN [INNER JOIN]
+|  |  hash predicates: tpch.supplier.s_nationkey = tpch.nation.n_nationkey
+|  |  runtime filters: RF006 <- tpch.nation.n_nationkey
+|  |
+|  |--03:SCAN HDFS [tpch.nation]
+|  |     partitions=1/1 files=1 size=2.15KB
+|  |     runtime filters: RF005 -> tpch.nation.n_regionkey
+|  |
+|  14:HASH JOIN [INNER JOIN]
+|  |  hash predicates: tpch.partsupp.ps_suppkey = tpch.supplier.s_suppkey
+|  |  runtime filters: RF007 <- tpch.supplier.s_suppkey
+|  |
+|  |--01:SCAN HDFS [tpch.supplier]
+|  |     partitions=1/1 files=1 size=1.33MB
+|  |     runtime filters: RF006 -> tpch.supplier.s_nationkey
+|  |
+|  13:HASH JOIN [INNER JOIN]
+|  |  hash predicates: tpch.partsupp.ps_partkey = tpch.part.p_partkey
+|  |  runtime filters: RF008 <- tpch.part.p_partkey
+|  |
+|  |--00:SCAN HDFS [tpch.part]
+|  |     partitions=1/1 files=1 size=22.83MB
+|  |     predicates: tpch.part.p_size = 15, tpch.part.p_type LIKE '%BRASS'
+|  |
+|  02:SCAN HDFS [tpch.partsupp]
+|     partitions=1/1 files=1 size=112.71MB
+|     runtime filters: RF007 -> tpch.partsupp.ps_suppkey, RF008 -> tpch.partsupp.ps_partkey
+|
+12:AGGREGATE [FINALIZE]
+|  output: min(ps_supplycost)
+|  group by: ps_partkey
+|
+11:HASH JOIN [INNER JOIN]
+|  hash predicates: n_regionkey = r_regionkey
+|  runtime filters: RF002 <- r_regionkey
+|
+|--08:SCAN HDFS [tpch.region]
+|     partitions=1/1 files=1 size=384B
+|     predicates: r_name = 'EUROPE'
+|
+10:HASH JOIN [INNER JOIN]
+|  hash predicates: s_nationkey = n_nationkey
+|  runtime filters: RF003 <- n_nationkey
+|
+|--07:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|     runtime filters: RF002 -> n_regionkey
+|
+09:HASH JOIN [INNER JOIN]
+|  hash predicates: ps_suppkey = s_suppkey
+|  runtime filters: RF004 <- s_suppkey
+|
+|--06:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF003 -> s_nationkey
+|
+05:SCAN HDFS [tpch.partsupp]
+   partitions=1/1 files=1 size=112.71MB
+   runtime filters: RF001 -> tpch.partsupp.ps_partkey, RF004 -> ps_suppkey
+====
+# TPCH-Q3
+# Q3 - Shipping Priority Query
+select
+  l_orderkey,
+  sum(l_extendedprice * (1 - l_discount)) as revenue,
+  o_orderdate,
+  o_shippriority
+from
+  customer,
+  orders,
+  lineitem
+where
+  c_mktsegment = 'BUILDING'
+  and c_custkey = o_custkey
+  and l_orderkey = o_orderkey
+  and o_orderdate < '1995-03-15'
+  and l_shipdate > '1995-03-15'
+group by
+  l_orderkey,
+  o_orderdate,
+  o_shippriority
+order by
+  revenue desc,
+  o_orderdate
+limit 10
+---- PLAN
+06:TOP-N [LIMIT=10]
+|  order by: sum(l_extendedprice * (1 - l_discount)) DESC, o_orderdate ASC
+|
+05:AGGREGATE [FINALIZE]
+|  output: sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount))
+|  group by: tpch.lineitem.l_orderkey, tpch.orders.o_orderdate, tpch.orders.o_shippriority
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.orders.o_custkey = tpch.customer.c_custkey
+|  runtime filters: RF000 <- tpch.customer.c_custkey
+|
+|--00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: tpch.customer.c_mktsegment = 'BUILDING'
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_orderkey = tpch.orders.o_orderkey
+|  runtime filters: RF001 <- tpch.orders.o_orderkey
+|
+|--01:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: tpch.orders.o_orderdate < '1995-03-15'
+|     runtime filters: RF000 -> tpch.orders.o_custkey
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: tpch.lineitem.l_shipdate > '1995-03-15'
+   runtime filters: RF001 -> tpch.lineitem.l_orderkey
+====
+# TPCH-Q4
+# Q4 - Order Priority Checking Query
+select
+  o_orderpriority,
+  count(*) as order_count
+from
+  orders
+where
+  o_orderdate >= '1993-07-01'
+  and o_orderdate < '1993-10-01'
+  and exists (
+    select
+      *
+    from
+      lineitem
+    where
+      l_orderkey = o_orderkey
+      and l_commitdate < l_receiptdate
+    )
+group by
+  o_orderpriority
+order by
+  o_orderpriority
+---- PLAN
+04:SORT
+|  order by: o_orderpriority ASC
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: tpch.orders.o_orderpriority
+|
+02:HASH JOIN [RIGHT SEMI JOIN]
+|  hash predicates: tpch.lineitem.l_orderkey = tpch.orders.o_orderkey
+|  runtime filters: RF000 <- tpch.orders.o_orderkey
+|
+|--00:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: tpch.orders.o_orderdate >= '1993-07-01', tpch.orders.o_orderdate < '1993-10-01'
+|
+01:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: tpch.lineitem.l_commitdate < tpch.lineitem.l_receiptdate
+   runtime filters: RF000 -> tpch.lineitem.l_orderkey
+====
+# TPCH-Q5
+# Q5 - Local Supplier Volume Query
+select
+  n_name,
+  sum(l_extendedprice * (1 - l_discount)) as revenue
+from
+  customer,
+  orders,
+  lineitem,
+  supplier,
+  nation,
+  region
+where
+  c_custkey = o_custkey
+  and l_orderkey = o_orderkey
+  and l_suppkey = s_suppkey
+  and c_nationkey = s_nationkey
+  and s_nationkey = n_nationkey
+  and n_regionkey = r_regionkey
+  and r_name = 'ASIA'
+  and o_orderdate >= '1994-01-01'
+  and o_orderdate < '1995-01-01'
+group by
+  n_name
+order by
+  revenue desc
+---- PLAN
+12:SORT
+|  order by: sum(l_extendedprice * (1 - l_discount)) DESC
+|
+11:AGGREGATE [FINALIZE]
+|  output: sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount))
+|  group by: tpch.nation.n_name
+|
+10:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.nation.n_regionkey = tpch.region.r_regionkey
+|  runtime filters: RF000 <- tpch.region.r_regionkey
+|
+|--05:SCAN HDFS [tpch.region]
+|     partitions=1/1 files=1 size=384B
+|     predicates: tpch.region.r_name = 'ASIA'
+|
+09:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.supplier.s_nationkey = tpch.nation.n_nationkey
+|  runtime filters: RF001 <- tpch.nation.n_nationkey
+|
+|--04:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|     runtime filters: RF000 -> tpch.nation.n_regionkey
+|
+08:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_suppkey = tpch.supplier.s_suppkey, tpch.customer.c_nationkey = tpch.supplier.s_nationkey
+|  runtime filters: RF002 <- tpch.supplier.s_suppkey, RF003 <- tpch.supplier.s_nationkey
+|
+|--03:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF001 -> tpch.supplier.s_nationkey
+|
+07:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.orders.o_custkey = tpch.customer.c_custkey
+|  runtime filters: RF004 <- tpch.customer.c_custkey
+|
+|--00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     runtime filters: RF001 -> tpch.customer.c_nationkey, RF003 -> tpch.customer.c_nationkey
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_orderkey = tpch.orders.o_orderkey
+|  runtime filters: RF005 <- tpch.orders.o_orderkey
+|
+|--01:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: tpch.orders.o_orderdate >= '1994-01-01', tpch.orders.o_orderdate < '1995-01-01'
+|     runtime filters: RF004 -> tpch.orders.o_custkey
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF002 -> tpch.lineitem.l_suppkey, RF005 -> tpch.lineitem.l_orderkey
+====
+# TPCH-Q6
+# Q6 - Forecasting Revenue Change Query
+select
+  sum(l_extendedprice * l_discount) as revenue
+from
+  lineitem
+where
+  l_shipdate >= '1994-01-01'
+  and l_shipdate < '1995-01-01'
+  and l_discount between 0.05 and 0.07
+  and l_quantity < 24
+---- PLAN
+01:AGGREGATE [FINALIZE]
+|  output: sum(tpch.lineitem.l_extendedprice * tpch.lineitem.l_discount)
+|
+00:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: tpch.lineitem.l_discount >= 0.05, tpch.lineitem.l_discount <= 0.07, tpch.lineitem.l_quantity < 24, tpch.lineitem.l_shipdate >= '1994-01-01', tpch.lineitem.l_shipdate < '1995-01-01'
+====
+# TPCH-Q7
+# Q7 - Volume Shipping Query
+select
+  supp_nation,
+  cust_nation,
+  l_year,
+  sum(volume) as revenue
+from (
+  select
+    n1.n_name as supp_nation,
+    n2.n_name as cust_nation,
+    year(l_shipdate) as l_year,
+    l_extendedprice * (1 - l_discount) as volume
+  from
+    supplier,
+    lineitem,
+    orders,
+    customer,
+    nation n1,
+    nation n2
+  where
+    s_suppkey = l_suppkey
+    and o_orderkey = l_orderkey
+    and c_custkey = o_custkey
+    and s_nationkey = n1.n_nationkey
+    and c_nationkey = n2.n_nationkey
+    and (
+      (n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY')
+      or (n1.n_name = 'GERMANY' and n2.n_name = 'FRANCE')
+    )
+    and l_shipdate between '1995-01-01' and '1996-12-31'
+  ) as shipping
+group by
+  supp_nation,
+  cust_nation,
+  l_year
+order by
+  supp_nation,
+  cust_nation,
+  l_year
+---- PLAN
+12:SORT
+|  order by: supp_nation ASC, cust_nation ASC, l_year ASC
+|
+11:AGGREGATE [FINALIZE]
+|  output: sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount))
+|  group by: tpch.nation.n_name, tpch.nation.n_name, year(tpch.lineitem.l_shipdate)
+|
+10:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.customer.c_nationkey = tpch.nation.n_nationkey
+|  other predicates: ((tpch.nation.n_name = 'FRANCE' AND tpch.nation.n_name = 'GERMANY') OR (tpch.nation.n_name = 'GERMANY' AND tpch.nation.n_name = 'FRANCE'))
+|  runtime filters: RF000 <- tpch.nation.n_nationkey
+|
+|--05:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|
+09:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.supplier.s_nationkey = tpch.nation.n_nationkey
+|  runtime filters: RF001 <- tpch.nation.n_nationkey
+|
+|--04:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|
+08:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.orders.o_custkey = tpch.customer.c_custkey
+|  runtime filters: RF002 <- tpch.customer.c_custkey
+|
+|--03:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     runtime filters: RF000 -> tpch.customer.c_nationkey
+|
+07:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_suppkey = tpch.supplier.s_suppkey
+|  runtime filters: RF003 <- tpch.supplier.s_suppkey
+|
+|--00:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF001 -> tpch.supplier.s_nationkey
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_orderkey = tpch.orders.o_orderkey
+|  runtime filters: RF004 <- tpch.orders.o_orderkey
+|
+|--02:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     runtime filters: RF002 -> tpch.orders.o_custkey
+|
+01:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: tpch.lineitem.l_shipdate >= '1995-01-01', tpch.lineitem.l_shipdate <= '1996-12-31'
+   runtime filters: RF003 -> tpch.lineitem.l_suppkey, RF004 -> tpch.lineitem.l_orderkey
+====
+# TPCH-Q8
+# Q8 - National Market Share Query
+select
+  o_year,
+  sum(case
+    when nation = 'BRAZIL'
+    then volume
+    else 0
+  end) / sum(volume) as mkt_share
+from (
+  select
+    year(o_orderdate) as o_year,
+    l_extendedprice * (1 - l_discount) as volume,
+    n2.n_name as nation
+  from
+    part,
+    supplier,
+    lineitem,
+    orders,
+    customer,
+    nation n1,
+    nation n2,
+    region
+  where
+    p_partkey = l_partkey
+    and s_suppkey = l_suppkey
+    and l_orderkey = o_orderkey
+    and o_custkey = c_custkey
+    and c_nationkey = n1.n_nationkey
+    and n1.n_regionkey = r_regionkey
+    and r_name = 'AMERICA'
+    and s_nationkey = n2.n_nationkey
+    and o_orderdate between '1995-01-01' and '1996-12-31'
+    and p_type = 'ECONOMY ANODIZED STEEL'
+  ) as all_nations
+group by
+  o_year
+order by
+  o_year
+---- PLAN
+16:SORT
+|  order by: o_year ASC
+|
+15:AGGREGATE [FINALIZE]
+|  output: sum(CASE WHEN tpch.nation.n_name = 'BRAZIL' THEN tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount) ELSE 0 END), sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount))
+|  group by: year(tpch.orders.o_orderdate)
+|
+14:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.supplier.s_nationkey = tpch.nation.n_nationkey
+|  runtime filters: RF000 <- tpch.nation.n_nationkey
+|
+|--06:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|
+13:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.nation.n_regionkey = tpch.region.r_regionkey
+|  runtime filters: RF001 <- tpch.region.r_regionkey
+|
+|--07:SCAN HDFS [tpch.region]
+|     partitions=1/1 files=1 size=384B
+|     predicates: tpch.region.r_name = 'AMERICA'
+|
+12:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.customer.c_nationkey = tpch.nation.n_nationkey
+|  runtime filters: RF002 <- tpch.nation.n_nationkey
+|
+|--05:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|     runtime filters: RF001 -> tpch.nation.n_regionkey
+|
+11:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.orders.o_custkey = tpch.customer.c_custkey
+|  runtime filters: RF003 <- tpch.customer.c_custkey
+|
+|--04:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     runtime filters: RF002 -> tpch.customer.c_nationkey
+|
+10:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_suppkey = tpch.supplier.s_suppkey
+|  runtime filters: RF004 <- tpch.supplier.s_suppkey
+|
+|--01:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF000 -> tpch.supplier.s_nationkey
+|
+09:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_orderkey = tpch.orders.o_orderkey
+|  runtime filters: RF005 <- tpch.orders.o_orderkey
+|
+|--03:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: tpch.orders.o_orderdate >= '1995-01-01', tpch.orders.o_orderdate <= '1996-12-31'
+|     runtime filters: RF003 -> tpch.orders.o_custkey
+|
+08:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_partkey = tpch.part.p_partkey
+|  runtime filters: RF006 <- tpch.part.p_partkey
+|
+|--00:SCAN HDFS [tpch.part]
+|     partitions=1/1 files=1 size=22.83MB
+|     predicates: tpch.part.p_type = 'ECONOMY ANODIZED STEEL'
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF004 -> tpch.lineitem.l_suppkey, RF005 -> tpch.lineitem.l_orderkey, RF006 -> tpch.lineitem.l_partkey
+====
+# TPCH-Q9
+# Q9 - Product Type Measure Query
+select
+  nation,
+  o_year,
+  sum(amount) as sum_profit
+from(
+  select
+    n_name as nation,
+    year(o_orderdate) as o_year,
+    l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount
+  from
+    part,
+    supplier,
+    lineitem,
+    partsupp,
+    orders,
+    nation
+  where
+    s_suppkey = l_suppkey
+    and ps_suppkey = l_suppkey
+    and ps_partkey = l_partkey
+    and p_partkey = l_partkey
+    and o_orderkey = l_orderkey
+    and s_nationkey = n_nationkey
+    and p_name like '%green%'
+  ) as profit
+group by
+  nation,
+  o_year
+order by
+  nation,
+  o_year desc
+---- PLAN
+12:SORT
+|  order by: nation ASC, o_year DESC
+|
+11:AGGREGATE [FINALIZE]
+|  output: sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount) - tpch.partsupp.ps_supplycost * tpch.lineitem.l_quantity)
+|  group by: tpch.nation.n_name, year(tpch.orders.o_orderdate)
+|
+10:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.supplier.s_nationkey = tpch.nation.n_nationkey
+|  runtime filters: RF000 <- tpch.nation.n_nationkey
+|
+|--05:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|
+09:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_suppkey = tpch.partsupp.ps_suppkey, tpch.lineitem.l_partkey = tpch.partsupp.ps_partkey
+|  runtime filters: RF001 <- tpch.partsupp.ps_suppkey, RF002 <- tpch.partsupp.ps_partkey
+|
+|--03:SCAN HDFS [tpch.partsupp]
+|     partitions=1/1 files=1 size=112.71MB
+|
+08:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_suppkey = tpch.supplier.s_suppkey
+|  runtime filters: RF003 <- tpch.supplier.s_suppkey
+|
+|--01:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF000 -> tpch.supplier.s_nationkey, RF001 -> tpch.supplier.s_suppkey
+|
+07:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_orderkey = tpch.orders.o_orderkey
+|  runtime filters: RF004 <- tpch.orders.o_orderkey
+|
+|--04:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_partkey = tpch.part.p_partkey
+|  runtime filters: RF005 <- tpch.part.p_partkey
+|
+|--00:SCAN HDFS [tpch.part]
+|     partitions=1/1 files=1 size=22.83MB
+|     predicates: tpch.part.p_name LIKE '%green%'
+|     runtime filters: RF002 -> tpch.part.p_partkey
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF001 -> tpch.lineitem.l_suppkey, RF002 -> tpch.lineitem.l_partkey, RF003 -> tpch.lineitem.l_suppkey, RF004 -> tpch.lineitem.l_orderkey, RF005 -> tpch.lineitem.l_partkey
+====
+# TPCH-Q10
+# Q10 - Returned Item Reporting Query
+# Converted select from multiple tables to joins
+select
+  c_custkey,
+  c_name,
+  sum(l_extendedprice * (1 - l_discount)) as revenue,
+  c_acctbal,
+  n_name,
+  c_address,
+  c_phone,
+  c_comment
+from
+  customer,
+  orders,
+  lineitem,
+  nation
+where
+  c_custkey = o_custkey
+  and l_orderkey = o_orderkey
+  and o_orderdate >= '1993-10-01'
+  and o_orderdate < '1994-01-01'
+  and l_returnflag = 'R'
+  and c_nationkey = n_nationkey
+group by
+  c_custkey,
+  c_name,
+  c_acctbal,
+  c_phone,
+  n_name,
+  c_address,
+  c_comment
+order by
+  revenue desc
+limit 20
+---- PLAN
+08:TOP-N [LIMIT=20]
+|  order by: sum(l_extendedprice * (1 - l_discount)) DESC
+|
+07:AGGREGATE [FINALIZE]
+|  output: sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount))
+|  group by: tpch.customer.c_custkey, tpch.customer.c_name, tpch.customer.c_acctbal, tpch.customer.c_phone, tpch.nation.n_name, tpch.customer.c_address, tpch.customer.c_comment
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.customer.c_nationkey = tpch.nation.n_nationkey
+|  runtime filters: RF000 <- tpch.nation.n_nationkey
+|
+|--03:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.orders.o_custkey = tpch.customer.c_custkey
+|  runtime filters: RF001 <- tpch.customer.c_custkey
+|
+|--00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     runtime filters: RF000 -> tpch.customer.c_nationkey
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_orderkey = tpch.orders.o_orderkey
+|  runtime filters: RF002 <- tpch.orders.o_orderkey
+|
+|--01:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     predicates: tpch.orders.o_orderdate >= '1993-10-01', tpch.orders.o_orderdate < '1994-01-01'
+|     runtime filters: RF001 -> tpch.orders.o_custkey
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: tpch.lineitem.l_returnflag = 'R'
+   runtime filters: RF002 -> tpch.lineitem.l_orderkey
+====
+# TPCH-Q11
+# Q11 - Important Stock Identification
+# Modifications: query was rewritten to not have a subquery in the having clause
+select
+  *
+from (
+  select
+    ps_partkey,
+    sum(ps_supplycost * ps_availqty) as value
+  from
+    partsupp,
+    supplier,
+    nation
+  where
+    ps_suppkey = s_suppkey
+    and s_nationkey = n_nationkey
+    and n_name = 'GERMANY'
+  group by
+    ps_partkey
+) as inner_query
+where
+  value > (
+    select
+      sum(ps_supplycost * ps_availqty) * 0.0001
+    from
+      partsupp,
+      supplier,
+      nation
+    where
+      ps_suppkey = s_suppkey
+      and s_nationkey = n_nationkey
+      and n_name = 'GERMANY'
+  )
+order by
+  value desc
+---- PLAN
+13:SORT
+|  order by: value DESC
+|
+12:NESTED LOOP JOIN [INNER JOIN]
+|  predicates: sum(ps_supplycost * ps_availqty) > sum(ps_supplycost * ps_availqty) * 0.0001
+|
+|--11:AGGREGATE [FINALIZE]
+|  |  output: sum(tpch.partsupp.ps_supplycost * tpch.partsupp.ps_availqty)
+|  |
+|  10:HASH JOIN [INNER JOIN]
+|  |  hash predicates: tpch.supplier.s_nationkey = tpch.nation.n_nationkey
+|  |  runtime filters: RF002 <- tpch.nation.n_nationkey
+|  |
+|  |--08:SCAN HDFS [tpch.nation]
+|  |     partitions=1/1 files=1 size=2.15KB
+|  |     predicates: tpch.nation.n_name = 'GERMANY'
+|  |
+|  09:HASH JOIN [INNER JOIN]
+|  |  hash predicates: tpch.partsupp.ps_suppkey = tpch.supplier.s_suppkey
+|  |  runtime filters: RF003 <- tpch.supplier.s_suppkey
+|  |
+|  |--07:SCAN HDFS [tpch.supplier]
+|  |     partitions=1/1 files=1 size=1.33MB
+|  |     runtime filters: RF002 -> tpch.supplier.s_nationkey
+|  |
+|  06:SCAN HDFS [tpch.partsupp]
+|     partitions=1/1 files=1 size=112.71MB
+|     runtime filters: RF003 -> tpch.partsupp.ps_suppkey
+|
+05:AGGREGATE [FINALIZE]
+|  output: sum(tpch.partsupp.ps_supplycost * tpch.partsupp.ps_availqty)
+|  group by: tpch.partsupp.ps_partkey
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.supplier.s_nationkey = tpch.nation.n_nationkey
+|  runtime filters: RF000 <- tpch.nation.n_nationkey
+|
+|--02:SCAN HDFS [tpch.nation]
+|     partitions=1/1 files=1 size=2.15KB
+|     predicates: tpch.nation.n_name = 'GERMANY'
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.partsupp.ps_suppkey = tpch.supplier.s_suppkey
+|  runtime filters: RF001 <- tpch.supplier.s_suppkey
+|
+|--01:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF000 -> tpch.supplier.s_nationkey
+|
+00:SCAN HDFS [tpch.partsupp]
+   partitions=1/1 files=1 size=112.71MB
+   runtime filters: RF001 -> tpch.partsupp.ps_suppkey
+====
+# TPCH-Q12
+# Q12 - Shipping Mode and Order Priority Query
+select
+  l_shipmode,
+  sum(case
+    when o_orderpriority = '1-URGENT'
+      or o_orderpriority = '2-HIGH'
+    then 1
+    else 0
+  end) as high_line_count,
+  sum(case
+    when o_orderpriority <> '1-URGENT'
+      and o_orderpriority <> '2-HIGH'
+    then 1
+    else 0
+  end) as low_line_count
+from
+  orders,
+  lineitem
+where
+  o_orderkey = l_orderkey
+  and l_shipmode in ('MAIL', 'SHIP')
+  and l_commitdate < l_receiptdate
+  and l_shipdate < l_commitdate
+  and l_receiptdate >= '1994-01-01'
+  and l_receiptdate < '1995-01-01'
+group by
+  l_shipmode
+order by
+  l_shipmode
+---- PLAN
+04:SORT
+|  order by: l_shipmode ASC
+|
+03:AGGREGATE [FINALIZE]
+|  output: sum(CASE WHEN tpch.orders.o_orderpriority = '1-URGENT' OR tpch.orders.o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END), sum(CASE WHEN tpch.orders.o_orderpriority != '1-URGENT' AND tpch.orders.o_orderpriority != '2-HIGH' THEN 1 ELSE 0 END)
+|  group by: tpch.lineitem.l_shipmode
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.orders.o_orderkey = tpch.lineitem.l_orderkey
+|  runtime filters: RF000 <- tpch.lineitem.l_orderkey
+|
+|--01:SCAN HDFS [tpch.lineitem]
+|     partitions=1/1 files=1 size=718.94MB
+|     predicates: tpch.lineitem.l_shipmode IN ('MAIL', 'SHIP'), tpch.lineitem.l_commitdate < tpch.lineitem.l_receiptdate, tpch.lineitem.l_shipdate < tpch.lineitem.l_commitdate, tpch.lineitem.l_receiptdate >= '1994-01-01', tpch.lineitem.l_receiptdate < '1995-01-01'
+|
+00:SCAN HDFS [tpch.orders]
+   partitions=1/1 files=1 size=162.56MB
+   runtime filters: RF000 -> tpch.orders.o_orderkey
+====
+# TPCH-Q13
+# Q13 - Customer Distribution Query
+select
+  c_count,
+  count(*) as custdist
+from (
+  select
+    c_custkey,
+    count(o_orderkey) as c_count
+  from
+    customer left outer join tpch.orders on (
+      c_custkey = o_custkey
+      and o_comment not like '%special%requests%'
+    )
+  group by
+    c_custkey
+  ) as c_orders
+group by
+  c_count
+order by
+  custdist desc,
+  c_count desc
+---- PLAN
+05:SORT
+|  order by: count(*) DESC, c_count DESC
+|
+04:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: count(o_orderkey)
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(o_orderkey)
+|  group by: tpch.customer.c_custkey
+|
+02:HASH JOIN [RIGHT OUTER JOIN]
+|  hash predicates: o_custkey = tpch.customer.c_custkey
+|  runtime filters: RF000 <- tpch.customer.c_custkey
+|
+|--00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|
+01:SCAN HDFS [tpch.orders]
+   partitions=1/1 files=1 size=162.56MB
+   predicates: NOT o_comment LIKE '%special%requests%'
+   runtime filters: RF000 -> o_custkey
+====
+# TPCH-Q14
+# Q14 - Promotion Effect
+select
+  100.00 * sum(case
+    when p_type like 'PROMO%'
+    then l_extendedprice * (1 - l_discount)
+    else 0.0
+    end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
+from
+  lineitem,
+  part
+where
+  l_partkey = p_partkey
+  and l_shipdate >= '1995-09-01'
+  and l_shipdate < '1995-10-01'
+---- PLAN
+03:AGGREGATE [FINALIZE]
+|  output: sum(CASE WHEN tpch.part.p_type LIKE 'PROMO%' THEN tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount) ELSE 0.0 END), sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount))
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_partkey = tpch.part.p_partkey
+|  runtime filters: RF000 <- tpch.part.p_partkey
+|
+|--01:SCAN HDFS [tpch.part]
+|     partitions=1/1 files=1 size=22.83MB
+|
+00:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: tpch.lineitem.l_shipdate >= '1995-09-01', tpch.lineitem.l_shipdate < '1995-10-01'
+   runtime filters: RF000 -> tpch.lineitem.l_partkey
+====
+# TPCH-Q15
+# Q15 - Top Supplier Query
+with revenue_view as (
+  select
+    l_suppkey as supplier_no,
+    sum(l_extendedprice * (1 - l_discount)) as total_revenue
+  from
+    lineitem
+  where
+    l_shipdate >= '1996-01-01'
+    and l_shipdate < '1996-04-01'
+  group by
+    l_suppkey)
+select
+  s_suppkey,
+  s_name,
+  s_address,
+  s_phone,
+  total_revenue
+from
+  supplier,
+  revenue_view
+where
+  s_suppkey = supplier_no
+  and total_revenue = (
+    select
+      max(total_revenue)
+    from
+      revenue_view
+    )
+order by
+  s_suppkey
+---- PLAN
+08:SORT
+|  order by: s_suppkey ASC
+|
+07:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: sum(l_extendedprice * (1 - l_discount)) = max(total_revenue)
+|
+|--05:AGGREGATE [FINALIZE]
+|  |  output: max(sum(l_extendedprice * (1 - l_discount)))
+|  |
+|  04:AGGREGATE [FINALIZE]
+|  |  output: sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount))
+|  |  group by: tpch.lineitem.l_suppkey
+|  |
+|  03:SCAN HDFS [tpch.lineitem]
+|     partitions=1/1 files=1 size=718.94MB
+|     predicates: tpch.lineitem.l_shipdate >= '1996-01-01', tpch.lineitem.l_shipdate < '1996-04-01'
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.supplier.s_suppkey = l_suppkey
+|  runtime filters: RF000 <- l_suppkey
+|
+|--02:AGGREGATE [FINALIZE]
+|  |  output: sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount))
+|  |  group by: tpch.lineitem.l_suppkey
+|  |
+|  01:SCAN HDFS [tpch.lineitem]
+|     partitions=1/1 files=1 size=718.94MB
+|     predicates: tpch.lineitem.l_shipdate >= '1996-01-01', tpch.lineitem.l_shipdate < '1996-04-01'
+|
+00:SCAN HDFS [tpch.supplier]
+   partitions=1/1 files=1 size=1.33MB
+   runtime filters: RF000 -> tpch.supplier.s_suppkey
+====
+# TPCH-Q16
+# Q16 - Parts/Supplier Relation Query
+select
+  p_brand,
+  p_type,
+  p_size,
+  count(distinct ps_suppkey) as supplier_cnt
+from
+  partsupp,
+  part
+where
+  p_partkey = ps_partkey
+  and p_brand <> 'Brand#45'
+  and p_type not like 'MEDIUM POLISHED%'
+  and p_size in (49, 14, 23, 45, 19, 3, 36, 9)
+  and ps_suppkey not in (
+    select
+      s_suppkey
+    from
+      supplier
+    where
+      s_comment like '%Customer%Complaints%'
+  )
+group by
+  p_brand,
+  p_type,
+  p_size
+order by
+  supplier_cnt desc,
+  p_brand,
+  p_type,
+  p_size
+---- PLAN
+07:SORT
+|  order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
+|
+06:AGGREGATE [FINALIZE]
+|  output: count(ps_suppkey)
+|  group by: p_brand, p_type, p_size
+|
+05:AGGREGATE
+|  group by: tpch.part.p_brand, tpch.part.p_type, tpch.part.p_size, tpch.partsupp.ps_suppkey
+|
+04:HASH JOIN [NULL AWARE LEFT ANTI JOIN]
+|  hash predicates: tpch.partsupp.ps_suppkey = tpch.supplier.s_suppkey
+|
+|--02:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     predicates: tpch.supplier.s_comment LIKE '%Customer%Complaints%'
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.partsupp.ps_partkey = tpch.part.p_partkey
+|  runtime filters: RF000 <- tpch.part.p_partkey
+|
+|--01:SCAN HDFS [tpch.part]
+|     partitions=1/1 files=1 size=22.83MB
+|     predicates: tpch.part.p_size IN (49, 14, 23, 45, 19, 3, 36, 9), tpch.part.p_brand != 'Brand#45', NOT tpch.part.p_type LIKE 'MEDIUM POLISHED%'
+|
+00:SCAN HDFS [tpch.partsupp]
+   partitions=1/1 files=1 size=112.71MB
+   runtime filters: RF000 -> tpch.partsupp.ps_partkey
+====
+# TPCH-Q17
+# Q17 - Small-Quantity-Order Revenue Query
+select
+  sum(l_extendedprice) / 7.0 as avg_yearly
+from
+  lineitem,
+  part
+where
+  p_partkey = l_partkey
+  and p_brand = 'Brand#23'
+  and p_container = 'MED BOX'
+  and l_quantity < (
+    select
+      0.2 * avg(l_quantity)
+    from
+      lineitem
+    where
+      l_partkey = p_partkey
+  )
+---- PLAN
+06:AGGREGATE [FINALIZE]
+|  output: sum(tpch.lineitem.l_extendedprice)
+|
+05:HASH JOIN [RIGHT SEMI JOIN]
+|  hash predicates: l_partkey = tpch.part.p_partkey
+|  other join predicates: tpch.lineitem.l_quantity < 0.2 * avg(l_quantity)
+|  runtime filters: RF000 <- tpch.part.p_partkey
+|
+|--04:HASH JOIN [INNER JOIN]
+|  |  hash predicates: tpch.lineitem.l_partkey = tpch.part.p_partkey
+|  |  runtime filters: RF001 <- tpch.part.p_partkey
+|  |
+|  |--01:SCAN HDFS [tpch.part]
+|  |     partitions=1/1 files=1 size=22.83MB
+|  |     predicates: tpch.part.p_container = 'MED BOX', tpch.part.p_brand = 'Brand#23'
+|  |
+|  00:SCAN HDFS [tpch.lineitem]
+|     partitions=1/1 files=1 size=718.94MB
+|     runtime filters: RF001 -> tpch.lineitem.l_partkey
+|
+03:AGGREGATE [FINALIZE]
+|  output: avg(tpch.lineitem.l_quantity)
+|  group by: tpch.lineitem.l_partkey
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF000 -> tpch.lineitem.l_partkey
+====
+# TPCH-Q18
+# Q18 - Large Value Customer Query
+select
+  c_name,
+  c_custkey,
+  o_orderkey,
+  o_orderdate,
+  o_totalprice,
+  sum(l_quantity)
+from
+  customer,
+  orders,
+  lineitem
+where
+  o_orderkey in (
+    select
+      l_orderkey
+    from
+      lineitem
+    group by
+      l_orderkey
+    having
+      sum(l_quantity) > 300
+    )
+  and c_custkey = o_custkey
+  and o_orderkey = l_orderkey
+group by
+  c_name,
+  c_custkey,
+  o_orderkey,
+  o_orderdate,
+  o_totalprice
+order by
+  o_totalprice desc,
+  o_orderdate
+limit 100
+---- PLAN
+09:TOP-N [LIMIT=100]
+|  order by: o_totalprice DESC, o_orderdate ASC
+|
+08:AGGREGATE [FINALIZE]
+|  output: sum(tpch.lineitem.l_quantity)
+|  group by: tpch.customer.c_name, tpch.customer.c_custkey, tpch.orders.o_orderkey, tpch.orders.o_orderdate, tpch.orders.o_totalprice
+|
+07:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: tpch.orders.o_orderkey = l_orderkey
+|  runtime filters: RF000 <- l_orderkey
+|
+|--04:AGGREGATE [FINALIZE]
+|  |  output: sum(tpch.lineitem.l_quantity)
+|  |  group by: tpch.lineitem.l_orderkey
+|  |  having: sum(l_quantity) > 300
+|  |
+|  03:SCAN HDFS [tpch.lineitem]
+|     partitions=1/1 files=1 size=718.94MB
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.orders.o_custkey = tpch.customer.c_custkey
+|  runtime filters: RF001 <- tpch.customer.c_custkey
+|
+|--00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_orderkey = tpch.orders.o_orderkey
+|  runtime filters: RF002 <- tpch.orders.o_orderkey
+|
+|--01:SCAN HDFS [tpch.orders]
+|     partitions=1/1 files=1 size=162.56MB
+|     runtime filters: RF000 -> tpch.orders.o_orderkey, RF001 -> tpch.orders.o_custkey
+|
+02:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF000 -> tpch.lineitem.l_orderkey, RF002 -> tpch.lineitem.l_orderkey
+====
+# TPCH-Q19
+# Q19 - Discounted Revenue Query
+select
+  sum(l_extendedprice * (1 - l_discount)) as revenue
+from
+  lineitem,
+  part
+where
+  p_partkey = l_partkey
+  and (
+    (
+      p_brand = 'Brand#12'
+      and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
+      and l_quantity >= 1 and l_quantity <= 11
+      and p_size between 1 and 5
+      and l_shipmode in ('AIR', 'AIR REG')
+      and l_shipinstruct = 'DELIVER IN PERSON'
+    )
+    or
+    (
+      p_brand = 'Brand#23'
+      and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
+      and l_quantity >= 10 and l_quantity <= 20
+      and p_size between 1 and 10
+      and l_shipmode in ('AIR', 'AIR REG')
+      and l_shipinstruct = 'DELIVER IN PERSON'
+    )
+    or
+    (
+      p_brand = 'Brand#34'
+      and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
+      and l_quantity >= 20 and l_quantity <= 30
+      and p_size between 1 and 15
+      and l_shipmode in ('AIR', 'AIR REG')
+      and l_shipinstruct = 'DELIVER IN PERSON'
+    )
+  )
+---- PLAN
+03:AGGREGATE [FINALIZE]
+|  output: sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount))
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: tpch.lineitem.l_partkey = tpch.part.p_partkey
+|  other predicates: ((tpch.part.p_brand = 'Brand#12' AND tpch.part.p_container IN ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') AND tpch.lineitem.l_quantity >= 1 AND tpch.lineitem.l_quantity <= 11 AND tpch.part.p_size BETWEEN 1 AND 5 AND tpch.lineitem.l_shipmode IN ('AIR', 'AIR REG') AND tpch.lineitem.l_shipinstruct = 'DELIVER IN PERSON') OR (tpch.part.p_brand = 'Brand#23' AND tpch.part.p_container IN ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') AND tpch.lineitem.l_quantity >= 10 AND tpch.lineitem.l_quantity <= 20 AND tpch.part.p_size BETWEEN 1 AND 10 AND tpch.lineitem.l_shipmode IN ('AIR', 'AIR REG') AND tpch.lineitem.l_shipinstruct = 'DELIVER IN PERSON') OR (tpch.part.p_brand = 'Brand#34' AND tpch.part.p_container IN ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') AND tpch.lineitem.l_quantity >= 20 AND tpch.lineitem.l_quantity <= 30 AND tpch.part.p_size BETWEEN 1 AND 15 AND tpch.lineitem.l_shipmode IN ('AIR', 'AIR REG') AND tpch.lineitem.l_shipinstruct = 'DELIVER IN PERSON'))
+|  runtime filters: RF000 <- tpch.part.p_partkey
+|
+|--01:SCAN HDFS [tpch.part]
+|     partitions=1/1 files=1 size=22.83MB
+|
+00:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF000 -> tpch.lineitem.l_partkey
+====
+# TPCH-Q20
+# Q20 - Potential Part Promotion Query
+select
+  s_name,
+  s_address
+from
+  supplier,
+  nation
+where
+  s_suppkey in (
+    select
+      ps_suppkey
+    from
+      partsupp
+    where
+      ps_partkey in (
+        select
+          p_partkey
+        from
+          part
+        where
+          p_name like 'forest%'
+        )
+      and ps_availqty > (
+        select
+          0.5 * sum(l_quantity)
+        from
+          lineitem
+        where
+          l_partkey = ps_partkey
+          and l_suppkey = ps_suppkey
+          and l_shipdate >= '1994-01-01'
+          and l_shipdate < '1995-01-01'
+        )
+    )
+  and s_nationkey = n_nationkey
+  and n_name = 'CANADA'
+order by
+  s_name
+---- PLAN
+10:SORT
+|  order by: s_name ASC
+|
+09:HASH JOIN [RIGHT SEMI JOIN]
+|  hash predicates: tpch.partsupp.ps_suppkey = tpch.supplier.s_suppkey
+|  runtime filters: RF000 <- tpch.supplier.s_suppkey
+|
+|--08:HASH JOIN [INNER JOIN]
+|  |  hash predicates: tpch.supplier.s_nationkey = tpch.nation.n_nationkey
+|  |  runtime filters: RF004 <- tpch.nation.n_nationkey
+|  |
+|  |--01:SCAN HDFS [tpch.nation]
+|  |     partitions=1/1 files=1 size=2.15KB
+|  |     predicates: tpch.nation.n_name = 'CANADA'
+|  |
+|  00:SCAN HDFS [tpch.supplier]
+|     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF004 -> tpch.supplier.s_nationkey
+|
+07:HASH JOIN [RIGHT SEMI JOIN]
+|  hash predicates: l_suppkey = tpch.partsupp.ps_suppkey, l_partkey = tpch.partsupp.ps_partkey
+|  other join predicates: tpch.partsupp.ps_availqty > 0.5 * sum(l_quantity)
+|  runtime filters: RF001 <- tpch.partsupp.ps_suppkey, RF002 <- tpch.partsupp.ps_partkey
+|
+|--06:HASH JOIN [LEFT SEMI JOIN]
+|  |  hash predicates: tpch.partsupp.ps_partkey = tpch.part.p_partkey
+|  |  runtime filters: RF003 <- tpch.part.p_partkey
+|  |
+|  |--03:SCAN HDFS [tpch.part]
+|  |     partitions=1/1 files=1 size=22.83MB
+|  |     predicates: tpch.part.p_name LIKE 'forest%'
+|  |
+|  02:SCAN HDFS [tpch.partsupp]
+|     partitions=1/1 files=1 size=112.71MB
+|     runtime filters: RF000 -> tpch.partsupp.ps_suppkey, RF003 -> tpch.partsupp.ps_partkey
+|
+05:AGGREGATE [FINALIZE]
+|  output: sum(tpch.lineitem.l_quantity)
+|  group by: tpch.lineitem.l_partkey, tpch.lineitem.l_suppkey
+|
+04:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: tpch.lineitem.l_shipdate >= '1994-01-01', tpch.lineitem.l_shipdate < '1995-01-01'
+   runtime filters: RF000 -> tpch.lineitem.l_suppkey, RF001 -> tpch.lineitem.l_suppkey, RF002 -> tpch.lineitem.l_partkey
+====
+# TPCH-Q21
+# Q21 - Suppliers Who Kept Orders Waiting Query
+select
+  s_name,
+  count(*) as numwait
+from
+  supplier,
+  lineitem l1,
+  orders,
+  tpch.nation
+where
+  s_suppkey = l1.l_suppkey
+  and o_orderkey = l1.l_orderkey
+  and o_orderstatus = 'F'
+  and l1.l_receiptdate > l1.l_commitdate
+  and exists (
+    select
+      *
+    from
+      lineitem l2
+    where
+      l2.l_orderkey = l1.l_orderkey
+      and l2.l_suppkey <> l1.l_suppkey
+  )
+  and not exists (
+    select
+      *
+    from
+      lineitem l3
+    where
+      l3.l_orderkey = l1.l_orderkey
+      and l3.l_suppkey <> l1.l_suppkey
+      and l3.l_receiptdate > l3.l_commitdate
+  )
+  and s_nationkey = n_nationkey
+  and n_name = 'SAUDI ARABIA'
+group by
+  s_name
+order by
+  numwait desc,
+  s_name
+limit 100
+---- PLAN
+12:TOP-N [LIMIT=100]
+|  order by: count(*) DESC, s_name ASC
+|
+11:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: tpch.supplier.s_name
+|
+10:HASH JOIN [RIGHT ANTI JOIN]
+|  hash predicates: tpch.lineitem.l_orderkey = tpch.lineitem.l_orderkey
+|  other join predicates: tpch.lineitem.l_suppkey != tpch.lineitem.l_suppkey
+|
+|--09:HASH JOIN [RIGHT SEMI JOIN]
+|  |  hash predicates: tpch.lineitem.l_orderkey = tpch.lineitem.l_orderkey
+|  |  other join predicates: tpch.lineitem.l_suppkey != tpch.lineitem.l_suppkey
+|  |  runtime filters: RF000 <- tpch.lineitem.l_orderkey
+|  |
+|  |--08:HASH JOIN [INNER JOIN]
+|  |  |  hash predicates: tpch.supplier.s_nationkey = n_nationkey
+|  |  |  runtime filters: RF001 <- n_nationkey
+|  |  |
+|  |  |--03:SCAN HDFS [tpch.nation]
+|  |  |     partitions=1/1 files=1 size=2.15KB
+|  |  |     predicates: n_name = 'SAUDI ARABIA'
+|  |  |
+|  |  07:HASH JOIN [INNER JOIN]
+|  |  |  hash predicates: tpch.lineitem.l_suppkey = tpch.supplier.s_suppkey
+|  |  |  runtime filters: RF002 <- tpch.supplier.s_suppkey
+|  |  |
+|  |  |--00:SCAN HDFS [tpch.supplier]
+|  |  |     partitions=1/1 files=1 size=1.33MB
+|  |  |     runtime filters: RF001 -> tpch.supplier.s_nationkey
+|  |  |
+|  |  06:HASH JOIN [INNER JOIN]
+|  |  |  hash predicates: tpch.lineitem.l_orderkey = tpch.orders.o_orderkey
+|  |  |  runtime filters: RF003 <- tpch.orders.o_orderkey
+|  |  |
+|  |  |--02:SCAN HDFS [tpch.orders]
+|  |  |     partitions=1/1 files=1 size=162.56MB
+|  |  |     predicates: tpch.orders.o_orderstatus = 'F'
+|  |  |
+|  |  01:SCAN HDFS [tpch.lineitem]
+|  |     partitions=1/1 files=1 size=718.94MB
+|  |     predicates: tpch.lineitem.l_receiptdate > tpch.lineitem.l_commitdate
+|  |     runtime filters: RF002 -> tpch.lineitem.l_suppkey, RF003 -> tpch.lineitem.l_orderkey
+|  |
+|  04:SCAN HDFS [tpch.lineitem]
+|     partitions=1/1 files=1 size=718.94MB
+|     runtime filters: RF000 -> tpch.lineitem.l_orderkey
+|
+05:SCAN HDFS [tpch.lineitem]
+   partitions=1/1 files=1 size=718.94MB
+   predicates: tpch.lineitem.l_receiptdate > tpch.lineitem.l_commitdate
+====
+# TPCH-Q22
+# Q22 - Global Sales Opportunity Query
+select
+  cntrycode,
+  count(*) as numcust,
+  sum(c_acctbal) as totacctbal
+from (
+  select
+    substr(c_phone, 1, 2) as cntrycode,
+    c_acctbal
+  from
+    customer
+  where
+    substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')
+    and c_acctbal > (
+      select
+        avg(c_acctbal)
+      from
+        customer
+      where
+        c_acctbal > 0.00
+        and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')
+      )
+    and not exists (
+      select
+        *
+      from
+        orders
+      where
+        o_custkey = c_custkey
+    )
+  ) as custsale
+group by
+  cntrycode
+order by
+  cntrycode
+---- PLAN
+07:SORT
+|  order by: cntrycode ASC
+|
+06:AGGREGATE [FINALIZE]
+|  output: count(*), sum(tpch.customer.c_acctbal)
+|  group by: substr(tpch.customer.c_phone, 1, 2)
+|
+05:HASH JOIN [RIGHT ANTI JOIN]
+|  hash predicates: tpch.orders.o_custkey = tpch.customer.c_custkey
+|
+|--04:NESTED LOOP JOIN [INNER JOIN]
+|  |  predicates: tpch.customer.c_acctbal > avg(c_acctbal)
+|  |
+|  |--02:AGGREGATE [FINALIZE]
+|  |  |  output: avg(tpch.customer.c_acctbal)
+|  |  |
+|  |  01:SCAN HDFS [tpch.customer]
+|  |     partitions=1/1 files=1 size=23.08MB
+|  |     predicates: tpch.customer.c_acctbal > 0.00, substr(tpch.customer.c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
+|  |
+|  00:SCAN HDFS [tpch.customer]
+|     partitions=1/1 files=1 size=23.08MB
+|     predicates: substr(tpch.customer.c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
+|
+03:SCAN HDFS [tpch.orders]
+   partitions=1/1 files=1 size=162.56MB
+====



[2/2] incubator-impala git commit: IMPALA-3940: Fix getting column stats through views.

Posted by ab...@apache.org.
IMPALA-3940: Fix getting column stats through views.

The bug: During join ordering we rely on the column stats of
join predicates for estimating the join cardinality. We have code
that tries to find the stats of a column through views but there
was a bug in identifying slots that belong to base table scans.
The bug lead us to incorrectly accept slots of view references
which do not have stats.

This patch fixes the above issue and adds new test infrastructure
for creating test-local views. It adds a TPCH-equivalent database that
contains views of the form "select * from tpch_basetbl" for all TPCH
tables and add tests the plans of all TPCH queries on the view database.

Change-Id: Ie3b62a5e7e7d0e84850749108c13991647cedce6
Reviewed-on: http://gerrit.cloudera.org:8080/3865
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/286da592
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/286da592
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/286da592

Branch: refs/heads/master
Commit: 286da59219f322ce99563537214e3bb30c0fa8c5
Parents: 88b89b8
Author: Alex Behm <al...@cloudera.com>
Authored: Mon Aug 8 10:27:09 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Aug 11 08:22:30 2016 +0000

----------------------------------------------------------------------
 .../impala/analysis/SlotDescriptor.java         |    2 +-
 .../java/com/cloudera/impala/catalog/View.java  |   15 +-
 .../impala/analysis/AnalyzeAuthStmtsTest.java   |    3 +-
 .../impala/analysis/AnalyzeDDLTest.java         |    3 +-
 .../cloudera/impala/analysis/AnalyzerTest.java  |  334 +---
 .../com/cloudera/impala/analysis/ToSqlTest.java |    3 +-
 .../impala/common/FrontendTestBase.java         |  316 ++++
 .../cloudera/impala/planner/PlannerTest.java    |   16 +-
 .../impala/planner/PlannerTestBase.java         |   10 +-
 .../queries/PlannerTest/joins.test              |   50 +-
 .../queries/PlannerTest/tpcds-all.test          |  335 ++--
 .../queries/PlannerTest/tpch-all.test           |  160 +-
 .../queries/PlannerTest/tpch-views.test         | 1473 ++++++++++++++++++
 13 files changed, 2145 insertions(+), 575 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/286da592/fe/src/main/java/com/cloudera/impala/analysis/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/SlotDescriptor.java b/fe/src/main/java/com/cloudera/impala/analysis/SlotDescriptor.java
index dea7789..7850a0e 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/SlotDescriptor.java
@@ -141,7 +141,7 @@ public class SlotDescriptor {
 
   public Path getPath() { return path_; }
 
-  public boolean isScanSlot() { return path_ != null; }
+  public boolean isScanSlot() { return path_ != null && path_.isRootedAtTable(); }
 
   public Column getColumn() { return !isScanSlot() ? null : path_.destColumn(); }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/286da592/fe/src/main/java/com/cloudera/impala/catalog/View.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/View.java b/fe/src/main/java/com/cloudera/impala/catalog/View.java
index 24f79b7..f062172 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/View.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/View.java
@@ -18,7 +18,6 @@
 package com.cloudera.impala.catalog;
 
 import java.io.StringReader;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
@@ -89,6 +88,16 @@ public class View extends Table {
     colLabels_ = colLabels;
   }
 
+  /**
+   * Creates a view for testig purposes.
+   */
+  private View(Db db, String name, QueryStmt queryStmt) {
+    super(null, null, db, name, null);
+    isLocalView_ = false;
+    queryStmt_ = queryStmt;
+    colLabels_ = null;
+  }
+
   @Override
   public void load(boolean reuseMetadata, HiveMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
@@ -191,4 +200,8 @@ public class View extends Table {
     view.setTable_type(TTableType.VIEW);
     return view;
   }
+
+  public static View createTestView(Db db, String name, QueryStmt viewDefStmt) {
+    return new View(db, name, viewDefStmt);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/286da592/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeAuthStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeAuthStmtsTest.java b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeAuthStmtsTest.java
index 2f54608..ec12ab8 100644
--- a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeAuthStmtsTest.java
+++ b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeAuthStmtsTest.java
@@ -30,8 +30,7 @@ import com.cloudera.impala.thrift.TQueryCtx;
 
 public class AnalyzeAuthStmtsTest extends AnalyzerTest {
   public AnalyzeAuthStmtsTest() throws AnalysisException {
-    analyzer_ = createAnalyzer(Catalog.DEFAULT_DB);
-    analyzer_.getCatalog().getAuthPolicy().addRole(
+    catalog_.getAuthPolicy().addRole(
         new Role("myRole", new HashSet<String>()));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/286da592/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeDDLTest.java
index f79f246..e89a7e0 100644
--- a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeDDLTest.java
@@ -46,6 +46,7 @@ import com.cloudera.impala.catalog.StructType;
 import com.cloudera.impala.catalog.Type;
 import com.cloudera.impala.common.AnalysisException;
 import com.cloudera.impala.common.FileSystemUtil;
+import com.cloudera.impala.common.FrontendTestBase;
 import com.cloudera.impala.common.RuntimeEnv;
 import com.cloudera.impala.testutil.TestUtils;
 import com.cloudera.impala.util.MetaStoreUtil;
@@ -53,7 +54,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class AnalyzeDDLTest extends AnalyzerTest {
+public class AnalyzeDDLTest extends FrontendTestBase {
 
   @Test
   public void TestAlterTableAddDropPartition() throws CatalogException {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/286da592/fe/src/test/java/com/cloudera/impala/analysis/AnalyzerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzerTest.java b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzerTest.java
index 8b65c7c..815279c 100644
--- a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzerTest.java
@@ -17,55 +17,27 @@
 
 package com.cloudera.impala.analysis;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-import java.io.StringReader;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.cloudera.impala.authorization.AuthorizationConfig;
-import com.cloudera.impala.catalog.AggregateFunction;
-import com.cloudera.impala.catalog.Catalog;
-import com.cloudera.impala.catalog.Column;
-import com.cloudera.impala.catalog.Db;
 import com.cloudera.impala.catalog.Function;
-import com.cloudera.impala.catalog.HdfsTable;
-import com.cloudera.impala.catalog.ImpaladCatalog;
 import com.cloudera.impala.catalog.PrimitiveType;
-import com.cloudera.impala.catalog.ScalarFunction;
 import com.cloudera.impala.catalog.ScalarType;
-import com.cloudera.impala.catalog.Table;
 import com.cloudera.impala.catalog.Type;
 import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.testutil.ImpaladTestCatalog;
-import com.cloudera.impala.testutil.TestUtils;
+import com.cloudera.impala.common.FrontendTestBase;
 import com.cloudera.impala.thrift.TExpr;
-import com.cloudera.impala.thrift.TFunctionBinaryType;
-import com.cloudera.impala.thrift.TQueryCtx;
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
-public class AnalyzerTest {
+public class AnalyzerTest extends FrontendTestBase {
   protected final static Logger LOG = LoggerFactory.getLogger(AnalyzerTest.class);
-  protected static ImpaladCatalog catalog_ = new ImpaladTestCatalog();
-
-  // Test-local list of test databases and tables. These are cleaned up in @After.
-  protected final List<Db> testDbs_ = Lists.newArrayList();
-  protected final List<Table> testTables_ = Lists.newArrayList();
-
-  protected Analyzer analyzer_;
 
   // maps from type to string that will result in literal of that type
   protected static Map<ScalarType, String> typeToLiteralValue_ =
@@ -86,117 +58,6 @@ public class AnalyzerTest {
     typeToLiteralValue_.put(Type.NULL, "NULL");
   }
 
-  protected Analyzer createAnalyzer(String defaultDb) {
-    TQueryCtx queryCtx =
-        TestUtils.createQueryContext(defaultDb, System.getProperty("user.name"));
-    return new Analyzer(catalog_, queryCtx,
-        AuthorizationConfig.createAuthDisabledConfig());
-  }
-
-  protected Analyzer createAnalyzer(TQueryOptions queryOptions) {
-    TQueryCtx queryCtx = TestUtils.createQueryContext();
-    queryCtx.request.query_options = queryOptions;
-    return new Analyzer(catalog_, queryCtx,
-        AuthorizationConfig.createAuthDisabledConfig());
-  }
-
-  protected Analyzer createAnalyzerUsingHiveColLabels() {
-    Analyzer analyzer = createAnalyzer(Catalog.DEFAULT_DB);
-    analyzer.setUseHiveColLabels(true);
-    return analyzer;
-  }
-
-  // Adds a Udf: default.name(args) to the catalog.
-  // TODO: we could consider having this be the sql to run instead but that requires
-  // connecting to the BE.
-  protected Function addTestFunction(String name,
-      ArrayList<ScalarType> args, boolean varArgs) {
-    return addTestFunction("default", name, args, varArgs);
-  }
-
-  protected Function addTestFunction(String name,
-      ScalarType arg, boolean varArgs) {
-    return addTestFunction("default", name, Lists.newArrayList(arg), varArgs);
-  }
-
-  protected Function addTestFunction(String db, String fnName,
-      ArrayList<ScalarType> args, boolean varArgs) {
-    ArrayList<Type> argTypes = Lists.newArrayList();
-    argTypes.addAll(args);
-    Function fn = ScalarFunction.createForTesting(
-        db, fnName, argTypes, Type.INT, "/Foo", "Foo.class", null,
-        null, TFunctionBinaryType.NATIVE);
-    fn.setHasVarArgs(varArgs);
-    catalog_.addFunction(fn);
-    return fn;
-  }
-
-  protected void addTestUda(String name, Type retType, Type... argTypes) {
-    FunctionName fnName = new FunctionName("default", name);
-    catalog_.addFunction(
-        AggregateFunction.createForTesting(
-            fnName, Lists.newArrayList(argTypes), retType, retType,
-            null, "init_fn_symbol", "update_fn_symbol", null, null,
-            null, null, null, TFunctionBinaryType.NATIVE));
-  }
-
-  /**
-   * Add a new dummy database with the given name to the catalog.
-   * Returns the new dummy database.
-   * The database is registered in testDbs_ and removed in the @After method.
-   */
-  protected Db addTestDb(String dbName, String comment) {
-    Db db = catalog_.getDb(dbName);
-    Preconditions.checkState(db == null, "Test db must not already exist.");
-    db = new Db(dbName, catalog_, new org.apache.hadoop.hive.metastore.api.Database(
-        dbName, comment, "", Collections.<String, String>emptyMap()));
-    catalog_.addDb(db);
-    testDbs_.add(db);
-    return db;
-  }
-
-  protected void clearTestDbs() {
-    for (Db testDb: testDbs_) {
-      catalog_.removeDb(testDb.getName());
-    }
-  }
-
-  /**
-   * Add a new dummy table to the catalog based on the given CREATE TABLE sql.
-   * The dummy table only has the column definitions and no other metadata.
-   * Returns the new dummy table.
-   * The test tables are registered in testTables_ and removed in the @After method.
-   */
-  protected Table addTestTable(String createTableSql) {
-    CreateTableStmt createTableStmt = (CreateTableStmt) AnalyzesOk(createTableSql);
-    // Currently does not support partitioned tables.
-    Preconditions.checkState(createTableStmt.getPartitionColumnDefs().isEmpty());
-    Db db = catalog_.getDb(createTableStmt.getDb());
-    Preconditions.checkNotNull(db, "Test tables must be created in an existing db.");
-    HdfsTable dummyTable = new HdfsTable(null, null, db, createTableStmt.getTbl(),
-        createTableStmt.getOwner());
-    List<ColumnDef> columnDefs = createTableStmt.getColumnDefs();
-    for (int i = 0; i < columnDefs.size(); ++i) {
-      ColumnDef colDef = columnDefs.get(i);
-      dummyTable.addColumn(new Column(colDef.getColName(), colDef.getType(), i));
-    }
-    db.addTable(dummyTable);
-    testTables_.add(dummyTable);
-    return dummyTable;
-  }
-
-  protected void clearTestTables() {
-    for (Table testTable: testTables_) {
-      testTable.getDb().removeTable(testTable.getName());
-    }
-  }
-
-  @After
-  public void tearDown() {
-    clearTestTables();
-    clearTestDbs();
-  }
-
   /**
    * Check whether SelectStmt components can be converted to thrift.
    */
@@ -239,123 +100,6 @@ public class AnalyzerTest {
   }
 
   /**
-   * Parse 'stmt' and return the root ParseNode.
-   */
-  public ParseNode ParsesOk(String stmt) {
-    SqlScanner input = new SqlScanner(new StringReader(stmt));
-    SqlParser parser = new SqlParser(input);
-    ParseNode node = null;
-    try {
-      node = (ParseNode) parser.parse().value;
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail("\nParser error:\n" + parser.getErrorMsg(stmt));
-    }
-    assertNotNull(node);
-    return node;
-  }
-
-  /**
-   * Analyze 'stmt', expecting it to pass. Asserts in case of analysis error.
-   */
-  public ParseNode AnalyzesOk(String stmt) {
-    return AnalyzesOk(stmt, createAnalyzer(Catalog.DEFAULT_DB), null);
-  }
-
-  /**
-   * Analyze 'stmt', expecting it to pass. Asserts in case of analysis error.
-   * If 'expectedWarning' is not null, asserts that a warning is produced.
-   */
-  public ParseNode AnalyzesOk(String stmt, String expectedWarning) {
-    return AnalyzesOk(stmt, createAnalyzer(Catalog.DEFAULT_DB), expectedWarning);
-  }
-
-  /**
-   * Analyze 'stmt', expecting it to pass. Asserts in case of analysis error.
-   * If 'expectedWarning' is not null, asserts that a warning is produced.
-   */
-  public ParseNode AnalyzesOk(String stmt, Analyzer analyzer, String expectedWarning) {
-    try {
-      analyzer_ = analyzer;
-      AnalysisContext analysisCtx = new AnalysisContext(catalog_,
-          TestUtils.createQueryContext(Catalog.DEFAULT_DB,
-              System.getProperty("user.name")),
-              AuthorizationConfig.createAuthDisabledConfig());
-      analysisCtx.analyze(stmt, analyzer);
-      AnalysisContext.AnalysisResult analysisResult = analysisCtx.getAnalysisResult();
-      if (expectedWarning != null) {
-        List<String> actualWarnings = analysisResult.getAnalyzer().getWarnings();
-        boolean matchedWarning = false;
-        for (String actualWarning: actualWarnings) {
-          if (actualWarning.startsWith(expectedWarning)) {
-            matchedWarning = true;
-            break;
-          }
-        }
-        if (!matchedWarning) {
-          fail(String.format("Did not produce expected warning.\n" +
-              "Expected warning:\n%s.\nActual warnings:\n%s",
-              expectedWarning, Joiner.on("\n").join(actualWarnings)));
-        }
-      }
-      Preconditions.checkNotNull(analysisResult.getStmt());
-      return analysisResult.getStmt();
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail("Error:\n" + e.toString());
-    }
-    return null;
-  }
-
-  /**
-   * Asserts if stmt passes analysis.
-   */
-  public void AnalysisError(String stmt) {
-    AnalysisError(stmt, null);
-  }
-
-  /**
-   * Analyze 'stmt', expecting it to pass. Asserts in case of analysis error.
-   */
-  public ParseNode AnalyzesOk(String stmt, Analyzer analyzer) {
-    return AnalyzesOk(stmt, analyzer, null);
-  }
-
-  /**
-   * Asserts if stmt passes analysis or the error string doesn't match and it
-   * is non-null.
-   */
-  public void AnalysisError(String stmt, String expectedErrorString) {
-    AnalysisError(stmt, createAnalyzer(Catalog.DEFAULT_DB), expectedErrorString);
-  }
-
-  /**
-   * Asserts if stmt passes analysis or the error string doesn't match and it
-   * is non-null.
-   */
-  public void AnalysisError(String stmt, Analyzer analyzer, String expectedErrorString) {
-    Preconditions.checkNotNull(expectedErrorString, "No expected error message given.");
-    LOG.info("processing " + stmt);
-    try {
-      AnalysisContext analysisCtx = new AnalysisContext(catalog_,
-          TestUtils.createQueryContext(Catalog.DEFAULT_DB,
-              System.getProperty("user.name")),
-              AuthorizationConfig.createAuthDisabledConfig());
-      analysisCtx.analyze(stmt, analyzer);
-      AnalysisContext.AnalysisResult analysisResult = analysisCtx.getAnalysisResult();
-      Preconditions.checkNotNull(analysisResult.getStmt());
-    } catch (Exception e) {
-      String errorString = e.getMessage();
-      Preconditions.checkNotNull(errorString, "Stack trace lost during exception.");
-      Assert.assertTrue(
-          "got error:\n" + errorString + "\nexpected:\n" + expectedErrorString,
-          errorString.startsWith(expectedErrorString));
-      return;
-    }
-    fail("Stmt didn't result in analysis error: " + stmt);
-  }
-
-  /**
    * Generates and analyzes two variants of the given query by replacing all occurrences
    * of "$TBL" in the query string with the unqualified and fully-qualified version of
    * the given table name. The unqualified variant is analyzed using an analyzer that has
@@ -426,28 +170,30 @@ public class AnalyzerTest {
   }
 
   private void testSelectStar() throws AnalysisException {
-    AnalyzesOk("select * from functional.AllTypes");
-    DescriptorTable descTbl = analyzer_.getDescTbl();
+    SelectStmt stmt = (SelectStmt) AnalyzesOk("select * from functional.AllTypes");
+    Analyzer analyzer = stmt.getAnalyzer();
+    DescriptorTable descTbl = analyzer.getDescTbl();
     TupleDescriptor tupleD = descTbl.getTupleDesc(new TupleId(0));
     for (SlotDescriptor slotD: tupleD.getSlots()) {
       slotD.setIsMaterialized(true);
     }
     descTbl.computeMemLayout();
     Assert.assertEquals(97.0f, tupleD.getAvgSerializedSize(), 0.0);
-    checkLayoutParams("functional.alltypes.bool_col", 1, 2, 0, 0);
-    checkLayoutParams("functional.alltypes.tinyint_col", 1, 3, 0, 1);
-    checkLayoutParams("functional.alltypes.smallint_col", 2, 4, 0, 2);
-    checkLayoutParams("functional.alltypes.id", 4, 8, 0, 3);
-    checkLayoutParams("functional.alltypes.int_col", 4, 12, 0, 4);
-    checkLayoutParams("functional.alltypes.float_col", 4, 16, 0, 5);
-    checkLayoutParams("functional.alltypes.year", 4, 20, 0, 6);
-    checkLayoutParams("functional.alltypes.month", 4, 24, 0, 7);
-    checkLayoutParams("functional.alltypes.bigint_col", 8, 32, 1, 0);
-    checkLayoutParams("functional.alltypes.double_col", 8, 40, 1, 1);
+    checkLayoutParams("functional.alltypes.bool_col", 1, 2, 0, 0, analyzer);
+    checkLayoutParams("functional.alltypes.tinyint_col", 1, 3, 0, 1, analyzer);
+    checkLayoutParams("functional.alltypes.smallint_col", 2, 4, 0, 2, analyzer);
+    checkLayoutParams("functional.alltypes.id", 4, 8, 0, 3, analyzer);
+    checkLayoutParams("functional.alltypes.int_col", 4, 12, 0, 4, analyzer);
+    checkLayoutParams("functional.alltypes.float_col", 4, 16, 0, 5, analyzer);
+    checkLayoutParams("functional.alltypes.year", 4, 20, 0, 6, analyzer);
+    checkLayoutParams("functional.alltypes.month", 4, 24, 0, 7, analyzer);
+    checkLayoutParams("functional.alltypes.bigint_col", 8, 32, 1, 0, analyzer);
+    checkLayoutParams("functional.alltypes.double_col", 8, 40, 1, 1, analyzer);
     int strSlotSize = PrimitiveType.STRING.getSlotSize();
-    checkLayoutParams("functional.alltypes.date_string_col", strSlotSize, 48, 1, 2);
+    checkLayoutParams("functional.alltypes.date_string_col",
+        strSlotSize, 48, 1, 2, analyzer);
     checkLayoutParams("functional.alltypes.string_col",
-        strSlotSize, 48 + strSlotSize, 1, 3);
+        strSlotSize, 48 + strSlotSize, 1, 3, analyzer);
   }
 
   private void testNonNullable() throws AnalysisException {
@@ -455,8 +201,9 @@ public class AnalyzerTest {
     // (byte range : data)
     // 0 - 7: count(int_col)
     // 8 - 15: count(*)
-    AnalyzesOk("select count(int_col), count(*) from functional.AllTypes");
-    DescriptorTable descTbl = analyzer_.getDescTbl();
+    SelectStmt stmt = (SelectStmt) AnalyzesOk(
+        "select count(int_col), count(*) from functional.AllTypes");
+    DescriptorTable descTbl = stmt.getAnalyzer().getDescTbl();
     TupleDescriptor aggDesc = descTbl.getTupleDesc(new TupleId(1));
     for (SlotDescriptor slotD: aggDesc.getSlots()) {
       slotD.setIsMaterialized(true);
@@ -475,8 +222,9 @@ public class AnalyzerTest {
     // 1 - 7: padded bytes
     // 8 - 15: sum(int_col)
     // 16 - 23: count(*)
-    AnalyzesOk("select sum(int_col), count(*) from functional.AllTypes");
-    DescriptorTable descTbl = analyzer_.getDescTbl();
+    SelectStmt stmt = (SelectStmt) AnalyzesOk(
+        "select sum(int_col), count(*) from functional.AllTypes");
+    DescriptorTable descTbl = stmt.getAnalyzer().getDescTbl();
     TupleDescriptor aggDesc = descTbl.getTupleDesc(new TupleId(1));
     for (SlotDescriptor slotD: aggDesc.getSlots()) {
       slotD.setIsMaterialized(true);
@@ -492,8 +240,9 @@ public class AnalyzerTest {
    * Tests that computeMemLayout() ignores non-materialized slots.
    */
   private void testNonMaterializedSlots() throws AnalysisException {
-    AnalyzesOk("select * from functional.alltypes");
-    DescriptorTable descTbl = analyzer_.getDescTbl();
+    SelectStmt stmt = (SelectStmt) AnalyzesOk("select * from functional.alltypes");
+    Analyzer analyzer = stmt.getAnalyzer();
+    DescriptorTable descTbl = analyzer.getDescTbl();
     TupleDescriptor tupleD = descTbl.getTupleDesc(new TupleId(0));
     ArrayList<SlotDescriptor> slots = tupleD.getSlots();
     for (SlotDescriptor slotD: slots) {
@@ -507,20 +256,21 @@ public class AnalyzerTest {
     descTbl.computeMemLayout();
     Assert.assertEquals(68.0f, tupleD.getAvgSerializedSize(), 0.0);
     // Check non-materialized slots.
-    checkLayoutParams("functional.alltypes.id", 0, -1, 0, 0);
-    checkLayoutParams("functional.alltypes.double_col", 0, -1, 0, 0);
-    checkLayoutParams("functional.alltypes.string_col", 0, -1, 0, 0);
+    checkLayoutParams("functional.alltypes.id", 0, -1, 0, 0, analyzer);
+    checkLayoutParams("functional.alltypes.double_col", 0, -1, 0, 0, analyzer);
+    checkLayoutParams("functional.alltypes.string_col", 0, -1, 0, 0, analyzer);
     // Check materialized slots.
-    checkLayoutParams("functional.alltypes.bool_col", 1, 2, 0, 0);
-    checkLayoutParams("functional.alltypes.tinyint_col", 1, 3, 0, 1);
-    checkLayoutParams("functional.alltypes.smallint_col", 2, 4, 0, 2);
-    checkLayoutParams("functional.alltypes.int_col", 4, 8, 0, 3);
-    checkLayoutParams("functional.alltypes.float_col", 4, 12, 0, 4);
-    checkLayoutParams("functional.alltypes.year", 4, 16, 0, 5);
-    checkLayoutParams("functional.alltypes.month", 4, 20, 0, 6);
-    checkLayoutParams("functional.alltypes.bigint_col", 8, 24, 0, 7);
+    checkLayoutParams("functional.alltypes.bool_col", 1, 2, 0, 0, analyzer);
+    checkLayoutParams("functional.alltypes.tinyint_col", 1, 3, 0, 1, analyzer);
+    checkLayoutParams("functional.alltypes.smallint_col", 2, 4, 0, 2, analyzer);
+    checkLayoutParams("functional.alltypes.int_col", 4, 8, 0, 3, analyzer);
+    checkLayoutParams("functional.alltypes.float_col", 4, 12, 0, 4, analyzer);
+    checkLayoutParams("functional.alltypes.year", 4, 16, 0, 5, analyzer);
+    checkLayoutParams("functional.alltypes.month", 4, 20, 0, 6, analyzer);
+    checkLayoutParams("functional.alltypes.bigint_col", 8, 24, 0, 7, analyzer);
     int strSlotSize = PrimitiveType.STRING.getSlotSize();
-    checkLayoutParams("functional.alltypes.date_string_col", strSlotSize, 32, 1, 0);
+    checkLayoutParams("functional.alltypes.date_string_col",
+        strSlotSize, 32, 1, 0, analyzer);
   }
 
   private void checkLayoutParams(SlotDescriptor d, int byteSize, int byteOffset,
@@ -532,8 +282,8 @@ public class AnalyzerTest {
   }
 
   private void checkLayoutParams(String colAlias, int byteSize, int byteOffset,
-      int nullIndicatorByte, int nullIndicatorBit) {
-    SlotDescriptor d = analyzer_.getSlotDescriptor(colAlias);
+      int nullIndicatorByte, int nullIndicatorBit, Analyzer analyzer) {
+    SlotDescriptor d = analyzer.getSlotDescriptor(colAlias);
     checkLayoutParams(d, byteSize, byteOffset, nullIndicatorByte, nullIndicatorBit);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/286da592/fe/src/test/java/com/cloudera/impala/analysis/ToSqlTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/analysis/ToSqlTest.java b/fe/src/test/java/com/cloudera/impala/analysis/ToSqlTest.java
index c264041..6b502a1 100644
--- a/fe/src/test/java/com/cloudera/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/com/cloudera/impala/analysis/ToSqlTest.java
@@ -24,12 +24,13 @@ import org.junit.Test;
 
 import com.cloudera.impala.authorization.AuthorizationConfig;
 import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.common.FrontendTestBase;
 import com.cloudera.impala.testutil.TestUtils;
 import com.google.common.base.Preconditions;
 
 // TODO: Expand this test, in particular, because view creation relies
 // on producing correct SQL.
-public class ToSqlTest extends AnalyzerTest {
+public class ToSqlTest extends FrontendTestBase {
 
   // Helpers for templated join tests.
   private static final String[] joinConditions_ =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/286da592/fe/src/test/java/com/cloudera/impala/common/FrontendTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/common/FrontendTestBase.java b/fe/src/test/java/com/cloudera/impala/common/FrontendTestBase.java
new file mode 100644
index 0000000..2d54182
--- /dev/null
+++ b/fe/src/test/java/com/cloudera/impala/common/FrontendTestBase.java
@@ -0,0 +1,316 @@
+// Copyright (c) 2016 Cloudera, Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.cloudera.impala.common;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Assert;
+
+import com.cloudera.impala.analysis.AnalysisContext;
+import com.cloudera.impala.analysis.Analyzer;
+import com.cloudera.impala.analysis.ColumnDef;
+import com.cloudera.impala.analysis.CreateTableStmt;
+import com.cloudera.impala.analysis.CreateViewStmt;
+import com.cloudera.impala.analysis.FunctionName;
+import com.cloudera.impala.analysis.ParseNode;
+import com.cloudera.impala.analysis.QueryStmt;
+import com.cloudera.impala.analysis.SqlParser;
+import com.cloudera.impala.analysis.SqlScanner;
+import com.cloudera.impala.authorization.AuthorizationConfig;
+import com.cloudera.impala.catalog.AggregateFunction;
+import com.cloudera.impala.catalog.Catalog;
+import com.cloudera.impala.catalog.Column;
+import com.cloudera.impala.catalog.Db;
+import com.cloudera.impala.catalog.Function;
+import com.cloudera.impala.catalog.HdfsTable;
+import com.cloudera.impala.catalog.ImpaladCatalog;
+import com.cloudera.impala.catalog.ScalarFunction;
+import com.cloudera.impala.catalog.ScalarType;
+import com.cloudera.impala.catalog.Table;
+import com.cloudera.impala.catalog.Type;
+import com.cloudera.impala.catalog.View;
+import com.cloudera.impala.service.Frontend;
+import com.cloudera.impala.testutil.ImpaladTestCatalog;
+import com.cloudera.impala.testutil.TestUtils;
+import com.cloudera.impala.thrift.TFunctionBinaryType;
+import com.cloudera.impala.thrift.TQueryCtx;
+import com.cloudera.impala.thrift.TQueryOptions;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Base class for most frontend tests. Contains common functions for unit testing
+ * various components, e.g., ParsesOk(), ParserError(), AnalyzesOk(), AnalysisError(),
+ * as well as helper functions for creating test-local tables/views and UDF/UDAs.
+ */
+public class FrontendTestBase {
+  protected static ImpaladCatalog catalog_ = new ImpaladTestCatalog();
+  protected static Frontend frontend_ = new Frontend(
+      AuthorizationConfig.createAuthDisabledConfig(), catalog_);
+
+  // Test-local list of test databases and tables. These are cleaned up in @After.
+  protected final List<Db> testDbs_ = Lists.newArrayList();
+  protected final List<Table> testTables_ = Lists.newArrayList();
+
+  protected Analyzer createAnalyzer(String defaultDb) {
+    TQueryCtx queryCtx =
+        TestUtils.createQueryContext(defaultDb, System.getProperty("user.name"));
+    return new Analyzer(catalog_, queryCtx,
+        AuthorizationConfig.createAuthDisabledConfig());
+  }
+
+  protected Analyzer createAnalyzer(TQueryOptions queryOptions) {
+    TQueryCtx queryCtx = TestUtils.createQueryContext();
+    queryCtx.request.query_options = queryOptions;
+    return new Analyzer(catalog_, queryCtx,
+        AuthorizationConfig.createAuthDisabledConfig());
+  }
+
+  protected Analyzer createAnalyzerUsingHiveColLabels() {
+    Analyzer analyzer = createAnalyzer(Catalog.DEFAULT_DB);
+    analyzer.setUseHiveColLabels(true);
+    return analyzer;
+  }
+
+  // Adds a Udf: default.name(args) to the catalog.
+  // TODO: we could consider having this be the sql to run instead but that requires
+  // connecting to the BE.
+  protected Function addTestFunction(String name,
+      ArrayList<ScalarType> args, boolean varArgs) {
+    return addTestFunction("default", name, args, varArgs);
+  }
+
+  protected Function addTestFunction(String name,
+      ScalarType arg, boolean varArgs) {
+    return addTestFunction("default", name, Lists.newArrayList(arg), varArgs);
+  }
+
+  protected Function addTestFunction(String db, String fnName,
+      ArrayList<ScalarType> args, boolean varArgs) {
+    ArrayList<Type> argTypes = Lists.newArrayList();
+    argTypes.addAll(args);
+    Function fn = ScalarFunction.createForTesting(
+        db, fnName, argTypes, Type.INT, "/Foo", "Foo.class", null,
+        null, TFunctionBinaryType.NATIVE);
+    fn.setHasVarArgs(varArgs);
+    catalog_.addFunction(fn);
+    return fn;
+  }
+
+  protected void addTestUda(String name, Type retType, Type... argTypes) {
+    FunctionName fnName = new FunctionName("default", name);
+    catalog_.addFunction(
+        AggregateFunction.createForTesting(
+            fnName, Lists.newArrayList(argTypes), retType, retType,
+            null, "init_fn_symbol", "update_fn_symbol", null, null,
+            null, null, null, TFunctionBinaryType.NATIVE));
+  }
+
+  /**
+   * Add a new dummy database with the given name to the catalog.
+   * Returns the new dummy database.
+   * The database is registered in testDbs_ and removed in the @After method.
+   */
+  protected Db addTestDb(String dbName, String comment) {
+    Db db = catalog_.getDb(dbName);
+    Preconditions.checkState(db == null, "Test db must not already exist.");
+    db = new Db(dbName, catalog_, new org.apache.hadoop.hive.metastore.api.Database(
+        dbName, comment, "", Collections.<String, String>emptyMap()));
+    catalog_.addDb(db);
+    testDbs_.add(db);
+    return db;
+  }
+
+  protected void clearTestDbs() {
+    for (Db testDb: testDbs_) {
+      catalog_.removeDb(testDb.getName());
+    }
+  }
+
+  /**
+   * Add a new dummy table to the catalog based on the given CREATE TABLE sql.
+   * The dummy table only has the column definitions and no other metadata.
+   * Returns the new dummy table.
+   * The test tables are registered in testTables_ and removed in the @After method.
+   */
+  protected Table addTestTable(String createTableSql) {
+    CreateTableStmt createTableStmt = (CreateTableStmt) AnalyzesOk(createTableSql);
+    // Currently does not support partitioned tables.
+    Preconditions.checkState(createTableStmt.getPartitionColumnDefs().isEmpty());
+    Db db = catalog_.getDb(createTableStmt.getDb());
+    Preconditions.checkNotNull(db, "Test tables must be created in an existing db.");
+    HdfsTable dummyTable = new HdfsTable(null, null, db, createTableStmt.getTbl(),
+        createTableStmt.getOwner());
+    List<ColumnDef> columnDefs = createTableStmt.getColumnDefs();
+    for (int i = 0; i < columnDefs.size(); ++i) {
+      ColumnDef colDef = columnDefs.get(i);
+      dummyTable.addColumn(new Column(colDef.getColName(), colDef.getType(), i));
+    }
+    db.addTable(dummyTable);
+    testTables_.add(dummyTable);
+    return dummyTable;
+  }
+
+  /**
+   * Adds a test-local view to the catalog based on the given CREATE VIEW sql.
+   * The test views are registered in testTables_ and removed in the @After method.
+   * Returns the new view.
+   */
+  protected Table addTestView(String createViewSql) {
+    CreateViewStmt createViewStmt = (CreateViewStmt) AnalyzesOk(createViewSql);
+    Db db = catalog_.getDb(createViewStmt.getDb());
+    Preconditions.checkNotNull(db, "Test views must be created in an existing db.");
+    QueryStmt viewStmt = (QueryStmt) AnalyzesOk(createViewStmt.getInlineViewDef());
+    View dummyView = View.createTestView(db, createViewStmt.getTbl(), viewStmt);
+    db.addTable(dummyView);
+    testTables_.add(dummyView);
+    return dummyView;
+  }
+
+  protected void clearTestTables() {
+    for (Table testTable: testTables_) {
+      testTable.getDb().removeTable(testTable.getName());
+    }
+  }
+
+  @After
+  public void tearDown() {
+    clearTestTables();
+    clearTestDbs();
+  }
+
+  /**
+   * Parse 'stmt' and return the root ParseNode.
+   */
+  public ParseNode ParsesOk(String stmt) {
+    SqlScanner input = new SqlScanner(new StringReader(stmt));
+    SqlParser parser = new SqlParser(input);
+    ParseNode node = null;
+    try {
+      node = (ParseNode) parser.parse().value;
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail("\nParser error:\n" + parser.getErrorMsg(stmt));
+    }
+    assertNotNull(node);
+    return node;
+  }
+
+  /**
+   * Analyze 'stmt', expecting it to pass. Asserts in case of analysis error.
+   */
+  public ParseNode AnalyzesOk(String stmt) {
+    return AnalyzesOk(stmt, createAnalyzer(Catalog.DEFAULT_DB), null);
+  }
+
+  /**
+   * Analyze 'stmt', expecting it to pass. Asserts in case of analysis error.
+   * If 'expectedWarning' is not null, asserts that a warning is produced.
+   */
+  public ParseNode AnalyzesOk(String stmt, String expectedWarning) {
+    return AnalyzesOk(stmt, createAnalyzer(Catalog.DEFAULT_DB), expectedWarning);
+  }
+
+  /**
+   * Analyze 'stmt', expecting it to pass. Asserts in case of analysis error.
+   * If 'expectedWarning' is not null, asserts that a warning is produced.
+   */
+  public ParseNode AnalyzesOk(String stmt, Analyzer analyzer, String expectedWarning) {
+    try {
+      AnalysisContext analysisCtx = new AnalysisContext(catalog_,
+          TestUtils.createQueryContext(Catalog.DEFAULT_DB,
+              System.getProperty("user.name")),
+              AuthorizationConfig.createAuthDisabledConfig());
+      analysisCtx.analyze(stmt, analyzer);
+      AnalysisContext.AnalysisResult analysisResult = analysisCtx.getAnalysisResult();
+      if (expectedWarning != null) {
+        List<String> actualWarnings = analysisResult.getAnalyzer().getWarnings();
+        boolean matchedWarning = false;
+        for (String actualWarning: actualWarnings) {
+          if (actualWarning.startsWith(expectedWarning)) {
+            matchedWarning = true;
+            break;
+          }
+        }
+        if (!matchedWarning) {
+          fail(String.format("Did not produce expected warning.\n" +
+              "Expected warning:\n%s.\nActual warnings:\n%s",
+              expectedWarning, Joiner.on("\n").join(actualWarnings)));
+        }
+      }
+      Preconditions.checkNotNull(analysisResult.getStmt());
+      return analysisResult.getStmt();
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail("Error:\n" + e.toString());
+    }
+    return null;
+  }
+
+  /**
+   * Asserts if stmt passes analysis.
+   */
+  public void AnalysisError(String stmt) {
+    AnalysisError(stmt, null);
+  }
+
+  /**
+   * Analyze 'stmt', expecting it to pass. Asserts in case of analysis error.
+   */
+  public ParseNode AnalyzesOk(String stmt, Analyzer analyzer) {
+    return AnalyzesOk(stmt, analyzer, null);
+  }
+
+  /**
+   * Asserts if stmt passes analysis or the error string doesn't match and it
+   * is non-null.
+   */
+  public void AnalysisError(String stmt, String expectedErrorString) {
+    AnalysisError(stmt, createAnalyzer(Catalog.DEFAULT_DB), expectedErrorString);
+  }
+
+  /**
+   * Asserts if stmt passes analysis or the error string doesn't match and it
+   * is non-null.
+   */
+  public void AnalysisError(String stmt, Analyzer analyzer, String expectedErrorString) {
+    Preconditions.checkNotNull(expectedErrorString, "No expected error message given.");
+    try {
+      AnalysisContext analysisCtx = new AnalysisContext(catalog_,
+          TestUtils.createQueryContext(Catalog.DEFAULT_DB,
+              System.getProperty("user.name")),
+              AuthorizationConfig.createAuthDisabledConfig());
+      analysisCtx.analyze(stmt, analyzer);
+      AnalysisContext.AnalysisResult analysisResult = analysisCtx.getAnalysisResult();
+      Preconditions.checkNotNull(analysisResult.getStmt());
+    } catch (Exception e) {
+      String errorString = e.getMessage();
+      Preconditions.checkNotNull(errorString, "Stack trace lost during exception.");
+      Assert.assertTrue(
+          "got error:\n" + errorString + "\nexpected:\n" + expectedErrorString,
+          errorString.startsWith(expectedErrorString));
+      return;
+    }
+    fail("Stmt didn't result in analysis error: " + stmt);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/286da592/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java b/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java
index 153ba97..7472da0 100644
--- a/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java
@@ -19,6 +19,7 @@ package com.cloudera.impala.planner;
 
 import org.junit.Test;
 
+import com.cloudera.impala.catalog.Db;
 import com.cloudera.impala.thrift.TQueryOptions;
 import com.cloudera.impala.thrift.TRuntimeFilterMode;
 
@@ -174,7 +175,20 @@ public class PlannerTest extends PlannerTestBase {
 
   @Test
   public void testTpch() {
-    runPlannerTestFile("tpch-all");
+    runPlannerTestFile("tpch-all", "tpch");
+  }
+
+  @Test
+  public void testTpchViews() {
+    // Re-create TPCH with views on the base tables. Used for testing
+    // that plan generation works as expected through views.
+    addTestDb("tpch_views", "Test DB for TPCH with views.");
+    Db tpchDb = catalog_.getDb("tpch");
+    for (String tblName: tpchDb.getAllTableNames()) {
+      addTestView(String.format(
+          "create view tpch_views.%s as select * from tpch.%s", tblName, tblName));
+    }
+    runPlannerTestFile("tpch-views", "tpch_views");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/286da592/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java b/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java
index 04b5b5f..328d151 100644
--- a/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java
@@ -39,13 +39,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.cloudera.impala.analysis.ColumnLineageGraph;
-import com.cloudera.impala.authorization.AuthorizationConfig;
 import com.cloudera.impala.catalog.CatalogException;
+import com.cloudera.impala.common.FrontendTestBase;
 import com.cloudera.impala.common.ImpalaException;
 import com.cloudera.impala.common.NotImplementedException;
 import com.cloudera.impala.common.RuntimeEnv;
-import com.cloudera.impala.service.Frontend;
-import com.cloudera.impala.testutil.ImpaladTestCatalog;
 import com.cloudera.impala.testutil.TestFileParser;
 import com.cloudera.impala.testutil.TestFileParser.Section;
 import com.cloudera.impala.testutil.TestFileParser.TestCase;
@@ -60,8 +58,8 @@ import com.cloudera.impala.thrift.THdfsPartition;
 import com.cloudera.impala.thrift.THdfsPartitionLocation;
 import com.cloudera.impala.thrift.THdfsScanNode;
 import com.cloudera.impala.thrift.THdfsTable;
-import com.cloudera.impala.thrift.TLineageGraph;
 import com.cloudera.impala.thrift.TKuduKeyRange;
+import com.cloudera.impala.thrift.TLineageGraph;
 import com.cloudera.impala.thrift.TNetworkAddress;
 import com.cloudera.impala.thrift.TPlanFragment;
 import com.cloudera.impala.thrift.TPlanNode;
@@ -79,11 +77,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-public class PlannerTestBase {
+public class PlannerTestBase extends FrontendTestBase {
   private final static Logger LOG = LoggerFactory.getLogger(PlannerTest.class);
   private final static boolean GENERATE_OUTPUT_FILE = true;
-  private static Frontend frontend_ = new Frontend(
-      AuthorizationConfig.createAuthDisabledConfig(), new ImpaladTestCatalog());
   private final String testDir_ = "functional-planner/queries/PlannerTest";
   private final String outDir_ = "/tmp/PlannerTest/";
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/286da592/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
index 5d0e892..f14363f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
@@ -727,8 +727,15 @@ inner join [shuffle]
 on (b.int_col = c.int_col and c.bool_col = b.bool_col)
 ---- PLAN
 05:HASH JOIN [INNER JOIN]
-|  hash predicates: b.int_col = int_col, b.bool_col = bool_col
-|  runtime filters: RF000 <- int_col, RF001 <- bool_col
+|  hash predicates: a.int_col = b.int_col, a.bool_col = b.bool_col
+|  runtime filters: RF000 <- b.int_col, RF001 <- b.bool_col
+|
+|--01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = int_col, a.bool_col = bool_col
+|  runtime filters: RF002 <- int_col, RF003 <- bool_col
 |
 |--03:AGGREGATE [FINALIZE]
 |  |  output: count(*)
@@ -736,14 +743,7 @@ on (b.int_col = c.int_col and c.bool_col = b.bool_col)
 |  |
 |  02:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
-|
-04:HASH JOIN [INNER JOIN]
-|  hash predicates: a.int_col = b.int_col, a.bool_col = b.bool_col
-|  runtime filters: RF002 <- b.int_col, RF003 <- b.bool_col
-|
-|--01:SCAN HDFS [functional.alltypes b]
-|     partitions=24/24 files=24 size=478.45KB
-|     runtime filters: RF000 -> b.int_col, RF001 -> b.bool_col
+|     runtime filters: RF000 -> functional.alltypes.int_col, RF001 -> functional.alltypes.bool_col
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
@@ -752,14 +752,23 @@ on (b.int_col = c.int_col and c.bool_col = b.bool_col)
 10:EXCHANGE [UNPARTITIONED]
 |
 05:HASH JOIN [INNER JOIN, PARTITIONED]
-|  hash predicates: b.int_col = int_col, b.bool_col = bool_col
-|  runtime filters: RF000 <- int_col, RF001 <- bool_col
+|  hash predicates: a.int_col = b.int_col, a.bool_col = b.bool_col
+|  runtime filters: RF000 <- b.int_col, RF001 <- b.bool_col
 |
-|--09:AGGREGATE [FINALIZE]
+|--09:EXCHANGE [HASH(b.int_col,b.bool_col)]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.int_col = int_col, a.bool_col = bool_col
+|  runtime filters: RF002 <- int_col, RF003 <- bool_col
+|
+|--07:AGGREGATE [FINALIZE]
 |  |  output: count:merge(*)
 |  |  group by: int_col, bool_col
 |  |
-|  08:EXCHANGE [HASH(int_col,bool_col)]
+|  06:EXCHANGE [HASH(int_col,bool_col)]
 |  |
 |  03:AGGREGATE [STREAMING]
 |  |  output: count(*)
@@ -767,18 +776,9 @@ on (b.int_col = c.int_col and c.bool_col = b.bool_col)
 |  |
 |  02:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
+|     runtime filters: RF000 -> functional.alltypes.int_col, RF001 -> functional.alltypes.bool_col
 |
-04:HASH JOIN [INNER JOIN, PARTITIONED]
-|  hash predicates: a.int_col = b.int_col, a.bool_col = b.bool_col
-|  runtime filters: RF002 <- b.int_col, RF003 <- b.bool_col
-|
-|--07:EXCHANGE [HASH(b.int_col,b.bool_col)]
-|  |
-|  01:SCAN HDFS [functional.alltypes b]
-|     partitions=24/24 files=24 size=478.45KB
-|     runtime filters: RF000 -> b.int_col, RF001 -> b.bool_col
-|
-06:EXCHANGE [HASH(a.int_col,a.bool_col)]
+08:EXCHANGE [HASH(a.int_col,a.bool_col)]
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/286da592/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
index 22032ab..6baefb1 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
@@ -2512,20 +2512,20 @@ limit 100
 |  runtime filters: RF000 <- d_week_seq - 52, RF001 <- s_store_id
 |
 |--15:HASH JOIN [INNER JOIN]
+|  |  hash predicates: ss_store_sk = s_store_sk
+|  |  runtime filters: RF005 <- s_store_sk
+|  |
+|  |--12:SCAN HDFS [tpcds.store]
+|  |     partitions=1/1 files=1 size=3.08KB
+|  |
+|  14:HASH JOIN [INNER JOIN]
 |  |  hash predicates: d_week_seq = d.d_week_seq
-|  |  runtime filters: RF005 <- d.d_week_seq
+|  |  runtime filters: RF006 <- d.d_week_seq
 |  |
 |  |--13:SCAN HDFS [tpcds.date_dim d]
 |  |     partitions=1/1 files=1 size=9.84MB
 |  |     predicates: d_month_seq >= 1185 + 12, d_month_seq <= 1185 + 23
 |  |
-|  14:HASH JOIN [INNER JOIN]
-|  |  hash predicates: ss_store_sk = s_store_sk
-|  |  runtime filters: RF006 <- s_store_sk
-|  |
-|  |--12:SCAN HDFS [tpcds.store]
-|  |     partitions=1/1 files=1 size=3.08KB
-|  |
 |  11:AGGREGATE [FINALIZE]
 |  |  output: sum(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Friday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Saturday') THEN ss_sales_price ELSE NULL END)
 |  |  group by: d_week_seq, ss_store_sk
@@ -2536,29 +2536,29 @@ limit 100
 |  |
 |  |--09:SCAN HDFS [tpcds.date_dim]
 |  |     partitions=1/1 files=1 size=9.84MB
-|  |     runtime filters: RF005 -> tpcds.date_dim.d_week_seq
+|  |     runtime filters: RF006 -> tpcds.date_dim.d_week_seq
 |  |
 |  08:SCAN HDFS [tpcds.store_sales]
 |     partitions=120/120 files=120 size=21.31MB
-|     runtime filters: RF006 -> tpcds.store_sales.ss_store_sk, RF007 -> ss_sold_date_sk
+|     runtime filters: RF005 -> tpcds.store_sales.ss_store_sk, RF007 -> ss_sold_date_sk
 |
 07:HASH JOIN [INNER JOIN]
+|  hash predicates: ss_store_sk = s_store_sk
+|  runtime filters: RF002 <- s_store_sk
+|
+|--04:SCAN HDFS [tpcds.store]
+|     partitions=1/1 files=1 size=3.08KB
+|     runtime filters: RF001 -> s_store_id
+|
+06:HASH JOIN [INNER JOIN]
 |  hash predicates: d_week_seq = d.d_week_seq
-|  runtime filters: RF002 <- d.d_week_seq
+|  runtime filters: RF003 <- d.d_week_seq
 |
 |--05:SCAN HDFS [tpcds.date_dim d]
 |     partitions=1/1 files=1 size=9.84MB
 |     predicates: d_month_seq >= 1185, d_month_seq <= 1185 + 11
 |     runtime filters: RF000 -> d.d_week_seq
 |
-06:HASH JOIN [INNER JOIN]
-|  hash predicates: ss_store_sk = s_store_sk
-|  runtime filters: RF003 <- s_store_sk
-|
-|--04:SCAN HDFS [tpcds.store]
-|     partitions=1/1 files=1 size=3.08KB
-|     runtime filters: RF001 -> s_store_id
-|
 03:AGGREGATE [FINALIZE]
 |  output: sum(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Friday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Saturday') THEN ss_sales_price ELSE NULL END)
 |  group by: d_week_seq, ss_store_sk
@@ -2569,11 +2569,11 @@ limit 100
 |
 |--01:SCAN HDFS [tpcds.date_dim]
 |     partitions=1/1 files=1 size=9.84MB
-|     runtime filters: RF000 -> tpcds.date_dim.d_week_seq, RF002 -> tpcds.date_dim.d_week_seq
+|     runtime filters: RF000 -> tpcds.date_dim.d_week_seq, RF003 -> tpcds.date_dim.d_week_seq
 |
 00:SCAN HDFS [tpcds.store_sales]
    partitions=120/120 files=120 size=21.31MB
-   runtime filters: RF003 -> tpcds.store_sales.ss_store_sk, RF004 -> ss_sold_date_sk
+   runtime filters: RF002 -> tpcds.store_sales.ss_store_sk, RF004 -> ss_sold_date_sk
 ---- DISTRIBUTEDPLAN
 32:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: s_store_name1 ASC, s_store_id1 ASC, d_week_seq1 ASC
@@ -2589,23 +2589,23 @@ limit 100
 |--31:EXCHANGE [HASH(d_week_seq - 52,s_store_id)]
 |  |
 |  15:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: d_week_seq = d.d_week_seq
-|  |  runtime filters: RF005 <- d.d_week_seq
+|  |  hash predicates: ss_store_sk = s_store_sk
+|  |  runtime filters: RF005 <- s_store_sk
 |  |
 |  |--29:EXCHANGE [BROADCAST]
 |  |  |
-|  |  13:SCAN HDFS [tpcds.date_dim d]
-|  |     partitions=1/1 files=1 size=9.84MB
-|  |     predicates: d_month_seq >= 1185 + 12, d_month_seq <= 1185 + 23
+|  |  12:SCAN HDFS [tpcds.store]
+|  |     partitions=1/1 files=1 size=3.08KB
 |  |
 |  14:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: ss_store_sk = s_store_sk
-|  |  runtime filters: RF006 <- s_store_sk
+|  |  hash predicates: d_week_seq = d.d_week_seq
+|  |  runtime filters: RF006 <- d.d_week_seq
 |  |
 |  |--28:EXCHANGE [BROADCAST]
 |  |  |
-|  |  12:SCAN HDFS [tpcds.store]
-|  |     partitions=1/1 files=1 size=3.08KB
+|  |  13:SCAN HDFS [tpcds.date_dim d]
+|  |     partitions=1/1 files=1 size=9.84MB
+|  |     predicates: d_month_seq >= 1185 + 12, d_month_seq <= 1185 + 23
 |  |
 |  27:AGGREGATE [FINALIZE]
 |  |  output: sum:merge(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Friday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Saturday') THEN ss_sales_price ELSE NULL END)
@@ -2625,36 +2625,36 @@ limit 100
 |  |  |
 |  |  09:SCAN HDFS [tpcds.date_dim]
 |  |     partitions=1/1 files=1 size=9.84MB
-|  |     runtime filters: RF005 -> tpcds.date_dim.d_week_seq
+|  |     runtime filters: RF006 -> tpcds.date_dim.d_week_seq
 |  |
 |  24:EXCHANGE [HASH(ss_sold_date_sk)]
 |  |
 |  08:SCAN HDFS [tpcds.store_sales]
 |     partitions=120/120 files=120 size=21.31MB
-|     runtime filters: RF006 -> tpcds.store_sales.ss_store_sk, RF007 -> ss_sold_date_sk
+|     runtime filters: RF005 -> tpcds.store_sales.ss_store_sk, RF007 -> ss_sold_date_sk
 |
 30:EXCHANGE [HASH(d_week_seq,s_store_id)]
 |
 07:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: d_week_seq = d.d_week_seq
-|  runtime filters: RF002 <- d.d_week_seq
+|  hash predicates: ss_store_sk = s_store_sk
+|  runtime filters: RF002 <- s_store_sk
 |
 |--23:EXCHANGE [BROADCAST]
 |  |
-|  05:SCAN HDFS [tpcds.date_dim d]
-|     partitions=1/1 files=1 size=9.84MB
-|     predicates: d_month_seq >= 1185, d_month_seq <= 1185 + 11
-|     runtime filters: RF000 -> d.d_week_seq
+|  04:SCAN HDFS [tpcds.store]
+|     partitions=1/1 files=1 size=3.08KB
+|     runtime filters: RF001 -> s_store_id
 |
 06:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: ss_store_sk = s_store_sk
-|  runtime filters: RF003 <- s_store_sk
+|  hash predicates: d_week_seq = d.d_week_seq
+|  runtime filters: RF003 <- d.d_week_seq
 |
 |--22:EXCHANGE [BROADCAST]
 |  |
-|  04:SCAN HDFS [tpcds.store]
-|     partitions=1/1 files=1 size=3.08KB
-|     runtime filters: RF001 -> s_store_id
+|  05:SCAN HDFS [tpcds.date_dim d]
+|     partitions=1/1 files=1 size=9.84MB
+|     predicates: d_month_seq >= 1185, d_month_seq <= 1185 + 11
+|     runtime filters: RF000 -> d.d_week_seq
 |
 21:AGGREGATE [FINALIZE]
 |  output: sum:merge(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Friday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Saturday') THEN ss_sales_price ELSE NULL END)
@@ -2674,13 +2674,13 @@ limit 100
 |  |
 |  01:SCAN HDFS [tpcds.date_dim]
 |     partitions=1/1 files=1 size=9.84MB
-|     runtime filters: RF000 -> tpcds.date_dim.d_week_seq, RF002 -> tpcds.date_dim.d_week_seq
+|     runtime filters: RF000 -> tpcds.date_dim.d_week_seq, RF003 -> tpcds.date_dim.d_week_seq
 |
 18:EXCHANGE [HASH(ss_sold_date_sk)]
 |
 00:SCAN HDFS [tpcds.store_sales]
    partitions=120/120 files=120 size=21.31MB
-   runtime filters: RF003 -> tpcds.store_sales.ss_store_sk, RF004 -> ss_sold_date_sk
+   runtime filters: RF002 -> tpcds.store_sales.ss_store_sk, RF004 -> ss_sold_date_sk
 ---- PARALLELPLANS
 32:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: s_store_name1 ASC, s_store_id1 ASC, d_week_seq1 ASC
@@ -2700,31 +2700,31 @@ limit 100
 |  31:EXCHANGE [HASH(d_week_seq - 52,s_store_id)]
 |  |
 |  15:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: d_week_seq = d.d_week_seq
-|  |  runtime filters: RF005 <- d.d_week_seq
+|  |  hash predicates: ss_store_sk = s_store_sk
+|  |  runtime filters: RF005 <- s_store_sk
 |  |
 |  |--JOIN BUILD
 |  |  |  join-table-id=01 plan-id=02 cohort-id=02
-|  |  |  build expressions: d.d_week_seq
+|  |  |  build expressions: s_store_sk
 |  |  |
 |  |  29:EXCHANGE [BROADCAST]
 |  |  |
-|  |  13:SCAN HDFS [tpcds.date_dim d]
-|  |     partitions=1/1 files=1 size=9.84MB
-|  |     predicates: d_month_seq >= 1185 + 12, d_month_seq <= 1185 + 23
+|  |  12:SCAN HDFS [tpcds.store]
+|  |     partitions=1/1 files=1 size=3.08KB
 |  |
 |  14:HASH JOIN [INNER JOIN, BROADCAST]
-|  |  hash predicates: ss_store_sk = s_store_sk
-|  |  runtime filters: RF006 <- s_store_sk
+|  |  hash predicates: d_week_seq = d.d_week_seq
+|  |  runtime filters: RF006 <- d.d_week_seq
 |  |
 |  |--JOIN BUILD
 |  |  |  join-table-id=02 plan-id=03 cohort-id=02
-|  |  |  build expressions: s_store_sk
+|  |  |  build expressions: d.d_week_seq
 |  |  |
 |  |  28:EXCHANGE [BROADCAST]
 |  |  |
-|  |  12:SCAN HDFS [tpcds.store]
-|  |     partitions=1/1 files=1 size=3.08KB
+|  |  13:SCAN HDFS [tpcds.date_dim d]
+|  |     partitions=1/1 files=1 size=9.84MB
+|  |     predicates: d_month_seq >= 1185 + 12, d_month_seq <= 1185 + 23
 |  |
 |  27:AGGREGATE [FINALIZE]
 |  |  output: sum:merge(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Friday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Saturday') THEN ss_sales_price ELSE NULL END)
@@ -2748,44 +2748,44 @@ limit 100
 |  |  |
 |  |  09:SCAN HDFS [tpcds.date_dim]
 |  |     partitions=1/1 files=1 size=9.84MB
-|  |     runtime filters: RF005 -> tpcds.date_dim.d_week_seq
+|  |     runtime filters: RF006 -> tpcds.date_dim.d_week_seq
 |  |
 |  24:EXCHANGE [HASH(ss_sold_date_sk)]
 |  |
 |  08:SCAN HDFS [tpcds.store_sales]
 |     partitions=120/120 files=120 size=21.31MB
-|     runtime filters: RF006 -> tpcds.store_sales.ss_store_sk, RF007 -> ss_sold_date_sk
+|     runtime filters: RF005 -> tpcds.store_sales.ss_store_sk, RF007 -> ss_sold_date_sk
 |
 30:EXCHANGE [HASH(d_week_seq,s_store_id)]
 |
 07:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: d_week_seq = d.d_week_seq
-|  runtime filters: RF002 <- d.d_week_seq
+|  hash predicates: ss_store_sk = s_store_sk
+|  runtime filters: RF002 <- s_store_sk
 |
 |--JOIN BUILD
 |  |  join-table-id=04 plan-id=05 cohort-id=01
-|  |  build expressions: d.d_week_seq
+|  |  build expressions: s_store_sk
 |  |
 |  23:EXCHANGE [BROADCAST]
 |  |
-|  05:SCAN HDFS [tpcds.date_dim d]
-|     partitions=1/1 files=1 size=9.84MB
-|     predicates: d_month_seq >= 1185, d_month_seq <= 1185 + 11
-|     runtime filters: RF000 -> d.d_week_seq
+|  04:SCAN HDFS [tpcds.store]
+|     partitions=1/1 files=1 size=3.08KB
+|     runtime filters: RF001 -> s_store_id
 |
 06:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: ss_store_sk = s_store_sk
-|  runtime filters: RF003 <- s_store_sk
+|  hash predicates: d_week_seq = d.d_week_seq
+|  runtime filters: RF003 <- d.d_week_seq
 |
 |--JOIN BUILD
 |  |  join-table-id=05 plan-id=06 cohort-id=01
-|  |  build expressions: s_store_sk
+|  |  build expressions: d.d_week_seq
 |  |
 |  22:EXCHANGE [BROADCAST]
 |  |
-|  04:SCAN HDFS [tpcds.store]
-|     partitions=1/1 files=1 size=3.08KB
-|     runtime filters: RF001 -> s_store_id
+|  05:SCAN HDFS [tpcds.date_dim d]
+|     partitions=1/1 files=1 size=9.84MB
+|     predicates: d_month_seq >= 1185, d_month_seq <= 1185 + 11
+|     runtime filters: RF000 -> d.d_week_seq
 |
 21:AGGREGATE [FINALIZE]
 |  output: sum:merge(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Friday') THEN ss_sales_price ELSE NULL END), sum:merge(CASE WHEN (d_day_name = 'Saturday') THEN ss_sales_price ELSE NULL END)
@@ -2809,13 +2809,13 @@ limit 100
 |  |
 |  01:SCAN HDFS [tpcds.date_dim]
 |     partitions=1/1 files=1 size=9.84MB
-|     runtime filters: RF000 -> tpcds.date_dim.d_week_seq, RF002 -> tpcds.date_dim.d_week_seq
+|     runtime filters: RF000 -> tpcds.date_dim.d_week_seq, RF003 -> tpcds.date_dim.d_week_seq
 |
 18:EXCHANGE [HASH(ss_sold_date_sk)]
 |
 00:SCAN HDFS [tpcds.store_sales]
    partitions=120/120 files=120 size=21.31MB
-   runtime filters: RF003 -> tpcds.store_sales.ss_store_sk, RF004 -> ss_sold_date_sk
+   runtime filters: RF002 -> tpcds.store_sales.ss_store_sk, RF004 -> ss_sold_date_sk
 ====
 # TPCDS-Q63
 select
@@ -3094,9 +3094,23 @@ limit 100
 |  order by: s_store_name ASC, i_item_desc ASC
 |
 13:HASH JOIN [INNER JOIN]
+|  hash predicates: ss_item_sk = i_item_sk
+|  runtime filters: RF000 <- i_item_sk
+|
+|--01:SCAN HDFS [tpcds.item]
+|     partitions=1/1 files=1 size=4.82MB
+|
+12:HASH JOIN [INNER JOIN]
+|  hash predicates: ss_store_sk = s_store_sk
+|  runtime filters: RF001 <- s_store_sk
+|
+|--00:SCAN HDFS [tpcds.store]
+|     partitions=1/1 files=1 size=3.08KB
+|
+11:HASH JOIN [INNER JOIN]
 |  hash predicates: ss_store_sk = ss_store_sk
 |  other predicates: sum(ss_sales_price) <= 0.1 * avg(revenue)
-|  runtime filters: RF000 <- ss_store_sk
+|  runtime filters: RF002 <- ss_store_sk
 |
 |--06:AGGREGATE [FINALIZE]
 |  |  output: avg(sum(ss_sales_price))
@@ -3116,22 +3130,7 @@ limit 100
 |  |
 |  02:SCAN HDFS [tpcds.store_sales]
 |     partitions=120/120 files=120 size=21.31MB
-|     runtime filters: RF004 -> ss_sold_date_sk
-|
-12:HASH JOIN [INNER JOIN]
-|  hash predicates: ss_item_sk = i_item_sk
-|  runtime filters: RF001 <- i_item_sk
-|
-|--01:SCAN HDFS [tpcds.item]
-|     partitions=1/1 files=1 size=4.82MB
-|
-11:HASH JOIN [INNER JOIN]
-|  hash predicates: ss_store_sk = s_store_sk
-|  runtime filters: RF002 <- s_store_sk
-|
-|--00:SCAN HDFS [tpcds.store]
-|     partitions=1/1 files=1 size=3.08KB
-|     runtime filters: RF000 -> tpcds.store.s_store_sk
+|     runtime filters: RF001 -> tpcds.store_sales.ss_store_sk, RF004 -> ss_sold_date_sk
 |
 10:AGGREGATE [FINALIZE]
 |  output: sum(ss_sales_price)
@@ -3147,37 +3146,59 @@ limit 100
 |
 07:SCAN HDFS [tpcds.store_sales]
    partitions=120/120 files=120 size=21.31MB
-   runtime filters: RF000 -> tpcds.store_sales.ss_store_sk, RF001 -> tpcds.store_sales.ss_item_sk, RF002 -> tpcds.store_sales.ss_store_sk, RF003 -> ss_sold_date_sk
+   runtime filters: RF000 -> tpcds.store_sales.ss_item_sk, RF001 -> tpcds.store_sales.ss_store_sk, RF002 -> tpcds.store_sales.ss_store_sk, RF003 -> ss_sold_date_sk
 ---- DISTRIBUTEDPLAN
-26:MERGING-EXCHANGE [UNPARTITIONED]
+28:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: s_store_name ASC, i_item_desc ASC
 |  limit: 100
 |
 14:TOP-N [LIMIT=100]
 |  order by: s_store_name ASC, i_item_desc ASC
 |
-13:HASH JOIN [INNER JOIN, BROADCAST]
+13:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_item_sk = i_item_sk
+|  runtime filters: RF000 <- i_item_sk
+|
+|--27:EXCHANGE [HASH(i_item_sk)]
+|  |
+|  01:SCAN HDFS [tpcds.item]
+|     partitions=1/1 files=1 size=4.82MB
+|
+26:EXCHANGE [HASH(ss_item_sk)]
+|
+12:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_store_sk = s_store_sk
+|  runtime filters: RF001 <- s_store_sk
+|
+|--25:EXCHANGE [HASH(s_store_sk)]
+|  |
+|  00:SCAN HDFS [tpcds.store]
+|     partitions=1/1 files=1 size=3.08KB
+|
+24:EXCHANGE [HASH(ss_store_sk)]
+|
+11:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: ss_store_sk = ss_store_sk
 |  other predicates: sum(ss_sales_price) <= 0.1 * avg(revenue)
-|  runtime filters: RF000 <- ss_store_sk
+|  runtime filters: RF002 <- ss_store_sk
 |
-|--25:EXCHANGE [BROADCAST]
+|--23:EXCHANGE [BROADCAST]
 |  |
-|  24:AGGREGATE [FINALIZE]
+|  22:AGGREGATE [FINALIZE]
 |  |  output: avg:merge(revenue)
 |  |  group by: ss_store_sk
 |  |
-|  23:EXCHANGE [HASH(ss_store_sk)]
+|  21:EXCHANGE [HASH(ss_store_sk)]
 |  |
 |  06:AGGREGATE [STREAMING]
 |  |  output: avg(sum(ss_sales_price))
 |  |  group by: ss_store_sk
 |  |
-|  22:AGGREGATE [FINALIZE]
+|  20:AGGREGATE [FINALIZE]
 |  |  output: sum:merge(ss_sales_price)
 |  |  group by: ss_store_sk, ss_item_sk
 |  |
-|  21:EXCHANGE [HASH(ss_store_sk,ss_item_sk)]
+|  19:EXCHANGE [HASH(ss_store_sk,ss_item_sk)]
 |  |
 |  05:AGGREGATE [STREAMING]
 |  |  output: sum(ss_sales_price)
@@ -3187,7 +3208,7 @@ limit 100
 |  |  hash predicates: ss_sold_date_sk = d_date_sk
 |  |  runtime filters: RF004 <- d_date_sk
 |  |
-|  |--20:EXCHANGE [BROADCAST]
+|  |--18:EXCHANGE [BROADCAST]
 |  |  |
 |  |  03:SCAN HDFS [tpcds.date_dim]
 |  |     partitions=1/1 files=1 size=9.84MB
@@ -3195,26 +3216,7 @@ limit 100
 |  |
 |  02:SCAN HDFS [tpcds.store_sales]
 |     partitions=120/120 files=120 size=21.31MB
-|     runtime filters: RF004 -> ss_sold_date_sk
-|
-12:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: ss_item_sk = i_item_sk
-|  runtime filters: RF001 <- i_item_sk
-|
-|--19:EXCHANGE [BROADCAST]
-|  |
-|  01:SCAN HDFS [tpcds.item]
-|     partitions=1/1 files=1 size=4.82MB
-|
-11:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: ss_store_sk = s_store_sk
-|  runtime filters: RF002 <- s_store_sk
-|
-|--18:EXCHANGE [BROADCAST]
-|  |
-|  00:SCAN HDFS [tpcds.store]
-|     partitions=1/1 files=1 size=3.08KB
-|     runtime filters: RF000 -> tpcds.store.s_store_sk
+|     runtime filters: RF001 -> tpcds.store_sales.ss_store_sk, RF004 -> ss_sold_date_sk
 |
 17:AGGREGATE [FINALIZE]
 |  output: sum:merge(ss_sales_price)
@@ -3238,41 +3240,71 @@ limit 100
 |
 07:SCAN HDFS [tpcds.store_sales]
    partitions=120/120 files=120 size=21.31MB
-   runtime filters: RF000 -> tpcds.store_sales.ss_store_sk, RF001 -> tpcds.store_sales.ss_item_sk, RF002 -> tpcds.store_sales.ss_store_sk, RF003 -> ss_sold_date_sk
+   runtime filters: RF000 -> tpcds.store_sales.ss_item_sk, RF001 -> tpcds.store_sales.ss_store_sk, RF002 -> tpcds.store_sales.ss_store_sk, RF003 -> ss_sold_date_sk
 ---- PARALLELPLANS
-26:MERGING-EXCHANGE [UNPARTITIONED]
+28:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: s_store_name ASC, i_item_desc ASC
 |  limit: 100
 |
 14:TOP-N [LIMIT=100]
 |  order by: s_store_name ASC, i_item_desc ASC
 |
-13:HASH JOIN [INNER JOIN, BROADCAST]
+13:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_item_sk = i_item_sk
+|  runtime filters: RF000 <- i_item_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: i_item_sk
+|  |
+|  27:EXCHANGE [HASH(i_item_sk)]
+|  |
+|  01:SCAN HDFS [tpcds.item]
+|     partitions=1/1 files=1 size=4.82MB
+|
+26:EXCHANGE [HASH(ss_item_sk)]
+|
+12:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: ss_store_sk = s_store_sk
+|  runtime filters: RF001 <- s_store_sk
+|
+|--JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: s_store_sk
+|  |
+|  25:EXCHANGE [HASH(s_store_sk)]
+|  |
+|  00:SCAN HDFS [tpcds.store]
+|     partitions=1/1 files=1 size=3.08KB
+|
+24:EXCHANGE [HASH(ss_store_sk)]
+|
+11:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: ss_store_sk = ss_store_sk
 |  other predicates: sum(ss_sales_price) <= 0.1 * avg(revenue)
-|  runtime filters: RF000 <- ss_store_sk
+|  runtime filters: RF002 <- ss_store_sk
 |
 |--JOIN BUILD
-|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  join-table-id=02 plan-id=03 cohort-id=01
 |  |  build expressions: ss_store_sk
 |  |
-|  25:EXCHANGE [BROADCAST]
+|  23:EXCHANGE [BROADCAST]
 |  |
-|  24:AGGREGATE [FINALIZE]
+|  22:AGGREGATE [FINALIZE]
 |  |  output: avg:merge(revenue)
 |  |  group by: ss_store_sk
 |  |
-|  23:EXCHANGE [HASH(ss_store_sk)]
+|  21:EXCHANGE [HASH(ss_store_sk)]
 |  |
 |  06:AGGREGATE [STREAMING]
 |  |  output: avg(sum(ss_sales_price))
 |  |  group by: ss_store_sk
 |  |
-|  22:AGGREGATE [FINALIZE]
+|  20:AGGREGATE [FINALIZE]
 |  |  output: sum:merge(ss_sales_price)
 |  |  group by: ss_store_sk, ss_item_sk
 |  |
-|  21:EXCHANGE [HASH(ss_store_sk,ss_item_sk)]
+|  19:EXCHANGE [HASH(ss_store_sk,ss_item_sk)]
 |  |
 |  05:AGGREGATE [STREAMING]
 |  |  output: sum(ss_sales_price)
@@ -3283,10 +3315,10 @@ limit 100
 |  |  runtime filters: RF004 <- d_date_sk
 |  |
 |  |--JOIN BUILD
-|  |  |  join-table-id=01 plan-id=02 cohort-id=02
+|  |  |  join-table-id=03 plan-id=04 cohort-id=02
 |  |  |  build expressions: d_date_sk
 |  |  |
-|  |  20:EXCHANGE [BROADCAST]
+|  |  18:EXCHANGE [BROADCAST]
 |  |  |
 |  |  03:SCAN HDFS [tpcds.date_dim]
 |  |     partitions=1/1 files=1 size=9.84MB
@@ -3294,34 +3326,7 @@ limit 100
 |  |
 |  02:SCAN HDFS [tpcds.store_sales]
 |     partitions=120/120 files=120 size=21.31MB
-|     runtime filters: RF004 -> ss_sold_date_sk
-|
-12:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: ss_item_sk = i_item_sk
-|  runtime filters: RF001 <- i_item_sk
-|
-|--JOIN BUILD
-|  |  join-table-id=02 plan-id=03 cohort-id=01
-|  |  build expressions: i_item_sk
-|  |
-|  19:EXCHANGE [BROADCAST]
-|  |
-|  01:SCAN HDFS [tpcds.item]
-|     partitions=1/1 files=1 size=4.82MB
-|
-11:HASH JOIN [INNER JOIN, BROADCAST]
-|  hash predicates: ss_store_sk = s_store_sk
-|  runtime filters: RF002 <- s_store_sk
-|
-|--JOIN BUILD
-|  |  join-table-id=03 plan-id=04 cohort-id=01
-|  |  build expressions: s_store_sk
-|  |
-|  18:EXCHANGE [BROADCAST]
-|  |
-|  00:SCAN HDFS [tpcds.store]
-|     partitions=1/1 files=1 size=3.08KB
-|     runtime filters: RF000 -> tpcds.store.s_store_sk
+|     runtime filters: RF001 -> tpcds.store_sales.ss_store_sk, RF004 -> ss_sold_date_sk
 |
 17:AGGREGATE [FINALIZE]
 |  output: sum:merge(ss_sales_price)
@@ -3349,7 +3354,7 @@ limit 100
 |
 07:SCAN HDFS [tpcds.store_sales]
    partitions=120/120 files=120 size=21.31MB
-   runtime filters: RF000 -> tpcds.store_sales.ss_store_sk, RF001 -> tpcds.store_sales.ss_item_sk, RF002 -> tpcds.store_sales.ss_store_sk, RF003 -> ss_sold_date_sk
+   runtime filters: RF000 -> tpcds.store_sales.ss_item_sk, RF001 -> tpcds.store_sales.ss_store_sk, RF002 -> tpcds.store_sales.ss_store_sk, RF003 -> ss_sold_date_sk
 ====
 # TPCDS-Q68
 select
@@ -5129,21 +5134,21 @@ with v1 as (
    partitions=120/120 files=120 size=21.31MB
    runtime filters: RF000 -> ss_store_sk, RF001 -> ss_sold_date_sk, RF002 -> ss_item_sk
 ---- PARALLELPLANS
-53:MERGING-EXCHANGE [UNPARTITIONED]
+54:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: sum_sales - avg_monthly_sales ASC, d_year ASC
 |  limit: 100
 |
 35:TOP-N [LIMIT=100]
 |  order by: sum_sales - avg_monthly_sales ASC, d_year ASC
 |
-34:HASH JOIN [INNER JOIN, BROADCAST]
+34:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: rank() = rank() - 1, s_store_name = s_store_name, i_category = i_category, s_company_name = s_company_name, i_brand = i_brand
 |
 |--JOIN BUILD
 |  |  join-table-id=00 plan-id=01 cohort-id=01
 |  |  build expressions: rank() - 1, s_store_name, i_category, s_company_name, i_brand
 |  |
-|  52:EXCHANGE [BROADCAST]
+|  53:EXCHANGE [HASH(rank() - 1,s_store_name,i_category,s_company_name,i_brand)]
 |  |
 |  32:ANALYTIC
 |  |  functions: rank()
@@ -5208,6 +5213,8 @@ with v1 as (
 |     partitions=120/120 files=120 size=21.31MB
 |     runtime filters: RF006 -> ss_store_sk, RF007 -> ss_sold_date_sk, RF008 -> ss_item_sk
 |
+52:EXCHANGE [HASH(rank(),s_store_name,i_category,s_company_name,i_brand)]
+|
 33:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: rank() + 1 = rank(), s_store_name = s_store_name, i_category = i_category, s_company_name = s_company_name, i_brand = i_brand
 |

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/286da592/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
index b91cc04..e5948eb 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
@@ -12,7 +12,7 @@ select
   avg(l_discount) as avg_disc,
   count(*) as count_order
 from
-  tpch.lineitem
+  lineitem
 where
   l_shipdate <= '1998-09-02'
 group by
@@ -85,11 +85,11 @@ select
   s_phone,
   s_comment
 from
-  tpch.part,
-  tpch.supplier,
-  tpch.partsupp,
-  tpch.nation,
-  tpch.region
+  part,
+  supplier,
+  partsupp,
+  nation,
+  region
 where
   p_partkey = ps_partkey
   and s_suppkey = ps_suppkey
@@ -441,9 +441,9 @@ select
   o_orderdate,
   o_shippriority
 from
-  tpch.customer,
-  tpch.orders,
-  tpch.lineitem
+  customer,
+  orders,
+  lineitem
 where
   c_mktsegment = 'BUILDING'
   and c_custkey = o_custkey
@@ -588,7 +588,7 @@ select
   o_orderpriority,
   count(*) as order_count
 from
-  tpch.orders
+  orders
 where
   o_orderdate >= '1993-07-01'
   and o_orderdate < '1993-10-01'
@@ -596,7 +596,7 @@ where
     select
       *
     from
-      tpch.lineitem
+      lineitem
     where
       l_orderkey = o_orderkey
       and l_commitdate < l_receiptdate
@@ -702,12 +702,12 @@ select
   n_name,
   sum(l_extendedprice * (1 - l_discount)) as revenue
 from
-  tpch.customer,
-  tpch.orders,
-  tpch.lineitem,
-  tpch.supplier,
-  tpch.nation,
-  tpch.region
+  customer,
+  orders,
+  lineitem,
+  supplier,
+  nation,
+  region
 where
   c_custkey = o_custkey
   and l_orderkey = o_orderkey
@@ -942,7 +942,7 @@ order by
 select
   sum(l_extendedprice * l_discount) as revenue
 from
-  tpch.lineitem
+  lineitem
 where
   l_shipdate >= '1994-01-01'
   and l_shipdate < '1995-01-01'
@@ -994,12 +994,12 @@ from (
     year(l_shipdate) as l_year,
     l_extendedprice * (1 - l_discount) as volume
   from
-    tpch.supplier,
-    tpch.lineitem,
-    tpch.orders,
-    tpch.customer,
-    tpch.nation n1,
-    tpch.nation n2
+    supplier,
+    lineitem,
+    orders,
+    customer,
+    nation n1,
+    nation n2
   where
     s_suppkey = l_suppkey
     and o_orderkey = l_orderkey
@@ -1251,14 +1251,14 @@ from (
     l_extendedprice * (1 - l_discount) as volume,
     n2.n_name as nation
   from
-    tpch.part,
-    tpch.supplier,
-    tpch.lineitem,
-    tpch.orders,
-    tpch.customer,
-    tpch.nation n1,
-    tpch.nation n2,
-    tpch.region
+    part,
+    supplier,
+    lineitem,
+    orders,
+    customer,
+    nation n1,
+    nation n2,
+    region
   where
     p_partkey = l_partkey
     and s_suppkey = l_suppkey
@@ -1571,12 +1571,12 @@ from(
     year(o_orderdate) as o_year,
     l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount
   from
-    tpch.part,
-    tpch.supplier,
-    tpch.lineitem,
-    tpch.partsupp,
-    tpch.orders,
-    tpch.nation
+    part,
+    supplier,
+    lineitem,
+    partsupp,
+    orders,
+    nation
   where
     s_suppkey = l_suppkey
     and ps_suppkey = l_suppkey
@@ -1815,10 +1815,10 @@ select
   c_phone,
   c_comment
 from
-  tpch.customer,
-  tpch.orders,
-  tpch.lineitem,
-  tpch.nation
+  customer,
+  orders,
+  lineitem,
+  nation
 where
   c_custkey = o_custkey
   and l_orderkey = o_orderkey
@@ -2004,9 +2004,9 @@ from (
     ps_partkey,
     sum(ps_supplycost * ps_availqty) as value
   from
-    tpch.partsupp,
-    tpch.supplier,
-    tpch.nation
+    partsupp,
+    supplier,
+    nation
   where
     ps_suppkey = s_suppkey
     and s_nationkey = n_nationkey
@@ -2019,9 +2019,9 @@ where
     select
       sum(ps_supplycost * ps_availqty) * 0.0001
     from
-      tpch.partsupp,
-      tpch.supplier,
-      tpch.nation
+      partsupp,
+      supplier,
+      nation
     where
       ps_suppkey = s_suppkey
       and s_nationkey = n_nationkey
@@ -2274,8 +2274,8 @@ select
     else 0
   end) as low_line_count
 from
-  tpch.orders,
-  tpch.lineitem
+  orders,
+  lineitem
 where
   o_orderkey = l_orderkey
   and l_shipmode in ('MAIL', 'SHIP')
@@ -2381,7 +2381,7 @@ from (
     c_custkey,
     count(o_orderkey) as c_count
   from
-    tpch.customer left outer join tpch.orders on (
+    customer left outer join tpch.orders on (
       c_custkey = o_custkey
       and o_comment not like '%special%requests%'
     )
@@ -2502,8 +2502,8 @@ select
     else 0.0
     end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
 from
-  tpch.lineitem,
-  tpch.part
+  lineitem,
+  part
 where
   l_partkey = p_partkey
   and l_shipdate >= '1995-09-01'
@@ -2579,7 +2579,7 @@ with revenue_view as (
     l_suppkey as supplier_no,
     sum(l_extendedprice * (1 - l_discount)) as total_revenue
   from
-    tpch.lineitem
+    lineitem
   where
     l_shipdate >= '1996-01-01'
     and l_shipdate < '1996-04-01'
@@ -2592,7 +2592,7 @@ select
   s_phone,
   total_revenue
 from
-  tpch.supplier,
+  supplier,
   revenue_view
 where
   s_suppkey = supplier_no
@@ -2768,8 +2768,8 @@ select
   p_size,
   count(distinct ps_suppkey) as supplier_cnt
 from
-  tpch.partsupp,
-  tpch.part
+  partsupp,
+  part
 where
   p_partkey = ps_partkey
   and p_brand <> 'Brand#45'
@@ -2779,7 +2779,7 @@ where
     select
       s_suppkey
     from
-      tpch.supplier
+      supplier
     where
       s_comment like '%Customer%Complaints%'
   )
@@ -2917,8 +2917,8 @@ order by
 select
   sum(l_extendedprice) / 7.0 as avg_yearly
 from
-  tpch.lineitem,
-  tpch.part
+  lineitem,
+  part
 where
   p_partkey = l_partkey
   and p_brand = 'Brand#23'
@@ -2927,7 +2927,7 @@ where
     select
       0.2 * avg(l_quantity)
     from
-      tpch.lineitem
+      lineitem
     where
       l_partkey = p_partkey
   )
@@ -3064,15 +3064,15 @@ select
   o_totalprice,
   sum(l_quantity)
 from
-  tpch.customer,
-  tpch.orders,
-  tpch.lineitem
+  customer,
+  orders,
+  lineitem
 where
   o_orderkey in (
     select
       l_orderkey
     from
-      tpch.lineitem
+      lineitem
     group by
       l_orderkey
     having
@@ -3266,8 +3266,8 @@ limit 100
 select
   sum(l_extendedprice * (1 - l_discount)) as revenue
 from
-  tpch.lineitem,
-  tpch.part
+  lineitem,
+  part
 where
   p_partkey = l_partkey
   and (
@@ -3368,20 +3368,20 @@ select
   s_name,
   s_address
 from
-  tpch.supplier,
-  tpch.nation
+  supplier,
+  nation
 where
   s_suppkey in (
     select
       ps_suppkey
     from
-      tpch.partsupp
+      partsupp
     where
       ps_partkey in (
         select
           p_partkey
         from
-          tpch.part
+          part
         where
           p_name like 'forest%'
         )
@@ -3389,7 +3389,7 @@ where
         select
           0.5 * sum(l_quantity)
         from
-          tpch.lineitem
+          lineitem
         where
           l_partkey = ps_partkey
           and l_suppkey = ps_suppkey
@@ -3597,9 +3597,9 @@ select
   s_name,
   count(*) as numwait
 from
-  tpch.supplier,
-  tpch.lineitem l1,
-  tpch.orders,
+  supplier,
+  lineitem l1,
+  orders,
   tpch.nation
 where
   s_suppkey = l1.l_suppkey
@@ -3610,7 +3610,7 @@ where
     select
       *
     from
-      tpch.lineitem l2
+      lineitem l2
     where
       l2.l_orderkey = l1.l_orderkey
       and l2.l_suppkey <> l1.l_suppkey
@@ -3619,7 +3619,7 @@ where
     select
       *
     from
-      tpch.lineitem l3
+      lineitem l3
     where
       l3.l_orderkey = l1.l_orderkey
       and l3.l_suppkey <> l1.l_suppkey
@@ -3868,14 +3868,14 @@ from (
     substr(c_phone, 1, 2) as cntrycode,
     c_acctbal
   from
-    tpch.customer
+    customer
   where
     substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')
     and c_acctbal > (
       select
         avg(c_acctbal)
       from
-        tpch.customer
+        customer
       where
         c_acctbal > 0.00
         and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')
@@ -3884,7 +3884,7 @@ from (
       select
         *
       from
-        tpch.orders
+        orders
       where
         o_custkey = c_custkey
     )