You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/18 16:43:39 UTC

[16/32] incubator-impala git commit: IMPALA-2905: Handle coordinator fragment lifecycle like all others

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/testdata/workloads/functional-planner/queries/PlannerTest/order.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/order.test b/testdata/workloads/functional-planner/queries/PlannerTest/order.test
index a39b9b2..266c517 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/order.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/order.test
@@ -2,12 +2,16 @@ select name, zip
 from functional.testtbl
 order by name offset 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:SORT
 |  order by: name ASC
 |
 00:SCAN HDFS [functional.testtbl]
    partitions=1/1 files=0 size=0B
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 02:MERGING-EXCHANGE [UNPARTITIONED]
 |  offset: 5
 |  order by: name ASC
@@ -22,12 +26,16 @@ select name, zip
 from functional.testtbl
 order by name
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:SORT
 |  order by: name ASC
 |
 00:SCAN HDFS [functional.testtbl]
    partitions=1/1 files=0 size=0B
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 02:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: name ASC
 |
@@ -43,6 +51,8 @@ where name like 'm%'
 group by 1
 order by 2 desc
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:SORT
 |  order by: count(*) DESC
 |
@@ -54,6 +64,8 @@ order by 2 desc
    partitions=1/1 files=0 size=0B
    predicates: name LIKE 'm%'
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: count(*) DESC
 |
@@ -80,6 +92,8 @@ where id < 5
 group by 1
 order by 2
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:SORT
 |  order by: sum(float_col) ASC
 |
@@ -90,6 +104,8 @@ order by 2
 00:SCAN HBASE [functional_hbase.alltypessmall]
    predicates: id < 5
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: sum(float_col) ASC
 |
@@ -114,6 +130,8 @@ from functional_hbase.alltypessmall
 group by 1
 order by 2,3 desc
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:SORT
 |  order by: sum(float_col) ASC, min(float_col) DESC
 |
@@ -123,6 +141,8 @@ order by 2,3 desc
 |
 00:SCAN HBASE [functional_hbase.alltypessmall]
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: sum(float_col) ASC, min(float_col) DESC
 |
@@ -144,12 +164,16 @@ order by 2,3 desc
 # Test that the sort is on int_col and not on the id column
 select int_col as id from functional.alltypessmall order by id
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:SORT
 |  order by: int_col ASC
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 02:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: int_col ASC
 |
@@ -162,12 +186,16 @@ select int_col as id from functional.alltypessmall order by id
 # Test that the sort is on id and not on int_col
 select int_col as id from functional.alltypessmall order by functional.alltypessmall.id
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:SORT
 |  order by: id ASC
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 02:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: id ASC
 |
@@ -184,6 +212,8 @@ select int_col, bigint_col from
    select * from functional.alltypessmall) t
 order by int_col desc offset 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:SORT
 |  order by: int_col DESC
 |
@@ -195,6 +225,8 @@ order by int_col desc offset 5
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 04:MERGING-EXCHANGE [UNPARTITIONED]
 |  offset: 5
 |  order by: int_col DESC
@@ -217,6 +249,8 @@ select int_col, bigint_col from
    select * from functional.alltypessmall) t
 order by int_col desc offset 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:SORT
 |  order by: int_col DESC
 |
@@ -231,6 +265,8 @@ order by int_col desc offset 5
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 07:MERGING-EXCHANGE [UNPARTITIONED]
 |  offset: 5
 |  order by: int_col DESC
@@ -259,6 +295,8 @@ select j.*, d.* from functional.JoinTbl j full outer join functional.DimTbl d
 on (j.test_id = d.id)
 order by j.test_id, j.test_name, j.test_zip, j.alltypes_id, d.name
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:SORT
 |  order by: test_id ASC, test_name ASC, test_zip ASC, alltypes_id ASC, name ASC
 |
@@ -271,6 +309,8 @@ order by j.test_id, j.test_name, j.test_zip, j.alltypes_id, d.name
 00:SCAN HDFS [functional.jointbl j]
    partitions=1/1 files=1 size=433B
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 06:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: test_id ASC, test_name ASC, test_zip ASC, alltypes_id ASC, name ASC
 |
@@ -303,6 +343,8 @@ and c.string_col < '7'
 and a.int_col + b.float_col + cast(c.string_col as float) < 1000
 order by c.string_col desc, a.smallint_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:SORT
 |  order by: string_col DESC, smallint_col ASC
 |
@@ -329,6 +371,8 @@ order by c.string_col desc, a.smallint_col
    predicates: c.string_col < '7'
    runtime filters: RF000 -> c.id
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 09:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: string_col DESC, smallint_col ASC
 |
@@ -370,6 +414,8 @@ from functional.alltypesagg
 group by 1
 order by avg(tinyint_col)
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:SORT
 |  order by: avg(tinyint_col) ASC
 |
@@ -380,6 +426,8 @@ order by avg(tinyint_col)
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: avg(tinyint_col) ASC
 |
@@ -406,6 +454,8 @@ left outer join functional.alltypessmall t2
   on (t1.int_col = t2.int_col)
 order by t1.id,t2.id
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:SORT
 |  order by: id ASC, id ASC
 |
@@ -418,6 +468,8 @@ order by t1.id,t2.id
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: id ASC, id ASC
 |
@@ -440,6 +492,8 @@ select t1.id, t2.id from functional.alltypestiny t1 cross join functional.alltyp
 where (t1.id < 3 and t2.id < 3)
 order by t1.id, t2.id
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:SORT
 |  order by: id ASC, id ASC
 |
@@ -453,6 +507,8 @@ order by t1.id, t2.id
    partitions=4/4 files=4 size=460B
    predicates: t1.id < 3
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: id ASC, id ASC
 |
@@ -481,6 +537,8 @@ union distinct
 (select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month from functional.alltypestiny where year=2009 and month=2)
 order by 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:SORT
 |  order by: id ASC
 |
@@ -501,6 +559,8 @@ order by 1
 01:SCAN HDFS [functional.alltypestiny]
    partitions=1/4 files=1 size=115B
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 09:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: id ASC
 |
@@ -538,6 +598,8 @@ union all
 (select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month from functional.alltypestiny where year=2009 and month=2)
 order by 1,2
 ---- PLAN
+PLAN-ROOT SINK
+|
 07:SORT
 |  order by: id ASC, bool_col ASC
 |
@@ -560,6 +622,8 @@ order by 1,2
 05:SCAN HDFS [functional.alltypestiny]
    partitions=1/4 files=1 size=115B
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 10:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: id ASC, bool_col ASC
 |
@@ -611,6 +675,8 @@ union all
    order by 1 limit 3)
 order by 12, 13, 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 16:SORT
 |  order by: year ASC, month ASC, id ASC
 |
@@ -658,6 +724,8 @@ order by 12, 13, 1
 10:SCAN HDFS [functional.alltypestiny]
    partitions=1/4 files=1 size=115B
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 23:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: year ASC, month ASC, id ASC
 |
@@ -729,6 +797,8 @@ order by 12, 13, 1
 select * from (select * from functional.alltypes order by bigint_col limit 10) t
 order by int_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:SORT
 |  order by: int_col ASC
 |
@@ -738,6 +808,8 @@ order by int_col
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 02:SORT
 |  order by: int_col ASC
 |
@@ -758,6 +830,8 @@ select * from
    (select * from functional.alltypessmall) order by bigint_col limit 10) t
 order by int_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:SORT
 |  order by: int_col ASC
 |
@@ -772,6 +846,8 @@ order by int_col
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 04:SORT
 |  order by: int_col ASC
 |
@@ -796,6 +872,8 @@ select * from
 (select * from functional.alltypes order by bigint_col) A
 join B on (A.string_col = B.string_col)
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: functional.alltypes.string_col = functional.alltypes.string_col
 |  runtime filters: RF000 <- functional.alltypes.string_col
@@ -807,6 +885,8 @@ join B on (A.string_col = B.string_col)
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> functional.alltypes.string_col
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:EXCHANGE [UNPARTITIONED]
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
@@ -863,6 +943,8 @@ select * from functional.alltypes
    union all
 select * from functional.alltypessmall order by bigint_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:UNION
 |
 |--02:SCAN HDFS [functional.alltypessmall]
@@ -871,6 +953,8 @@ select * from functional.alltypessmall order by bigint_col
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 03:EXCHANGE [UNPARTITIONED]
 |
 00:UNION
@@ -886,6 +970,8 @@ select * from functional.alltypes
    union all
 (select * from functional.alltypessmall) order by bigint_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:SORT
 |  order by: bigint_col ASC
 |
@@ -897,6 +983,8 @@ select * from functional.alltypes
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 04:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: bigint_col ASC
 |
@@ -917,6 +1005,8 @@ select int_col from functional.alltypes order by int_col
  union (select int_col from functional.alltypes order by int_col limit 10 offset 5)
 order by int_col offset 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 07:SORT
 |  order by: int_col ASC
 |
@@ -940,6 +1030,8 @@ order by int_col offset 5
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 14:MERGING-EXCHANGE [UNPARTITIONED]
 |  offset: 5
 |  order by: int_col ASC
@@ -1003,6 +1095,8 @@ select * from
   having sum(float_col) > 10) t3
 order by x
 ---- PLAN
+PLAN-ROOT SINK
+|
 11:SORT
 |  order by: x ASC
 |
@@ -1040,6 +1134,8 @@ order by x
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 11:SORT
 |  order by: x ASC
 |
