You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/01/12 18:19:11 UTC

[12/26] impala git commit: IMPALA-8021: Add estimated cardinality to EXPLAIN output

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
index 9ca806f..a6c32e3 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
@@ -7,15 +7,18 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.int_col
 |  runtime filters: RF000 <- t2.int_col
+|  row-size=167B cardinality=1
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.id < 10
+|     row-size=72B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    predicates: t1.id = 10
    runtime filters: RF000 -> t1.year
+   row-size=95B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -24,17 +27,20 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: t1.year = t2.int_col
 |  runtime filters: RF000 <- t2.int_col
+|  row-size=167B cardinality=1
 |
 |--03:EXCHANGE [BROADCAST]
 |  |
 |  01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.id < 10
+|     row-size=72B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    predicates: t1.id = 10
    runtime filters: RF000 -> t1.year
+   row-size=95B cardinality=1
 ====
 # Four-way join query
 select straight_join * from functional.alltypestiny t1, functional.alltypesagg t2,
@@ -47,31 +53,38 @@ PLAN-ROOT SINK
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: t3.month = t4.id
 |  runtime filters: RF000 <- t4.id
+|  row-size=345B cardinality=9
 |
 |--03:SCAN HDFS [functional.alltypesnopart t4]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t4.bigint_col < 10
+|     row-size=72B cardinality=0
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: t2.id = t3.tinyint_col
 |  runtime filters: RF002 <- t3.tinyint_col
+|  row-size=273B cardinality=9
 |
 |--02:SCAN HDFS [functional.alltypestiny t3]
 |     partitions=4/4 files=4 size=460B
 |     runtime filters: RF000 -> t3.month
+|     row-size=89B cardinality=8
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.int_col
 |  runtime filters: RF004 <- t2.int_col
+|  row-size=184B cardinality=92
 |
 |--01:SCAN HDFS [functional.alltypesagg t2]
 |     partitions=11/11 files=11 size=814.73KB
 |     predicates: t2.bool_col = TRUE
 |     runtime filters: RF002 -> t2.id
+|     row-size=95B cardinality=5.50K
 |
 00:SCAN HDFS [functional.alltypestiny t1]
    partitions=4/4 files=4 size=460B
    runtime filters: RF004 -> t1.year
+   row-size=89B cardinality=8
 ====
 # Two-way join query where multiple runtime filters are generated
 select straight_join * from functional.alltypesagg t1, functional.alltypesnopart t2
@@ -82,14 +95,17 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.int_col, t1.month = t2.bigint_col
 |  runtime filters: RF000 <- t2.int_col, RF001 <- t2.bigint_col
+|  row-size=167B cardinality=11.00K
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.id = 10
+|     row-size=72B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.year, RF001 -> t1.month
+   row-size=95B cardinality=11.00K
 ====
 # Two-way join query with an inline view in the build side of the join
 select straight_join * from functional.alltypesagg t1,
@@ -101,14 +117,17 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.int_col
 |  runtime filters: RF000 <- t2.int_col
+|  row-size=167B cardinality=11.00K
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.id = 1
+|     row-size=72B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.year
+   row-size=95B cardinality=11.00K
 ====
 # Two-way join query with an inline view in the build side of the join where the
 # right child of the join predicate is an arithmetic expr between two slots
@@ -122,14 +141,17 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = id + int_col
 |  runtime filters: RF000 <- id + int_col
+|  row-size=111B cardinality=11.00K
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.bigint_col < 10
+|     row-size=16B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.year
+   row-size=95B cardinality=11.00K
 ====
 # Two-way join query where the lhs of the join predicate is an arithmetic expr
 select straight_join * from functional.alltypesagg t1, functional.alltypesnopart t2
@@ -140,14 +162,17 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year + 1 = t2.id
 |  runtime filters: RF000 <- t2.id
+|  row-size=167B cardinality=11.00K
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.int_col < 10
+|     row-size=72B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.year + 1
+   row-size=95B cardinality=11.00K
 ====
 # Two-way join query with join predicates that are not suitable for hashing
 select straight_join * from functional.alltypesagg t1, functional.alltypesnopart t2
@@ -161,15 +186,18 @@ PLAN-ROOT SINK
 |  hash predicates: t1.id = t2.id
 |  other predicates: t1.year = t1.month + t2.int_col, t2.tinyint_col = t1.year + t2.smallint_col, t1.year + t2.int_col = t1.month + t2.tinyint_col
 |  runtime filters: RF000 <- t2.id
+|  row-size=167B cardinality=11
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.bigint_col = 1
+|     row-size=72B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    predicates: t1.int_col = 1
    runtime filters: RF000 -> t1.id
+   row-size=95B cardinality=11
 ====
 # Two-way join query where the left child of the equi-join predicate
 # is an arithmetic expr between two slots from the same scan tuple
@@ -182,14 +210,17 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year + t1.month = t2.id, t1.int_col * 100 = t2.bigint_col / 100, t1.int_col + 1 - t1.tinyint_col = t2.smallint_col + 10
 |  runtime filters: RF000 <- t2.id, RF001 <- t2.bigint_col / 100, RF002 <- t2.smallint_col + 10
+|  row-size=167B cardinality=11.00K
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.bool_col = FALSE
+|     row-size=72B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.year + t1.month, RF001 -> t1.int_col * 100, RF002 -> t1.int_col + 1 - t1.tinyint_col
+   row-size=95B cardinality=11.00K
 ====
 # Three-way join query with an inline view on the probe side of the join where the left
 # child of the equi-join predicate is an arithmetic expr between two slots from
@@ -204,21 +235,26 @@ PLAN-ROOT SINK
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year + t2.year = t3.int_col
+|  row-size=88B cardinality=7.81K
 |
 |--03:SCAN HDFS [functional.alltypesnopart t3]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t3.bool_col = TRUE
+|     row-size=72B cardinality=0
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id
 |  runtime filters: RF000 <- t2.id
+|  row-size=16B cardinality=7.81K
 |
 |--01:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.id
+   row-size=8B cardinality=11.00K
 ====
 # Two-way join query with an inline view in the build side of the join that has an
 # aggregation
@@ -232,18 +268,23 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.month = id, t1.year = count(int_col)
 |  runtime filters: RF000 <- id, RF001 <- count(int_col)
+|  row-size=107B cardinality=0
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: count(int_col)
 |  |  group by: id
 |  |  having: count(int_col) < 10
+|  |  row-size=12B cardinality=0
 |  |
 |  01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
+|     row-size=8B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
+   partition predicates: t1.year < 10
    partitions=0/11 files=0 size=0B
    runtime filters: RF000 -> t1.month, RF001 -> t1.year
+   row-size=95B cardinality=0
 ====
 # Two-way join query with an inline view in the build side of the join that has a
 # two-way join (bushy plan)
@@ -257,21 +298,26 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.month = t3.tinyint_col, t1.year = t2.id + t3.id
 |  runtime filters: RF000 <- t3.tinyint_col, RF001 <- t2.id + t3.id
+|  row-size=112B cardinality=11.00K
 |
 |--03:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t2.int_col = t3.int_col
 |  |  runtime filters: RF004 <- t3.int_col
+|  |  row-size=17B cardinality=0
 |  |
 |  |--02:SCAN HDFS [functional.alltypesnopart t3]
 |  |     partitions=1/1 files=0 size=0B
+|  |     row-size=9B cardinality=0
 |  |
 |  01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     runtime filters: RF004 -> t2.int_col
+|     row-size=8B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.month, RF001 -> t1.year
+   row-size=95B cardinality=11.00K
 ====
 # Four-way join query with an inline view in the build side of the join where the
 # inline view has a tree-way cyclic join (bushy plan)
@@ -286,30 +332,37 @@ PLAN-ROOT SINK
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.int_col, t1.month = t4.tinyint_col
 |  runtime filters: RF000 <- t2.int_col, RF001 <- t4.tinyint_col
+|  row-size=117B cardinality=11.00K
 |
 |--05:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t2.tinyint_col = t4.tinyint_col, t3.int_col = t4.int_col
 |  |  runtime filters: RF004 <- t4.tinyint_col, RF005 <- t4.int_col
+|  |  row-size=22B cardinality=0
 |  |
 |  |--03:SCAN HDFS [functional.alltypesnopart t4]
 |  |     partitions=1/1 files=0 size=0B
+|  |     row-size=5B cardinality=0
 |  |
 |  04:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t2.id = t3.id
 |  |  runtime filters: RF008 <- t3.id
+|  |  row-size=17B cardinality=0
 |  |
 |  |--02:SCAN HDFS [functional.alltypesnopart t3]
 |  |     partitions=1/1 files=0 size=0B
 |  |     runtime filters: RF005 -> t3.int_col
+|  |     row-size=8B cardinality=0
 |  |
 |  01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.int_col = t2.id
 |     runtime filters: RF004 -> t2.tinyint_col, RF008 -> t2.id
+|     row-size=9B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.year, RF001 -> t1.month
+   row-size=95B cardinality=11.00K
 ====
 # Four-way join query between base tables in a star schema
 select straight_join * from functional.alltypesagg t1, functional.alltypesnopart t2,
@@ -322,32 +375,39 @@ PLAN-ROOT SINK
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t4.tinyint_col
 |  runtime filters: RF000 <- t4.tinyint_col
+|  row-size=311B cardinality=11.00K
 |
 |--03:SCAN HDFS [functional.alltypesnopart t4]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t4.bigint_col IN (1, 2)
+|     row-size=72B cardinality=0
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t3.int_col
 |  runtime filters: RF002 <- t3.int_col
