You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/01/12 18:19:13 UTC

[14/26] impala git commit: IMPALA-8021: Add estimated cardinality to EXPLAIN output

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
index 1afe61c..85962ce 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
@@ -13,79 +13,101 @@ PLAN-ROOT SINK
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=8B cardinality=4
 |
 |--08:AGGREGATE [FINALIZE]
 |  |  output: count(*)
+|  |  row-size=8B cardinality=1
 |  |
 |  07:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=0B cardinality=7.30K
 |
 |--06:AGGREGATE [FINALIZE]
 |  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |  row-size=8B cardinality=1
 |  |
 |  05:SCAN HDFS [functional_parquet.alltypes]
-|     partitions=24/24 files=24 size=178.13KB
+|     partitions=24/24 files=24 size=189.28KB
+|     row-size=8B cardinality=unavailable
 |
 |--04:AGGREGATE [FINALIZE]
 |  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |  row-size=8B cardinality=1
 |  |
 |  03:SCAN HDFS [functional_parquet.alltypes]
-|     partitions=24/24 files=24 size=178.13KB
+|     partitions=24/24 files=24 size=189.28KB
+|     row-size=8B cardinality=unavailable
 |
 02:AGGREGATE [FINALIZE]
 |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  row-size=8B cardinality=1
 |
 01:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=8B cardinality=unavailable
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=8B cardinality=4
 |
 |--16:AGGREGATE [FINALIZE]
 |  |  output: count:merge(*)
+|  |  row-size=8B cardinality=1
 |  |
 |  15:EXCHANGE [UNPARTITIONED]
 |  |
 |  08:AGGREGATE
 |  |  output: count(*)
+|  |  row-size=8B cardinality=1
 |  |
 |  07:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=0B cardinality=7.30K
 |
 |--14:AGGREGATE [FINALIZE]
 |  |  output: count:merge(*)
+|  |  row-size=8B cardinality=1
 |  |
 |  13:EXCHANGE [UNPARTITIONED]
 |  |
 |  06:AGGREGATE
 |  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |  row-size=8B cardinality=1
 |  |
 |  05:SCAN HDFS [functional_parquet.alltypes]
-|     partitions=24/24 files=24 size=178.13KB
+|     partitions=24/24 files=24 size=189.28KB
+|     row-size=8B cardinality=unavailable
 |
 |--12:AGGREGATE [FINALIZE]
 |  |  output: count:merge(*)
+|  |  row-size=8B cardinality=1
 |  |
 |  11:EXCHANGE [UNPARTITIONED]
 |  |
 |  04:AGGREGATE
 |  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |  row-size=8B cardinality=1
 |  |
 |  03:SCAN HDFS [functional_parquet.alltypes]
-|     partitions=24/24 files=24 size=178.13KB
+|     partitions=24/24 files=24 size=189.28KB
+|     row-size=8B cardinality=unavailable
 |
 10:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
+|  row-size=8B cardinality=1
 |
 09:EXCHANGE [UNPARTITIONED]
 |
 02:AGGREGATE
 |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  row-size=8B cardinality=1
 |
 01:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=8B cardinality=unavailable
 ====
 # Verify that the parquet count(*) optimization is applied even if there is more than
 # one item in the select list.
@@ -95,9 +117,11 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=8B cardinality=unavailable
 ====
 # Select count(<partition col>) - the optimization should be disabled because it's not a
 # count(<literal>) or count(*) aggregate function.
@@ -107,9 +131,11 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(year)
+|  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=4B cardinality=unavailable
 ====
 # Group by partition columns.
 select month, count(*) from functional_parquet.alltypes group by month, year
@@ -119,9 +145,11 @@ PLAN-ROOT SINK
 01:AGGREGATE [FINALIZE]
 |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
 |  group by: month, year
+|  row-size=16B cardinality=24
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=16B cardinality=unavailable
 ====
 # The optimization is disabled because tinyint_col is not a partition col.
 select tinyint_col, count(*) from functional_parquet.alltypes group by tinyint_col, year
@@ -131,9 +159,11 @@ PLAN-ROOT SINK
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: tinyint_col, year
+|  row-size=13B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=5B cardinality=unavailable
 ====
 # The optimization is disabled because there are two aggregate functions.
 select avg(year), count(*) from functional_parquet.alltypes
@@ -142,9 +172,11 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: avg(year), count(*)
+|  row-size=16B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=4B cardinality=unavailable
 ====
 # Optimization is not applied because the inner count(*) is not materialized. The outer
 # count(*) does not reference a base table.
@@ -154,11 +186,14 @@ PLAN-ROOT SINK
 |
 02:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 01:AGGREGATE [FINALIZE]
+|  row-size=0B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=0B cardinality=unavailable
 ====
 # The optimization is applied if count(*) is in the having clause.
 select 1 from functional_parquet.alltypes having count(*) > 1
@@ -168,9 +203,11 @@ PLAN-ROOT SINK
 01:AGGREGATE [FINALIZE]
 |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
 |  having: count(*) > 1
+|  row-size=8B cardinality=0
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=8B cardinality=unavailable
 ====
 # The count(*) optimization is applied in the inline view.
 select count(*), count(a) from (select count(1) as a from functional_parquet.alltypes) t
@@ -179,12 +216,15 @@ PLAN-ROOT SINK
 |
 02:AGGREGATE [FINALIZE]
 |  output: count(*), count(count(*))
+|  row-size=16B cardinality=1
 |
 01:AGGREGATE [FINALIZE]
 |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=8B cardinality=unavailable
 ====
 # The count(*) optimization is applied to the inline view even if there is a join.
 select *
@@ -197,17 +237,21 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: x.id = count(*)
 |  runtime filters: RF000 <- count(*)
+|  row-size=101B cardinality=7.30K
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
 |  |  group by: year
+|  |  row-size=12B cardinality=2
 |  |
 |  01:SCAN HDFS [functional_parquet.alltypes]
-|     partitions=24/24 files=24 size=178.13KB
+|     partitions=24/24 files=24 size=189.28KB
+|     row-size=12B cardinality=unavailable
 |
 00:SCAN HDFS [functional.alltypes x]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> x.id
+   row-size=89B cardinality=7.30K
 ====
 # The count(*) optimization is not applied if there is more than 1 table ref.
 select count(*) from functional_parquet.alltypes a, functional_parquet.alltypes b
@@ -216,14 +260,18 @@ PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=0B cardinality=unavailable
 |
 |--01:SCAN HDFS [functional_parquet.alltypes b]
-|     partitions=24/24 files=24 size=178.13KB
+|     partitions=24/24 files=24 size=189.28KB
+|     row-size=0B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypes a]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=0B cardinality=unavailable
 ====
 # The count(*) optimization is applied if there are predicates on partition columns.
 select count(1) from functional_parquet.alltypes where year < 2010 and month > 8;
@@ -232,9 +280,12 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=4/24 files=4 size=29.75KB
+   partition predicates: year < 2010, month > 8
+   partitions=4/24 files=4 size=31.40KB
+   row-size=8B cardinality=unavailable
 ====
 # tinyint_col is not a partition column so the optimization is disabled.
 select count(1) from functional_parquet.alltypes where year < 2010 and tinyint_col > 8;
@@ -243,10 +294,13 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=12/24 files=12 size=89.05KB
+   partition predicates: year < 2010
+   partitions=12/24 files=12 size=94.74KB
    predicates: tinyint_col > 8
+   row-size=1B cardinality=unavailable
 ====
 # Optimization is applied after constant folding.
 select count(1 + 2 + 3) from functional_parquet.alltypes
@@ -255,9 +309,11 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=8B cardinality=unavailable
 ====
 # Optimization is not applied to count(null).
 select count(1 + null + 3) from functional_parquet.alltypes
@@ -268,18 +324,23 @@ PLAN-ROOT SINK
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=8B cardinality=2
 |
 |--04:AGGREGATE [FINALIZE]
 |  |  output: count(NULL)
+|  |  row-size=8B cardinality=1
 |  |
 |  03:SCAN HDFS [functional_parquet.alltypes]
-|     partitions=24/24 files=24 size=178.13KB
+|     partitions=24/24 files=24 size=189.28KB
+|     row-size=0B cardinality=unavailable
 |
 02:AGGREGATE [FINALIZE]
 |  output: count(NULL + 3)