@@ -1098,6 +1194,8 @@ select int_col from
   (select int_col, bigint_col from functional.alltypesagg)
 order by bigint_col limit 10) A
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:TOP-N [LIMIT=10]
 |  order by: bigint_col ASC
 |
@@ -1109,6 +1207,8 @@ order by bigint_col limit 10) A
 01:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 04:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: bigint_col ASC
 |  limit: 10
@@ -1127,6 +1227,8 @@ order by bigint_col limit 10) A
 # Sort node is unnecessary (IMPALA-1148).
 select 1 from functional.alltypes order by 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
@@ -1135,6 +1237,8 @@ select a from
   (select 1 as a, int_col, bigint_col
    from functional.alltypes order by 1 limit 1) v
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    limit: 1

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
index c6ccc81..f3e43bd 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
@@ -27,6 +27,8 @@ and t1.zip + t2.zip = 10
 # join predicate between t1, t2 and t3 applied after last join
 and t1.zip + t2.zip + t3.zip= 20
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t1.id = t3.id
 |  other join predicates: t2.id = 15, t1.id - t2.id = 0
@@ -49,6 +51,8 @@ and t1.zip + t2.zip + t3.zip= 20
    partitions=1/1 files=0 size=0B
    predicates: t1.id > 0
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 07:EXCHANGE [UNPARTITIONED]
 |
 04:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
@@ -87,6 +91,8 @@ from (select * from functional.testtbl a1) t1
 where t1.id > 0 and t2.id is null and t3.id is not null
 and t1.zip + t2.zip = 10 and t1.zip + t2.zip + t3.zip= 20
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a1.id = a3.id
 |  other join predicates: a2.id = 15, a1.id - a2.id = 0
@@ -109,6 +115,8 @@ and t1.zip + t2.zip = 10 and t1.zip + t2.zip + t3.zip= 20
    partitions=1/1 files=0 size=0B
    predicates: a1.id > 0
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 07:EXCHANGE [UNPARTITIONED]
 |
 04:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
@@ -145,6 +153,8 @@ from functional.testtbl t1
     t1.id = t2.id and t1.id = 17)
   join functional.testtbl t3 on (t1.id = t3.id)
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t3.id
 |  runtime filters: RF000 <- t3.id
@@ -195,6 +205,8 @@ and t1.zip + t2.zip = 10
 # join predicate between t1, t2 and t3 applied after last join
 and t1.zip + t2.zip + t3.zip= 20
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: t1.id = t3.id
 |  other join predicates: t3.zip = 94720, t1.id - t2.id = 0
@@ -219,6 +231,8 @@ and t1.zip + t2.zip + t3.zip= 20
    predicates: t1.id IS NOT NULL, t1.id > 0
    runtime filters: RF000 -> t1.id, RF001 -> t1.id - 1
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 09:EXCHANGE [UNPARTITIONED]
 |
 04:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