+|  row-size=239B cardinality=11.00K
 |
 |--02:SCAN HDFS [functional.alltypesnopart t3]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t3.bool_col = TRUE
 |     runtime filters: RF000 -> t3.int_col
+|     row-size=72B cardinality=0
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.id
 |  runtime filters: RF004 <- t2.id
+|  row-size=167B cardinality=11.00K
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.bool_col = FALSE
 |     runtime filters: RF000 -> t2.id, RF002 -> t2.id
+|     row-size=72B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.year, RF002 -> t1.year, RF004 -> t1.year
+   row-size=95B cardinality=11.00K
 ====
 # Five-way cyclic join query
 select straight_join * from functional.alltypesagg t1, functional.alltypesnopart t2,
@@ -360,37 +420,46 @@ PLAN-ROOT SINK
 08:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.month = t5.id, t4.smallint_col = t5.smallint_col
 |  runtime filters: RF000 <- t5.id, RF001 <- t5.smallint_col
+|  row-size=400B cardinality=11.00K
 |
 |--04:SCAN HDFS [functional.alltypesnopart t5]
 |     partitions=1/1 files=0 size=0B
+|     row-size=72B cardinality=0
 |
 07:HASH JOIN [INNER JOIN]
 |  hash predicates: t3.month = t4.bigint_col
 |  runtime filters: RF004 <- t4.bigint_col
+|  row-size=328B cardinality=11.00K
 |
 |--03:SCAN HDFS [functional.alltypesnopart t4]
 |     partitions=1/1 files=0 size=0B
 |     runtime filters: RF001 -> t4.smallint_col
+|     row-size=72B cardinality=0
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: t2.int_col = t3.tinyint_col
 |  runtime filters: RF006 <- t3.tinyint_col
+|  row-size=256B cardinality=11.00K
 |
 |--02:SCAN HDFS [functional.alltypessmall t3]
 |     partitions=4/4 files=4 size=6.32KB
 |     runtime filters: RF004 -> t3.month
+|     row-size=89B cardinality=100
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.id
 |  runtime filters: RF008 <- t2.id
+|  row-size=167B cardinality=11.00K
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     runtime filters: RF006 -> t2.int_col
+|     row-size=72B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.month, RF008 -> t1.year
+   row-size=95B cardinality=11.00K
 ====
 # Two-way left outer join query; no runtime filters should be generated from the
 # ON-clause equi-join predicate
@@ -403,13 +472,16 @@ PLAN-ROOT SINK
 02:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t1.year = t2.int_col
 |  other predicates: t2.id = 1
+|  row-size=167B cardinality=11.00K
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.id = 1
+|     row-size=72B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
+   row-size=95B cardinality=11.00K
 ====
 # Two-way left outer join query where not all equi-join predicates should
 # generate a runtime filter
@@ -423,14 +495,17 @@ PLAN-ROOT SINK
 |  hash predicates: t1.year = t2.int_col
 |  other predicates: t2.id = 2, t1.month = t2.tinyint_col
 |  runtime filters: RF000 <- t2.tinyint_col
+|  row-size=167B cardinality=11.00K
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.id = 2
+|     row-size=72B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.month
+   row-size=95B cardinality=11.00K
 ====
 # Multi-way join query with outer joins
 select straight_join * from functional.alltypesagg t1 left outer join functional.alltypesnopart t2
@@ -446,37 +521,46 @@ PLAN-ROOT SINK
 |  hash predicates: t1.year = t5.smallint_col
 |  other predicates: t2.id = 1, t3.int_col = 1, t4.bool_col = TRUE
 |  runtime filters: RF000 <- t5.smallint_col
+|  row-size=383B cardinality=11.00K
 |
 |--04:SCAN HDFS [functional.alltypesnopart t5]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t5.bool_col = FALSE
+|     row-size=72B cardinality=0
 |
 07:HASH JOIN [FULL OUTER JOIN]
 |  hash predicates: t1.year = t4.tinyint_col
+|  row-size=311B cardinality=11.00K
 |
 |--03:SCAN HDFS [functional.alltypesnopart t4]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t4.bool_col = TRUE
+|     row-size=72B cardinality=0
 |
 06:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t1.year = t3.id
+|  row-size=239B cardinality=11.00K
 |
 |--02:SCAN HDFS [functional.alltypesnopart t3]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t3.int_col = 1
 |     runtime filters: RF000 -> t3.id
+|     row-size=72B cardinality=0
 |
 05:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t1.year = t2.int_col
+|  row-size=167B cardinality=11.00K
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.id = 1
 |     runtime filters: RF000 -> t2.int_col
+|     row-size=72B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.year
+   row-size=95B cardinality=11.00K
 ====
 # Two-way right outer join query where a runtime filter can be pushed to the nullable
 # probe side
@@ -492,15 +576,19 @@ PLAN-ROOT SINK
 |  other join predicates: t2.int_col = 10
 |  other predicates: t1.int_col = 1, t1.month = t2.tinyint_col
 |  runtime filters: RF000 <- t2.int_col, RF001 <- t2.tinyint_col
+|  row-size=167B cardinality=0
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.id = 10
+|     row-size=72B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
+   partition predicates: t1.month = 1, t1.year = 10
    partitions=0/11 files=0 size=0B
    predicates: t1.int_col = 1
    runtime filters: RF000 -> t1.year, RF001 -> t1.month
+   row-size=95B cardinality=0
 ====
 # Three-way join query with semi joins
 select straight_join * from functional.alltypesagg t1 left semi join functional.alltypesnopart t2
@@ -513,21 +601,26 @@ PLAN-ROOT SINK
 04:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: t1.month = t3.tinyint_col
 |  runtime filters: RF000 <- t3.tinyint_col
+|  row-size=72B cardinality=0
 |
 |--02:SCAN HDFS [functional.alltypesnopart t3]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t3.id = 1
+|     row-size=72B cardinality=0
 |
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.year = t2.int_col
 |  runtime filters: RF002 <- t2.int_col
+|  row-size=95B cardinality=11.00K
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.month, RF002 -> t1.year
+   row-size=95B cardinality=11.00K
 ====
 # Query with a subquery that is converted to a null-aware left anti join
 select straight_join * from functional.alltypesagg t1
@@ -538,14 +631,17 @@ PLAN-ROOT SINK
 |
 02:HASH JOIN [NULL AWARE LEFT ANTI JOIN]
 |  hash predicates: t1.year = id
+|  row-size=95B cardinality=1.10K
 |
 |--01:SCAN HDFS [functional.alltypesnopart]
 |     partitions=1/1 files=0 size=0B
 |     predicates: int_col = 10
+|     row-size=8B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    predicates: t1.int_col < 10
+   row-size=95B cardinality=1.10K
 ====
 # Two-way join query between two inline views where the scan node to apply the filter
 # is below an aggregation node in the probe side of the join
@@ -559,18 +655,22 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: year = id
 |  runtime filters: RF000 <- id
+|  row-size=21B cardinality=1
 |
 |--02:SCAN HDFS [functional.alltypesnopart]
 |     partitions=1/1 files=0 size=0B
 |     predicates: tinyint_col < 10
+|     row-size=9B cardinality=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: year
+|  row-size=12B cardinality=1
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.year
+   row-size=4B cardinality=11.00K
 ====
 # Two-way join query where the lhs of the join is an inline view with an aggregation;
 # the runtime filter cannot be pushed through the aggregation node
@@ -583,17 +683,21 @@ PLAN-ROOT SINK
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: count(*) = t2.id
+|  row-size=101B cardinality=1
 |
 |--02:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: t2.int_col = 1
+|     row-size=89B cardinality=730
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: year
+|  row-size=12B cardinality=1
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
+   row-size=4B cardinality=11.00K
 ====
 # Two-way join query with multiple nested inline views in the probe side of the join
 # where the scan node to apply the filter is below multiple aggregation nodes
@@ -610,28 +714,35 @@ PLAN-ROOT SINK
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: v1.year = t3.smallint_col
 |  runtime filters: RF000 <- t3.smallint_col
+|  row-size=88B cardinality=1
 |
 |--05:SCAN HDFS [functional.alltypesnopart t3]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t3.id = 1
+|     row-size=72B cardinality=0
 |
 04:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: year, t2.int_col
+|  row-size=16B cardinality=1
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: month = t2.int_col
 |  runtime filters: RF002 <- t2.int_col
+|  row-size=12B cardinality=1
 |
 |--02:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
 |
 01:AGGREGATE [FINALIZE]
 |  group by: year, month
+|  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.year, RF002 -> t1.month
+   row-size=8B cardinality=11.00K
 ====
 # Four-way join query between an inline view with an aggregation and three base tables
 select straight_join 1 from
@@ -646,33 +757,41 @@ PLAN-ROOT SINK
 07:HASH JOIN [INNER JOIN]
 |  hash predicates: year = c.year
 |  runtime filters: RF000 <- c.year
+|  row-size=28B cardinality=58.40K
 |
 |--04:SCAN HDFS [functional.alltypestiny c]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: year = b.year
 |  runtime filters: RF002 <- b.year
+|  row-size=24B cardinality=14.60K
 |
 |--03:SCAN HDFS [functional.alltypestiny b]
 |     partitions=4/4 files=4 size=460B
 |     predicates: b.int_col < 10
 |     runtime filters: RF000 -> b.year
+|     row-size=8B cardinality=1
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: year = a.year
 |  runtime filters: RF004 <- a.year
+|  row-size=16B cardinality=29.20K
 |
 |--02:SCAN HDFS [functional.alltypestiny a]
 |     partitions=4/4 files=4 size=460B
 |     runtime filters: RF000 -> a.year, RF002 -> a.year