+|  row-size=8B cardinality=1
 |
 01:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=0B cardinality=unavailable
 ====
 # Optimization is not applied when selecting from an empty table.
 select count(*) from functional_parquet.emptytable
@@ -288,9 +349,11 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=0
 |
 00:SCAN HDFS [functional_parquet.emptytable]
    partitions=0/0 files=0 size=0B
+   row-size=0B cardinality=0
 ====
 # Optimization is not applied when all partitions are pruned.
 select count(1) from functional_parquet.alltypes where year = -1
@@ -299,9 +362,12 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=0
 |
 00:SCAN HDFS [functional_parquet.alltypes]
+   partition predicates: year = -1
    partitions=0/24 files=0 size=0B
+   row-size=0B cardinality=0
 ====
 # Optimization is not applied across query blocks, even though it would be correct here.
 select count(*) from (select int_col from functional_parquet.alltypes) t
@@ -310,9 +376,11 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=0B cardinality=unavailable
 ====
 # Optimization is not applied when there is a distinct agg.
 select count(*), count(distinct 1) from functional_parquet.alltypes
@@ -321,13 +389,16 @@ PLAN-ROOT SINK
 |
 02:AGGREGATE [FINALIZE]
 |  output: count(1), count:merge(*)
+|  row-size=16B cardinality=1
 |
 01:AGGREGATE
 |  output: count(*)
 |  group by: 1
+|  row-size=9B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=189.28KB
+   row-size=0B cardinality=unavailable
 ====
 # The optimization is applied here because only the count(*) and a partition column are
 # materialized. Non-materialized agg exprs are ignored.
@@ -343,7 +414,10 @@ PLAN-ROOT SINK
 01:AGGREGATE [FINALIZE]
 |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
 |  group by: year
+|  row-size=12B cardinality=2
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=2/24 files=2 size=15.01KB
+   partition predicates: month = 1
+   partitions=2/24 files=2 size=16.06KB
+   row-size=12B cardinality=unavailable
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test b/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test
index deda7e9..bc4d740 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test
@@ -5,17 +5,21 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: min(month), max(year), ndv(day)
+|  row-size=16B cardinality=1
 |
 00:UNION
    constant-operands=11
+   row-size=12B cardinality=11
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: min(month), max(year), ndv(day)
+|  row-size=16B cardinality=1
 |
 00:UNION
    constant-operands=11
+   row-size=12B cardinality=11
 ====
 # Test with explicit distinct keyword.
 select count(distinct year), ndv(day) from functional.alltypesagg
@@ -24,25 +28,31 @@ PLAN-ROOT SINK
 |
 02:AGGREGATE [FINALIZE]
 |  output: count(year), ndv:merge(day)
+|  row-size=16B cardinality=1
 |
 01:AGGREGATE
 |  output: ndv(day)
 |  group by: year
+|  row-size=12B cardinality=1
 |
 00:UNION
    constant-operands=11
+   row-size=8B cardinality=11
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 02:AGGREGATE [FINALIZE]
 |  output: count(year), ndv:merge(day)
+|  row-size=16B cardinality=1
 |
 01:AGGREGATE
 |  output: ndv(day)
 |  group by: year
+|  row-size=12B cardinality=1
 |
 00:UNION
    constant-operands=11
+   row-size=8B cardinality=11
 ====
 # Test static partition pruning.
 select min(month), max(day) from functional.alltypesagg where year = 2010 and day = 1;
@@ -51,9 +61,11 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: min(month), max(day)
+|  row-size=8B cardinality=1
 |
 00:UNION
    constant-operands=1
+   row-size=8B cardinality=1
 ====
 # Test with cases where all partitions are pruned.
 select c1, c2 from
@@ -64,8 +76,10 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: min(year), max(month)
+|  row-size=8B cardinality=0
 |
 00:UNION
+   row-size=8B cardinality=0
 ====
 # Test with group by and having clauses.
 select ndv(month) from functional.alltypesagg group by year having max(day)=10
@@ -76,9 +90,11 @@ PLAN-ROOT SINK
 |  output: ndv(month), max(day)
 |  group by: year
 |  having: max(day) = 10
+|  row-size=16B cardinality=0
 |
 00:UNION
    constant-operands=11
+   row-size=12B cardinality=11
 ====
 # Test with group-by clauses (no aggregate expressions) only.
 select month from functional.alltypes group by month
@@ -87,9 +103,11 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  group by: month
+|  row-size=4B cardinality=12
 |
 00:UNION
    constant-operands=12
+   row-size=4B cardinality=12
 ====
 # Test with distinct select list.
 select distinct month from functional.alltypes where month % 2 = 0
@@ -98,9 +116,11 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  group by: month
+|  row-size=4B cardinality=6
 |
 00:UNION
    constant-operands=6
+   row-size=4B cardinality=6
 ====
 # Test with joins on the partition keys.
 select min(a.month)
@@ -111,31 +131,39 @@ PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: min(a.month)
+|  row-size=4B cardinality=1
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: a.year = b.year
+|  row-size=12B cardinality=24
 |
 |--01:UNION
 |     constant-operands=1
+|     row-size=4B cardinality=1
 |
 00:UNION
    constant-operands=24
+   row-size=8B cardinality=24
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: min(a.month)
+|  row-size=4B cardinality=1
 |
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.year = b.year
+|  row-size=12B cardinality=24
 |
 |--04:EXCHANGE [UNPARTITIONED]
 |  |
 |  01:UNION
 |     constant-operands=1
+|     row-size=4B cardinality=1
 |
 00:UNION
    constant-operands=24
+   row-size=8B cardinality=24
 ====
 # Test query which contains both distinct and non-distinct aggregate
 # expressions and make sure the optimization is applied when applicable.
@@ -149,20 +177,25 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: year = year
 |  runtime filters: RF000 <- year
+|  row-size=16B cardinality=4
 |
 |--01:AGGREGATE [FINALIZE]
 |  |  group by: year
+|  |  row-size=4B cardinality=2
 |  |
 |  00:UNION
 |     constant-operands=2
+|     row-size=4B cardinality=2
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(month)
 |  group by: year
+|  row-size=12B cardinality=2
 |
 02:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> functional.alltypes.year
+   row-size=8B cardinality=7.30K
 ====
 # Test queries with tableRefs which cannot be evaluated by metadata.
 select min(a.year), ndv(b.timestamp_col) from
@@ -172,14 +205,18 @@ PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: min(a.year), ndv(b.timestamp_col)
+|  row-size=12B cardinality=0
 |
 02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=20B cardinality=0
 |
 |--01:SCAN HDFS [functional.alltypesnopart b]
 |     partitions=1/1 files=0 size=0B
+|     row-size=16B cardinality=0
 |
 00:UNION
    constant-operands=2
+   row-size=4B cardinality=2
 ====
 # Test that non-partitioning slots which aren't materialized won't block the
 # optimization from being applied.
@@ -191,14 +228,18 @@ PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: ndv(a.year + b.year), min(a.month + b.month)
+|  row-size=16B cardinality=1
 |
 02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=16B cardinality=24
 |
 |--01:UNION
 |     constant-operands=1
+|     row-size=8B cardinality=1
 |
 00:UNION
    constant-operands=24
+   row-size=8B cardinality=24
 ====
 # IMPALA-2948. Unmaterialized slots won't block the optimization (the hash join version).
 select t1.int_col
@@ -212,16 +253,20 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.int_col = min(t2.year)
 |  runtime filters: RF000 <- min(t2.year)
+|  row-size=8B cardinality=8
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: min(t2.year)
+|  |  row-size=4B cardinality=1
 |  |
 |  01:UNION
 |     constant-operands=1
+|     row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypestiny t1]
    partitions=4/4 files=4 size=460B
    runtime filters: RF000 -> t1.int_col
+   row-size=4B cardinality=8
 ====
 # Test with with clauses on the partition keys.
 with c1 as (select distinct month from functional.alltypes),
@@ -232,20 +277,26 @@ PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
 |  output: ndv(month)
+|  row-size=8B cardinality=1
 |
 00:UNION