@@ -262,6 +276,8 @@ from (select * from functional.testtbl a1) t1 right outer join (select * from fu
 where t1.id > 0 and t2.id is null and t3.id is not null
 and t1.zip + t2.zip = 10 and t1.zip + t2.zip + t3.zip= 20
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: a1.id = a3.id
 |  other join predicates: a3.zip = 94720, a1.id - a2.id = 0
@@ -286,6 +302,8 @@ and t1.zip + t2.zip = 10 and t1.zip + t2.zip + t3.zip= 20
    predicates: a1.id IS NOT NULL, a1.id > 0
    runtime filters: RF000 -> a1.id, RF001 -> a1.id - 1
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 09:EXCHANGE [UNPARTITIONED]
 |
 04:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
@@ -327,6 +345,8 @@ from functional.alltypesagg a
 right outer join functional.alltypestiny b on (a.tinyint_col = b.id)
 where a.tinyint_col is null
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: a.tinyint_col = b.id
 |  other predicates: a.tinyint_col IS NULL
@@ -339,6 +359,8 @@ where a.tinyint_col is null
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> a.tinyint_col
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:EXCHANGE [UNPARTITIONED]
 |
 02:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
@@ -363,6 +385,8 @@ from functional.alltypesagg a
 full outer join functional.alltypestiny b on (a.tinyint_col = b.id)
 where a.tinyint_col is null
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [FULL OUTER JOIN]
 |  hash predicates: a.tinyint_col = b.id
 |  other predicates: a.tinyint_col IS NULL
@@ -373,6 +397,8 @@ where a.tinyint_col is null
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:EXCHANGE [UNPARTITIONED]
 |
 02:HASH JOIN [FULL OUTER JOIN, PARTITIONED]
@@ -398,6 +424,8 @@ from functional.alltypes a full outer join functional.alltypes b
 # also to the full outer join
 where b.bigint_col > 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [FULL OUTER JOIN]
 |  hash predicates: a.id = b.id
 |  other join predicates: a.int_col < 10, b.tinyint_col != 5
@@ -420,6 +448,8 @@ inner join functional.alltypes c
 # first full outer join
 where b.tinyint_col > 20
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: c.int_col = a.int_col
 |  other predicates: a.tinyint_col < 10
@@ -453,6 +483,8 @@ full outer join functional.alltypes c
 # re-assigned to the full outer join.
 where a.smallint_col = 100 and a.float_col > b.float_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [FULL OUTER JOIN]
 |  hash predicates: c.int_col = a.int_col
 |  other join predicates: a.bigint_col < 10, a.tinyint_col < b.tinyint_col
@@ -487,6 +519,8 @@ full outer join functional.alltypes d
 # predicate on b from the where clause is assigned to the first full outer join
 where a.bool_col = false and a.float_col < b.float_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 07:HASH JOIN [FULL OUTER JOIN]
 |  hash predicates: a.tinyint_col = d.tinyint_col
 |  other join predicates: b.int_col < 20
@@ -536,6 +570,8 @@ from (
 # tuple ids
 where x != y
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:HASH JOIN [FULL OUTER JOIN]
 |  hash predicates: c.id = d.id
 |  other join predicates: a.bigint_col > b.bigint_col
@@ -575,6 +611,8 @@ full outer join functional.alltypes c
 # that materializes the corresponding tuple ids
 where a.bigint_col = b.bigint_col and a.tinyint_col < b.tinyint_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [FULL OUTER JOIN]
 |  hash predicates: b.int_col = c.int_col
 |  other join predicates: c.int_col < 10
@@ -606,6 +644,8 @@ full outer join functional.alltypes d
   on (b.string_col = d.string_col and a.tinyint_col < b.tinyint_col)
 where a.float_col = b.float_col and b.smallint_col = 1 and d.tinyint_col < 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:HASH JOIN [FULL OUTER JOIN]
 |  hash predicates: b.string_col = d.string_col
 |  other join predicates: a.tinyint_col < b.tinyint_col
@@ -645,6 +685,8 @@ group by a.bool_col, a.int_col, b.bool_col, b.int_col
 having a.bool_col is null and a.int_col is not null
   and b.bool_col is null and b.int_col is not null
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: sum(b.double_col)
 |  group by: a.bool_col, a.int_col, b.bool_col, b.int_col
@@ -669,6 +711,8 @@ group by a.bool_col, a.int_col, b.bool_col, b.int_col
 having a.bool_col is null and a.int_col is not null
   and b.bool_col is null and b.int_col is not null
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: sum(b.double_col)
 |  group by: a.bool_col, a.int_col, b.bool_col, b.int_col
@@ -697,6 +741,8 @@ where (
     a.timestamp_col
   end) >= cast('2001-01-01 00:00:00' as timestamp);
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: b.id = a.id
 |  runtime filters: RF000 <- a.id
@@ -717,6 +763,8 @@ left outer join functional.alltypestiny b
 inner join functional.alltypestiny c
   on b.id = c.id and b.int_col < 0 and a.int_col > 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: c.id = b.id
 |  other predicates: b.int_col < 0
@@ -744,6 +792,8 @@ right outer join functional.alltypestiny b
 inner join functional.alltypestiny c
   on b.id = c.id and b.int_col < 0 and a.int_col > 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: c.id = b.id
 |  other predicates: a.int_col > 10
@@ -773,6 +823,8 @@ full outer join functional.alltypestiny b
 inner join functional.alltypestiny c
   on b.id = c.id and b.int_col < 0 and a.int_col > 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: b.id = c.id
 |  other predicates: a.int_col > 10, b.int_col < 0
@@ -805,6 +857,8 @@ inner join functional.alltypestiny d
 full outer join functional.alltypestiny e
   on d.id = e.id
 ---- PLAN
+PLAN-ROOT SINK
+|
 09:HASH JOIN [FULL OUTER JOIN]
 |  hash predicates: e.id = d.id
 |

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test b/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test
index c99a90b..deda7e9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test
@@ -1,12 +1,16 @@
 # Test with aggregate expressions which ignore the distinct keyword.
 select min(month), max(year), ndv(day) from functional.alltypesagg
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: min(month), max(year), ndv(day)
 |
 00:UNION
    constant-operands=11
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: min(month), max(year), ndv(day)
 |
@@ -16,6 +20,8 @@ select min(month), max(year), ndv(day) from functional.alltypesagg
 # Test with explicit distinct keyword.
 select count(distinct year), ndv(day) from functional.alltypesagg
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:AGGREGATE [FINALIZE]
 |  output: count(year), ndv:merge(day)
 |
@@ -26,6 +32,8 @@ select count(distinct year), ndv(day) from functional.alltypesagg
 00:UNION
    constant-operands=11
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 02:AGGREGATE [FINALIZE]
 |  output: count(year), ndv:merge(day)
 |
@@ -39,6 +47,8 @@ select count(distinct year), ndv(day) from functional.alltypesagg
 # Test static partition pruning.
 select min(month), max(day) from functional.alltypesagg where year = 2010 and day = 1;
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: min(month), max(day)
 |
@@ -50,6 +60,8 @@ select c1, c2 from
   (select min(year) c1, max(month) c2, count(int_col) c3
    from functional.alltypes where year = 2000) t;
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: min(year), max(month)
 |
@@ -58,6 +70,8 @@ select c1, c2 from
 # Test with group by and having clauses.
 select ndv(month) from functional.alltypesagg group by year having max(day)=10
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: ndv(month), max(day)
 |  group by: year
@@ -69,6 +83,8 @@ select ndv(month) from functional.alltypesagg group by year having max(day)=10
 # Test with group-by clauses (no aggregate expressions) only.
 select month from functional.alltypes group by month
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  group by: month
 |
@@ -78,6 +94,8 @@ select month from functional.alltypes group by month
 # Test with distinct select list.
 select distinct month from functional.alltypes where month % 2 = 0
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  group by: month
 |
@@ -89,6 +107,8 @@ select min(a.month)
 from functional.alltypes as a, functional.alltypesagg as b
 where a.year = b.year
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: min(a.month)
 |
@@ -101,6 +121,8 @@ where a.year = b.year
 00:UNION
    constant-operands=24
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: min(a.month)
 |
@@ -122,6 +144,8 @@ select * from
   (select year, count(month) from functional.alltypes group by year) b
 on (a.year = b.year)
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: year = year
 |  runtime filters: RF000 <- year
@@ -144,6 +168,8 @@ on (a.year = b.year)
 select min(a.year), ndv(b.timestamp_col) from
 functional.alltypes a, functional.alltypesnopart b
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: min(a.year), ndv(b.timestamp_col)
 |
@@ -161,6 +187,8 @@ select c1, c2 from
   (select ndv(a.year + b.year) c1, min(a.month + b.month) c2, count(a.int_col) c3 from
    functional.alltypes a, functional.alltypesagg b) t
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: ndv(a.year + b.year), min(a.month + b.month)
 |
@@ -179,6 +207,8 @@ from functional.alltypestiny t1 inner join
           min(t2.year) as int_col from functional.alltypestiny t2) t3
 on (t1.int_col = t3.int_col)
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.int_col = min(t2.year)
 |  runtime filters: RF000 <- min(t2.year)
@@ -198,6 +228,8 @@ with c1 as (select distinct month from functional.alltypes),
      c2 as (select distinct year from functional.alltypes)
 select ndv(month) from (select * from c1 union all select * from c2) t
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:AGGREGATE [FINALIZE]
 |  output: ndv(month)
 |
@@ -218,6 +250,8 @@ select ndv(month) from (select * from c1 union all select * from c2) t
 # If slots other than partition keys are accessed, make sure scan nodes are generated.
 select date_string_col, min(month) from functional.alltypes group by date_string_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: min(month)
 |  group by: date_string_col
@@ -228,6 +262,8 @@ select date_string_col, min(month) from functional.alltypes group by date_string
 # Make sure non-distinct aggregation functions will generate scan nodes.
 select count(month) from functional.alltypes
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: count(month)
 |
@@ -237,6 +273,8 @@ select count(month) from functional.alltypes
 # Make sure that queries without any aggregation will generate scan nodes.
 select month from functional.alltypes order by year
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:SORT
 |  order by: year ASC
 |

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test b/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
index 605f157..eeb9b97 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
@@ -3,6 +3,8 @@ select straight_join count(*)
 from functional.alltypes a join functional.alltypes b on (a.double_col = b.bigint_col)
 where b.bigint_col div 2 = 0
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -26,6 +28,8 @@ from
    inner join functional.alltypessmall b on (a.cnt = b.id)
 where b.id < 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: count(id) = b.id
 |
@@ -45,6 +49,8 @@ where b.id < 10
 select count(*) from functional.alltypes
 where month = id and id = int_col and tinyint_col = int_col and int_col < 2
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -60,6 +66,8 @@ from
    left outer join (select id, string_col from functional.alltypes) b
    on (a.id = b.id and a.string_col = 'a' and b.string_col = 'b')
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: id = id
 |  other join predicates: string_col = 'a'
@@ -80,6 +88,8 @@ from
    on (a.id = b.id)
 where a.string_col = 'a' and b.string_col = 'b'
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: id = id
 |  other predicates: string_col = 'b'
@@ -99,6 +109,8 @@ from
    cross join (select id, string_col from functional.alltypes) b
 where a.string_col = 'a' and b.string_col = 'b'
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:NESTED LOOP JOIN [CROSS JOIN]
 |
 |--01:SCAN HDFS [functional.alltypes]
@@ -109,6 +121,8 @@ where a.string_col = 'a' and b.string_col = 'b'
    partitions=24/24 files=24 size=478.45KB
    predicates: functional.alltypes.string_col = 'a'
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 04:EXCHANGE [UNPARTITIONED]
 |
 02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
@@ -135,6 +149,8 @@ where c1 > 0
 order by 2, 1 desc
 limit 3
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:TOP-N [LIMIT=3]
 |  order by: c2 ASC, c1 DESC
 |
@@ -163,6 +179,8 @@ where c1 > 0
 order by 2, 1 desc
 limit 3
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:TOP-N [LIMIT=3]
 |  order by: c2 ASC, c1 DESC
 |
@@ -189,6 +207,8 @@ from functional.alltypes a
 where a.year = 2009 and b.month + 2 <= 4 and b.id = 17
   and cast(sin(c.int_col) as boolean) = true
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: b.id = c.id, b.month = c.month, b.year = c.year, b.smallint_col = c.int_col
 |  runtime filters: RF000 <- c.id, RF001 <- c.month, RF002 <- c.year, RF003 <- c.int_col
@@ -221,6 +241,8 @@ NODE 2:
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypestiny/year=2009/month=1/090101.txt 0:115
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypestiny/year=2009/month=2/090201.txt 0:115
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 08:EXCHANGE [UNPARTITIONED]
 |
 04:HASH JOIN [INNER JOIN, PARTITIONED]
@@ -262,6 +284,8 @@ from (select * from functional.alltypes) a
 where a.year = 2009 and b.month + 2 <= 4 and b.id = 17
   and cast(sin(c.int_col) as boolean) = true
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: functional.alltypessmall.id = functional.alltypestiny.id, functional.alltypessmall.month = functional.alltypestiny.month, functional.alltypessmall.year = functional.alltypestiny.year, functional.alltypessmall.smallint_col = functional.alltypestiny.int_col
 |  runtime filters: RF000 <- functional.alltypestiny.id, RF001 <- functional.alltypestiny.month, RF002 <- functional.alltypestiny.year, RF003 <- functional.alltypestiny.int_col
@@ -294,6 +318,8 @@ NODE 2:
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypestiny/year=2009/month=1/090101.txt 0:115
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypestiny/year=2009/month=2/090201.txt 0:115
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 08:EXCHANGE [UNPARTITIONED]
 |
 04:HASH JOIN [INNER JOIN, PARTITIONED]
@@ -340,6 +366,8 @@ from functional.alltypes a
    and a.month = b.month and b.month + 1 = 2)
 where a.year = 2009 and a.tinyint_col = 7 and a.id is null and b.id = 17 and b.int_col is null
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a.id = b.id, a.month = b.month, a.tinyint_col = b.tinyint_col, a.year = b.year
 |  other predicates: b.int_col IS NULL, b.id = 17
@@ -368,6 +396,8 @@ NODE 0:
 NODE 1:
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypessmall/year=2009/month=1/090101.txt 0:1610
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 04:EXCHANGE [UNPARTITIONED]
 |
 02:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
@@ -400,6 +430,8 @@ from functional.alltypessmall a
    and a.month = b.month and a.month + 1 = 2)
 where b.year = 2009 and b.tinyint_col = 7 and b.id is null and a.id = 17 and a.int_col is null
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: a.id = b.id, a.month = b.month, a.tinyint_col = b.tinyint_col, a.year = b.year
 |  other predicates: a.int_col IS NULL, a.id = 17
@@ -414,6 +446,8 @@ where b.year = 2009 and b.tinyint_col = 7 and b.id is null and a.id = 17 and a.i
    predicates: a.id = 17, a.tinyint_col = 7
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.tinyint_col, RF003 -> a.year
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:EXCHANGE [UNPARTITIONED]
 |
 02:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
@@ -448,6 +482,8 @@ from functional.alltypes a
   on (a.id = b.id and a.tinyint_col = b.int_col and a.year = b.year and a.month = b.month)
 where a.year = 2009 and b.month <= 2 and b.count_col + 1 = 17 and a.tinyint_col != 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = id, a.month = month, a.year = year, a.tinyint_col = int_col
 |  runtime filters: RF000 <- id, RF001 <- month, RF002 <- year, RF003 <- int_col
@@ -473,6 +509,8 @@ NODE 1:
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypessmall/year=2009/month=1/090101.txt 0:1610
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypessmall/year=2009/month=2/090201.txt 0:1621
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 07:EXCHANGE [UNPARTITIONED]
 |
 03:HASH JOIN [INNER JOIN, BROADCAST]
@@ -518,6 +556,8 @@ where a.id = b.id and
       b.count_col + 1 = 17 and
       a.tinyint_col != 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = id, a.month = month, a.year = year, a.tinyint_col = int_col
 |  runtime filters: RF000 <- id, RF001 <- month, RF002 <- year, RF003 <- int_col