+|     row-size=4B cardinality=8
 |
 01:AGGREGATE [FINALIZE]
 |  group by: id, year, month
+|  row-size=12B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> functional.alltypes.year, RF002 -> functional.alltypes.year, RF004 -> functional.alltypes.year
+   row-size=12B cardinality=7.30K
 ====
 # Two-way join query with an inline view in the probe side of the join where the
 # scan node to apply the filter is below a top-n (order by with limit) operator
@@ -685,16 +804,20 @@ PLAN-ROOT SINK
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: int_col = t2.int_col, year = t2.id
+|  row-size=80B cardinality=10
 |
 |--02:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.smallint_col = 1
+|     row-size=72B cardinality=0
 |
 01:TOP-N [LIMIT=10]
 |  order by: year ASC
+|  row-size=8B cardinality=10
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=8B cardinality=11.00K
 ====
 # Two-way join query with an inline in the probe side of the join that has a union
 select straight_join * from
@@ -708,21 +831,26 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: year = t3.int_col
 |  runtime filters: RF000 <- t3.int_col
+|  row-size=76B cardinality=11.01K
 |
 |--03:SCAN HDFS [functional.alltypesnopart t3]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t3.bool_col = FALSE
+|     row-size=72B cardinality=0
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=4B cardinality=11.01K
 |
 |--02:SCAN HDFS [functional.alltypestiny t2]
 |     partitions=4/4 files=4 size=460B
 |     runtime filters: RF000 -> t2.year
+|     row-size=4B cardinality=8
 |
 01:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.year
+   row-size=4B cardinality=11.00K
 ====
 # Query with nested UNION ALL operators
 select straight_join count(*) from
@@ -739,29 +867,36 @@ PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: month = b.month
 |  runtime filters: RF000 <- b.month
+|  row-size=12B cardinality=21.90K
 |
 |--04:SCAN HDFS [functional.alltypessmall b]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: b.int_col = 1
+|     row-size=8B cardinality=10
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=4B cardinality=21.90K
 |
 |--03:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     runtime filters: RF000 -> functional.alltypes.month
+|     row-size=4B cardinality=7.30K
 |
 |--02:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     runtime filters: RF000 -> functional.alltypes.month
+|     row-size=4B cardinality=7.30K
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> functional.alltypes.month
+   row-size=4B cardinality=7.30K
 ====
 # Query with nested UNION DISTINCT operators
 select straight_join count(*) from
@@ -778,32 +913,40 @@ PLAN-ROOT SINK
 |
 07:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: month = b.month
 |  runtime filters: RF000 <- b.month
+|  row-size=16B cardinality=216
 |
 |--05:SCAN HDFS [functional.alltypessmall b]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: b.int_col = 1
+|     row-size=8B cardinality=10
 |
 04:AGGREGATE [FINALIZE]
 |  group by: month, year
+|  row-size=8B cardinality=216
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=8B cardinality=21.90K
 |
 |--03:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     runtime filters: RF000 -> functional.alltypes.month
+|     row-size=8B cardinality=7.30K
 |
 |--02:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     runtime filters: RF000 -> functional.alltypes.month
+|     row-size=8B cardinality=7.30K
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> functional.alltypes.month
+   row-size=8B cardinality=7.30K
 ====
 # UNION ALL query
 select straight_join t2.id, t1.year from functional.alltypesagg t1, functional.alltypesnopart t2
@@ -815,30 +958,37 @@ where t3.month = t4.smallint_col and t4.bool_col = true
 PLAN-ROOT SINK
 |
 00:UNION
+|  row-size=8B cardinality=18.30K
 |
 |--06:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t3.month = t4.smallint_col
 |  |  runtime filters: RF002 <- t4.smallint_col
+|  |  row-size=15B cardinality=7.30K
 |  |
 |  |--05:SCAN HDFS [functional.alltypesnopart t4]
 |  |     partitions=1/1 files=0 size=0B
 |  |     predicates: t4.bool_col = TRUE
+|  |     row-size=7B cardinality=0
 |  |
 |  04:SCAN HDFS [functional.alltypes t3]
 |     partitions=24/24 files=24 size=478.45KB
 |     runtime filters: RF002 -> t3.month
+|     row-size=8B cardinality=7.30K
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.int_col
 |  runtime filters: RF000 <- t2.int_col
+|  row-size=13B cardinality=11.00K
 |
 |--02:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.bool_col = FALSE
+|     row-size=9B cardinality=0
 |
 01:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.year
+   row-size=4B cardinality=11.00K
 ====
 # Query with UNION ALL operator on the rhs of a join node
 select straight_join count(*) from functional.alltypes a
@@ -853,25 +1003,31 @@ PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = id
 |  runtime filters: RF000 <- id
+|  row-size=8B cardinality=730
 |
 |--01:UNION
+|  |  row-size=4B cardinality=1.46K
 |  |
 |  |--03:SCAN HDFS [functional.alltypes]
 |  |     partitions=24/24 files=24 size=478.45KB
 |  |     predicates: (functional.alltypes.id - functional.alltypes.id) < 1, (functional.alltypes.int_col - functional.alltypes.int_col) < 1
+|  |     row-size=8B cardinality=730
 |  |
 |  02:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: (functional.alltypes.id - functional.alltypes.id) < 1, (functional.alltypes.int_col - functional.alltypes.int_col) < 1
+|     row-size=8B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    predicates: (a.id - a.id) < 1
    runtime filters: RF000 -> a.id
+   row-size=4B cardinality=730
 ====
 # Two-way join query with an inline view in the probe side of the join where the
 # scan node to apply the filter in below an analytic function on the probe side of the join
@@ -886,30 +1042,37 @@ PLAN-ROOT SINK
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: year = t3.id, month = t3.tinyint_col
+|  row-size=92B cardinality=11.00K
 |
 |--05:SCAN HDFS [functional.alltypesnopart t3]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t3.bool_col = FALSE
+|     row-size=72B cardinality=0
 |
 04:ANALYTIC
 |  functions: count(id)
 |  partition by: year
 |  order by: month DESC
 |  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=20B cardinality=11.00K
 |
 03:SORT
 |  order by: year ASC NULLS FIRST, month DESC
+|  row-size=12B cardinality=11.00K
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.id
 |  runtime filters: RF000 <- t2.id
+|  row-size=12B cardinality=11.00K
 |
 |--01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.year
+   row-size=8B cardinality=11.00K
 ====
 # Two-way join query with an analytic function on the probe side
 # TODO: Propagate a runtime filter through the analytic function
@@ -923,22 +1086,28 @@ PLAN-ROOT SINK
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: year = v1.int_col
+|  row-size=109B cardinality=14.60K
 |
 |--03:SCAN HDFS [functional.alltypestiny v1]
 |     partitions=4/4 files=4 size=460B
 |     predicates: v1.int_col = 2009
+|     row-size=89B cardinality=4
 |
 02:ANALYTIC
 |  functions: sum(int_col)
 |  partition by: year
 |  order by: id ASC
 |  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=20B cardinality=3.65K
 |
 01:SORT
 |  order by: year ASC NULLS FIRST, id ASC
+|  row-size=12B cardinality=3.65K
 |
 00:SCAN HDFS [functional.alltypes]
+   partition predicates: functional.alltypes.year = 2009
    partitions=12/24 files=12 size=238.68KB
+   row-size=12B cardinality=3.65K
 ====
 # Multi-way join query with a bushy plan
 select straight_join * from
@@ -961,36 +1130,45 @@ PLAN-ROOT SINK
 08:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.month = t4.int_col
 |  runtime filters: RF000 <- t4.int_col
+|  row-size=121B cardinality=11.00K
 |
 |--07:HASH JOIN [LEFT OUTER JOIN]
 |  |  hash predicates: t4.smallint_col = t5.smallint_col
 |  |  other predicates: t5.bool_col = TRUE
+|  |  row-size=17B cardinality=0
 |  |
 |  |--06:SCAN HDFS [functional.alltypesnopart t5]
 |  |     partitions=1/1 files=0 size=0B
 |  |     predicates: t5.bool_col = TRUE
+|  |     row-size=11B cardinality=0
 |  |
 |  05:SCAN HDFS [functional.alltypesnopart t4]
 |     partitions=1/1 files=0 size=0B
+|     row-size=6B cardinality=0
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.year = t2.id + t3.id + 1
 |  runtime filters: RF002 <- t2.id + t3.id + 1
+|  row-size=104B cardinality=11.00K
 |
 |--03:HASH JOIN [LEFT OUTER JOIN]
 |  |  hash predicates: t2.id = t3.id
 |  |  other predicates: t3.bool_col = FALSE
+|  |  row-size=9B cardinality=0
 |  |
 |  |--02:SCAN HDFS [functional.alltypesnopart t3]
 |  |     partitions=1/1 files=0 size=0B
 |  |     predicates: t3.bool_col = FALSE
+|  |     row-size=5B cardinality=0
 |  |
 |  01:SCAN HDFS [functional.alltypesnopart t2]
 |     partitions=1/1 files=0 size=0B
+|     row-size=4B cardinality=0
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.month, RF002 -> t1.year
+   row-size=95B cardinality=11.00K
 ====
 # Multi-way join query where the slots of all the join predicates belong to the same
 # equivalence class
@@ -1003,22 +1181,27 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t2.id = t3.id
 |  runtime filters: RF000 <- t3.id
+|  row-size=16B cardinality=4
 |
 |--02:SCAN HDFS [functional.alltypestiny t3]
 |     partitions=4/4 files=4 size=460B
 |     predicates: t3.int_col = 1