+|  row-size=4B cardinality=14
 |
 |--04:AGGREGATE [FINALIZE]
 |  |  group by: year
+|  |  row-size=4B cardinality=2
 |  |
 |  03:UNION
 |     constant-operands=2
+|     row-size=4B cardinality=2
 |
 02:AGGREGATE [FINALIZE]
 |  group by: month
+|  row-size=4B cardinality=12
 |
 01:UNION
    constant-operands=12
+   row-size=4B cardinality=12
 ====
 # If slots other than partition keys are accessed, make sure scan nodes are generated.
 select date_string_col, min(month) from functional.alltypes group by date_string_col
@@ -255,9 +306,11 @@ PLAN-ROOT SINK
 01:AGGREGATE [FINALIZE]
 |  output: min(month)
 |  group by: date_string_col
+|  row-size=24B cardinality=736
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=24B cardinality=7.30K
 ====
 # Make sure non-distinct aggregation functions will generate scan nodes.
 select count(month) from functional.alltypes
@@ -266,9 +319,11 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(month)
+|  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ====
 # Make sure that queries without any aggregation will generate scan nodes.
 select month from functional.alltypes order by year
@@ -277,7 +332,9 @@ PLAN-ROOT SINK
 |
 01:SORT
 |  order by: year ASC
+|  row-size=8B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=8B cardinality=7.30K
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/partition-pruning.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/partition-pruning.test b/testdata/workloads/functional-planner/queries/PlannerTest/partition-pruning.test
index 768106c..ef7d8c4 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/partition-pruning.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/partition-pruning.test
@@ -9,6 +9,7 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 00:SCAN HDFS [functional.stringpartitionkey]
+   partition predicates: CAST(string_col AS TIMESTAMP) = TIMESTAMP '2009-01-01 00:00:00'
    partitions=1/2 files=1 size=2B
    stored statistics:
      table: rows=1 size=2B

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test b/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
index 09a2993..4b1ce24 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
@@ -7,18 +7,22 @@ PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: a.double_col = b.bigint_col
 |  runtime filters: RF000 <- b.bigint_col
+|  row-size=16B cardinality=532.90K
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: b.bigint_col DIV 2 = 0
+|     row-size=8B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> a.double_col
+   row-size=8B cardinality=7.30K
 ====
 # Where clause predicate is turned into Having clause
 select a.cnt, b.int_col
@@ -32,18 +36,22 @@ PLAN-ROOT SINK
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: count(id) = b.id
+|  row-size=21B cardinality=10
 |
 |--02:SCAN HDFS [functional.alltypessmall b]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: b.id < 10
+|     row-size=8B cardinality=10
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(id)
 |  group by: int_col, tinyint_col
 |  having: count(id) < 10
+|  row-size=13B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=9B cardinality=7.30K
 ====
 # single-table test case: partitions are pruned due to predicate inference
 select count(*) from functional.alltypes
@@ -53,10 +61,13 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
+   partition predicates: functional.alltypes.month < 2
    partitions=2/24 files=2 size=40.32KB
    predicates: functional.alltypes.id < 2, functional.alltypes.tinyint_col < 2, id = int_col, int_col < 2, month = id, tinyint_col = int_col
+   row-size=13B cardinality=62
 ====
 # all subquery results get materialized correctly;
 # a.string_col = 'a' needs to be evaluated by the join itself, not the scan
@@ -71,13 +82,16 @@ PLAN-ROOT SINK
 02:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: id = id
 |  other join predicates: string_col = 'a'
+|  row-size=38B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: string_col = 'b'
+|     row-size=17B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=21B cardinality=7.30K
 ====
 # variation with Where clause; "b.string_col = 'b'" still needs to be applied
 # by the join node but it's safe to have the 'b' scan apply it as well
@@ -93,14 +107,17 @@ PLAN-ROOT SINK
 02:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: id = id
 |  other predicates: string_col = 'b'
+|  row-size=38B cardinality=730
 |
 |--01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: functional.alltypes.string_col = 'b'
+|     row-size=17B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    predicates: functional.alltypes.string_col = 'a'
+   row-size=21B cardinality=730
 ====
 # Predicates are pushed through cross join to the inline views
 select a.int_col
@@ -112,30 +129,36 @@ where a.string_col = 'a' and b.string_col = 'b'
 PLAN-ROOT SINK
 |
 02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=30B cardinality=532.90K
 |
 |--01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: functional.alltypes.string_col = 'b'
+|     row-size=13B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    predicates: functional.alltypes.string_col = 'a'
+   row-size=17B cardinality=730
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 04:EXCHANGE [UNPARTITIONED]
 |
 02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=30B cardinality=532.90K
 |
 |--03:EXCHANGE [BROADCAST]
 |  |
 |  01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: functional.alltypes.string_col = 'b'
+|     row-size=13B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    predicates: functional.alltypes.string_col = 'a'
+   row-size=17B cardinality=730
 ====
 # c1 > 0 does not get propagated into inl view due to limit clauses
 select c1, c2, c3
@@ -153,19 +176,24 @@ PLAN-ROOT SINK
 |
 04:TOP-N [LIMIT=3]
 |  order by: c2 ASC, c1 DESC
+|  row-size=9B cardinality=1
 |
 03:SELECT
 |  predicates: int_col > 0
+|  row-size=9B cardinality=1
 |
 02:TOP-N [LIMIT=5]
 |  order by: int_col ASC, tinyint_col ASC
+|  row-size=9B cardinality=5
 |
 01:AGGREGATE [FINALIZE]
 |  output: max(id)
 |  group by: int_col, tinyint_col
+|  row-size=9B cardinality=100
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
+   row-size=9B cardinality=100
 ====
 # same for with clause variant
 with t as (select int_col c1, tinyint_col c2, max(id) c3
@@ -183,19 +211,24 @@ PLAN-ROOT SINK
 |
 04:TOP-N [LIMIT=3]
 |  order by: c2 ASC, c1 DESC
+|  row-size=9B cardinality=1
 |
 03:SELECT
 |  predicates: int_col > 0
+|  row-size=9B cardinality=1
 |
 02:TOP-N [LIMIT=5]
 |  order by: int_col ASC, tinyint_col ASC
+|  row-size=9B cardinality=5
 |
 01:AGGREGATE [FINALIZE]
 |  output: max(id)
 |  group by: int_col, tinyint_col
+|  row-size=9B cardinality=100
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
+   row-size=9B cardinality=100
 ====
 # basic propagation between equivalence classes, with partition pruning
 select straight_join a.year, a.month, b.year, b.month
@@ -212,24 +245,32 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: b.id = c.id, b.month = c.month, b.year = c.year, b.smallint_col = c.int_col
 |  runtime filters: RF000 <- c.id, RF001 <- c.month, RF002 <- c.year, RF003 <- c.int_col
+|  row-size=43B cardinality=1
 |
 |--02:SCAN HDFS [functional.alltypestiny c]
+|     partition predicates: c.year = 2009, c.month + 2 <= 4
 |     partitions=2/4 files=2 size=230B
 |     predicates: c.id = 17, CAST(sin(c.int_col) AS BOOLEAN) = TRUE
+|     row-size=16B cardinality=1
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id, a.month = b.month, a.year = b.year, a.tinyint_col = b.smallint_col
 |  runtime filters: RF008 <- b.id, RF009 <- b.month, RF010 <- b.year, RF011 <- b.smallint_col
+|  row-size=27B cardinality=1
 |
 |--01:SCAN HDFS [functional.alltypessmall b]
+|     partition predicates: b.year = 2009, b.month + 2 <= 4
 |     partitions=2/4 files=2 size=3.16KB
 |     predicates: b.id = 17, CAST(sin(b.smallint_col) AS BOOLEAN) = TRUE
 |     runtime filters: RF000 -> b.id, RF001 -> b.month, RF002 -> b.year, RF003 -> b.smallint_col
+|     row-size=14B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes a]
+   partition predicates: a.year = 2009, a.month + 2 <= 4
    partitions=2/24 files=2 size=38.07KB
    predicates: a.id = 17, CAST(sin(a.tinyint_col) AS BOOLEAN) = TRUE
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.year, RF003 -> a.tinyint_col, RF008 -> a.id, RF009 -> a.month, RF010 -> a.year, RF011 -> a.tinyint_col
+   row-size=13B cardinality=1
 ---- SCANRANGELOCATIONS
 NODE 0:
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=1/090101.txt 0:20433
@@ -248,30 +289,38 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: b.id = c.id, b.month = c.month, b.year = c.year, b.smallint_col = c.int_col
 |  runtime filters: RF000 <- c.id, RF001 <- c.month, RF002 <- c.year, RF003 <- c.int_col