@@ -536,6 +576,8 @@ where a.id = b.id and
    predicates: a.id > 11, a.tinyint_col != 5
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.year, RF003 -> a.tinyint_col
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 07:EXCHANGE [UNPARTITIONED]
 |
 03:HASH JOIN [INNER JOIN, BROADCAST]
@@ -579,6 +621,8 @@ from functional.alltypes a
   on (a.id = b.id and a.tinyint_col = b.int_col and a.year = b.year and a.month = b.month)
 where a.year = 2009 and b.month <= 2 and b.count_col + 1 = 17 and a.tinyint_col != 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = id, a.month = month, a.year = year, a.tinyint_col = int_col
 |  runtime filters: RF000 <- id, RF001 <- month, RF002 <- year, RF003 <- int_col
@@ -609,6 +653,8 @@ NODE 1:
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypessmall/year=2009/month=3/090301.txt 0:1620
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypessmall/year=2009/month=4/090401.txt 0:1621
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 09:EXCHANGE [UNPARTITIONED]
 |
 04:HASH JOIN [INNER JOIN, BROADCAST]
@@ -661,6 +707,8 @@ where a.year = 2009 and
       a.year = b.year and
       a.month = b.month
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = id, a.month = month, a.year = year, a.tinyint_col = int_col
 |  runtime filters: RF000 <- id, RF001 <- month, RF002 <- year, RF003 <- int_col
@@ -682,6 +730,8 @@ where a.year = 2009 and
    predicates: a.id > 11, a.tinyint_col != 5
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.year, RF003 -> a.tinyint_col
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 09:EXCHANGE [UNPARTITIONED]
 |
 04:HASH JOIN [INNER JOIN, BROADCAST]
@@ -724,6 +774,8 @@ on (x.id = z.id)
 where x.year = 2009
 and z.month = 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: functional.alltypes.id = functional.alltypesagg.id
 |  runtime filters: RF000 <- functional.alltypesagg.id
@@ -756,6 +808,8 @@ where x.year = 2009
 and z.month = 1
 and x.id + x.b_id = 17
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = functional.alltypesagg.id
 |  runtime filters: RF000 <- functional.alltypesagg.id
@@ -785,6 +839,8 @@ from functional.alltypes a left outer join
 (select id, int_col from functional.alltypes group by 1, 2) b on (a.id = b.id)
 where a.id is null and isnull(b.id, 0) = 0 and b.int_col = 17
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a.id = id
 |  other predicates: int_col = 17, isnull(id, 0) = 0
@@ -807,6 +863,8 @@ from functional.alltypes a left outer join
 on (a.id = b.id)
 where isnull(a.id, 0) = 0 and b.id is null  and b.int_col = 17
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a.id = id
 |  other predicates: id IS NULL, int_col = 17
@@ -837,6 +895,8 @@ from
 right outer join functional.alltypes a on (a.id = b.id)
 where a.id is null and isnull(b.id, 0) = 0 and b.int_col = 17
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: id = a.id
 |  other predicates: int_col = 17, isnull(id, 0) = 0
@@ -861,6 +921,8 @@ from
 right outer join functional.alltypes a on (a.id = b.id)
 where isnull(a.id, 0) = 0 and b.id is null  and b.int_col = 17
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: id = a.id
 |  other predicates: id IS NULL, int_col = 17
@@ -893,6 +955,8 @@ select straight_join a.string_col from functional.alltypes a
 full outer join (select * from functional.alltypessmall where id > 0) b
 ON a.id=b.id
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [FULL OUTER JOIN]
 |  hash predicates: a.id = functional.alltypessmall.id
 |
@@ -916,6 +980,8 @@ left outer join
    where x.id is null) b
 on a.id=b.id
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a.id = x.id
 |
@@ -942,6 +1008,8 @@ from functional.alltypes
 group by bool_col, int_col
 having bool_col = false and int_col > 0 and count(bigint_col) > 0
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: count(bigint_col)
 |  group by: bool_col, int_col
@@ -968,6 +1036,8 @@ and t2.id + t2.smallint_col + t2.bigint_col > 30
 # assigned in join, TODO: propagate multi-tuple predicates
 and t2.id + t3.int_col > 40
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t2.bigint_col = functional.alltypestiny.bigint_col, t2.id = functional.alltypestiny.id, t2.smallint_col = functional.alltypestiny.int_col
 |  other predicates: t2.id + functional.alltypestiny.int_col > 40
@@ -1009,6 +1079,8 @@ t1.id + t1.tinyint_col > 20
 # assigned in agg nodes in t2 and t3
 and t2.y + t2.z > 30
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: max(smallint_col) = max(smallint_col), min(int_col) = min(int_col)
 |  runtime filters: RF000 <- max(smallint_col)
@@ -1056,6 +1128,8 @@ and ifnull(t3.tinyint_col + t3.bigint_col, true) = true
 # assigned in scan of t1, t2 and t3
 and t1.id * t1.int_col < 100
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t2.id = functional.alltypestiny.id, t2.int_col = functional.alltypestiny.int_col
 |  other predicates: functional.alltypestiny.tinyint_col + functional.alltypestiny.smallint_col + functional.alltypestiny.int_col > 10, ifnull(functional.alltypestiny.tinyint_col + functional.alltypestiny.bigint_col, TRUE) = TRUE
@@ -1084,6 +1158,8 @@ functional.alltypes t1 inner join functional.alltypessmall t2
 on (t1.id = t2.month and t1.year = t2.year and t1.month = t2.month)
 where t2.year + t2.month > 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.month, t1.year = t2.year
 |  runtime filters: RF000 <- t2.month, RF001 <- t2.year
@@ -1107,6 +1183,8 @@ on (t1.id = t2.id
     and t1.tinyint_col = t2.int_col)
 where t1.id + t1.tinyint_col > 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id, t1.tinyint_col = t2.tinyint_col
 |  runtime filters: RF000 <- t2.id, RF001 <- t2.tinyint_col
@@ -1132,6 +1210,8 @@ inner join (select bigint_col, min(int_col) x, max(int_col) y
 on (t1.id = t2.bigint_col and t1.int_col = t2.x)
 where t1.id + t1.int_col > 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.int_col = min(int_col), t1.id = bigint_col
 |  runtime filters: RF000 <- min(int_col), RF001 <- bigint_col
@@ -1157,6 +1237,8 @@ left anti join
 on (a.id = b.id)
 where a.id < 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [LEFT ANTI JOIN]
 |  hash predicates: a.id = id
 |
@@ -1176,6 +1258,8 @@ right anti join functional.alltypestiny b
 on (a.id = b.id)
 where b.id < 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [RIGHT ANTI JOIN]
 |  hash predicates: id = b.id
 |
@@ -1197,6 +1281,8 @@ select * from
    group by j.int_col) v
 where v.int_col = 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: sum(a.tinyint_col)
 |  group by: b.int_col
@@ -1223,6 +1309,8 @@ select * from
     on a.id = b.id) j) v
 where v.int_col = 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: b.id = a.id
 |  other predicates: b.int_col = 10
@@ -1245,6 +1333,8 @@ SELECT count(*) FROM
   WHERE n_name = 'BRAZIL' AND n_regionkey = 1 AND c_custkey % 2 = 0) cn
  LEFT OUTER JOIN tpch_parquet.region r ON n_regionkey = r_regionkey
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
 |

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
index f1abd1d..446aead 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
@@ -2,6 +2,8 @@
 select straight_join * from functional.alltypesagg t1, functional.alltypesnopart t2
 where t1.year = t2.int_col and t2.id < 10 and t1.id = 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.int_col
 |  runtime filters: RF000 <- t2.int_col
@@ -15,6 +17,8 @@ where t1.year = t2.int_col and t2.id < 10 and t1.id = 10
    predicates: t1.id = 10
    runtime filters: RF000 -> t1.year
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 04:EXCHANGE [UNPARTITIONED]
 |
 02:HASH JOIN [INNER JOIN, BROADCAST]
@@ -38,6 +42,8 @@ select straight_join * from functional.alltypestiny t1, functional.alltypesagg t
 where t1.year = t2.int_col and t3.tinyint_col = t2.id and t3.month = t4.id and
   t2.bool_col = true and t4.bigint_col < 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: t3.month = t4.id
 |  runtime filters: RF000 <- t4.id
@@ -71,6 +77,8 @@ where t1.year = t2.int_col and t3.tinyint_col = t2.id and t3.month = t4.id and
 select straight_join * from functional.alltypesagg t1, functional.alltypesnopart t2
 where t1.year = t2.int_col and t1.month = t2.bigint_col and t2.id = 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.int_col, t1.month = t2.bigint_col
 |  runtime filters: RF000 <- t2.int_col, RF001 <- t2.bigint_col
@@ -88,6 +96,8 @@ select straight_join * from functional.alltypesagg t1,
   (select * from functional.alltypesnopart t2 where t2.id = 1) v
 where t1.year = v.int_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.int_col
 |  runtime filters: RF000 <- t2.int_col
@@ -107,6 +117,8 @@ select straight_join * from functional.alltypesagg t1,
    where t2.bigint_col < 10) v
 where v.id1 = t1.year
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = id + int_col
 |  runtime filters: RF000 <- id + int_col
@@ -123,6 +135,8 @@ where v.id1 = t1.year
 select straight_join * from functional.alltypesagg t1, functional.alltypesnopart t2
 where t1.year + 1 = t2.id and t2.int_col < 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year + 1 = t2.id
 |  runtime filters: RF000 <- t2.id