+|     row-size=8B cardinality=4
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id
 |  runtime filters: RF002 <- t2.id
+|  row-size=8B cardinality=8
 |
 |--01:SCAN HDFS [functional.alltypestiny t2]
 |     partitions=4/4 files=4 size=460B
 |     runtime filters: RF000 -> t2.id
+|     row-size=4B cardinality=8
 |
 00:SCAN HDFS [functional.alltypestiny t1]
    partitions=4/4 files=4 size=460B
    runtime filters: RF000 -> t1.id, RF002 -> t1.id
+   row-size=4B cardinality=8
 ====
 # Equivalent query to the one above; the same runtime filters should be generated
 select straight_join 1 from functional.alltypestiny t1 join functional.alltypestiny t2 on t1.id = t2.id
@@ -1030,22 +1213,27 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t3.id
 |  runtime filters: RF000 <- t3.id
+|  row-size=16B cardinality=4
 |
 |--02:SCAN HDFS [functional.alltypestiny t3]
 |     partitions=4/4 files=4 size=460B
 |     predicates: t3.int_col = 1
+|     row-size=8B cardinality=4
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id
 |  runtime filters: RF002 <- t2.id
+|  row-size=8B cardinality=8
 |
 |--01:SCAN HDFS [functional.alltypestiny t2]
 |     partitions=4/4 files=4 size=460B
 |     runtime filters: RF000 -> t2.id
+|     row-size=4B cardinality=8
 |
 00:SCAN HDFS [functional.alltypestiny t1]
    partitions=4/4 files=4 size=460B
    runtime filters: RF000 -> t1.id, RF002 -> t1.id
+   row-size=4B cardinality=8
 ====
 # Check that runtime filters are not generated in subplans
 select straight_join 1 from tpch_nested_parquet.customer c,
@@ -1057,21 +1245,28 @@ where c_custkey = v.o_orderkey
 PLAN-ROOT SINK
 |
 01:SUBPLAN
+|  row-size=48B cardinality=150.00K
 |
 |--06:HASH JOIN [INNER JOIN]
 |  |  hash predicates: c_custkey = o1.o_orderkey
+|  |  row-size=48B cardinality=1
 |  |
 |  |--05:HASH JOIN [INNER JOIN]
 |  |  |  hash predicates: o1.o_orderkey = o2.o_orderkey
+|  |  |  row-size=16B cardinality=10
 |  |  |
 |  |  |--04:UNNEST [c.c_orders o2]
+|  |  |     row-size=0B cardinality=10
 |  |  |
 |  |  03:UNNEST [c.c_orders o1]
+|  |     row-size=0B cardinality=10
 |  |
 |  02:SINGULAR ROW SRC
+|     row-size=32B cardinality=1
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   partitions=1/1 files=4 size=577.87MB
+   partitions=1/1 files=4 size=288.99MB
+   row-size=32B cardinality=150.00K
 ====
 # Two-way join query where the build side is optimized into an empty set
 select straight_join 1
@@ -1084,12 +1279,14 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id
 |  runtime filters: RF000 <- t2.id
+|  row-size=8B cardinality=0
 |
 |--01:EMPTYSET
 |
 00:SCAN HDFS [functional.alltypestiny t1]
    partitions=4/4 files=4 size=460B
    runtime filters: RF000 -> t1.id
+   row-size=4B cardinality=8
 ====
 # Two-way join query where both the build side and probe side are optimized
 # into empty sets
@@ -1101,6 +1298,7 @@ PLAN-ROOT SINK
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: functional.alltypestiny.id = functional.alltypessmall.id
+|  row-size=8B cardinality=0
 |
 |--01:EMPTYSET
 |
@@ -1118,29 +1316,36 @@ PLAN-ROOT SINK
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id + t2.id = t4.id
 |  runtime filters: RF000 <- t4.id
+|  row-size=16B cardinality=8
 |
 |--03:SCAN HDFS [functional.alltypestiny t4]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t3.id
 |  runtime filters: RF002 <- t3.id
+|  row-size=12B cardinality=8
 |
 |--02:SCAN HDFS [functional.alltypestiny t3]
 |     partitions=4/4 files=4 size=460B
 |     runtime filters: RF000 -> t3.id + t3.id
+|     row-size=4B cardinality=8
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id
 |  runtime filters: RF004 <- t2.id
+|  row-size=8B cardinality=8
 |
 |--01:SCAN HDFS [functional.alltypestiny t2]
 |     partitions=4/4 files=4 size=460B
 |     runtime filters: RF000 -> t2.id + t2.id, RF002 -> t2.id
+|     row-size=4B cardinality=8
 |
 00:SCAN HDFS [functional.alltypestiny t1]
    partitions=4/4 files=4 size=460B
    runtime filters: RF000 -> t1.id + t1.id, RF002 -> t1.id, RF004 -> t1.id
+   row-size=4B cardinality=8
 ====
 # IMPALA-3074: Generated runtime filter has multiple candidate target nodes not all of
 # which are valid due to type mismatch between the associated source and target
@@ -1154,21 +1359,26 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a3.smallint_col = a4.smallint_col
 |  runtime filters: RF000 <- a4.smallint_col
+|  row-size=8B cardinality=23.36K
 |
 |--02:SCAN HDFS [functional.alltypes a4]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=2B cardinality=7.30K
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a1.int_col = a3.smallint_col
 |  runtime filters: RF002 <- a3.smallint_col
+|  row-size=6B cardinality=32
 |
 |--01:SCAN HDFS [functional.alltypestiny a3]
 |     partitions=4/4 files=4 size=460B
 |     runtime filters: RF000 -> a3.smallint_col
+|     row-size=2B cardinality=8
 |
 00:SCAN HDFS [functional.alltypestiny a1]
    partitions=4/4 files=4 size=460B
    runtime filters: RF002 -> a1.int_col
+   row-size=4B cardinality=8
 ====
 # IMPALA-3574: Runtime filter generated from a targer expr that contains a TupleIsNull
 # predicate.
@@ -1185,25 +1395,32 @@ PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
 |  group by: t1.int_col
+|  row-size=4B cardinality=2
 |
 04:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: if(TupleIsNull(), NULL, coalesce(int_col, 384)) = t1.month
 |  runtime filters: RF000 <- t1.month
+|  row-size=12B cardinality=8
 |
 |--00:SCAN HDFS [functional.alltypestiny t1]
+|     partition predicates: t1.month IS NOT NULL
 |     partitions=4/4 files=4 size=460B
+|     row-size=8B cardinality=8
 |
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: int_col = int_col
 |  runtime filters: RF002 <- int_col
+|  row-size=4B cardinality=115
 |
 |--01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     runtime filters: RF000 -> coalesce(functional.alltypes.int_col, 384)
+|     row-size=4B cardinality=7.30K
 |
 02:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> coalesce(int_col, 384), RF002 -> int_col
+   row-size=4B cardinality=11.00K
 ====
 # IMPALA-4076: Test pruning the least selective runtime filters to obey
 # MAX_NUM_RUNTIME_FILTERS in the presence of zero-cardinality plan nodes. This query was
@@ -1300,110 +1517,146 @@ from big_six
 PLAN-ROOT SINK
 |
 36:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=377B cardinality=0
 |
 |--28:HASH JOIN [INNER JOIN]
 |  |  hash predicates: a.bigint_col = b.bigint_col, a.bool_col = b.bool_col, a.double_col = b.double_col, a.float_col = b.float_col, a.id = b.id, a.int_col = b.int_col, a.smallint_col = b.smallint_col, a.tinyint_col = b.tinyint_col
+|  |  row-size=64B cardinality=8
 |  |
 |  |--27:SCAN HDFS [functional.alltypestiny b]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=32B cardinality=8
 |  |
 |  26:SCAN HDFS [functional.alltypes a]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=32B cardinality=7.30K
 |
 35:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=313B cardinality=0
 |
 |--25:HASH JOIN [INNER JOIN]
 |  |  hash predicates: a.bool_col = b.bool_col, a.id = b.id, a.tinyint_col = b.tinyint_col
+|  |  row-size=12B cardinality=7.30K
 |  |
 |  |--24:SCAN HDFS [functional.alltypes b]
 |  |     partitions=24/24 files=24 size=478.45KB
+|  |     row-size=6B cardinality=7.30K
 |  |
 |  23:SCAN HDFS [functional.alltypes a]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=6B cardinality=7.30K
 |
 34:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=301B cardinality=0
 |
 |--22:HASH JOIN [INNER JOIN]
 |  |  hash predicates: a.bigint_col = b.bigint_col, a.bool_col = b.bool_col, a.id = b.id, a.int_col = b.int_col, a.smallint_col = b.smallint_col, a.tinyint_col = b.tinyint_col
+|  |  row-size=40B cardinality=8
 |  |
 |  |--21:SCAN HDFS [functional.alltypestiny b]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=20B cardinality=8
 |  |
 |  20:SCAN HDFS [functional.alltypes a]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=20B cardinality=7.30K
 |
 33:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=261B cardinality=0
 |
 |--19:HASH JOIN [INNER JOIN]
 |  |  hash predicates: b.id = x.id
+|  |  row-size=9B cardinality=0
 |  |
 |  |--18:SCAN HDFS [functional.alltypestiny x]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=4B cardinality=8
 |  |
 |  17:HASH JOIN [INNER JOIN]
 |  |  hash predicates: id = b.id
+|  |  row-size=5B cardinality=0
 |  |
 |  |--16:SCAN HDFS [functional.alltypes b]
 |  |     partitions=24/24 files=24 size=478.45KB