+|  row-size=43B cardinality=1
 |
 |--07:EXCHANGE [HASH(c.id,c.month,c.year,c.int_col)]
 |  |
 |  02:SCAN HDFS [functional.alltypestiny c]
+|     partition predicates: c.year = 2009, c.month + 2 <= 4
 |     partitions=2/4 files=2 size=230B
 |     predicates: c.id = 17, CAST(sin(c.int_col) AS BOOLEAN) = TRUE
+|     row-size=16B cardinality=1
 |
 03:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: a.id = b.id, a.month = b.month, a.year = b.year, a.tinyint_col = b.smallint_col
 |  runtime filters: RF008 <- b.id, RF009 <- b.month, RF010 <- b.year, RF011 <- b.smallint_col
+|  row-size=27B cardinality=1
 |
 |--06:EXCHANGE [HASH(b.id,b.month,b.year,b.smallint_col)]
 |  |
 |  01:SCAN HDFS [functional.alltypessmall b]
+|     partition predicates: b.year = 2009, b.month + 2 <= 4
 |     partitions=2/4 files=2 size=3.16KB
 |     predicates: b.id = 17, CAST(sin(b.smallint_col) AS BOOLEAN) = TRUE
 |     runtime filters: RF000 -> b.id, RF001 -> b.month, RF002 -> b.year, RF003 -> b.smallint_col
+|     row-size=14B cardinality=1
 |
 05:EXCHANGE [HASH(a.id,a.month,a.year,a.tinyint_col)]
 |
 00:SCAN HDFS [functional.alltypes a]
+   partition predicates: a.year = 2009, a.month + 2 <= 4
    partitions=2/24 files=2 size=38.07KB
    predicates: a.id = 17, CAST(sin(a.tinyint_col) AS BOOLEAN) = TRUE
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.year, RF003 -> a.tinyint_col, RF008 -> a.id, RF009 -> a.month, RF010 -> a.year, RF011 -> a.tinyint_col
+   row-size=13B cardinality=1
 ====
 # basic propagation between equivalence classes, with partition pruning;
 # variation with inline views
@@ -289,24 +338,32 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: functional.alltypessmall.id = functional.alltypestiny.id, functional.alltypessmall.month = functional.alltypestiny.month, functional.alltypessmall.year = functional.alltypestiny.year, functional.alltypessmall.smallint_col = functional.alltypestiny.int_col
 |  runtime filters: RF000 <- functional.alltypestiny.id, RF001 <- functional.alltypestiny.month, RF002 <- functional.alltypestiny.year, RF003 <- functional.alltypestiny.int_col
+|  row-size=43B cardinality=1
 |
 |--02:SCAN HDFS [functional.alltypestiny]
+|     partition predicates: functional.alltypestiny.year = 2009, functional.alltypestiny.month + 2 <= 4
 |     partitions=2/4 files=2 size=230B
 |     predicates: functional.alltypestiny.id = 17, CAST(sin(functional.alltypestiny.int_col) AS BOOLEAN) = TRUE
+|     row-size=16B cardinality=1
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: functional.alltypes.id = functional.alltypessmall.id, functional.alltypes.month = functional.alltypessmall.month, functional.alltypes.year = functional.alltypessmall.year, functional.alltypes.tinyint_col = functional.alltypessmall.smallint_col
 |  runtime filters: RF008 <- functional.alltypessmall.id, RF009 <- functional.alltypessmall.month, RF010 <- functional.alltypessmall.year, RF011 <- functional.alltypessmall.smallint_col
+|  row-size=27B cardinality=1
 |
 |--01:SCAN HDFS [functional.alltypessmall]
+|     partition predicates: functional.alltypessmall.year = 2009, functional.alltypessmall.month + 2 <= 4
 |     partitions=2/4 files=2 size=3.16KB
 |     predicates: functional.alltypessmall.id = 17, CAST(sin(functional.alltypessmall.smallint_col) AS BOOLEAN) = TRUE
 |     runtime filters: RF000 -> functional.alltypessmall.id, RF001 -> functional.alltypessmall.month, RF002 -> functional.alltypessmall.year, RF003 -> functional.alltypessmall.smallint_col
+|     row-size=14B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
+   partition predicates: functional.alltypes.year = 2009, functional.alltypes.month + 2 <= 4
    partitions=2/24 files=2 size=38.07KB
    predicates: functional.alltypes.id = 17, CAST(sin(functional.alltypes.tinyint_col) AS BOOLEAN) = TRUE
    runtime filters: RF000 -> functional.alltypes.id, RF001 -> functional.alltypes.month, RF002 -> functional.alltypes.year, RF003 -> functional.alltypes.tinyint_col, RF008 -> functional.alltypes.id, RF009 -> functional.alltypes.month, RF010 -> functional.alltypes.year, RF011 -> functional.alltypes.tinyint_col
+   row-size=13B cardinality=1
 ---- SCANRANGELOCATIONS
 NODE 0:
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=1/090101.txt 0:20433
@@ -325,30 +382,38 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: functional.alltypessmall.id = functional.alltypestiny.id, functional.alltypessmall.month = functional.alltypestiny.month, functional.alltypessmall.year = functional.alltypestiny.year, functional.alltypessmall.smallint_col = functional.alltypestiny.int_col
 |  runtime filters: RF000 <- functional.alltypestiny.id, RF001 <- functional.alltypestiny.month, RF002 <- functional.alltypestiny.year, RF003 <- functional.alltypestiny.int_col
+|  row-size=43B cardinality=1
 |
 |--07:EXCHANGE [HASH(functional.alltypestiny.id,functional.alltypestiny.month,functional.alltypestiny.year,functional.alltypestiny.int_col)]
 |  |
 |  02:SCAN HDFS [functional.alltypestiny]
+|     partition predicates: functional.alltypestiny.year = 2009, functional.alltypestiny.month + 2 <= 4
 |     partitions=2/4 files=2 size=230B
 |     predicates: functional.alltypestiny.id = 17, CAST(sin(functional.alltypestiny.int_col) AS BOOLEAN) = TRUE
+|     row-size=16B cardinality=1
 |
 03:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: functional.alltypes.id = functional.alltypessmall.id, functional.alltypes.month = functional.alltypessmall.month, functional.alltypes.year = functional.alltypessmall.year, functional.alltypes.tinyint_col = functional.alltypessmall.smallint_col
 |  runtime filters: RF008 <- functional.alltypessmall.id, RF009 <- functional.alltypessmall.month, RF010 <- functional.alltypessmall.year, RF011 <- functional.alltypessmall.smallint_col
+|  row-size=27B cardinality=1
 |
 |--06:EXCHANGE [HASH(functional.alltypessmall.id,functional.alltypessmall.month,functional.alltypessmall.year,functional.alltypessmall.smallint_col)]
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
+|     partition predicates: functional.alltypessmall.year = 2009, functional.alltypessmall.month + 2 <= 4
 |     partitions=2/4 files=2 size=3.16KB
 |     predicates: functional.alltypessmall.id = 17, CAST(sin(functional.alltypessmall.smallint_col) AS BOOLEAN) = TRUE
 |     runtime filters: RF000 -> functional.alltypessmall.id, RF001 -> functional.alltypessmall.month, RF002 -> functional.alltypessmall.year, RF003 -> functional.alltypessmall.smallint_col
+|     row-size=14B cardinality=1
 |
 05:EXCHANGE [HASH(functional.alltypes.id,functional.alltypes.month,functional.alltypes.year,functional.alltypes.tinyint_col)]
 |
 00:SCAN HDFS [functional.alltypes]