@@ -141,6 +155,8 @@ where t1.id = t2.id and t1.year + t2.int_col = t1.month + t2.tinyint_col
 and t1.year = t1.month + t2.int_col and t1.year + t2.smallint_col = t2.tinyint_col
 and t1.int_col = 1 and 1 = t2.bigint_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id
 |  other predicates: t1.year + t2.smallint_col = t2.tinyint_col, t1.year = t1.month + t2.int_col, t1.year + t2.int_col = t1.month + t2.tinyint_col
@@ -161,6 +177,8 @@ select straight_join * from functional.alltypesagg t1, functional.alltypesnopart
 where t1.year + t1.month = t2.id and t1.int_col + 1 - t1.tinyint_col = t2.smallint_col + 10
 and t1.int_col * 100 = t2.bigint_col / 100 and t2.bool_col = false
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year + t1.month = t2.id, t1.int_col * 100 = t2.bigint_col / 100, t1.int_col + 1 - t1.tinyint_col = t2.smallint_col + 10
 |  runtime filters: RF000 <- t2.id, RF001 <- t2.bigint_col / 100, RF002 <- t2.smallint_col + 10
@@ -182,6 +200,8 @@ select straight_join * from
   functional.alltypesnopart t3
 where v.year = t3.int_col and t3.bool_col = true
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year + t2.year = t3.int_col
 |
@@ -207,6 +227,8 @@ select straight_join * from functional.alltypesagg t1,
    having count(int_col) < 10) v
 where v.cnt = t1.year and v.id = t1.month
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.month = id, t1.year = count(int_col)
 |  runtime filters: RF000 <- id, RF001 <- count(int_col)
@@ -230,6 +252,8 @@ select straight_join * from functional.alltypesagg t1,
    functional.alltypesnopart t3 where t2.int_col = t3.int_col) v
 where v.id = t1.year and t1.month = v.tinyint_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.month = t3.tinyint_col, t1.year = t2.id + t3.id
 |  runtime filters: RF000 <- t3.tinyint_col, RF001 <- t2.id + t3.id
@@ -257,6 +281,8 @@ select straight_join * from functional.alltypesagg t1,
    where t2.id = t3.id and t3.int_col = t4.int_col and t4.tinyint_col = t2.tinyint_col) v
 where t1.year = v.int_col and t1.year = v.id and t1.month = v.tinyint_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.int_col, t1.month = t4.tinyint_col
 |  runtime filters: RF000 <- t2.int_col, RF001 <- t4.tinyint_col
@@ -291,6 +317,8 @@ select straight_join * from functional.alltypesagg t1, functional.alltypesnopart
 where t1.year = t2.id and t1.year = t3.int_col and t1.year = t4.tinyint_col and
   t2.bool_col = false and t3.bool_col = true and t4.bigint_col in (1,2)
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t4.tinyint_col
 |  runtime filters: RF000 <- t4.tinyint_col
@@ -327,6 +355,8 @@ select straight_join * from functional.alltypesagg t1, functional.alltypesnopart
 where t1.year = t2.id and t2.int_col = t3.tinyint_col and t3.month = t4.bigint_col
   and t4.smallint_col = t5.smallint_col and t5.id = t1.month
 ---- PLAN
+PLAN-ROOT SINK
+|
 08:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.month = t5.id, t4.smallint_col = t5.smallint_col
 |  runtime filters: RF000 <- t5.id, RF001 <- t5.smallint_col
@@ -368,6 +398,8 @@ select straight_join * from functional.alltypesagg t1 left outer join functional
   on t1.year = t2.int_col
 where t2.id = 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t1.year = t2.int_col
 |  other predicates: t2.id = 1
@@ -385,6 +417,8 @@ select straight_join * from functional.alltypesagg t1 left outer join functional
   on t1.year = t2.int_col
 where t2.id = 2 and t1.month = t2.tinyint_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t1.year = t2.int_col
 |  other predicates: t2.id = 2, t1.month = t2.tinyint_col
@@ -406,6 +440,8 @@ select straight_join * from functional.alltypesagg t1 left outer join functional
   on t1.year = t5.smallint_col
 where t2.id = 1 and t3.int_col = 1 and t4.bool_col = true and t5.bool_col = false
 ---- PLAN
+PLAN-ROOT SINK
+|
 08:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: t1.year = t5.smallint_col
 |  other predicates: t2.id = 1, t3.int_col = 1, t4.bool_col = TRUE
@@ -449,6 +485,8 @@ from functional.alltypesagg t1 right outer join functional.alltypesnopart t2
   on t1.year = t2.int_col and t1.month = 1 and t2.int_col = 10
 where t2.id = 10 and t1.month = t2.tinyint_col and t1.int_col = 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: t1.year = t2.int_col
 |  other join predicates: t2.int_col = 10
@@ -470,6 +508,8 @@ select straight_join * from functional.alltypesagg t1 left semi join functional.
   on t1.month = t3.tinyint_col
 where t3.id = 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: t1.month = t3.tinyint_col
 |  runtime filters: RF000 <- t3.tinyint_col
@@ -494,6 +534,8 @@ select straight_join * from functional.alltypesagg t1
 where t1.year not in (select id from functional.alltypesnopart where int_col = 10)
 and t1.int_col < 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [NULL AWARE LEFT ANTI JOIN]
 |  hash predicates: t1.year = id
 |
@@ -512,6 +554,8 @@ select straight_join * from
   (select id, int_col from functional.alltypesnopart where tinyint_col < 10) v2
 where v1.year = v2.id
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: year = id
 |  runtime filters: RF000 <- id
@@ -535,6 +579,8 @@ select straight_join * from
   functional.alltypes t2
 where v1.cnt = t2.id and t2.int_col = 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: count(*) = t2.id
 |
@@ -559,6 +605,8 @@ select straight_join * from
    functional.alltypesnopart t3
 where v2.year = t3.smallint_col and t3.id = 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: v1.year = t3.smallint_col
 |  runtime filters: RF000 <- t3.smallint_col
@@ -593,6 +641,8 @@ join functional.alltypestiny b on v.year = b.year
 join functional.alltypestiny c on v.year = c.year
 where b.int_col < 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 07:HASH JOIN [INNER JOIN]
 |  hash predicates: year = c.year
 |  runtime filters: RF000 <- c.year
@@ -631,6 +681,8 @@ select straight_join * from
   functional.alltypesnopart t2
 where v1.year = t2.id and v1.int_col = t2.int_col and t2.smallint_col = 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: int_col = t2.int_col, year = t2.id
 |
@@ -651,6 +703,8 @@ select straight_join * from
   functional.alltypesnopart t3
 where v.year = t3.int_col and t3.bool_col = false
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: year = t3.int_col
 |  runtime filters: RF000 <- t3.int_col
@@ -680,6 +734,8 @@ select straight_join count(*) from
   on a.month = b.month
 where b.int_col = 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -716,6 +772,8 @@ select straight_join count(*) from
   on a.month = b.month
 where b.int_col = 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 07:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -751,6 +809,8 @@ union all
 select straight_join t4.id, t3.year from functional.alltypes t3, functional.alltypesnopart t4
 where t3.month = t4.smallint_col and t4.bool_col = true
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:UNION
 |
 |--06:HASH JOIN [INNER JOIN]
@@ -786,6 +846,8 @@ select straight_join count(*) from functional.alltypes a
   on a.id = b.id
 where (b.id - b.id) < 1 AND (b.int_col - b.int_col) < 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -817,6 +879,8 @@ select straight_join * from
    functional.alltypesnopart t3
 where v1.month = t3.tinyint_col and v1.year = t3.id and t3.bool_col = false
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: year = t3.id, month = t3.tinyint_col
 |
@@ -852,6 +916,8 @@ select straight_join * from
    from functional.alltypes) v, functional.alltypestiny v1
 where v.year = v1.int_col and v.year = 2009
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: year = v1.int_col
 |
@@ -887,6 +953,8 @@ select straight_join * from
   ) v3
 where v2.month = v3.intcol1
 ---- PLAN
+PLAN-ROOT SINK
+|
 08:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.month = t4.int_col
 |  runtime filters: RF000 <- t4.int_col
@@ -927,6 +995,8 @@ select straight_join 1 from functional.alltypestiny t1 join functional.alltypest
   join functional.alltypestiny t3 on t2.id = t3.id
 where t3.int_col = 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t2.id = t3.id
 |  runtime filters: RF000 <- t3.id
@@ -952,6 +1022,8 @@ select straight_join 1 from functional.alltypestiny t1 join functional.alltypest
   join functional.alltypestiny t3 on t1.id = t3.id
 where t3.int_col = 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t3.id
 |  runtime filters: RF000 <- t3.id
@@ -979,6 +1051,8 @@ select straight_join 1 from tpch_nested_parquet.customer c,
    on o1.o_orderkey = o2.o_orderkey) v
 where c_custkey = v.o_orderkey
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:SUBPLAN
 |
 |--06:HASH JOIN [INNER JOIN]
@@ -1002,6 +1076,8 @@ from functional.alltypestiny t1 join
   (select * from functional.alltypessmall t2 where false) v on t1.id = v.id
 where v.int_col = 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id
 |  runtime filters: RF000 <- t2.id
@@ -1018,6 +1094,8 @@ select straight_join 1 from
   (select * from functional.alltypestiny where false) v1 join
   (select * from functional.alltypessmall where false) v2 on v1.id = v2.id
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: functional.alltypestiny.id = functional.alltypessmall.id
 |