+|  |     row-size=4B cardinality=7.30K
 |  |
 |  15:EMPTYSET
 |
 32:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=252B cardinality=24.90T
 |
 |--14:HASH JOIN [INNER JOIN]
 |  |  hash predicates: a.id = b.id
+|  |  row-size=8B cardinality=7.30K
 |  |
 |  |--13:SCAN HDFS [functional.alltypes b]
 |  |     partitions=24/24 files=24 size=478.45KB
+|  |     row-size=4B cardinality=7.30K
 |  |
 |  12:SCAN HDFS [functional.alltypes a]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 31:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=244B cardinality=3.41G
 |
 |--11:HASH JOIN [INNER JOIN]
 |  |  hash predicates: a.bigint_col = b.bigint_col, a.bool_col = b.bool_col, a.double_col = b.double_col, a.float_col = b.float_col, a.id = b.id, a.int_col = b.int_col, a.smallint_col = b.smallint_col, a.tinyint_col = b.tinyint_col
 |  |  runtime filters: RF032 <- b.bigint_col, RF033 <- b.bool_col, RF034 <- b.double_col, RF035 <- b.float_col, RF036 <- b.id, RF037 <- b.int_col, RF038 <- b.smallint_col, RF039 <- b.tinyint_col
+|  |  row-size=64B cardinality=8
 |  |
 |  |--10:SCAN HDFS [functional.alltypestiny b]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=32B cardinality=8
 |  |
 |  09:SCAN HDFS [functional.alltypes a]
 |     partitions=24/24 files=24 size=478.45KB
 |     runtime filters: RF032 -> a.bigint_col, RF033 -> a.bool_col, RF034 -> a.double_col, RF035 -> a.float_col, RF036 -> a.id, RF037 -> a.int_col, RF038 -> a.smallint_col, RF039 -> a.tinyint_col
+|     row-size=32B cardinality=7.30K
 |
 30:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=180B cardinality=426.32M
 |
 |--08:HASH JOIN [INNER JOIN]
 |  |  hash predicates: a.bool_col = b.bool_col, a.double_col = b.double_col, a.id = b.id, a.smallint_col = b.smallint_col, a.timestamp_col = b.timestamp_col, a.tinyint_col = b.tinyint_col, a.string_col = b.string_col, a.date_string_col = b.date_string_col
+|  |  row-size=130B cardinality=7.30K
 |  |
 |  |--07:SCAN HDFS [functional.alltypes b]
 |  |     partitions=24/24 files=24 size=478.45KB
+|  |     row-size=65B cardinality=7.30K
 |  |
 |  06:SCAN HDFS [functional.alltypes a]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=65B cardinality=7.30K
 |
 29:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=50B cardinality=58.40K
 |
 |--05:HASH JOIN [INNER JOIN]
 |  |  hash predicates: a.bool_col = b.bool_col, a.id = b.id
 |  |  runtime filters: RF012 <- b.bool_col, RF013 <- b.id
+|  |  row-size=10B cardinality=8
 |  |
 |  |--04:SCAN HDFS [functional.alltypestiny b]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=5B cardinality=8
 |  |
 |  03:SCAN HDFS [functional.alltypes a]
 |     partitions=24/24 files=24 size=478.45KB
 |     runtime filters: RF012 -> a.bool_col, RF013 -> a.id
+|     row-size=5B cardinality=7.30K
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: a.bigint_col = b.bigint_col, a.bool_col = b.bool_col, a.id = b.id, a.int_col = b.int_col, a.smallint_col = b.smallint_col, a.tinyint_col = b.tinyint_col
+|  row-size=40B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=20B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
+   row-size=20B cardinality=7.30K
 ====
 # IMPALA-4490: Only generate runtime filters for hash join nodes, even if there is an
 # otherwise suitable equality predicate.
@@ -1419,20 +1672,25 @@ PLAN-ROOT SINK
 04:NESTED LOOP JOIN [LEFT OUTER JOIN]
 |  join predicates: b.id IS DISTINCT FROM c.id
 |  predicates: c.int_col = b.int_col + b.bigint_col
+|  row-size=28B cardinality=7.30K
 |
 |--02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: b.id = a.id
 |  runtime filters: RF000 <- a.id
+|  row-size=20B cardinality=7.30K
 |
 |--00:SCAN HDFS [functional.alltypes a]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 01:SCAN HDFS [functional.alltypes b]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> b.id
+   row-size=16B cardinality=7.30K
 ====
 # IMPALA-5597: Runtime filter should be generated and assigned successfully when the
 # source expr and target expr have different decimal types.
@@ -1440,21 +1698,24 @@ select *
 from tpch_parquet.lineitem
 left join tpch_parquet.part on if(l_orderkey % 2 = 0, NULL, l_partkey) = p_partkey
 where l_orderkey = 965 and l_extendedprice * l_tax = p_retailprice;
----- Plan
+---- PLAN
 PLAN-ROOT SINK
 |
 02:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: p_partkey = if(l_orderkey % 2 = 0, NULL, l_partkey)
 |  other predicates: p_retailprice = l_extendedprice * l_tax
 |  runtime filters: RF000 <- if(l_orderkey % 2 = 0, NULL, l_partkey), RF001 <- l_extendedprice * l_tax
+|  row-size=419B cardinality=4
 |
 |--00:SCAN HDFS [tpch_parquet.lineitem]
-|     partitions=1/1 files=3 size=193.71MB
+|     partitions=1/1 files=3 size=193.60MB
 |     predicates: l_orderkey = 965
+|     row-size=231B cardinality=4
 |
 01:SCAN HDFS [tpch_parquet.part]
    partitions=1/1 files=1 size=6.23MB
    runtime filters: RF000 -> p_partkey, RF001 -> p_retailprice
+   row-size=188B cardinality=200.00K
 ====
 # IMPALA-6286: Runtime filter must not be assigned at scan 01 because that could
 # alter the query results due to the coalesce() in the join condition of join 04.
@@ -1467,18 +1728,23 @@ PLAN-ROOT SINK
 |
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: coalesce(t2.id + 10, 100) = `$a$1`.`$c$1`
+|  row-size=12B cardinality=32
 |
 |--02:UNION
 |     constant-operands=1
+|     row-size=1B cardinality=1
 |
 03:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t1.int_col = t2.int_col
+|  row-size=12B cardinality=32
 |
 |--01:SCAN HDFS [functional.alltypestiny t2]
 |     partitions=4/4 files=4 size=460B
+|     row-size=8B cardinality=8
 |
 00:SCAN HDFS [functional.alltypestiny t1]
    partitions=4/4 files=4 size=460B
+   row-size=4B cardinality=8
 ====
 # IMPALA-6286: Same as above but with an inline view.
 select /* +straight_join */ 1 from functional.alltypestiny t1
@@ -1490,18 +1756,23 @@ PLAN-ROOT SINK
 |
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: coalesce(t2.id + 10, 100) = `$a$1`.`$c$1`
+|  row-size=12B cardinality=32
 |
 |--02:UNION
 |     constant-operands=1
+|     row-size=1B cardinality=1
 |
 03:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t1.int_col = t2.int_col
+|  row-size=12B cardinality=32
 |
 |--01:SCAN HDFS [functional.alltypestiny t2]
 |     partitions=4/4 files=4 size=460B
+|     row-size=8B cardinality=8
 |
 00:SCAN HDFS [functional.alltypestiny t1]
    partitions=4/4 files=4 size=460B
+   row-size=4B cardinality=8
 ====
 # IMPALA-6286: The runtime filter produced by inner join 05 can safely be assigned
 # at scan 01. It would also be safe to produce a runtime filter at join 06 and assign
@@ -1517,24 +1788,31 @@ PLAN-ROOT SINK
 |
 06:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: coalesce(t2.id + 10, 100) = `$a$1`.`$c$1`
+|  row-size=16B cardinality=32
 |
 |--03:UNION
 |     constant-operands=1
+|     row-size=1B cardinality=1
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: t2.id = t3.id
 |  runtime filters: RF000 <- t3.id
+|  row-size=16B cardinality=32
 |
 |--02:SCAN HDFS [functional.alltypestiny t3]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 04:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t1.int_col = t2.int_col
+|  row-size=12B cardinality=32
 |
 |--01:SCAN HDFS [functional.alltypestiny t2]
 |     partitions=4/4 files=4 size=460B
 |     runtime filters: RF000 -> t2.id
+|     row-size=8B cardinality=8
 |
 00:SCAN HDFS [functional.alltypestiny t1]
    partitions=4/4 files=4 size=460B
+   row-size=4B cardinality=8
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
index 5188d25..64c654b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
@@ -10,74 +10,91 @@ PLAN-ROOT SINK
 |
 07:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: a.bool_col = d.bool_col, a.year = d.year
 |  runtime filters: RF000 <- d.bool_col, RF001 <- d.year
+|  row-size=74B cardinality=16.21G
 |
 |--03:SCAN HDFS [functional.alltypes d]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=5B cardinality=7.30K
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = c.int_col, a.month = c.month
 |  runtime filters: RF004 <- c.int_col, RF005 <- c.month
+|  row-size=69B cardinality=4.44M
 |
 |--02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
 |  runtime filters: RF008 <- b.id, RF009 <- b.date_string_col
+|  row-size=61B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=24B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> a.bool_col, RF001 -> a.year, RF004 -> a.int_col, RF005 -> a.month, RF008 -> a.id, RF009 -> a.date_string_col
+   row-size=37B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 13:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
+|  row-size=8B cardinality=1
 |
 12:EXCHANGE [UNPARTITIONED]
 |
 07:AGGREGATE
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: a.bool_col = d.bool_col, a.year = d.year
 |  runtime filters: RF000 <- d.bool_col, RF001 <- d.year