+   partition predicates: functional.alltypes.year = 2009, functional.alltypes.month + 2 <= 4
    partitions=2/24 files=2 size=38.07KB
    predicates: functional.alltypes.id = 17, CAST(sin(functional.alltypes.tinyint_col) AS BOOLEAN) = TRUE
    runtime filters: RF000 -> functional.alltypes.id, RF001 -> functional.alltypes.month, RF002 -> functional.alltypes.year, RF003 -> functional.alltypes.tinyint_col, RF008 -> functional.alltypes.id, RF009 -> functional.alltypes.month, RF010 -> functional.alltypes.year, RF011 -> functional.alltypes.tinyint_col
+   row-size=13B cardinality=1
 ====
 # propagation between outer-joined tables only goes in one direction:
 # - predicates on a.year and a.tinyint_col are propagated to b
@@ -371,14 +436,19 @@ PLAN-ROOT SINK
 02:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a.id = b.id, a.month = b.month, a.tinyint_col = b.tinyint_col, a.year = b.year
 |  other predicates: b.int_col IS NULL, b.id = 17
+|  row-size=30B cardinality=115
 |
 |--01:SCAN HDFS [functional.alltypessmall b]
+|     partition predicates: b.month + 1 = 2, b.year = 2009
 |     partitions=1/4 files=1 size=1.57KB
 |     predicates: b.id = 17, b.tinyint_col = 7
+|     row-size=17B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes a]
+   partition predicates: a.year = 2009
    partitions=12/24 files=12 size=238.68KB
    predicates: a.id IS NULL, a.tinyint_col = 7
+   row-size=13B cardinality=115
 ---- SCANRANGELOCATIONS
 NODE 0:
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=1/090101.txt 0:20433
@@ -403,16 +473,21 @@ PLAN-ROOT SINK
 02:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
 |  hash predicates: a.id = b.id, a.month = b.month, a.tinyint_col = b.tinyint_col, a.year = b.year
 |  other predicates: b.int_col IS NULL, b.id = 17
+|  row-size=30B cardinality=115
 |
 |--03:EXCHANGE [BROADCAST]
 |  |
 |  01:SCAN HDFS [functional.alltypessmall b]
+|     partition predicates: b.month + 1 = 2, b.year = 2009
 |     partitions=1/4 files=1 size=1.57KB
 |     predicates: b.id = 17, b.tinyint_col = 7
+|     row-size=17B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes a]
+   partition predicates: a.year = 2009
    partitions=12/24 files=12 size=238.68KB
    predicates: a.id IS NULL, a.tinyint_col = 7
+   row-size=13B cardinality=115
 ====
 # propagation between outer-joined tables only goes in one direction:
 # - predicates on b.year, b.tinyint_col are propagated to a
@@ -436,15 +511,20 @@ PLAN-ROOT SINK
 |  hash predicates: a.id = b.id, a.month = b.month, a.tinyint_col = b.tinyint_col, a.year = b.year
 |  other predicates: a.int_col IS NULL, a.id = 17
 |  runtime filters: RF000 <- b.id, RF001 <- b.month, RF002 <- b.tinyint_col, RF003 <- b.year
+|  row-size=30B cardinality=115
 |
 |--01:SCAN HDFS [functional.alltypes b]
+|     partition predicates: b.year = 2009
 |     partitions=12/24 files=12 size=238.68KB
 |     predicates: b.id IS NULL, b.tinyint_col = 7
+|     row-size=13B cardinality=115
 |
 00:SCAN HDFS [functional.alltypessmall a]
+   partition predicates: a.month + 1 = 2, a.year = 2009
    partitions=1/4 files=1 size=1.57KB
    predicates: a.id = 17, a.tinyint_col = 7
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.tinyint_col, RF003 -> a.year
+   row-size=17B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -454,19 +534,24 @@ PLAN-ROOT SINK
 |  hash predicates: a.id = b.id, a.month = b.month, a.tinyint_col = b.tinyint_col, a.year = b.year
 |  other predicates: a.int_col IS NULL, a.id = 17
 |  runtime filters: RF000 <- b.id, RF001 <- b.month, RF002 <- b.tinyint_col, RF003 <- b.year
+|  row-size=30B cardinality=115
 |
 |--04:EXCHANGE [HASH(b.id,b.month,b.tinyint_col,b.year)]
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
+|     partition predicates: b.year = 2009
 |     partitions=12/24 files=12 size=238.68KB
 |     predicates: b.id IS NULL, b.tinyint_col = 7
+|     row-size=13B cardinality=115
 |
 03:EXCHANGE [HASH(a.id,a.month,a.tinyint_col,a.year)]
 |
 00:SCAN HDFS [functional.alltypessmall a]
+   partition predicates: a.month + 1 = 2, a.year = 2009
    partitions=1/4 files=1 size=1.57KB
    predicates: a.id = 17, a.tinyint_col = 7
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.tinyint_col, RF003 -> a.year
+   row-size=17B cardinality=1
 ====
 # propagation into inline view with aggregation:
 # - predicates from enclosing scope applied to grouping exprs; with partition pruning
@@ -487,20 +572,26 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = id, a.month = month, a.year = year, a.tinyint_col = int_col
 |  runtime filters: RF000 <- id, RF001 <- month, RF002 <- year, RF003 <- int_col
+|  row-size=37B cardinality=1
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: count(*)
 |  |  group by: year, month, id, int_col
 |  |  having: count(*) + 1 = 17
+|  |  row-size=24B cardinality=5
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
+|     partition predicates: functional.alltypessmall.year = 2009, functional.alltypessmall.month <= 2
 |     partitions=2/4 files=2 size=3.16KB
 |     predicates: functional.alltypessmall.int_col != 5, id > 11
+|     row-size=16B cardinality=5
 |
 00:SCAN HDFS [functional.alltypes a]
+   partition predicates: a.year = 2009, a.month <= 2
    partitions=2/24 files=2 size=38.07KB
    predicates: a.id > 11, a.tinyint_col != 5
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.year, RF003 -> a.tinyint_col
+   row-size=13B cardinality=59
 ---- SCANRANGELOCATIONS
 NODE 0:
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=1/090101.txt 0:20433
@@ -516,6 +607,7 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.id = id, a.month = month, a.year = year, a.tinyint_col = int_col
 |  runtime filters: RF000 <- id, RF001 <- month, RF002 <- year, RF003 <- int_col
+|  row-size=37B cardinality=1
 |
 |--06:EXCHANGE [BROADCAST]
 |  |
@@ -523,21 +615,27 @@ PLAN-ROOT SINK
 |  |  output: count:merge(*)
 |  |  group by: year, month, id, int_col
 |  |  having: count(*) + 1 = 17
+|  |  row-size=24B cardinality=5
 |  |
 |  04:EXCHANGE [HASH(year,month,id,int_col)]
 |  |
 |  02:AGGREGATE [STREAMING]
 |  |  output: count(*)
 |  |  group by: year, month, id, int_col
+|  |  row-size=24B cardinality=5
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
+|     partition predicates: functional.alltypessmall.year = 2009, functional.alltypessmall.month <= 2
 |     partitions=2/4 files=2 size=3.16KB
 |     predicates: functional.alltypessmall.int_col != 5, id > 11
+|     row-size=16B cardinality=5
 |
 00:SCAN HDFS [functional.alltypes a]
+   partition predicates: a.year = 2009, a.month <= 2
    partitions=2/24 files=2 size=38.07KB
    predicates: a.id > 11, a.tinyint_col != 5
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.year, RF003 -> a.tinyint_col
+   row-size=13B cardinality=59
 ====
 # Same as above but with cross join
 select straight_join a.id, b.id
@@ -561,20 +659,26 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = id, a.month = month, a.year = year, a.tinyint_col = int_col
 |  runtime filters: RF000 <- id, RF001 <- month, RF002 <- year, RF003 <- int_col
+|  row-size=37B cardinality=1
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: count(*)
 |  |  group by: year, month, id, int_col
 |  |  having: count(*) + 1 = 17
+|  |  row-size=24B cardinality=5
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
+|     partition predicates: functional.alltypessmall.year = 2009, functional.alltypessmall.month <= 2
 |     partitions=2/4 files=2 size=3.16KB
 |     predicates: functional.alltypessmall.int_col != 5, id > 11
+|     row-size=16B cardinality=5
 |
 00:SCAN HDFS [functional.alltypes a]