@@ -1032,6 +1110,8 @@ select straight_join 1 from functional.alltypestiny t1 join functional.alltypest
   join functional.alltypestiny t3 on t1.id = t3.id
   join functional.alltypestiny t4 on t1.id + t2.id = t4.id
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id + t2.id = t4.id
 |  runtime filters: RF000 <- t4.id
@@ -1066,6 +1146,8 @@ select straight_join 1 from functional.alltypestiny a1
   inner join functional.alltypestiny a3 ON a3.smallint_col = a1.int_col
   inner join functional.alltypes a4 ON a4.smallint_col = a3.smallint_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a3.smallint_col = a4.smallint_col
 |  runtime filters: RF000 <- a4.smallint_col
@@ -1096,6 +1178,8 @@ from functional.alltypestiny t1 left join
   on t2.int_col = t1.month
 where t1.month is not null
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:AGGREGATE [FINALIZE]
 |  group by: t1.int_col
 |
@@ -1210,6 +1294,8 @@ from big_six
   inner join big_three
   inner join small_four_2
 ---- PLAN
+PLAN-ROOT SINK
+|
 36:NESTED LOOP JOIN [CROSS JOIN]
 |
 |--28:HASH JOIN [INNER JOIN]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
index 82b8d73..60e9dd9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
@@ -1,9 +1,13 @@
 select * from functional_seq.alltypes t1 limit 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:SCAN HDFS [functional_seq.alltypes t1]
    partitions=24/24 files=24 size=562.59KB
    limit: 5
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 00:SCAN HDFS [functional_seq.alltypes t1]
    partitions=24/24 files=24 size=562.59KB
    limit: 5
@@ -11,10 +15,14 @@ select * from functional_seq.alltypes t1 limit 5
 # Query is over the limit of 8 rows to be optimized, will distribute the query
 select * from functional.alltypes t1 limit 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    limit: 10
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 01:EXCHANGE [UNPARTITIONED]
 |  limit: 10
 |
@@ -25,10 +33,14 @@ select * from functional.alltypes t1 limit 10
 # Query is optimized, run on coordinator only
 select * from functional.alltypes t1 limit 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    limit: 5
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    limit: 5
@@ -36,11 +48,15 @@ select * from functional.alltypes t1 limit 5
 # If a predicate is applied the optimization is disabled
 select * from functional.alltypes t1 where t1.id < 99 limit 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    predicates: t1.id < 99
    limit: 5
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 01:EXCHANGE [UNPARTITIONED]
 |  limit: 5
 |
@@ -52,10 +68,14 @@ select * from functional.alltypes t1 where t1.id < 99 limit 5
 # No optimization for hbase tables
 select * from functional_hbase.alltypes t1 where t1.id < 99 limit 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:SCAN HBASE [functional_hbase.alltypes t1]
    predicates: t1.id < 99
    limit: 5
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 01:EXCHANGE [UNPARTITIONED]
 |  limit: 5
 |
@@ -66,9 +86,13 @@ select * from functional_hbase.alltypes t1 where t1.id < 99 limit 5
 # Applies optimization for small queries in hbase
 select * from functional_hbase.alltypes t1 limit 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:SCAN HBASE [functional_hbase.alltypes t1]
    limit: 5
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 00:SCAN HBASE [functional_hbase.alltypes t1]
    limit: 5
 ====
@@ -122,6 +146,8 @@ select * from functional_hbase.alltypes limit 5
 union all
 select * from functional_hbase.alltypes limit 2
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:UNION
 |
 |--02:SCAN HBASE [functional_hbase.alltypes]
@@ -130,6 +156,8 @@ select * from functional_hbase.alltypes limit 2
 01:SCAN HBASE [functional_hbase.alltypes]
    limit: 5
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 00:UNION
 |
 |--02:SCAN HBASE [functional_hbase.alltypes]
@@ -142,6 +170,8 @@ select * from functional_hbase.alltypes limit 5
 union all
 select * from functional_hbase.alltypes limit 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:UNION
 |
 |--02:SCAN HBASE [functional_hbase.alltypes]
@@ -150,6 +180,8 @@ select * from functional_hbase.alltypes limit 5
 01:SCAN HBASE [functional_hbase.alltypes]
    limit: 5
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 00:UNION
 |
 |--04:EXCHANGE [UNPARTITIONED]
@@ -168,6 +200,8 @@ select * from functional_hbase.alltypes limit 5
 select * from
   functional.testtbl a join functional.testtbl b on a.id = b.id
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 04:EXCHANGE [UNPARTITIONED]
 |
 02:HASH JOIN [INNER JOIN, BROADCAST]
@@ -186,6 +220,8 @@ select * from
 select * from
   functional.testtbl a, functional.testtbl b
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 04:EXCHANGE [UNPARTITIONED]
 |
 02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
@@ -202,6 +238,8 @@ select * from
   functional.alltypestiny a
 where a.id in (select id from functional.alltypestiny limit 5) limit 5
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:EXCHANGE [UNPARTITIONED]
 |  limit: 5
 |
@@ -237,6 +275,8 @@ select id, bool_col
 from functional.alltypestiny c
 where year=2009 and month=2
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 04:UNION
 |
 |--03:AGGREGATE [FINALIZE]
@@ -256,6 +296,8 @@ where year=2009 and month=2
 # IMPALA-2527: Tests that the small query optimization is disabled for colleciton types
 select key from functional.allcomplextypes.map_map_col.value limit 5;
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 01:EXCHANGE [UNPARTITIONED]
 |  limit: 5
 |

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
index 21776ba..060d470 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
@@ -4,6 +4,8 @@ from functional.alltypes
 where id in
   (select id from functional.alltypesagg)
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id
 |  runtime filters: RF000 <- id
@@ -21,6 +23,8 @@ from functional.alltypes
 where id not in
   (select id from functional.alltypesagg)
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [NULL AWARE LEFT ANTI JOIN]
 |  hash predicates: id = id
 |
@@ -39,6 +43,8 @@ where a.int_col not in
    where g.id = a.id and g.bigint_col < a.bigint_col)
 and a.int_col < 100
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [NULL AWARE LEFT ANTI JOIN]
 |  hash predicates: a.int_col = int_col
 |  other join predicates: a.id = g.id, g.bigint_col < a.bigint_col
@@ -56,6 +62,8 @@ select *
 from functional.alltypes a
 where a.id not in (select id from functional.alltypes b where a.id = b.id)
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [NULL AWARE LEFT ANTI JOIN]
 |  hash predicates: a.id = id
 |  other join predicates: a.id = b.id
@@ -73,6 +81,8 @@ where int_col in
   (select int_col from functional.alltypesagg g where a.id = g.id and g.bigint_col < 10)
 and bool_col = false
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -95,6 +105,8 @@ from functional.alltypes t
 where t.int_col + 1 in
   (select int_col + bigint_col from functional.alltypesagg)
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.int_col + 1 = int_col + bigint_col
 |  runtime filters: RF000 <- int_col + bigint_col
@@ -114,6 +126,8 @@ where t.id in
 and t.tinyint_col not in (select tinyint_col from functional.alltypestiny)
 and t.bigint_col < 1000
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [NULL AWARE LEFT ANTI JOIN]
 |  hash predicates: t.tinyint_col = tinyint_col
 |
@@ -139,6 +153,8 @@ from functional.alltypesagg a, functional.alltypes t
 where a.id = t.id and a.int_col in
   (select int_col from functional.alltypestiny where bool_col = false)
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -170,6 +186,8 @@ where a.id in
    where s.int_col = t.int_col and a.bool_col = s.bool_col)
 and a.int_col < 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -201,6 +219,8 @@ where a.id in
   (select id from functional.alltypestiny)
 and t.bool_col = false
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -231,6 +251,8 @@ from functional.alltypes a left outer join
 on a.int_col = t.int_col
 where a.bool_col = false and t.bigint_col < 100
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -262,6 +284,8 @@ where a.int_col in
    from functional.alltypes t, functional.alltypessmall s, functional.alltypestiny n
    where t.id = s.id and s.bigint_col = n.bigint_col and n.bool_col = false)
 ---- PLAN
+PLAN-ROOT SINK
+|
 08:AGGREGATE [FINALIZE]
 |  output: count(id)
 |
@@ -304,6 +328,8 @@ where t.id in
    (select id, count(*) as cnt from functional.alltypessmall group by id) s
    where s.id = a.id and s.cnt = 10)
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.id = a.id
 |  runtime filters: RF000 <- a.id
@@ -332,6 +358,8 @@ with t as (select a.* from functional.alltypes a where id in
   (select id from functional.alltypestiny))
 select * from t where t.bool_col = false and t.int_col = 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id
 |  runtime filters: RF000 <- id
@@ -354,6 +382,8 @@ where s.string_col = t.string_col and t.int_col in
   (select int_col from functional.alltypessmall)
 and s.bool_col = false
 ---- PLAN
+PLAN-ROOT SINK
+|
 08:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.int_col = int_col
 |  runtime filters: RF000 <- int_col
@@ -395,6 +425,8 @@ where id in
   (select id from functional.alltypesagg a where t.int_col = a.int_col)
 and t.bool_col = false
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id, t.int_col = a.int_col
 |  runtime filters: RF000 <- id, RF001 <- a.int_col
@@ -416,6 +448,8 @@ where id in
    and bool_col = false)
 and bigint_col < 1000
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id
 |  runtime filters: RF000 <- id
@@ -446,6 +480,8 @@ where id in
     (select tinyint_col from functional.alltypestiny s
      where s.bigint_col = a.bigint_col))
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id, t.int_col = a.int_col
 |  runtime filters: RF000 <- id, RF001 <- a.int_col