+|  row-size=74B cardinality=16.21G
 |
 |--11:EXCHANGE [HASH(d.bool_col,d.year)]
 |  |
 |  03:SCAN HDFS [functional.alltypes d]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=5B cardinality=7.30K
 |
 10:EXCHANGE [HASH(a.bool_col,a.year)]
 |
 05:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.int_col = c.int_col, a.month = c.month
 |  runtime filters: RF004 <- c.int_col, RF005 <- c.month
+|  row-size=69B cardinality=4.44M
 |
 |--09:EXCHANGE [BROADCAST]
 |  |
 |  02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
 |  runtime filters: RF008 <- b.id, RF009 <- b.date_string_col
+|  row-size=61B cardinality=7.30K
 |
 |--08:EXCHANGE [BROADCAST]
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=24B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> a.bool_col, RF001 -> a.year, RF004 -> a.int_col, RF005 -> a.month, RF008 -> a.id, RF009 -> a.date_string_col
+   row-size=37B cardinality=7.30K
 ====
 # Keep only MAX_NUM_RUNTIME_FILTERS most selective filters, remove the rest.
 # In this query RF000 (<- d.bool_col) and RF001 (<- d.year) are the least selective
@@ -95,43 +112,52 @@ PLAN-ROOT SINK
 |
 13:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
+|  row-size=8B cardinality=1
 |
 12:EXCHANGE [UNPARTITIONED]
 |
 07:AGGREGATE
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|  row-size=74B cardinality=16.21G
 |
 |--11:EXCHANGE [HASH(d.bool_col,d.year)]
 |  |
 |  03:SCAN HDFS [functional.alltypes d]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=5B cardinality=7.30K
 |
 10:EXCHANGE [HASH(a.bool_col,a.year)]
 |
 05:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.int_col = c.int_col, a.month = c.month
 |  runtime filters: RF004 <- c.int_col, RF005 <- c.month
+|  row-size=69B cardinality=4.44M
 |
 |--09:EXCHANGE [BROADCAST]
 |  |
 |  02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
 |  runtime filters: RF008 <- b.id, RF009 <- b.date_string_col
+|  row-size=61B cardinality=7.30K
 |
 |--08:EXCHANGE [BROADCAST]
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=24B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF008 -> a.id, RF009 -> a.date_string_col, RF004 -> a.int_col, RF005 -> a.month
+   row-size=37B cardinality=7.30K
 ====
 # DISABLE_ROW_RUNTIME_FILTERING is set: only partition column filters are applied.
 select /* +straight_join */ count(*) from functional.alltypes a
@@ -147,30 +173,38 @@ PLAN-ROOT SINK
 |
 07:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: a.bool_col = d.bool_col, a.year = d.year
 |  runtime filters: RF001 <- d.year
+|  row-size=74B cardinality=16.21G
 |
 |--03:SCAN HDFS [functional.alltypes d]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=5B cardinality=7.30K
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = c.int_col, a.month = c.month
 |  runtime filters: RF005 <- c.month
+|  row-size=69B cardinality=4.44M
 |
 |--02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|  row-size=61B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=24B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF001 -> a.year, RF005 -> a.month
+   row-size=37B cardinality=7.30K
 ====
 # DISABLE_ROW_RUNTIME_FILTERING is set and MAX_NUM_RUNTIME_FILTERS is set to 2: only the 2
 # partition column filters are applied
@@ -188,30 +222,38 @@ PLAN-ROOT SINK
 |
 07:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: a.bool_col = d.bool_col, a.year = d.year
 |  runtime filters: RF001 <- d.year
+|  row-size=74B cardinality=16.21G
 |
 |--03:SCAN HDFS [functional.alltypes d]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=5B cardinality=7.30K
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = c.int_col, a.month = c.month
 |  runtime filters: RF005 <- c.month
+|  row-size=69B cardinality=4.44M
 |
 |--02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|  row-size=61B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=24B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF001 -> a.year, RF005 -> a.month
+   row-size=37B cardinality=7.30K
 ====
 # RUNTIME_FILTER_MODE is set to LOCAL: only local filters are applied
 select /* +straight_join */ count(*) from functional.alltypes a
@@ -227,73 +269,90 @@ PLAN-ROOT SINK
 |
 07:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: a.bool_col = d.bool_col, a.year = d.year
 |  runtime filters: RF000 <- d.bool_col, RF001 <- d.year
+|  row-size=74B cardinality=16.21G
 |
 |--03:SCAN HDFS [functional.alltypes d]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=5B cardinality=7.30K
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = c.int_col, a.month = c.month
 |  runtime filters: RF004 <- c.int_col, RF005 <- c.month
+|  row-size=69B cardinality=4.44M
 |
 |--02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
 |  runtime filters: RF008 <- b.id, RF009 <- b.date_string_col
+|  row-size=61B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=24B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> a.bool_col, RF001 -> a.year, RF004 -> a.int_col, RF005 -> a.month, RF008 -> a.id, RF009 -> a.date_string_col
+   row-size=37B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 13:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
+|  row-size=8B cardinality=1
 |
 12:EXCHANGE [UNPARTITIONED]
 |
 07:AGGREGATE
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|  row-size=74B cardinality=16.21G
 |
 |--11:EXCHANGE [HASH(d.bool_col,d.year)]
 |  |
 |  03:SCAN HDFS [functional.alltypes d]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=5B cardinality=7.30K
 |
 10:EXCHANGE [HASH(a.bool_col,a.year)]
 |
 05:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.int_col = c.int_col, a.month = c.month
 |  runtime filters: RF004 <- c.int_col, RF005 <- c.month
+|  row-size=69B cardinality=4.44M
 |
 |--09:EXCHANGE [BROADCAST]
 |  |
 |  02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
 |  runtime filters: RF008 <- b.id, RF009 <- b.date_string_col
+|  row-size=61B cardinality=7.30K
 |
 |--08:EXCHANGE [BROADCAST]
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=24B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF004 -> a.int_col, RF005 -> a.month, RF008 -> a.id, RF009 -> a.date_string_col
+   row-size=37B cardinality=7.30K
 ====
 # RUNTIME_FILTER_MODE is set to LOCAL and MAX_NUM_RUNTIME_FILTERS is set to 3: only 3
 # local filters are kept, which means that both local and non-local filters are removed
@@ -312,72 +371,89 @@ PLAN-ROOT SINK
 |
 07:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|  row-size=74B cardinality=16.21G
 |
 |--03:SCAN HDFS [functional.alltypes d]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=5B cardinality=7.30K
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = c.int_col, a.month = c.month
 |  runtime filters: RF004 <- c.int_col
+|  row-size=69B cardinality=4.44M
 |
 |--02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
 |  runtime filters: RF008 <- b.id, RF009 <- b.date_string_col
+|  row-size=61B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=24B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF008 -> a.id, RF009 -> a.date_string_col, RF004 -> a.int_col
+   row-size=37B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 13:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
+|  row-size=8B cardinality=1
 |
 12:EXCHANGE [UNPARTITIONED]
 |
 07:AGGREGATE
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|  row-size=74B cardinality=16.21G
 |
 |--11:EXCHANGE [HASH(d.bool_col,d.year)]
 |  |
 |  03:SCAN HDFS [functional.alltypes d]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=5B cardinality=7.30K
 |
 10:EXCHANGE [HASH(a.bool_col,a.year)]
 |
 05:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.int_col = c.int_col, a.month = c.month
 |  runtime filters: RF004 <- c.int_col
+|  row-size=69B cardinality=4.44M
 |
 |--09:EXCHANGE [BROADCAST]
 |  |
 |  02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
 |  runtime filters: RF008 <- b.id, RF009 <- b.date_string_col
+|  row-size=61B cardinality=7.30K
 |
 |--08:EXCHANGE [BROADCAST]
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=24B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF008 -> a.id, RF009 -> a.date_string_col, RF004 -> a.int_col
+   row-size=37B cardinality=7.30K
 ====
 # DISABLE_ROW_RUNTIME_FILTERING is set and RUNTIME_FILTER_MODE is set to LOCAL: only local
 # partition column filters are applied
@@ -395,71 +471,88 @@ PLAN-ROOT SINK
 |
 07:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: a.bool_col = d.bool_col, a.year = d.year
 |  runtime filters: RF001 <- d.year
+|  row-size=74B cardinality=16.21G
 |
 |--03:SCAN HDFS [functional.alltypes d]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=5B cardinality=7.30K
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = c.int_col, a.month = c.month
 |  runtime filters: RF005 <- c.month
+|  row-size=69B cardinality=4.44M
 |
 |--02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|  row-size=61B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=24B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF001 -> a.year, RF005 -> a.month
+   row-size=37B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 13:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
+|  row-size=8B cardinality=1
 |
 12:EXCHANGE [UNPARTITIONED]
 |
 07:AGGREGATE
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|  row-size=74B cardinality=16.21G
 |
 |--11:EXCHANGE [HASH(d.bool_col,d.year)]
 |  |
 |  03:SCAN HDFS [functional.alltypes d]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=5B cardinality=7.30K
 |
 10:EXCHANGE [HASH(a.bool_col,a.year)]
 |
 05:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.int_col = c.int_col, a.month = c.month
 |  runtime filters: RF005 <- c.month
+|  row-size=69B cardinality=4.44M
 |
 |--09:EXCHANGE [BROADCAST]
 |  |
 |  02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|  row-size=61B cardinality=7.30K
 |
 |--08:EXCHANGE [BROADCAST]
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=24B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF005 -> a.month
+   row-size=37B cardinality=7.30K
 ====
 # RUNTIME_FILTER_MODE is OFF: no filters are applied
 select /* +straight_join */ count(*) from functional.alltypes a