+   partition predicates: a.year = 2009, a.month <= 2
    partitions=2/24 files=2 size=38.07KB
    predicates: a.id > 11, a.tinyint_col != 5
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.year, RF003 -> a.tinyint_col
+   row-size=13B cardinality=59
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -583,6 +687,7 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.id = id, a.month = month, a.year = year, a.tinyint_col = int_col
 |  runtime filters: RF000 <- id, RF001 <- month, RF002 <- year, RF003 <- int_col
+|  row-size=37B cardinality=1
 |
 |--06:EXCHANGE [BROADCAST]
 |  |
@@ -590,21 +695,27 @@ PLAN-ROOT SINK
 |  |  output: count:merge(*)
 |  |  group by: year, month, id, int_col
 |  |  having: count(*) + 1 = 17
+|  |  row-size=24B cardinality=5
 |  |
 |  04:EXCHANGE [HASH(year,month,id,int_col)]
 |  |
 |  02:AGGREGATE [STREAMING]
 |  |  output: count(*)
 |  |  group by: year, month, id, int_col
+|  |  row-size=24B cardinality=5
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
+|     partition predicates: functional.alltypessmall.year = 2009, functional.alltypessmall.month <= 2
 |     partitions=2/4 files=2 size=3.16KB
 |     predicates: functional.alltypessmall.int_col != 5, id > 11
+|     row-size=16B cardinality=5
 |
 00:SCAN HDFS [functional.alltypes a]
+   partition predicates: a.year = 2009, a.month <= 2
    partitions=2/24 files=2 size=38.07KB
    predicates: a.id > 11, a.tinyint_col != 5
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.year, RF003 -> a.tinyint_col
+   row-size=13B cardinality=59
 ====
 # no propagation into select block with limit;
 # propagation out of that block is okay;
@@ -626,23 +737,29 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = id, a.month = month, a.year = year, a.tinyint_col = int_col
 |  runtime filters: RF000 <- id, RF001 <- month, RF002 <- year, RF003 <- int_col
+|  row-size=37B cardinality=1
 |
 |--03:SELECT
 |  |  predicates: count(*) + 1 = 17
+|  |  row-size=24B cardinality=1
 |  |
 |  02:AGGREGATE [FINALIZE]
 |  |  output: count(*)
 |  |  group by: year, month, id, int_col
 |  |  limit: 5
+|  |  row-size=24B cardinality=5
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: id > 11
+|     row-size=16B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes a]
+   partition predicates: a.year = 2009, a.month <= 2
    partitions=2/24 files=2 size=38.07KB
    predicates: a.id > 11, a.tinyint_col != 5
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.year, RF003 -> a.tinyint_col
+   row-size=13B cardinality=59
 ---- SCANRANGELOCATIONS
 NODE 0:
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=1/090101.txt 0:20433
@@ -660,11 +777,13 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.id = id, a.month = month, a.year = year, a.tinyint_col = int_col
 |  runtime filters: RF000 <- id, RF001 <- month, RF002 <- year, RF003 <- int_col
+|  row-size=37B cardinality=1
 |
 |--08:EXCHANGE [BROADCAST]
 |  |
 |  03:SELECT
 |  |  predicates: count(*) + 1 = 17
+|  |  row-size=24B cardinality=1
 |  |
 |  07:EXCHANGE [UNPARTITIONED]
 |  |  limit: 5
@@ -673,21 +792,26 @@ PLAN-ROOT SINK
 |  |  output: count:merge(*)
 |  |  group by: year, month, id, int_col
 |  |  limit: 5
+|  |  row-size=24B cardinality=5
 |  |
 |  05:EXCHANGE [HASH(year,month,id,int_col)]
 |  |
 |  02:AGGREGATE [STREAMING]
 |  |  output: count(*)
 |  |  group by: year, month, id, int_col
+|  |  row-size=24B cardinality=10
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: id > 11
+|     row-size=16B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes a]
+   partition predicates: a.year = 2009, a.month <= 2
    partitions=2/24 files=2 size=38.07KB
    predicates: a.id > 11, a.tinyint_col != 5
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.year, RF003 -> a.tinyint_col
+   row-size=13B cardinality=59
 ====
 # Similar to the above, converts the cross join to a hash join
 select straight_join a.id, b.id
@@ -712,23 +836,29 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = id, a.month = month, a.year = year, a.tinyint_col = int_col
 |  runtime filters: RF000 <- id, RF001 <- month, RF002 <- year, RF003 <- int_col
+|  row-size=37B cardinality=1
 |
 |--03:SELECT
 |  |  predicates: count(*) + 1 = 17
+|  |  row-size=24B cardinality=1
 |  |
 |  02:AGGREGATE [FINALIZE]
 |  |  output: count(*)
 |  |  group by: year, month, id, int_col
 |  |  limit: 5
+|  |  row-size=24B cardinality=5
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: id > 11
+|     row-size=16B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes a]
+   partition predicates: a.year = 2009, a.month <= 2
    partitions=2/24 files=2 size=38.07KB
    predicates: a.id > 11, a.tinyint_col != 5
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.year, RF003 -> a.tinyint_col
+   row-size=13B cardinality=59
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -737,11 +867,13 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.id = id, a.month = month, a.year = year, a.tinyint_col = int_col
 |  runtime filters: RF000 <- id, RF001 <- month, RF002 <- year, RF003 <- int_col
+|  row-size=37B cardinality=1
 |
 |--08:EXCHANGE [BROADCAST]
 |  |
 |  03:SELECT
 |  |  predicates: count(*) + 1 = 17
+|  |  row-size=24B cardinality=1
 |  |
 |  07:EXCHANGE [UNPARTITIONED]
 |  |  limit: 5
@@ -750,21 +882,26 @@ PLAN-ROOT SINK
 |  |  output: count:merge(*)
 |  |  group by: year, month, id, int_col
 |  |  limit: 5
+|  |  row-size=24B cardinality=5
 |  |
 |  05:EXCHANGE [HASH(year,month,id,int_col)]
 |  |
 |  02:AGGREGATE [STREAMING]
 |  |  output: count(*)
 |  |  group by: year, month, id, int_col
+|  |  row-size=24B cardinality=10
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: id > 11
+|     row-size=16B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes a]
+   partition predicates: a.year = 2009, a.month <= 2
    partitions=2/24 files=2 size=38.07KB
    predicates: a.id > 11, a.tinyint_col != 5
    runtime filters: RF000 -> a.id, RF001 -> a.month, RF002 -> a.year, RF003 -> a.tinyint_col
+   row-size=13B cardinality=59
 ====
 # propagation of z.month=1 to alltypesagg is prevented
 select straight_join x.int_col, z.int_col
@@ -779,17 +916,22 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: functional.alltypes.id = functional.alltypesagg.id
 |  runtime filters: RF000 <- functional.alltypesagg.id
+|  row-size=20B cardinality=10
 |
 |--02:SELECT
 |  |  predicates: functional.alltypesagg.month = 1
+|  |  row-size=12B cardinality=10
 |  |
 |  01:SCAN HDFS [functional.alltypesagg]
 |     partitions=11/11 files=11 size=814.73KB
 |     limit: 10
+|     row-size=12B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes]
+   partition predicates: functional.alltypes.year = 2009
    partitions=12/24 files=12 size=238.68KB
    runtime filters: RF000 -> functional.alltypes.id
+   row-size=8B cardinality=3.65K
 ====
 # extra join predicate "x.id + x.b_id = 17" results in referenced slots being
 # materialized
@@ -813,25 +955,33 @@ PLAN-ROOT SINK
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = functional.alltypesagg.id
 |  runtime filters: RF000 <- functional.alltypesagg.id
+|  row-size=36B cardinality=50
 |
 |--04:SELECT
 |  |  predicates: functional.alltypesagg.month = 1
+|  |  row-size=12B cardinality=10
 |  |
 |  03:SCAN HDFS [functional.alltypesagg]
 |     partitions=11/11 files=11 size=814.73KB
 |     limit: 10
+|     row-size=12B cardinality=10
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = b.int_col, a.year = b.year
 |  other predicates: a.id + b.id = 17
 |  runtime filters: RF002 <- b.int_col, RF003 <- b.year
+|  row-size=24B cardinality=36.50K
 |
 |--01:SCAN HDFS [functional.alltypessmall b]