@@ -472,6 +508,8 @@ where id in
   (select id from functional.alltypesagg a where a.int_col in
     (select int_col from functional.alltypestiny s where a.bigint_col = s.bigint_col))
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id
 |  runtime filters: RF000 <- id
@@ -497,6 +535,8 @@ from functional.alltypes
 where id in
   (select id from functional.alltypes where id < 10)
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id
 |  runtime filters: RF000 <- id
@@ -516,6 +556,8 @@ from functional.alltypesagg a inner join functional.alltypes t on t.id = a.id
 where t.int_col < 10 and t.int_col in
   (select int_col from functional.alltypessmall s where s.id = t.id)
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.id = s.id, t.int_col = int_col
 |  runtime filters: RF000 <- s.id, RF001 <- int_col
@@ -543,6 +585,8 @@ from functional.alltypes t
 where exists
   (select * from functional.alltypesagg a where a.id = t.id)
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -567,6 +611,8 @@ where exists
    group by id, int_col, bool_col)
 and tinyint_col < 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.tinyint_col = b.tinyint_col
 |  runtime filters: RF000 <- b.tinyint_col
@@ -588,6 +634,8 @@ from functional.alltypes t
 where not exists
   (select id from functional.alltypesagg a where t.int_col = a.int_col)
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -610,6 +658,8 @@ where not exists
    group by b.id, b.int_col, b.bigint_col)
 and bool_col = false
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -640,6 +690,8 @@ select *
 from functional.alltypestiny t
 where exists (select * from functional.alltypessmall s where s.id < 5)
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:NESTED LOOP JOIN [LEFT SEMI JOIN]
 |
 |--01:SCAN HDFS [functional.alltypessmall s]
@@ -658,6 +710,8 @@ where exists
    from functional.alltypesagg where tinyint_col = 10
    group by id, int_col, bigint_col)
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:NESTED LOOP JOIN [RIGHT SEMI JOIN]
 |
 |--00:SCAN HDFS [functional.alltypestiny t]
@@ -676,6 +730,8 @@ select 1
 from functional.alltypestiny t
 where exists (select * from functional.alltypessmall limit 0)
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:EMPTYSET
 ====
 # Uncorrelated NOT EXISTS
@@ -683,6 +739,8 @@ select *
 from functional.alltypestiny t
 where not exists (select * from functional.alltypessmall s where s.id < 5)
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:NESTED LOOP JOIN [LEFT ANTI JOIN]
 |
 |--01:SCAN HDFS [functional.alltypessmall s]
@@ -701,6 +759,8 @@ select *
 from w1 t
 where not exists (select 1 from w2)
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:NESTED LOOP JOIN [LEFT ANTI JOIN]
 |
 |--01:SCAN HDFS [functional.alltypessmall s]
@@ -719,6 +779,8 @@ where not exists
    from functional.alltypesagg where tinyint_col = 10
    group by id, int_col, bigint_col)
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:NESTED LOOP JOIN [RIGHT ANTI JOIN]
 |
 |--00:SCAN HDFS [functional.alltypestiny t]
@@ -737,6 +799,8 @@ select 1
 from functional.alltypestiny t
 where not exists (select * from functional.alltypessmall limit 0)
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:SCAN HDFS [functional.alltypestiny t]
    partitions=4/4 files=4 size=460B
 ====
@@ -748,6 +812,8 @@ where exists
     (select * from functional.alltypesagg g where g.int_col = t.int_col
      and g.bool_col = false))
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -790,6 +856,8 @@ and g.tinyint_col <
 group by g.int_col
 having count(*) < 100
 ---- PLAN
+PLAN-ROOT SINK
+|
 10:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: g.int_col
@@ -847,6 +915,8 @@ where a.int_col in
   group by int_col)
 and a.bigint_col < 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.int_col = count(int_col)
 |  runtime filters: RF000 <- count(int_col)
@@ -871,6 +941,8 @@ where a.int_col <
   (select max(int_col) from functional.alltypesagg g where g.bool_col = true)
 and a.bigint_col > 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:NESTED LOOP JOIN [INNER JOIN]
 |  predicates: a.int_col < max(int_col)
 |
@@ -891,6 +963,8 @@ from functional.alltypesagg a
 where (select max(id) from functional.alltypes t where t.bool_col = false) > 10
 and a.int_col < 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:NESTED LOOP JOIN [CROSS JOIN]
 |
 |--02:AGGREGATE [FINALIZE]
@@ -915,6 +989,8 @@ where a.id =
 and a.bool_col = false
 group by a.int_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: a.int_col
@@ -947,6 +1023,8 @@ where t.int_col <
 and a.bool_col = false
 group by t.tinyint_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 08:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: t.tinyint_col
@@ -993,6 +1071,8 @@ and a.tinyint_col >
   (select max(tinyint_col) from functional.alltypessmall s where s.id < 10)
 and t.bool_col = false
 ---- PLAN
+PLAN-ROOT SINK
+|
 08:NESTED LOOP JOIN [INNER JOIN]
 |  predicates: a.tinyint_col > max(tinyint_col)
 |
@@ -1041,6 +1121,8 @@ where t.int_col <
       where a.id = g.id
       and a.bool_col = false))
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.id = g.id
 |  other join predicates: t.int_col < avg(g.int_col) * 2
@@ -1079,6 +1161,8 @@ where a.int_col <
    where s.id = a.id and s.tinyint_col >
      (select count(*) from functional.alltypestiny where bool_col = false))
 ---- PLAN
+PLAN-ROOT SINK
+|
 08:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a.id = s.id
 |  other predicates: a.int_col < zeroifnull(count(*))
@@ -1115,6 +1199,8 @@ from functional.alltypesagg g
 where 100 < (select count(*) from functional.alltypes where bool_col = false and id < 5)
 and bool_col = false
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:AGGREGATE [FINALIZE]
 |  group by: id, bool_col
 |
@@ -1138,6 +1224,8 @@ from functional.alltypesagg g
 where 100 > (select count(distinct id) from functional.alltypestiny where int_col < 5)
 and g.bigint_col < 1000 and g.bigint_col = true
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:AGGREGATE [FINALIZE]
 |  group by: g.id, g.bool_col, g.tinyint_col, g.smallint_col, g.int_col, g.bigint_col, g.float_col, g.double_col, g.date_string_col, g.string_col, g.timestamp_col, g.year, g.month, g.day
 |
@@ -1163,6 +1251,8 @@ select *
 from functional.alltypestiny t
 where (select max(int_col) from functional.alltypesagg where int_col is null) is null
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:NESTED LOOP JOIN [CROSS JOIN]
 |
 |--02:AGGREGATE [FINALIZE]
@@ -1183,6 +1273,8 @@ where (select count(*) from functional.alltypesagg g where t.id = g.id) is null
 and bool_col = false
 group by int_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: int_col
@@ -1214,6 +1306,8 @@ where
    where g.id = t.id and g.int_col is null) is null
 and t.bool_col = false
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: g.id = t.id
 |  runtime filters: RF000 <- t.id
@@ -1239,6 +1333,8 @@ where 1 +
   (select count(*) from functional.alltypesagg where bool_col = false) = t.int_col + 2
 and t.bigint_col < 100
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.int_col + 2 = 1 + count(*)
 |  runtime filters: RF000 <- 1 + count(*)
@@ -1262,6 +1358,8 @@ where nullifzero((select min(id) from functional.alltypessmall s where s.bool_co
   is null
 and t.id < 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:NESTED LOOP JOIN [CROSS JOIN]
 |
 |--02:AGGREGATE [FINALIZE]
@@ -1286,6 +1384,8 @@ where t.int_col <
    limit 1)
 group by t.bool_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:AGGREGATE [FINALIZE]
 |  output: min(t.id)
 |  group by: t.bool_col
@@ -1313,6 +1413,8 @@ where int_col between
   (select min(int_col) from functional.alltypessmall where bool_col = false) and
   (select max(int_col) from functional.alltypessmall where bool_col = true)
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:NESTED LOOP JOIN [INNER JOIN]
 |  predicates: int_col <= max(int_col)
 |
@@ -1344,6 +1446,8 @@ where
    from functional.alltypestiny tt1
    where t1.id = tt1.month) < t1.id
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: tt1.month = t1.id
 |  other predicates: zeroifnull(count(tt1.smallint_col)) < t1.id
@@ -1369,6 +1473,8 @@ where
   < 10
 group by int_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: int_col
@@ -1398,6 +1504,8 @@ where
 and bool_col = false
 group by int_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: int_col
@@ -1425,6 +1533,8 @@ select 1
 from functional.alltypestiny t1
 where (select count(*) from functional.alltypessmall) + t1.int_col = t1.bigint_col - 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:NESTED LOOP JOIN [INNER JOIN]
 |  predicates: count(*) + t1.int_col = t1.bigint_col - 1
 |
@@ -1443,6 +1553,8 @@ select 1
 from functional.alltypestiny t1 join functional.alltypessmall t2 on t1.id = t2.id
 where (select count(*) from functional.alltypes) + 1 = t1.int_col + t2.int_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.int_col + t2.int_col = count(*) + 1
 |
@@ -1470,6 +1582,8 @@ from functional.alltypestiny t1 join functional.alltypessmall t2 on t1.id = t2.i
 where
   (select count(*) from functional.alltypes) + t2.bigint_col = t1.int_col + t2.int_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:NESTED LOOP JOIN [INNER JOIN]
 |  predicates: count(*) + t2.bigint_col = t1.int_col + t2.int_col
 |