@@ -475,27 +568,35 @@ PLAN-ROOT SINK
 |
 07:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|  row-size=74B cardinality=16.21G
 |
 |--03:SCAN HDFS [functional.alltypes d]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=5B cardinality=7.30K
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = c.int_col, a.month = c.month
+|  row-size=69B cardinality=4.44M
 |
 |--02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|  row-size=61B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=24B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
+   row-size=37B cardinality=7.30K
 ====
 # MAX_NUM_RUNTIME_FILTERS is 0: no filters are applied
 select /* +straight_join */ count(*) from functional.alltypes a
@@ -511,27 +612,35 @@ PLAN-ROOT SINK
 |
 07:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|  row-size=74B cardinality=16.21G
 |
 |--03:SCAN HDFS [functional.alltypes d]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=5B cardinality=7.30K
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = c.int_col, a.month = c.month
+|  row-size=69B cardinality=4.44M
 |
 |--02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|  row-size=61B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=24B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
+   row-size=37B cardinality=7.30K
 ====
 # DISABLE_ROW_RUNTIME_FILTERING completely disables filters for Kudu.
 select /* +straight_join */ count(*) from functional_kudu.alltypes a
@@ -543,11 +652,15 @@ PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id
+|  row-size=8B cardinality=7.30K
 |
 |--01:SCAN KUDU [functional_kudu.alltypes b]
+|     row-size=4B cardinality=7.30K
 |
 00:SCAN KUDU [functional_kudu.alltypes a]
+   row-size=4B cardinality=7.30K
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/shuffle-by-distinct-exprs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/shuffle-by-distinct-exprs.test b/testdata/workloads/functional-planner/queries/PlannerTest/shuffle-by-distinct-exprs.test
index 74f09c3..babc89d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/shuffle-by-distinct-exprs.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/shuffle-by-distinct-exprs.test
@@ -7,22 +7,27 @@ PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
 |  output: count:merge(int_col)
+|  row-size=8B cardinality=1
 |
 05:EXCHANGE [UNPARTITIONED]
 |
 02:AGGREGATE
 |  output: count(int_col)
+|  row-size=8B cardinality=1
 |
 04:AGGREGATE
 |  group by: int_col