+|     partition predicates: b.year = 2009
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=12B cardinality=100
 |
 00:SCAN HDFS [functional.alltypes a]
+   partition predicates: a.year = 2009
    partitions=12/24 files=12 size=238.68KB
    runtime filters: RF000 -> a.id, RF002 -> a.int_col, RF003 -> a.year
+   row-size=12B cardinality=3.65K
 ====
 # correct placement of predicates in the presence of aggregation in an inline view
 select straight_join a.id, b.id
@@ -844,17 +994,21 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a.id = id
 |  other predicates: int_col = 17, isnull(id, 0) = 0
+|  row-size=12B cardinality=730
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  group by: id, int_col
+|  |  row-size=8B cardinality=730
 |  |
 |  01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: functional.alltypes.int_col = 17
+|     row-size=8B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    predicates: a.id IS NULL
+   row-size=4B cardinality=730
 ====
 select straight_join a.id, b.id
 from functional.alltypes a left outer join
@@ -868,27 +1022,34 @@ PLAN-ROOT SINK
 06:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a.id = id
 |  other predicates: id IS NULL, int_col = 17
+|  row-size=12B cardinality=730
 |
 |--01:UNION
 |  |  pass-through-operands: all
+|  |  row-size=8B cardinality=740
 |  |
 |  |--05:AGGREGATE [FINALIZE]
 |  |  |  group by: id, int_col
+|  |  |  row-size=8B cardinality=10
 |  |  |
 |  |  04:SCAN HDFS [functional.alltypessmall]
 |  |     partitions=4/4 files=4 size=6.32KB
 |  |     predicates: functional.alltypessmall.int_col = 17
+|  |     row-size=8B cardinality=10
 |  |
 |  03:AGGREGATE [FINALIZE]
 |  |  group by: id, int_col
+|  |  row-size=8B cardinality=730
 |  |
 |  02:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: functional.alltypes.int_col = 17
+|     row-size=8B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    predicates: isnull(a.id, 0) = 0
+   row-size=4B cardinality=730
 ====
 select a.id, b.id
 from
@@ -902,18 +1063,22 @@ PLAN-ROOT SINK
 |  hash predicates: id = a.id
 |  other predicates: int_col = 17, isnull(id, 0) = 0
 |  runtime filters: RF000 <- a.id
+|  row-size=12B cardinality=730
 |
 |--02:SCAN HDFS [functional.alltypes a]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: a.id IS NULL
+|     row-size=4B cardinality=730
 |
 01:AGGREGATE [FINALIZE]
 |  group by: id, int_col
+|  row-size=8B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    predicates: functional.alltypes.int_col = 17
    runtime filters: RF000 -> functional.alltypes.id
+   row-size=8B cardinality=730
 ====
 select straight_join a.id, b.id
 from
@@ -928,29 +1093,36 @@ PLAN-ROOT SINK
 |  hash predicates: id = a.id
 |  other predicates: id IS NULL, int_col = 17
 |  runtime filters: RF000 <- a.id
+|  row-size=12B cardinality=740
 |
 |--05:SCAN HDFS [functional.alltypes a]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: isnull(a.id, 0) = 0
+|     row-size=4B cardinality=730
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=8B cardinality=740
 |
 |--04:AGGREGATE [FINALIZE]
 |  |  group by: id, int_col
+|  |  row-size=8B cardinality=10
 |  |
 |  03:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: functional.alltypessmall.int_col = 17
 |     runtime filters: RF000 -> functional.alltypessmall.id
+|     row-size=8B cardinality=10
 |
 02:AGGREGATE [FINALIZE]
 |  group by: id, int_col
+|  row-size=8B cardinality=730
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    predicates: functional.alltypes.int_col = 17
    runtime filters: RF000 -> functional.alltypes.id
+   row-size=8B cardinality=730
 ====
 # predicate inside outer-joined inline view must be assigned in scan
 select straight_join a.string_col from functional.alltypes a
@@ -961,13 +1133,16 @@ PLAN-ROOT SINK
 |
 02:HASH JOIN [FULL OUTER JOIN]
 |  hash predicates: a.id = functional.alltypessmall.id
+|  row-size=21B cardinality=7.31K
 |
 |--01:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: id > 0
+|     row-size=4B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
+   row-size=17B cardinality=7.30K
 ====
 # TODO: Remove the following limitation in our predicate propagation:
 # It is safe to propagate 'y.id is null' to the scan of y, but we prevent
@@ -986,21 +1161,26 @@ PLAN-ROOT SINK
 |
 04:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a.id = x.id
+|  row-size=25B cardinality=7.30K
 |
 |--03:HASH JOIN [INNER JOIN]
 |  |  hash predicates: x.id = y.id
 |  |  runtime filters: RF000 <- y.id
+|  |  row-size=8B cardinality=1
 |  |
 |  |--02:SCAN HDFS [functional.alltypestiny y]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=4B cardinality=8
 |  |
 |  01:SCAN HDFS [functional.alltypessmall x]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: x.id IS NULL
 |     runtime filters: RF000 -> x.id
+|     row-size=4B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
+   row-size=17B cardinality=7.30K
 ====
 # Test proper assignment of Having-clause predicates (IMPALA-820):
 # - Predicates only referencing the group-by exprs are assigned in the scan node.
@@ -1016,10 +1196,12 @@ PLAN-ROOT SINK
 |  output: count(bigint_col)
 |  group by: bool_col, int_col
 |  having: count(bigint_col) > 0
+|  row-size=13B cardinality=2
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    predicates: functional.alltypes.int_col > 0, functional.alltypes.bool_col = FALSE
+   row-size=13B cardinality=516
 ====
 # basic propagation of multi-slot, single-tuple predicates
 select straight_join 1 from
@@ -1044,24 +1226,29 @@ PLAN-ROOT SINK
 |  hash predicates: t2.bigint_col = functional.alltypestiny.bigint_col, t2.id = functional.alltypestiny.id, t2.smallint_col = functional.alltypestiny.int_col
 |  other predicates: t2.id + functional.alltypestiny.int_col > 40
 |  runtime filters: RF000 <- functional.alltypestiny.bigint_col, RF001 <- functional.alltypestiny.id, RF002 <- functional.alltypestiny.int_col
+|  row-size=35B cardinality=1
 |
 |--02:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |     predicates: functional.alltypestiny.id + functional.alltypestiny.bigint_col > 20, functional.alltypestiny.id + functional.alltypestiny.int_col > 10, functional.alltypestiny.id + functional.alltypestiny.int_col + functional.alltypestiny.bigint_col > 30
+|     row-size=16B cardinality=1
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id, t1.tinyint_col = t2.smallint_col
 |  runtime filters: RF006 <- t2.id, RF007 <- t2.smallint_col
+|  row-size=19B cardinality=1
 |
 |--01:SCAN HDFS [functional.alltypessmall t2]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: t2.id + t2.bigint_col > 20, t2.id + t2.smallint_col > 10, t2.id + t2.smallint_col + t2.bigint_col > 30
 |     runtime filters: RF000 -> t2.bigint_col, RF001 -> t2.id, RF002 -> t2.smallint_col
+|     row-size=14B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    predicates: t1.id + t1.tinyint_col > 10
    runtime filters: RF001 -> t1.id, RF002 -> t1.tinyint_col, RF006 -> t1.id, RF007 -> t1.tinyint_col
+   row-size=5B cardinality=730
 ====
 # basic propagation of multi-slot, single-tuple predicates with aggregates
 select straight_join 1 from
@@ -1086,30 +1273,37 @@ PLAN-ROOT SINK
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: max(smallint_col) = max(smallint_col), min(int_col) = min(int_col)
 |  runtime filters: RF000 <- max(smallint_col)
+|  row-size=33B cardinality=730
 |
 |--04:AGGREGATE [FINALIZE]
 |  |  output: max(smallint_col), min(int_col)
 |  |  having: max(smallint_col) + min(int_col) > 30
+|  |  row-size=6B cardinality=0
 |  |
 |  03:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
+|     row-size=6B cardinality=8
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = count(tinyint_col), t1.tinyint_col = max(smallint_col)
 |  runtime filters: RF002 <- count(tinyint_col), RF003 <- max(smallint_col)