@@ -1498,6 +1612,8 @@ where
    from functional.alltypesagg t1 inner join functional.alltypes t2 on t1.id = t2.id
    where t1.id + t2.id = t.int_col) = t.int_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: sum(t1.id) = t.int_col
 |
@@ -1528,6 +1644,8 @@ where
    from functional.alltypesagg t1 inner join functional.alltypes t2 on t1.id = t2.id
    where t1.id + t2.id = t.bigint_col) = t.int_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: t1.id + t2.id = t.bigint_col, sum(t1.id) = t.int_col
 |
@@ -1560,6 +1678,8 @@ where
    on tt1.int_col = tt2.int_col
    where tt1.id + tt2.id = t1.int_col - t2.int_col) = t1.bigint_col
 ---- PLAN
+PLAN-ROOT SINK
+|
 07:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: sum(tt1.id) = t1.bigint_col, tt1.id + tt2.id = t1.int_col - t2.int_col
 |
@@ -1602,6 +1722,8 @@ and not exists
    from functional.alltypesagg t3
    where t1.id = t3.id)
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:EMPTYSET
 ====
 # Correlated EXISTS and NOT EXISTS subqueries with limit 0 and
@@ -1625,6 +1747,8 @@ and not exists
    from functional.alltypestiny t5
    where t1.id = t5.id and false)
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:SCAN HDFS [functional.alltypestiny t1]
    partitions=4/4 files=4 size=460B
 ====
@@ -1647,6 +1771,8 @@ and not exists
    where t4.int_col = t1.tinyint_col
    having count(id) > 200)
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:HASH JOIN [LEFT ANTI JOIN]
 |  hash predicates: t1.tinyint_col = t4.int_col
 |
@@ -1680,6 +1806,8 @@ where t1.id is not distinct from
 (select min(id) from functional.alltypes t2
 where t1.int_col is not distinct from t2.int_col);
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.id IS NOT DISTINCT FROM min(id), t1.int_col IS NOT DISTINCT FROM t2.int_col
 |  runtime filters: RF000 <- min(id), RF001 <- t2.int_col
@@ -1700,6 +1828,8 @@ where t1.id is distinct from
 (select min(id) from functional.alltypes t2
 where t1.int_col is not distinct from t2.int_col);
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.int_col IS NOT DISTINCT FROM t2.int_col
 |  other join predicates: t1.id IS DISTINCT FROM min(id)
@@ -1721,6 +1851,8 @@ where t1.id =
 (select min(id) from functional.alltypes t2
 where t1.int_col is not distinct from t2.int_col);
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.id = min(id), t1.int_col IS NOT DISTINCT FROM t2.int_col
 |  runtime filters: RF000 <- min(id), RF001 <- t2.int_col
@@ -1741,6 +1873,8 @@ where t1.id !=
 (select min(id) from functional.alltypes t2
 where t1.int_col is not distinct from t2.int_col);
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.int_col IS NOT DISTINCT FROM t2.int_col
 |  other join predicates: t1.id != min(id)
@@ -1762,6 +1896,8 @@ where t1.id is not distinct from
 (select min(id) from functional.alltypes t2
 where t1.int_col = t2.int_col);
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.id IS NOT DISTINCT FROM min(id), t1.int_col = t2.int_col
 |  runtime filters: RF000 <- min(id), RF001 <- t2.int_col
@@ -1782,6 +1918,8 @@ where t1.id is distinct from
 (select min(id) from functional.alltypes t2
 where t1.int_col = t2.int_col);
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.int_col = t2.int_col
 |  other join predicates: t1.id IS DISTINCT FROM min(id)
@@ -1803,6 +1941,8 @@ where t1.id =
 (select min(id) from functional.alltypes t2
 where t1.int_col = t2.int_col);
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.id = min(id), t1.int_col = t2.int_col
 |  runtime filters: RF000 <- min(id), RF001 <- t2.int_col
@@ -1823,6 +1963,8 @@ where t1.id !=
 (select min(id) from functional.alltypes t2
 where t1.int_col = t2.int_col);
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.int_col = t2.int_col
 |  other join predicates: t1.id != min(id)
@@ -1850,6 +1992,8 @@ select 1 from functional.alltypes t where id in
    a.double_col between round(acos(t.float_col), 2)
                 and cast(t.string_col as int))
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id
 |  other join predicates: a.tinyint_col >= t.tinyint_col, t.float_col >= a.float_col, a.smallint_col <= t.int_col, a.tinyint_col <= t.smallint_col, t.float_col <= a.double_col, a.double_col <= CAST(t.string_col AS INT), t.string_col >= a.string_col, a.double_col >= round(acos(t.float_col), 2)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/testdata/workloads/functional-planner/queries/PlannerTest/topn.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/topn.test b/testdata/workloads/functional-planner/queries/PlannerTest/topn.test
index e9dcd43..4252ac7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/topn.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/topn.test
@@ -3,12 +3,16 @@ from functional.testtbl
 order by name
 limit 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:TOP-N [LIMIT=1]
 |  order by: name ASC
 |
 00:SCAN HDFS [functional.testtbl]
    partitions=1/1 files=0 size=0B
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 02:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: name ASC
 |  limit: 1
@@ -26,6 +30,8 @@ group by 1
 order by 2 desc
 limit 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:TOP-N [LIMIT=10]
 |  order by: count(*) DESC
 |
@@ -37,6 +43,8 @@ limit 10
    partitions=1/1 files=0 size=0B
    predicates: name LIKE 'm%'
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: count(*) DESC
 |  limit: 10
@@ -65,6 +73,8 @@ group by 1
 order by 2
 limit 4
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:TOP-N [LIMIT=4]
 |  order by: sum(float_col) ASC
 |
@@ -75,6 +85,8 @@ limit 4
 00:SCAN HBASE [functional_hbase.alltypessmall]
    predicates: id < 5
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: sum(float_col) ASC
 |  limit: 4
@@ -101,8 +113,12 @@ group by 1
 order by 2,3 desc
 limit 0
 ---- PLAN
+PLAN-ROOT SINK
+|
 00:EMPTYSET
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 00:EMPTYSET
 ====
 # Test correct identification of the implicit aliasing of int_col in the select
@@ -114,6 +130,8 @@ where t1.id = t2.id and t2.int_col is not null
 order by int_col
 limit 2
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:TOP-N [LIMIT=2]
 |  order by: int_col ASC
 |
@@ -129,6 +147,8 @@ limit 2
    partitions=4/4 files=4 size=6.32KB
    runtime filters: RF000 -> t1.id
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: int_col ASC
 |  limit: 2
@@ -153,12 +173,16 @@ limit 2
 # Test that the top-n is on int_col and not on the id column
 select int_col as id from functional.alltypessmall order by id limit 2
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:TOP-N [LIMIT=2]
 |  order by: int_col ASC
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 02:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: int_col ASC
 |  limit: 2
@@ -172,12 +196,16 @@ select int_col as id from functional.alltypessmall order by id limit 2
 # Test that the top-n is on id and not on int_col
 select int_col as id from functional.alltypessmall order by functional.alltypessmall.id limit 2
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:TOP-N [LIMIT=2]
 |  order by: id ASC
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 02:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: id ASC
 |  limit: 2
@@ -196,6 +224,8 @@ where t1.id = t2.id and t2.int_col is not null
 order by int_col
 limit 10 offset 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:TOP-N [LIMIT=10 OFFSET=5]
 |  order by: int_col ASC
 |
@@ -222,6 +252,8 @@ NODE 1:
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypessmall/year=2009/month=3/090301.txt 0:1620
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypessmall/year=2009/month=4/090401.txt 0:1621
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:MERGING-EXCHANGE [UNPARTITIONED]
 |  offset: 5
 |  order by: int_col ASC
@@ -251,6 +283,8 @@ select int_col, bigint_col from
    select * from functional.alltypessmall) t
 order by int_col desc limit 10 offset 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:TOP-N [LIMIT=10 OFFSET=5]
 |  order by: int_col DESC
 |
@@ -262,6 +296,8 @@ order by int_col desc limit 10 offset 5
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 04:MERGING-EXCHANGE [UNPARTITIONED]
 |  offset: 5
 |  order by: int_col DESC
@@ -285,6 +321,8 @@ select int_col, bigint_col from
    select * from functional.alltypessmall) t
 order by int_col desc limit 10 offset 5
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:TOP-N [LIMIT=10 OFFSET=5]
 |  order by: int_col DESC
 |
@@ -299,6 +337,8 @@ order by int_col desc limit 10 offset 5
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 07:MERGING-EXCHANGE [UNPARTITIONED]
 |  offset: 5
 |  order by: int_col DESC
@@ -327,6 +367,8 @@ order by int_col desc limit 10 offset 5
 select * from (select * from functional.alltypes limit 10) t
 order by int_col limit 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:TOP-N [LIMIT=10]
 |  order by: int_col ASC
 |
@@ -334,6 +376,8 @@ order by int_col limit 10
    partitions=24/24 files=24 size=478.45KB
    limit: 10
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 01:TOP-N [LIMIT=10]
 |  order by: int_col ASC
 |
@@ -351,6 +395,8 @@ select * from
    (select * from functional.alltypessmall) limit 10) t
 order by int_col limit 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:TOP-N [LIMIT=10]
 |  order by: int_col ASC
 |
@@ -363,6 +409,8 @@ order by int_col limit 10
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 03:TOP-N [LIMIT=10]
 |  order by: int_col ASC
 |