+|  row-size=4B cardinality=10
 |
 03:EXCHANGE [HASH(int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: int_col
+|  row-size=4B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ====
 select count(distinct int_col) from functional.alltypes;
 ---- QUERYOPTIONS
@@ -34,22 +39,27 @@ PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
 |  output: count:merge(int_col)
+|  row-size=8B cardinality=1
 |
 05:EXCHANGE [UNPARTITIONED]
 |
 02:AGGREGATE
 |  output: count(int_col)
+|  row-size=8B cardinality=1
 |
 04:AGGREGATE
 |  group by: int_col
+|  row-size=4B cardinality=10
 |
 03:EXCHANGE [HASH(int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: int_col
+|  row-size=4B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ====
 # Distinct agg with a grouping expr
 select count(distinct int_col) from functional.alltypes group by year;
@@ -63,17 +73,21 @@ PLAN-ROOT SINK
 02:AGGREGATE [FINALIZE]
 |  output: count(int_col)
 |  group by: year
+|  row-size=12B cardinality=2
 |
 04:AGGREGATE
 |  group by: year, int_col
+|  row-size=8B cardinality=20
 |
 03:EXCHANGE [HASH(year)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: year, int_col
+|  row-size=8B cardinality=20
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=8B cardinality=7.30K
 ====
 select count(distinct int_col) from functional.alltypes group by year;
 ---- QUERYOPTIONS
@@ -87,23 +101,28 @@ PLAN-ROOT SINK
 06:AGGREGATE [FINALIZE]
 |  output: count:merge(int_col)
 |  group by: year
+|  row-size=12B cardinality=2
 |
 05:EXCHANGE [HASH(year)]
 |
 02:AGGREGATE [STREAMING]
 |  output: count(int_col)
 |  group by: year
+|  row-size=12B cardinality=2
 |
 04:AGGREGATE
 |  group by: year, int_col
+|  row-size=8B cardinality=20
 |
 03:EXCHANGE [HASH(year,int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: year, int_col
+|  row-size=8B cardinality=20
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=8B cardinality=7.30K
 ====
 # Distinct agg without a grouping expr and with a compatible child partition
 select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
@@ -115,29 +134,35 @@ PLAN-ROOT SINK
 |
 08:AGGREGATE [FINALIZE]
 |  output: count:merge(a.int_col)
+|  row-size=8B cardinality=1
 |
 07:EXCHANGE [UNPARTITIONED]
 |
 04:AGGREGATE
 |  output: count(a.int_col)
+|  row-size=8B cardinality=1
 |
 03:AGGREGATE
 |  group by: a.int_col
+|  row-size=4B cardinality=10
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: a.int_col = b.int_col
 |  runtime filters: RF000 <- b.int_col
+|  row-size=8B cardinality=5.33M
 |
 |--06:EXCHANGE [HASH(b.int_col)]
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 05:EXCHANGE [HASH(a.int_col)]
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> a.int_col
+   row-size=4B cardinality=7.30K
 ====
 select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
     functional.alltypes b on a.int_col = b.int_col;
@@ -150,29 +175,35 @@ PLAN-ROOT SINK
 |
 08:AGGREGATE [FINALIZE]
 |  output: count:merge(a.int_col)
+|  row-size=8B cardinality=1
 |
 07:EXCHANGE [UNPARTITIONED]
 |
 04:AGGREGATE
 |  output: count(a.int_col)
+|  row-size=8B cardinality=1
 |
 03:AGGREGATE
 |  group by: a.int_col
+|  row-size=4B cardinality=10
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: a.int_col = b.int_col
 |  runtime filters: RF000 <- b.int_col
+|  row-size=8B cardinality=5.33M
 |
 |--06:EXCHANGE [HASH(b.int_col)]
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 05:EXCHANGE [HASH(a.int_col)]
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> a.int_col
+   row-size=4B cardinality=7.30K
 ====
 # Distinct agg with a grouping expr and a compatible child partition
 select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
@@ -187,24 +218,29 @@ PLAN-ROOT SINK
 04:AGGREGATE [FINALIZE]
 |  output: count(a.int_col)
 |  group by: a.year
+|  row-size=12B cardinality=2
 |
 03:AGGREGATE
 |  group by: a.year, a.int_col
+|  row-size=8B cardinality=20
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: a.year = b.year
 |  runtime filters: RF000 <- b.year
+|  row-size=12B cardinality=26.64M
 |
 |--06:EXCHANGE [HASH(b.year)]
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 05:EXCHANGE [HASH(a.year)]
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> a.year
+   row-size=8B cardinality=7.30K
 ====
 select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
     functional.alltypes b on a.year = b.year group by a.year;
@@ -219,35 +255,42 @@ PLAN-ROOT SINK
 10:AGGREGATE [FINALIZE]
 |  output: count:merge(a.int_col)
 |  group by: a.year
+|  row-size=12B cardinality=2
 |
 09:EXCHANGE [HASH(a.year)]
 |
 04:AGGREGATE [STREAMING]
 |  output: count(a.int_col)
 |  group by: a.year
+|  row-size=12B cardinality=2
 |
 08:AGGREGATE
 |  group by: a.year, a.int_col
+|  row-size=8B cardinality=20
 |
 07:EXCHANGE [HASH(a.year,a.int_col)]
 |
 03:AGGREGATE [STREAMING]
 |  group by: a.year, a.int_col
+|  row-size=8B cardinality=20
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: a.year = b.year
 |  runtime filters: RF000 <- b.year
+|  row-size=12B cardinality=26.64M
 |
 |--06:EXCHANGE [HASH(b.year)]
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 05:EXCHANGE [HASH(a.year)]
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> a.year
+   row-size=8B cardinality=7.30K
 ====
 # The input is partitioned by distinct exprs + grouping exprs
 select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
@@ -264,30 +307,36 @@ PLAN-ROOT SINK
 08:AGGREGATE [FINALIZE]
 |  output: count:merge(a.int_col)
 |  group by: a.year
+|  row-size=12B cardinality=2
 |
 07:EXCHANGE [HASH(a.year)]
 |
 04:AGGREGATE [STREAMING]
 |  output: count(a.int_col)
 |  group by: a.year
+|  row-size=12B cardinality=2
 |
 03:AGGREGATE
 |  group by: a.year, a.int_col
+|  row-size=8B cardinality=20
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: a.int_col = b.int_col, a.year = b.year
 |  runtime filters: RF000 <- b.int_col, RF001 <- b.year
+|  row-size=16B cardinality=5.33M
 |
 |--06:EXCHANGE [HASH(b.int_col,b.year)]
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 05:EXCHANGE [HASH(a.int_col,a.year)]
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> a.int_col, RF001 -> a.year
+   row-size=8B cardinality=7.30K
 ====
 select count(distinct a.int_col) from functional.alltypes a inner join [shuffle]
    functional.alltypes b on a.year = b.year and a.int_col = b.int_col group by a.year;
@@ -303,27 +352,33 @@ PLAN-ROOT SINK
 04:AGGREGATE [FINALIZE]
 |  output: count(a.int_col)
 |  group by: a.year
+|  row-size=12B cardinality=2
 |
 08:AGGREGATE
 |  group by: a.year, a.int_col
+|  row-size=8B cardinality=20
 |
 07:EXCHANGE [HASH(a.year)]
 |
 03:AGGREGATE [STREAMING]
 |  group by: a.year, a.int_col
+|  row-size=8B cardinality=20
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: a.int_col = b.int_col, a.year = b.year
 |  runtime filters: RF000 <- b.int_col, RF001 <- b.year
+|  row-size=16B cardinality=5.33M
 |
 |--06:EXCHANGE [HASH(b.int_col,b.year)]
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 05:EXCHANGE [HASH(a.int_col,a.year)]
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> a.int_col, RF001 -> a.year
+   row-size=8B cardinality=7.30K
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
index 2c91472..e60e032 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
@@ -3,14 +3,16 @@ select * from functional_seq.alltypes t1 limit 5
 PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_seq.alltypes t1]
-   partitions=24/24 files=24 size=562.59KB
+   partitions=24/24 files=24 size=557.47KB
    limit: 5
+   row-size=80B cardinality=5
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_seq.alltypes t1]
-   partitions=24/24 files=24 size=562.59KB
+   partitions=24/24 files=24 size=557.47KB
    limit: 5
+   row-size=80B cardinality=5
 ====
 # Query is over the limit of 8 rows to be optimized, will distribute the query
 select * from functional.alltypes t1 limit 10
@@ -20,6 +22,7 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    limit: 10
+   row-size=89B cardinality=10
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -29,6 +32,7 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    limit: 10
+   row-size=89B cardinality=10
 ====
 # Query is optimized, run on coordinator only
 select * from functional.alltypes t1 limit 5
@@ -38,12 +42,14 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    limit: 5
+   row-size=89B cardinality=5
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    limit: 5
+   row-size=89B cardinality=5
 ====
 # If a predicate is applied the optimization is disabled
 select * from functional.alltypes t1 where t1.id < 99 limit 5
@@ -54,6 +60,7 @@ PLAN-ROOT SINK
    partitions=24/24 files=24 size=478.45KB
    predicates: t1.id < 99
    limit: 5
+   row-size=89B cardinality=5
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -64,6 +71,7 @@ PLAN-ROOT SINK
    partitions=24/24 files=24 size=478.45KB
    predicates: t1.id < 99
    limit: 5
+   row-size=89B cardinality=5
 ====
 # No optimization for hbase tables
 select * from functional_hbase.alltypes t1 where t1.id < 99 limit 5
@@ -73,6 +81,7 @@ PLAN-ROOT SINK
 00:SCAN HBASE [functional_hbase.alltypes t1]
    predicates: t1.id < 99
    limit: 5
+   row-size=80B cardinality=5
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -82,6 +91,7 @@ PLAN-ROOT SINK
 00:SCAN HBASE [functional_hbase.alltypes t1]
    predicates: t1.id < 99
    limit: 5
+   row-size=80B cardinality=5
 ====
 # Applies optimization for small queries in hbase
 select * from functional_hbase.alltypes t1 limit 5
@@ -90,11 +100,13 @@ PLAN-ROOT SINK
 |
 00:SCAN HBASE [functional_hbase.alltypes t1]
    limit: 5
+   row-size=80B cardinality=5
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 00:SCAN HBASE [functional_hbase.alltypes t1]
    limit: 5
+   row-size=80B cardinality=5
 ====
 insert into
   functional_hbase.alltypes
@@ -105,11 +117,13 @@ WRITE TO HBASE table=functional_hbase.alltypes
 |
 00:UNION
    constant-operands=1
+   row-size=57B cardinality=1
 ---- DISTRIBUTEDPLAN
 WRITE TO HBASE table=functional_hbase.alltypes
 |
 00:UNION
    constant-operands=1
+   row-size=57B cardinality=1
 ====
 create table tm as select * from functional_hbase.alltypes limit 5
 ---- PLAN
@@ -118,12 +132,14 @@ WRITE TO HDFS [default.tm, OVERWRITE=false]
 |
 00:SCAN HBASE [functional_hbase.alltypes]
    limit: 5
+   row-size=80B cardinality=5
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [default.tm, OVERWRITE=false]
 |  partitions=1
 |
 00:SCAN HBASE [functional_hbase.alltypes]
    limit: 5
+   row-size=80B cardinality=5
 ====
 create table tm as select * from functional_hbase.alltypes limit 50
 ---- PLAN
@@ -132,6 +148,7 @@ WRITE TO HDFS [default.tm, OVERWRITE=false]
 |
 00:SCAN HBASE [functional_hbase.alltypes]
    limit: 50
+   row-size=80B cardinality=50
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [default.tm, OVERWRITE=false]
 |  partitions=1
@@ -141,6 +158,7 @@ WRITE TO HDFS [default.tm, OVERWRITE=false]
 |
 00:SCAN HBASE [functional_hbase.alltypes]
    limit: 50
+   row-size=80B cardinality=50
 ====
 select * from functional_hbase.alltypes limit 5
 union all
@@ -150,23 +168,29 @@ PLAN-ROOT SINK
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=80B cardinality=7
 |
 |--02:SCAN HBASE [functional_hbase.alltypes]
 |     limit: 2
+|     row-size=80B cardinality=2
 |
 01:SCAN HBASE [functional_hbase.alltypes]
    limit: 5
+   row-size=80B cardinality=5
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=80B cardinality=7
 |
 |--02:SCAN HBASE [functional_hbase.alltypes]
 |     limit: 2
+|     row-size=80B cardinality=2
 |
 01:SCAN HBASE [functional_hbase.alltypes]
    limit: 5
+   row-size=80B cardinality=5
 ====
 select * from functional_hbase.alltypes limit 5
 union all
@@ -176,29 +200,35 @@ PLAN-ROOT SINK
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=80B cardinality=10
 |
 |--02:SCAN HBASE [functional_hbase.alltypes]
 |     limit: 5
+|     row-size=80B cardinality=5
 |
 01:SCAN HBASE [functional_hbase.alltypes]
    limit: 5
+   row-size=80B cardinality=5
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=80B cardinality=10
 |
 |--04:EXCHANGE [UNPARTITIONED]
 |  |  limit: 5
 |  |
 |  02:SCAN HBASE [functional_hbase.alltypes]
 |     limit: 5
+|     row-size=80B cardinality=5
 |
 03:EXCHANGE [UNPARTITIONED]
 |  limit: 5
 |
 01:SCAN HBASE [functional_hbase.alltypes]
    limit: 5
+   row-size=80B cardinality=5
 ====
 # Two scans cannot run in the same fragment. IMPALA-561
 select * from
@@ -211,15 +241,18 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.id = b.id
 |  runtime filters: RF000 <- b.id
+|  row-size=48B cardinality=0
 |
 |--03:EXCHANGE [BROADCAST]
 |  |
 |  01:SCAN HDFS [functional.testtbl b]
 |     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
 |
 00:SCAN HDFS [functional.testtbl a]
    partitions=1/1 files=0 size=0B
    runtime filters: RF000 -> a.id
+   row-size=24B cardinality=0
 ====
 select * from
   functional.testtbl a, functional.testtbl b
@@ -229,14 +262,17 @@ PLAN-ROOT SINK
 04:EXCHANGE [UNPARTITIONED]
 |
 02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=48B cardinality=0
 |
 |--03:EXCHANGE [BROADCAST]
 |  |
 |  01:SCAN HDFS [functional.testtbl b]
 |     partitions=1/1 files=0 size=0B
+|     row-size=24B cardinality=0
 |
 00:SCAN HDFS [functional.testtbl a]
    partitions=1/1 files=0 size=0B
+   row-size=24B cardinality=0
 ====
 select * from
   functional.alltypestiny a
@@ -251,6 +287,7 @@ PLAN-ROOT SINK
 |  hash predicates: a.id = id
 |  runtime filters: RF000 <- id
 |  limit: 5
+|  row-size=89B cardinality=5
 |
 |--04:EXCHANGE [BROADCAST]
 |  |
@@ -260,10 +297,12 @@ PLAN-ROOT SINK
 |  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |     limit: 5
+|     row-size=4B cardinality=5
 |
 00:SCAN HDFS [functional.alltypestiny a]
    partitions=4/4 files=4 size=460B
    runtime filters: RF000 -> a.id
+   row-size=89B cardinality=8
 ====
 # Test correct single-node planning for mixed union distinct/all (IMPALA-1553).
 select
@@ -283,20 +322,29 @@ PLAN-ROOT SINK
 |
 04:UNION
 |  pass-through-operands: 03
+|  row-size=5B cardinality=6
 |
 |--05:SCAN HDFS [functional.alltypestiny c]
+|     partition predicates: year = 2009, month = 2
 |     partitions=1/4 files=1 size=115B
+|     row-size=5B cardinality=2
 |
 03:AGGREGATE [FINALIZE]
 |  group by: id, bool_col
+|  row-size=5B cardinality=4
 |
 00:UNION
+|  row-size=5B cardinality=4
 |
 |--02:SCAN HDFS [functional.alltypestiny b]
+|     partition predicates: year = 2009, month = 1
 |     partitions=1/4 files=1 size=115B
+|     row-size=5B cardinality=2
 |
 01:SCAN HDFS [functional.alltypestiny a]
+   partition predicates: year = 2009, month = 1
    partitions=1/4 files=1 size=115B
+   row-size=5B cardinality=2
 ====
 # IMPALA-2527: Tests that the small query optimization is disabled for colleciton types
 select key from functional.allcomplextypes.map_map_col.value limit 5;
@@ -309,4 +357,5 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional.allcomplextypes.map_map_col.value]
    partitions=0/0 files=0 size=0B
    limit: 5
+   row-size=12B cardinality=0
 ====