+|  row-size=27B cardinality=730
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: count(tinyint_col), max(smallint_col), min(int_col)
 |  |  group by: bigint_col
 |  |  having: count(tinyint_col) + max(smallint_col) > 10, count(tinyint_col) + max(smallint_col) > 20, max(smallint_col) + min(int_col) > 30
+|  |  row-size=22B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=15B cardinality=100
 |
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    predicates: t1.id + t1.tinyint_col > 10, t1.id + t1.tinyint_col > 20
    runtime filters: RF000 -> t1.tinyint_col, RF002 -> t1.id, RF003 -> t1.tinyint_col
+   row-size=5B cardinality=730
 ====
 # assignment of multi-slot, single-tuple predicates with outer-joined tuple (IMPALA-824)
 select straight_join 1
@@ -1135,22 +1329,27 @@ PLAN-ROOT SINK
 04:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t2.id = functional.alltypestiny.id, t2.int_col = functional.alltypestiny.int_col
 |  other predicates: functional.alltypestiny.tinyint_col + functional.alltypestiny.smallint_col + functional.alltypestiny.int_col > 10, ifnull(functional.alltypestiny.tinyint_col + functional.alltypestiny.bigint_col, 1) = 1
+|  row-size=46B cardinality=730
 |
 |--02:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |     predicates: functional.alltypestiny.id * functional.alltypestiny.int_col < 100, functional.alltypestiny.tinyint_col + functional.alltypestiny.smallint_col + functional.alltypestiny.int_col > 10
+|     row-size=19B cardinality=1
 |
 03:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t1.id = t2.id, t1.int_col = t2.int_col
 |  other predicates: t2.tinyint_col = t2.smallint_col, ifnull(t2.tinyint_col + t2.bigint_col, 1) = 1
+|  row-size=27B cardinality=730
 |
 |--01:SCAN HDFS [functional.alltypessmall t2]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: t2.tinyint_col = t2.smallint_col, t2.id * t2.int_col < 100
+|     row-size=19B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    predicates: t1.id * t1.int_col < 100
+   row-size=8B cardinality=730
 ====
 # TODO: Fix this limitation of our getBindingPredicates() implementation:
 # We use the first multi-slot mapping and not necessarily the best,
@@ -1165,14 +1364,18 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.month, t1.year = t2.year
 |  runtime filters: RF000 <- t2.month, RF001 <- t2.year
+|  row-size=178B cardinality=100
 |
 |--01:SCAN HDFS [functional.alltypessmall t2]
+|     partition predicates: t2.year + t2.month > 10
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=89B cardinality=100
 |
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    predicates: t1.id = t1.month, t1.year + t1.id > 10
    runtime filters: RF000 -> t1.id, RF001 -> t1.year
+   row-size=89B cardinality=730
 ====
 # TODO: Fix this limitation of our getBindingPredicates() implementation:
 # We use the first multi-slot mapping and not all non-redundant mappings, i.e.,
@@ -1190,15 +1393,18 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id, t1.tinyint_col = t2.tinyint_col
 |  runtime filters: RF000 <- t2.id, RF001 <- t2.tinyint_col
+|  row-size=178B cardinality=1
 |
 |--01:SCAN HDFS [functional.alltypessmall t2]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: t2.id = t2.smallint_col, t2.tinyint_col = t2.int_col, t2.id + t2.tinyint_col > 10
+|     row-size=89B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    predicates: t1.id + t1.tinyint_col > 10
    runtime filters: RF000 -> t1.id, RF001 -> t1.tinyint_col
+   row-size=89B cardinality=730
 ====
 # TODO: Fix this limitation of our predicate propagation implementation:
 # Multi-slot predicates are not propagated onto an agg node if the slot mapping
@@ -1217,18 +1423,22 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.int_col = min(int_col), t1.id = bigint_col
 |  runtime filters: RF000 <- min(int_col), RF001 <- bigint_col
+|  row-size=105B cardinality=10
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: min(int_col), max(int_col)
 |  |  group by: bigint_col
+|  |  row-size=16B cardinality=10
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=12B cardinality=100
 |
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    predicates: t1.id + t1.int_col > 10
    runtime filters: RF000 -> t1.int_col, RF001 -> t1.id
+   row-size=89B cardinality=730
 ====
 # Anti-joins have a uni-directional value transfer (IMPALA-1249).
 select * from
@@ -1243,14 +1453,17 @@ PLAN-ROOT SINK
 |
 02:HASH JOIN [LEFT ANTI JOIN]
 |  hash predicates: a.id = id
+|  row-size=89B cardinality=730
 |
 |--01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |     predicates: functional.alltypestiny.id < 10, id > -20
+|     row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    predicates: a.id < 10
+   row-size=89B cardinality=730
 ====
 # Anti-joins have a uni-directional value transfer (IMPALA-1249).
 select * from
@@ -1264,14 +1477,17 @@ PLAN-ROOT SINK
 |
 02:HASH JOIN [RIGHT ANTI JOIN]
 |  hash predicates: id = b.id
+|  row-size=89B cardinality=1
 |
 |--01:SCAN HDFS [functional.alltypestiny b]
 |     partitions=4/4 files=4 size=460B
 |     predicates: b.id < 10
+|     row-size=89B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    predicates: functional.alltypes.id < 10, id > -20
+   row-size=4B cardinality=730
 ====
 # Test proper predicate assignment with predicate propagation when the
 # generated predicate is bound by an outer joined tuple (IMPALA-2018)
@@ -1289,18 +1505,22 @@ PLAN-ROOT SINK
 |  output: sum(a.tinyint_col)
 |  group by: b.int_col
 |  having: j.int_col = 10
+|  row-size=12B cardinality=1
 |
 02:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: b.id = a.id
 |  runtime filters: RF000 <- a.id
+|  row-size=13B cardinality=9
 |
 |--00:SCAN HDFS [functional.alltypestiny a]
 |     partitions=4/4 files=4 size=460B
+|     row-size=5B cardinality=8
 |
 01:SCAN HDFS [functional.alltypesagg b]
    partitions=11/11 files=11 size=814.73KB
    predicates: b.int_col = 10
    runtime filters: RF000 -> b.id
+   row-size=8B cardinality=11
 ====
 # Test proper predicate assignment with predicate propagation when the
 # generated predicate is bound by an outer joined tuple (IMPALA-2018)
@@ -1317,14 +1537,17 @@ PLAN-ROOT SINK
 |  hash predicates: b.id = a.id
 |  other predicates: b.int_col = 10
 |  runtime filters: RF000 <- a.id
+|  row-size=13B cardinality=9
 |
 |--00:SCAN HDFS [functional.alltypestiny a]
 |     partitions=4/4 files=4 size=460B
+|     row-size=5B cardinality=8
 |
 01:SCAN HDFS [functional.alltypesagg b]
    partitions=11/11 files=11 size=814.73KB
    predicates: b.int_col = 10
    runtime filters: RF000 -> b.id
+   row-size=8B cardinality=11
 ====
 # Tests propagation of cardinality estimation of SCAN HDFS node with small
 # initial cardinality and low selectivity (IMPALA-2165). If any of the
@@ -1339,21 +1562,27 @@ PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 04:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: n.n_regionkey = r_regionkey
+|  row-size=31B cardinality=15.00K
 |
 |--03:SCAN HDFS [tpch_parquet.region r]
-|     partitions=1/1 files=1 size=1.01KB
+|     partitions=1/1 files=1 size=1.34KB
 |     predicates: r.r_regionkey = 1
+|     row-size=2B cardinality=1
 |
 02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=29B cardinality=15.00K
 |
 |--01:SCAN HDFS [tpch_parquet.nation n]
-|     partitions=1/1 files=1 size=2.38KB
+|     partitions=1/1 files=1 size=2.75KB
 |     predicates: n_regionkey = 1, n_name = 'BRAZIL'
+|     row-size=21B cardinality=1
 |
 00:SCAN HDFS [tpch_parquet.customer c]
-   partitions=1/1 files=1 size=12.27MB
+   partitions=1/1 files=1 size=12.31MB
    predicates: c_custkey % 2 = 0
+   row-size=8B cardinality=15.00K
 ====