You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/01/12 18:19:09 UTC

[10/26] impala git commit: IMPALA-8021: Add estimated cardinality to EXPLAIN output

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
index 1e59828..5176491 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
@@ -9,13 +9,16 @@ PLAN-ROOT SINK
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id
 |  runtime filters: RF000 <- id
+|  row-size=89B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypesagg]
 |     partitions=11/11 files=11 size=814.73KB
+|     row-size=4B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> id
+   row-size=89B cardinality=7.30K
 ====
 # NOT IN predicate rewritten into a null-aware anti join
 select *
@@ -27,12 +30,15 @@ PLAN-ROOT SINK
 |
 02:HASH JOIN [NULL AWARE LEFT ANTI JOIN]
 |  hash predicates: id = id
+|  row-size=89B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypesagg]
 |     partitions=11/11 files=11 size=814.73KB
+|     row-size=4B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=89B cardinality=7.30K
 ====
 # Correlated NOT IN rewritten into a null-aware anti join
 select *
@@ -48,13 +54,16 @@ PLAN-ROOT SINK
 02:HASH JOIN [NULL AWARE LEFT ANTI JOIN]
 |  hash predicates: a.int_col = int_col
 |  other join predicates: a.id = g.id, g.bigint_col < a.bigint_col
+|  row-size=89B cardinality=730
 |
 |--01:SCAN HDFS [functional.alltypesagg g]
 |     partitions=11/11 files=11 size=814.73KB
+|     row-size=16B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    predicates: a.int_col < 100
+   row-size=89B cardinality=730
 ====
 # Correlated NOT IN subquery resulting in the same eq conjunct
 # being used in both the hash and the other join predicate
@@ -67,12 +76,15 @@ PLAN-ROOT SINK
 02:HASH JOIN [NULL AWARE LEFT ANTI JOIN]
 |  hash predicates: a.id = id
 |  other join predicates: a.id = b.id
+|  row-size=89B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
+   row-size=89B cardinality=7.30K
 ====
 # Subquery with predicate in the WHERE clause
 select count(*)
@@ -85,19 +97,23 @@ PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.id = g.id, int_col = int_col
 |  runtime filters: RF000 <- g.id, RF001 <- int_col
+|  row-size=9B cardinality=1.10K
 |
 |--01:SCAN HDFS [functional.alltypesagg g]
 |     partitions=11/11 files=11 size=814.73KB
 |     predicates: g.bigint_col < 10
+|     row-size=16B cardinality=1.10K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    predicates: bool_col = FALSE
    runtime filters: RF000 -> a.id, RF001 -> int_col
+   row-size=9B cardinality=3.65K
 ====
 # IMPALA-4325: Preserve parenthesis of expressions when rewriting subqueries
 select *
@@ -113,14 +129,17 @@ PLAN-ROOT SINK
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.int_col = t2.int_col
 |  runtime filters: RF000 <- t2.int_col
+|  row-size=89B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: (t2.int_col IS NOT NULL AND (t2.int_col < 0 OR t2.int_col > 10) OR t2.bigint_col IS NOT NULL AND (t2.bigint_col < 0 OR t2.bigint_col > 10))
+|     row-size=12B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> t1.int_col
+   row-size=89B cardinality=7.30K
 ====
 # Complex expression in the IN predicate
 select *
@@ -133,13 +152,16 @@ PLAN-ROOT SINK
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.int_col + 1 = int_col + bigint_col
 |  runtime filters: RF000 <- int_col + bigint_col
+|  row-size=89B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypesagg]
 |     partitions=11/11 files=11 size=814.73KB
+|     row-size=12B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> t.int_col + 1
+   row-size=89B cardinality=7.30K
 ====
 # Multiple subqueries in the WHERE clause
 select *
@@ -153,22 +175,27 @@ PLAN-ROOT SINK
 |
 04:HASH JOIN [NULL AWARE LEFT ANTI JOIN]
 |  hash predicates: t.tinyint_col = tinyint_col
+|  row-size=89B cardinality=730
 |
 |--02:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
+|     row-size=1B cardinality=8
 |
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.id = id
 |  runtime filters: RF000 <- id
+|  row-size=89B cardinality=730
 |
 |--01:SCAN HDFS [functional.alltypesagg]
 |     partitions=11/11 files=11 size=814.73KB
 |     predicates: bool_col = FALSE
+|     row-size=5B cardinality=5.50K
 |
 00:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
    predicates: t.bigint_col < 1000
    runtime filters: RF000 -> t.id
+   row-size=89B cardinality=730
 ====
 # Multiple tables in the FROM clause of the outer query block
 select count(*)
@@ -180,25 +207,31 @@ PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.int_col = int_col
 |  runtime filters: RF000 <- int_col
+|  row-size=12B cardinality=16
 |
 |--02:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |     predicates: bool_col = FALSE
+|     row-size=5B cardinality=4
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = t.id
 |  runtime filters: RF002 <- t.id
+|  row-size=12B cardinality=7.81K
 |
 |--01:SCAN HDFS [functional.alltypes t]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> a.int_col, RF002 -> a.id
+   row-size=8B cardinality=11.00K
 ====
 # Multiple tables in the subquery
 select count(*)
@@ -213,26 +246,32 @@ PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.bool_col = s.bool_col, a.id = s.id
 |  runtime filters: RF000 <- s.bool_col, RF001 <- s.id
+|  row-size=9B cardinality=80
 |
 |--03:HASH JOIN [INNER JOIN]
 |  |  hash predicates: s.int_col = t.int_col
 |  |  runtime filters: RF004 <- t.int_col
+|  |  row-size=13B cardinality=80
 |  |
 |  |--02:SCAN HDFS [functional.alltypestiny t]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=4B cardinality=8
 |  |
 |  01:SCAN HDFS [functional.alltypessmall s]
 |     partitions=4/4 files=4 size=6.32KB
 |     runtime filters: RF004 -> s.int_col
+|     row-size=9B cardinality=100
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
    predicates: a.int_col < 10
    runtime filters: RF000 -> a.bool_col, RF001 -> a.id
+   row-size=9B cardinality=1.10K
 ====
 # Outer join between the tables in the outer query block
 select count(*)
@@ -246,25 +285,31 @@ PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.id = id
 |  runtime filters: RF000 <- id
+|  row-size=13B cardinality=33
 |
 |--02:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 03:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a.int_col = t.int_col
 |  other predicates: t.bool_col = FALSE
+|  row-size=13B cardinality=41.95K
 |
 |--01:SCAN HDFS [functional.alltypes t]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: t.bool_col = FALSE
+|     row-size=5B cardinality=3.65K
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> a.id
+   row-size=8B cardinality=11.00K
 ====
 # Subquery in the outer-joined table
 select count(*)
@@ -278,26 +323,32 @@ PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 04:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a.int_col = g.int_col
 |  other predicates: g.bigint_col < 100
+|  row-size=21B cardinality=5.84K
 |
 |--03:HASH JOIN [LEFT SEMI JOIN]
 |  |  hash predicates: id = id
 |  |  runtime filters: RF000 <- id
+|  |  row-size=16B cardinality=8
 |  |
 |  |--02:SCAN HDFS [functional.alltypestiny]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=4B cardinality=8
 |  |
 |  01:SCAN HDFS [functional.alltypesagg g]
 |     partitions=11/11 files=11 size=814.73KB
 |     predicates: g.bigint_col < 100
 |     runtime filters: RF000 -> id
+|     row-size=16B cardinality=1.10K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    predicates: a.bool_col = FALSE
+   row-size=5B cardinality=3.65K
 ====
 # Multiple tables in the FROM clause of the subquery
 select count(distinct id)
@@ -311,37 +362,46 @@ PLAN-ROOT SINK
 |
 08:AGGREGATE [FINALIZE]
 |  output: count(id)
+|  row-size=8B cardinality=1
 |
 07:AGGREGATE
 |  group by: id
+|  row-size=4B cardinality=115
 |
 06:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.int_col = t.int_col
 |  runtime filters: RF000 <- t.int_col
+|  row-size=8B cardinality=115
 |
 |--05:HASH JOIN [INNER JOIN]
 |  |  hash predicates: s.bigint_col = n.bigint_col
 |  |  runtime filters: RF002 <- n.bigint_col
+|  |  row-size=29B cardinality=40
 |  |
 |  |--03:SCAN HDFS [functional.alltypestiny n]
 |  |     partitions=4/4 files=4 size=460B
 |  |     predicates: n.bool_col = FALSE
+|  |     row-size=9B cardinality=4
 |  |
 |  04:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t.id = s.id
 |  |  runtime filters: RF004 <- s.id
+|  |  row-size=20B cardinality=99
 |  |
 |  |--02:SCAN HDFS [functional.alltypessmall s]
 |  |     partitions=4/4 files=4 size=6.32KB
 |  |     runtime filters: RF002 -> s.bigint_col
+|  |     row-size=12B cardinality=100
 |  |
 |  01:SCAN HDFS [functional.alltypes t]
 |     partitions=24/24 files=24 size=478.45KB
 |     runtime filters: RF004 -> t.id
+|     row-size=8B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> a.int_col
+   row-size=8B cardinality=11.00K
 ====
 # Subqueries with inline views
 select *
@@ -356,26 +416,32 @@ PLAN-ROOT SINK
 05:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.id = a.id
 |  runtime filters: RF000 <- a.id
+|  row-size=89B cardinality=8
 |
 |--04:HASH JOIN [INNER JOIN]
 |  |  hash predicates: id = a.id
 |  |  runtime filters: RF002 <- a.id
+|  |  row-size=16B cardinality=8
 |  |
 |  |--01:SCAN HDFS [functional.alltypestiny a]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=4B cardinality=8
 |  |
 |  03:AGGREGATE [FINALIZE]
 |  |  output: count(*)
 |  |  group by: id
 |  |  having: count(*) = 10
+|  |  row-size=12B cardinality=99
 |  |
 |  02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
 |     runtime filters: RF002 -> functional.alltypessmall.id
+|     row-size=4B cardinality=100
 |
 00:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> t.id
+   row-size=89B cardinality=7.30K
 ====
 with t as (select a.* from functional.alltypes a where id in
   (select id from functional.alltypestiny))
@@ -386,14 +452,17 @@ PLAN-ROOT SINK
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id
 |  runtime filters: RF000 <- id
+|  row-size=89B cardinality=8
 |
 |--01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    predicates: a.int_col = 10, a.bool_col = FALSE
    runtime filters: RF000 -> id
+   row-size=89B cardinality=516
 ====
 # Subqueries in WITH, FROM and WHERE clauses
 with t as (select a.* from functional.alltypes a
@@ -410,36 +479,45 @@ PLAN-ROOT SINK
 08:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.int_col = int_col
 |  runtime filters: RF000 <- int_col
+|  row-size=109B cardinality=91
 |
 |--06:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=4B cardinality=100
 |
 07:HASH JOIN [INNER JOIN]
 |  hash predicates: g.string_col = a.string_col
 |  runtime filters: RF002 <- a.string_col
+|  row-size=109B cardinality=91
 |
 |--02:HASH JOIN [LEFT SEMI JOIN]
 |  |  hash predicates: id = id
 |  |  runtime filters: RF006 <- id
+|  |  row-size=89B cardinality=8
 |  |
 |  |--01:SCAN HDFS [functional.alltypestiny]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=4B cardinality=8
 |  |
 |  00:SCAN HDFS [functional.alltypes a]
 |     partitions=24/24 files=24 size=478.45KB
 |     runtime filters: RF000 -> a.int_col, RF006 -> id
+|     row-size=89B cardinality=7.30K
 |
 05:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: g.id = id
 |  runtime filters: RF004 <- id
+|  row-size=20B cardinality=5.50K
 |
 |--04:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 03:SCAN HDFS [functional.alltypesagg g]
    partitions=11/11 files=11 size=814.73KB
    predicates: g.bool_col = FALSE
    runtime filters: RF002 -> g.string_col, RF004 -> g.id
+   row-size=20B cardinality=5.50K
 ====
 # Correlated subqueries
 select *
@@ -453,14 +531,17 @@ PLAN-ROOT SINK
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id, t.int_col = a.int_col
 |  runtime filters: RF000 <- id, RF001 <- a.int_col
+|  row-size=89B cardinality=3.65K
 |
 |--01:SCAN HDFS [functional.alltypesagg a]
 |     partitions=11/11 files=11 size=814.73KB
+|     row-size=8B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
    predicates: t.bool_col = FALSE
    runtime filters: RF000 -> id, RF001 -> t.int_col
+   row-size=89B cardinality=3.65K
 ====
 # Multiple nesting levels (uncorrelated queries)
 select *
@@ -476,23 +557,28 @@ PLAN-ROOT SINK
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id
 |  runtime filters: RF000 <- id
+|  row-size=89B cardinality=11
 |
 |--03:HASH JOIN [LEFT SEMI JOIN]
 |  |  hash predicates: int_col = int_col
 |  |  runtime filters: RF002 <- int_col
+|  |  row-size=9B cardinality=11
 |  |
 |  |--02:SCAN HDFS [functional.alltypestiny]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=4B cardinality=8
 |  |
 |  01:SCAN HDFS [functional.alltypesagg]
 |     partitions=11/11 files=11 size=814.73KB
 |     predicates: bool_col = FALSE
 |     runtime filters: RF002 -> int_col
+|     row-size=9B cardinality=5.50K
 |
 00:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
    predicates: bigint_col < 1000
    runtime filters: RF000 -> id
+   row-size=89B cardinality=730
 ====
 # Multiple nesting levels (correlated queries)
 select *
@@ -508,21 +594,26 @@ PLAN-ROOT SINK
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id, t.int_col = a.int_col
 |  runtime filters: RF000 <- id, RF001 <- a.int_col
+|  row-size=89B cardinality=22
 |
 |--03:HASH JOIN [LEFT SEMI JOIN]
 |  |  hash predicates: a.bigint_col = s.bigint_col, a.tinyint_col = tinyint_col
 |  |  runtime filters: RF004 <- s.bigint_col, RF005 <- tinyint_col
+|  |  row-size=17B cardinality=22
 |  |
 |  |--02:SCAN HDFS [functional.alltypestiny s]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=9B cardinality=8
 |  |
 |  01:SCAN HDFS [functional.alltypesagg a]
 |     partitions=11/11 files=11 size=814.73KB
 |     runtime filters: RF004 -> a.bigint_col, RF005 -> a.tinyint_col
+|     row-size=17B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> id, RF001 -> t.int_col
+   row-size=89B cardinality=7.30K
 ====
 # Multiple nesting levels (correlated and uncorrelated queries)
 select *
@@ -536,21 +627,26 @@ PLAN-ROOT SINK
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id
 |  runtime filters: RF000 <- id
+|  row-size=89B cardinality=22
 |
 |--03:HASH JOIN [LEFT SEMI JOIN]
 |  |  hash predicates: a.bigint_col = s.bigint_col, a.int_col = int_col
 |  |  runtime filters: RF002 <- s.bigint_col, RF003 <- int_col
+|  |  row-size=16B cardinality=22
 |  |
 |  |--02:SCAN HDFS [functional.alltypestiny s]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=12B cardinality=8
 |  |
 |  01:SCAN HDFS [functional.alltypesagg a]
 |     partitions=11/11 files=11 size=814.73KB
 |     runtime filters: RF002 -> a.bigint_col, RF003 -> a.int_col
+|     row-size=16B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> id
+   row-size=89B cardinality=7.30K
 ====
 # Predicate propagation with uncorrelated subqueries
 select *
@@ -563,15 +659,18 @@ PLAN-ROOT SINK
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id
 |  runtime filters: RF000 <- id
+|  row-size=89B cardinality=730
 |
 |--01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: id < 10
+|     row-size=4B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    predicates: functional.alltypes.id < 10
    runtime filters: RF000 -> id
+   row-size=89B cardinality=730
 ====
 # Predicate propagation with correlated subqueries
 select *
@@ -584,23 +683,28 @@ PLAN-ROOT SINK
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.id = s.id, t.int_col = int_col
 |  runtime filters: RF000 <- s.id, RF001 <- int_col
+|  row-size=184B cardinality=10
 |
 |--02:SCAN HDFS [functional.alltypessmall s]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: s.int_col < 10
+|     row-size=8B cardinality=10
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = t.id
 |  runtime filters: RF004 <- t.id
+|  row-size=184B cardinality=782
 |
 |--01:SCAN HDFS [functional.alltypes t]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: t.int_col < 10
 |     runtime filters: RF000 -> t.id, RF001 -> t.int_col
+|     row-size=89B cardinality=730
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> a.id, RF004 -> a.id
+   row-size=95B cardinality=11.00K
 ====
 # Correlated EXISTS
 select count(*)
@@ -612,17 +716,21 @@ PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 02:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: a.id = t.id
 |  runtime filters: RF000 <- t.id
+|  row-size=4B cardinality=7.30K
 |
 |--00:SCAN HDFS [functional.alltypes t]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 01:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> a.id
+   row-size=4B cardinality=11.00K
 ====
 # Correlated EXISTS with an analytic function and a group by clause
 select 1
@@ -639,17 +747,21 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.tinyint_col = b.tinyint_col
 |  runtime filters: RF000 <- b.tinyint_col
+|  row-size=1B cardinality=244
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  group by: id, int_col, bool_col, b.tinyint_col
+|  |  row-size=10B cardinality=8
 |  |
 |  01:SCAN HDFS [functional.alltypestiny b]
 |     partitions=4/4 files=4 size=460B
+|     row-size=10B cardinality=8
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
    predicates: tinyint_col < 10
    runtime filters: RF000 -> a.tinyint_col
+   row-size=1B cardinality=1.10K
 ====
 # Correlated NOT EXISTS
 select count(*)
@@ -661,15 +773,19 @@ PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 02:HASH JOIN [RIGHT ANTI JOIN]
 |  hash predicates: a.int_col = t.int_col
+|  row-size=4B cardinality=7.30K
 |
 |--00:SCAN HDFS [functional.alltypes t]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 01:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
+   row-size=4B cardinality=11.00K
 ====
 # Correlated NOT EXISTS with an analytic function and a group by clause
 select count(*)
@@ -685,28 +801,35 @@ PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 05:HASH JOIN [LEFT ANTI JOIN]
 |  hash predicates: a.int_col = b.int_col
+|  row-size=5B cardinality=5.50K
 |
 |--04:AGGREGATE [FINALIZE]
 |  |  group by: b.id, b.int_col, b.bigint_col
+|  |  row-size=16B cardinality=50
 |  |
 |  03:HASH JOIN [INNER JOIN]
 |  |  hash predicates: c.id = b.id
 |  |  runtime filters: RF000 <- b.id
+|  |  row-size=21B cardinality=50
 |  |
 |  |--01:SCAN HDFS [functional.alltypessmall b]
 |  |     partitions=4/4 files=4 size=6.32KB
+|  |     row-size=16B cardinality=100
 |  |
 |  02:SCAN HDFS [functional.alltypes c]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: c.bool_col = FALSE
 |     runtime filters: RF000 -> c.id
+|     row-size=5B cardinality=3.65K
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
    predicates: bool_col = FALSE
+   row-size=5B cardinality=5.50K
 ====
 # Uncorrelated EXISTS
 select *
@@ -716,14 +839,17 @@ where exists (select * from functional.alltypessmall s where s.id < 5)
 PLAN-ROOT SINK
 |
 02:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|  row-size=89B cardinality=8
 |
 |--01:SCAN HDFS [functional.alltypessmall s]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: s.id < 5
 |     limit: 1
+|     row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypestiny t]
    partitions=4/4 files=4 size=460B
+   row-size=89B cardinality=8
 ====
 # Uncorrelated EXISTS with an analytic function and a group by clause
 select 1
@@ -736,17 +862,21 @@ where exists
 PLAN-ROOT SINK
 |
 03:NESTED LOOP JOIN [RIGHT SEMI JOIN]
+|  row-size=0B cardinality=8
 |
 |--00:SCAN HDFS [functional.alltypestiny t]
 |     partitions=4/4 files=4 size=460B
+|     row-size=0B cardinality=8
 |
 02:AGGREGATE [FINALIZE]
 |  group by: id, int_col, bigint_col
 |  limit: 1
+|  row-size=16B cardinality=1
 |
 01:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
    predicates: tinyint_col = 10
+   row-size=17B cardinality=1.22K
 ====
 # Uncorrelated EXISTS with a LIMIT 0 clause
 select 1
@@ -765,14 +895,17 @@ where not exists (select * from functional.alltypessmall s where s.id < 5)
 PLAN-ROOT SINK
 |
 02:NESTED LOOP JOIN [LEFT ANTI JOIN]
+|  row-size=89B cardinality=8
 |
 |--01:SCAN HDFS [functional.alltypessmall s]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: s.id < 5
 |     limit: 1
+|     row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypestiny t]
    partitions=4/4 files=4 size=460B
+   row-size=89B cardinality=8
 ====
 # Uncorrelated NOT exists referencing a WITH clause
 with
@@ -785,14 +918,17 @@ where not exists (select 1 from w2)
 PLAN-ROOT SINK
 |
 02:NESTED LOOP JOIN [LEFT ANTI JOIN]
+|  row-size=89B cardinality=8
 |
 |--01:SCAN HDFS [functional.alltypessmall s]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: s.id < 0
 |     limit: 1
+|     row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypestiny t]
    partitions=4/4 files=4 size=460B
+   row-size=89B cardinality=8
 ====
 # Uncorrelated NOT EXISTS with an analytic function and a group by clause
 select 1
@@ -805,17 +941,21 @@ where not exists
 PLAN-ROOT SINK
 |
 03:NESTED LOOP JOIN [RIGHT ANTI JOIN]
+|  row-size=0B cardinality=8
 |
 |--00:SCAN HDFS [functional.alltypestiny t]
 |     partitions=4/4 files=4 size=460B
+|     row-size=0B cardinality=8
 |
 02:AGGREGATE [FINALIZE]
 |  group by: id, int_col, bigint_col
 |  limit: 1
+|  row-size=16B cardinality=1
 |
 01:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
    predicates: tinyint_col = 10
+   row-size=17B cardinality=1.22K
 ====
 # Uncorrelated NOT EXISTS with a LIMIT 0 clause
 select 1
@@ -826,6 +966,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypestiny t]
    partitions=4/4 files=4 size=460B
+   row-size=0B cardinality=8
 ====
 # Multiple nesting levels
 select count(*)
@@ -839,26 +980,32 @@ PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 04:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.id = t.id
 |  runtime filters: RF000 <- t.id
+|  row-size=4B cardinality=8
 |
 |--03:HASH JOIN [RIGHT SEMI JOIN]
 |  |  hash predicates: g.int_col = t.int_col
 |  |  runtime filters: RF002 <- t.int_col
+|  |  row-size=8B cardinality=8
 |  |
 |  |--01:SCAN HDFS [functional.alltypestiny t]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=8B cardinality=8
 |  |
 |  02:SCAN HDFS [functional.alltypesagg g]
 |     partitions=11/11 files=11 size=814.73KB
 |     predicates: g.bool_col = FALSE
 |     runtime filters: RF002 -> g.int_col
+|     row-size=5B cardinality=5.50K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> a.id
+   row-size=4B cardinality=7.30K
 ====
 # Multiple subquery predicates
 select g.int_col, count(*)
@@ -885,50 +1032,61 @@ PLAN-ROOT SINK
 |  output: count(*)
 |  group by: g.int_col
 |  having: count(*) < 100
+|  row-size=12B cardinality=4
 |
 09:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: t.id = g.id
 |  other predicates: g.tinyint_col < zeroifnull(count(*))
 |  runtime filters: RF000 <- g.id
+|  row-size=33B cardinality=4
 |
 |--08:HASH JOIN [RIGHT SEMI JOIN]
 |  |  hash predicates: bigint_col = g.bigint_col, s.id = g.id
 |  |  runtime filters: RF002 <- g.bigint_col, RF003 <- g.id
+|  |  row-size=21B cardinality=4
 |  |
 |  |--07:HASH JOIN [LEFT SEMI JOIN]
 |  |  |  hash predicates: g.id = t.id
 |  |  |  runtime filters: RF006 <- t.id
+|  |  |  row-size=21B cardinality=4
 |  |  |
 |  |  |--02:SCAN HDFS [functional.alltypestiny t]
 |  |  |     partitions=4/4 files=4 size=460B
 |  |  |     predicates: t.bool_col = FALSE
+|  |  |     row-size=5B cardinality=4
 |  |  |
 |  |  06:HASH JOIN [RIGHT OUTER JOIN]
 |  |  |  hash predicates: a.id = g.id
 |  |  |  runtime filters: RF008 <- g.id
+|  |  |  row-size=21B cardinality=1.10K
 |  |  |
 |  |  |--00:SCAN HDFS [functional.alltypesagg g]
 |  |  |     partitions=11/11 files=11 size=814.73KB
 |  |  |     predicates: g.int_col < 100
 |  |  |     runtime filters: RF006 -> g.id
+|  |  |     row-size=17B cardinality=1.10K
 |  |  |
 |  |  01:SCAN HDFS [functional.alltypes a]
 |  |     partitions=24/24 files=24 size=478.45KB
 |  |     runtime filters: RF006 -> a.id, RF008 -> a.id
+|  |     row-size=4B cardinality=7.30K
 |  |
 |  03:SCAN HDFS [functional.alltypessmall s]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: s.int_col > 10
 |     runtime filters: RF002 -> bigint_col, RF003 -> s.id
+|     row-size=16B cardinality=10
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: t.id
+|  row-size=12B cardinality=3.65K
 |
 04:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
    predicates: t.bool_col = TRUE
    runtime filters: RF000 -> t.id
+   row-size=5B cardinality=3.65K
 ====
 # Subqueries with aggregation
 select *
@@ -943,19 +1101,23 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.int_col = count(int_col)
 |  runtime filters: RF000 <- count(int_col)
+|  row-size=89B cardinality=730
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: count(int_col)
 |  |  group by: int_col
+|  |  row-size=12B cardinality=957
 |  |
 |  01:SCAN HDFS [functional.alltypesagg g]
 |     partitions=11/11 files=11 size=814.73KB
 |     predicates: g.bool_col
+|     row-size=5B cardinality=1.10K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    predicates: a.bigint_col < 10
    runtime filters: RF000 -> a.int_col
+   row-size=89B cardinality=730
 ====
 # Uncorrelated aggregation subquery
 select *
@@ -968,17 +1130,21 @@ PLAN-ROOT SINK
 |
 03:NESTED LOOP JOIN [INNER JOIN]
 |  predicates: a.int_col < max(int_col)
+|  row-size=93B cardinality=730
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: max(int_col)
+|  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypesagg g]
 |     partitions=11/11 files=11 size=814.73KB
 |     predicates: g.bool_col = TRUE
+|     row-size=5B cardinality=5.50K
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    predicates: a.bigint_col > 10
+   row-size=89B cardinality=730
 ====
 # Aggregation subquery with constant comparison expr
 select *
@@ -989,18 +1155,22 @@ and a.int_col < 10
 PLAN-ROOT SINK
 |
 03:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=99B cardinality=0
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: max(id)
 |  |  having: max(id) > 10
+|  |  row-size=4B cardinality=0
 |  |
 |  01:SCAN HDFS [functional.alltypes t]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: t.bool_col = FALSE
+|     row-size=5B cardinality=3.65K
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
    predicates: a.int_col < 10
+   row-size=95B cardinality=1.10K
 ====
 # Correlated aggregation subquery
 select a.int_col, count(*)
@@ -1017,23 +1187,28 @@ PLAN-ROOT SINK
 04:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: a.int_col
+|  row-size=12B cardinality=10
 |
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.id = min(id), a.int_col = t.int_col
 |  runtime filters: RF000 <- min(id), RF001 <- t.int_col
+|  row-size=9B cardinality=10
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: min(id)
 |  |  group by: t.int_col
+|  |  row-size=8B cardinality=10
 |  |
 |  01:SCAN HDFS [functional.alltypes t]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: t.tinyint_col < 10
+|     row-size=9B cardinality=730
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
    predicates: a.bool_col = FALSE
    runtime filters: RF000 -> a.id, RF001 -> a.int_col
+   row-size=9B cardinality=5.50K
 ====
 # Aggregation subquery with multiple tables
 select t.tinyint_col, count(*)
@@ -1051,37 +1226,46 @@ PLAN-ROOT SINK
 08:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: t.tinyint_col
+|  row-size=9B cardinality=10
 |
 07:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.bigint_col = s.bigint_col
 |  other join predicates: t.int_col < min(s.int_col)
 |  runtime filters: RF000 <- s.bigint_col
+|  row-size=22B cardinality=7.30K
 |
 |--05:AGGREGATE [FINALIZE]
 |  |  output: min(s.int_col)
 |  |  group by: s.bigint_col
+|  |  row-size=12B cardinality=10
 |  |
 |  04:HASH JOIN [LEFT OUTER JOIN]
 |  |  hash predicates: s.id = p.id
+|  |  row-size=21B cardinality=50
 |  |
 |  |--03:SCAN HDFS [functional.alltypestiny p]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=4B cardinality=8
 |  |
 |  02:SCAN HDFS [functional.alltypessmall s]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: s.bool_col = FALSE
+|     row-size=17B cardinality=50
 |
 06:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t.id = a.id
 |  other predicates: a.bool_col = FALSE
+|  row-size=22B cardinality=7.30K
 |
 |--01:SCAN HDFS [functional.alltypesagg a]
 |     partitions=11/11 files=11 size=814.73KB
 |     predicates: a.bool_col = FALSE
+|     row-size=5B cardinality=5.50K
 |
 00:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> t.bigint_col
+   row-size=17B cardinality=7.30K
 ====
 # Multiple aggregation subqueries
 select *
@@ -1098,39 +1282,48 @@ PLAN-ROOT SINK
 |
 08:NESTED LOOP JOIN [INNER JOIN]
 |  predicates: a.tinyint_col > max(tinyint_col)
+|  row-size=185B cardinality=781
 |
 |--05:AGGREGATE [FINALIZE]
 |  |  output: max(tinyint_col)
+|  |  row-size=1B cardinality=1
 |  |
 |  04:SCAN HDFS [functional.alltypessmall s]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: s.id < 10
+|     row-size=5B cardinality=10
 |
 07:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.bigint_col = g.bigint_col
 |  other join predicates: a.int_col < min(int_col)
 |  runtime filters: RF000 <- g.bigint_col
+|  row-size=184B cardinality=781
 |
 |--03:AGGREGATE [FINALIZE]
 |  |  output: min(int_col)
 |  |  group by: g.bigint_col
+|  |  row-size=12B cardinality=2
 |  |
 |  02:SCAN HDFS [functional.alltypestiny g]
 |     partitions=4/4 files=4 size=460B
 |     predicates: g.bool_col = FALSE
+|     row-size=13B cardinality=4
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = t.id
 |  runtime filters: RF002 <- t.id
+|  row-size=184B cardinality=3.91K
 |
 |--01:SCAN HDFS [functional.alltypes t]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: t.bool_col = FALSE
 |     runtime filters: RF000 -> t.bigint_col
+|     row-size=89B cardinality=3.65K
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF002 -> a.id
+   row-size=95B cardinality=11.00K
 ====
 # Multiple nesting levels with aggregation subqueries
 select *
@@ -1150,29 +1343,36 @@ PLAN-ROOT SINK
 |  hash predicates: t.id = g.id
 |  other join predicates: t.int_col < avg(g.int_col) * 2
 |  runtime filters: RF000 <- g.id
+|  row-size=89B cardinality=7.30K
 |
 |--05:AGGREGATE [FINALIZE]
 |  |  output: avg(g.int_col)
 |  |  group by: g.id
+|  |  row-size=12B cardinality=10.28K
 |  |
 |  04:HASH JOIN [LEFT OUTER JOIN]
 |  |  hash predicates: g.id = a.id
 |  |  other predicates: g.bigint_col < zeroifnull(count(*))
+|  |  row-size=28B cardinality=11.00K
 |  |
 |  |--03:AGGREGATE [FINALIZE]
 |  |  |  output: count(*)
 |  |  |  group by: a.id
+|  |  |  row-size=12B cardinality=4
 |  |  |
 |  |  02:SCAN HDFS [functional.alltypestiny a]
 |  |     partitions=4/4 files=4 size=460B
 |  |     predicates: a.bool_col = FALSE
+|  |     row-size=5B cardinality=4
 |  |
 |  01:SCAN HDFS [functional.alltypesagg g]
 |     partitions=11/11 files=11 size=814.73KB
+|     row-size=16B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> t.id
+   row-size=89B cardinality=7.30K
 ====
 # Multiple nesting of aggregate subquery predicates with count
 select *
@@ -1189,32 +1389,41 @@ PLAN-ROOT SINK
 08:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a.id = s.id
 |  other predicates: a.int_col < zeroifnull(count(*))
+|  row-size=196B cardinality=11.00K
 |
 |--06:AGGREGATE [FINALIZE]
 |  |  output: count(*)
 |  |  group by: s.id
+|  |  row-size=12B cardinality=99
 |  |
 |  05:NESTED LOOP JOIN [INNER JOIN]
 |  |  predicates: s.tinyint_col > count(*)
+|  |  row-size=13B cardinality=100
 |  |
 |  |--04:AGGREGATE [FINALIZE]
 |  |  |  output: count(*)
+|  |  |  row-size=8B cardinality=1
 |  |  |
 |  |  03:SCAN HDFS [functional.alltypestiny]
 |  |     partitions=4/4 files=4 size=460B
 |  |     predicates: bool_col = FALSE
+|  |     row-size=1B cardinality=4
 |  |
 |  02:SCAN HDFS [functional.alltypessmall s]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=5B cardinality=100
 |
 07:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: a.id = t.id
+|  row-size=184B cardinality=11.00K
 |
 |--01:SCAN HDFS [functional.alltypes t]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=89B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
+   row-size=95B cardinality=11.00K
 ====
 # Distinct in the outer select block
 select distinct id, bool_col
@@ -1226,20 +1435,25 @@ PLAN-ROOT SINK
 |
 04:AGGREGATE [FINALIZE]
 |  group by: id, bool_col
+|  row-size=5B cardinality=0
 |
 03:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=13B cardinality=0
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: count(*)
 |  |  having: count(*) > 100
+|  |  row-size=8B cardinality=0
 |  |
 |  01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: id < 5, bool_col = FALSE
+|     row-size=5B cardinality=516
 |
 00:SCAN HDFS [functional.alltypesagg g]
    partitions=11/11 files=11 size=814.73KB
    predicates: bool_col = FALSE
+   row-size=5B cardinality=5.50K
 ====
 # Distinct with an unqualified star in the outer select block
 select distinct *
@@ -1251,23 +1465,29 @@ PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
 |  group by: g.id, g.bool_col, g.tinyint_col, g.smallint_col, g.int_col, g.bigint_col, g.float_col, g.double_col, g.date_string_col, g.string_col, g.timestamp_col, g.year, g.month, g.day
+|  row-size=95B cardinality=0
 |
 04:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=103B cardinality=0
 |
 |--03:AGGREGATE [FINALIZE]
 |  |  output: count(id)
 |  |  having: count(id) < 100
+|  |  row-size=8B cardinality=0
 |  |
 |  02:AGGREGATE
 |  |  group by: id
+|  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |     predicates: int_col < 5
+|     row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional.alltypesagg g]
    partitions=11/11 files=11 size=814.73KB
    predicates: g.bigint_col = 1
+   row-size=95B cardinality=11
 ====
 # Aggregate subquery in an IS NULL predicate
 select *
@@ -1277,17 +1497,21 @@ where (select max(int_col) from functional.alltypesagg where int_col is null) is
 PLAN-ROOT SINK
 |
 03:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=93B cardinality=0
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: max(int_col)
 |  |  having: max(int_col) IS NULL
+|  |  row-size=4B cardinality=0
 |  |
 |  01:SCAN HDFS [functional.alltypesagg]
 |     partitions=11/11 files=11 size=814.73KB
 |     predicates: int_col IS NULL
+|     row-size=4B cardinality=20
 |
 00:SCAN HDFS [functional.alltypestiny t]
    partitions=4/4 files=4 size=460B
+   row-size=89B cardinality=8
 ====
 # Correlated aggregate subquery with a count in an IS NULL predicate
 select int_col, count(*)
@@ -1301,24 +1525,29 @@ PLAN-ROOT SINK
 04:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: int_col
+|  row-size=12B cardinality=2
 |
 03:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: g.id = t.id
 |  other predicates: zeroifnull(count(*)) IS NULL
 |  runtime filters: RF000 <- t.id
+|  row-size=21B cardinality=4
 |
 |--00:SCAN HDFS [functional.alltypestiny t]
 |     partitions=4/4 files=4 size=460B
 |     predicates: bool_col = FALSE
+|     row-size=9B cardinality=4
 |
 02:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: g.id
 |  having: zeroifnull(count(*)) IS NULL
+|  row-size=12B cardinality=1.03K
 |
 01:SCAN HDFS [functional.alltypesagg g]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> g.id
+   row-size=4B cardinality=11.00K
 ====
 # Correlated aggregate subquery in an IS NULL predicate
 select *
@@ -1334,20 +1563,24 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.id = g.id
 |  runtime filters: RF000 <- g.id
+|  row-size=89B cardinality=4
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: max(int_col)
 |  |  group by: g.id
 |  |  having: max(int_col) IS NULL
+|  |  row-size=8B cardinality=20
 |  |
 |  01:SCAN HDFS [functional.alltypesagg g]
 |     partitions=11/11 files=11 size=814.73KB
 |     predicates: g.int_col IS NULL
+|     row-size=8B cardinality=20
 |
 00:SCAN HDFS [functional.alltypestiny t]
    partitions=4/4 files=4 size=460B
    predicates: t.bool_col = FALSE
    runtime filters: RF000 -> t.id
+   row-size=89B cardinality=4
 ====
 # Complex expr with a scalar subquery
 select *
@@ -1361,18 +1594,22 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.int_col + 2 = 1 + count(*)
 |  runtime filters: RF000 <- 1 + count(*)
+|  row-size=89B cardinality=1
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: count(*)
+|  |  row-size=8B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypesagg]
 |     partitions=11/11 files=11 size=814.73KB
 |     predicates: bool_col = FALSE
+|     row-size=1B cardinality=5.50K
 |
 00:SCAN HDFS [functional.alltypestiny t]
    partitions=4/4 files=4 size=460B
    predicates: t.bigint_col < 100
    runtime filters: RF000 -> t.int_col + 2
+   row-size=89B cardinality=1
 ====
 # Scalar subquery in a function
 select *
@@ -1384,18 +1621,22 @@ and t.id < 10
 PLAN-ROOT SINK
 |
 03:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=93B cardinality=0
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: min(id)
 |  |  having: nullifzero(min(id)) IS NULL
+|  |  row-size=4B cardinality=0
 |  |
 |  01:SCAN HDFS [functional.alltypessmall s]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: s.bool_col = FALSE
+|     row-size=5B cardinality=50
 |
 00:SCAN HDFS [functional.alltypestiny t]
    partitions=4/4 files=4 size=460B
    predicates: t.id < 10
+   row-size=89B cardinality=1
 ====
 # Correlated aggregate subquery with a LIMIT clause that is removed during the rewrite
 select min(t.id)
@@ -1412,22 +1653,27 @@ PLAN-ROOT SINK
 04:AGGREGATE [FINALIZE]
 |  output: min(t.id)
 |  group by: t.bool_col
+|  row-size=5B cardinality=2
 |
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t.id = s.id
 |  other join predicates: t.int_col < sum(s.int_col)
 |  runtime filters: RF000 <- s.id
+|  row-size=9B cardinality=99
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: sum(s.int_col)
 |  |  group by: s.id
+|  |  row-size=12B cardinality=99
 |  |
 |  01:SCAN HDFS [functional.alltypessmall s]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=8B cardinality=100
 |
 00:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> t.id
+   row-size=9B cardinality=7.30K
 ====
 # Between predicate with subqueries
 select *
@@ -1440,26 +1686,33 @@ PLAN-ROOT SINK
 |
 06:NESTED LOOP JOIN [INNER JOIN]
 |  predicates: int_col <= max(int_col)
+|  row-size=97B cardinality=8
 |
 |--04:AGGREGATE [FINALIZE]
 |  |  output: max(int_col)
+|  |  row-size=4B cardinality=1
 |  |
 |  03:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: bool_col = TRUE
+|     row-size=5B cardinality=50
 |
 05:NESTED LOOP JOIN [INNER JOIN]
 |  predicates: int_col >= min(int_col)
+|  row-size=93B cardinality=8
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: min(int_col)
+|  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: bool_col = FALSE
+|     row-size=5B cardinality=50
 |
 00:SCAN HDFS [functional.alltypestiny t]
    partitions=4/4 files=4 size=460B
+   row-size=89B cardinality=8
 ====
 # Aggregate subquery with count (subquery op slotRef)
 select t1.id
@@ -1475,17 +1728,21 @@ PLAN-ROOT SINK
 |  hash predicates: tt1.month = t1.id
 |  other predicates: t1.id > zeroifnull(count(tt1.smallint_col))
 |  runtime filters: RF000 <- t1.id
+|  row-size=16B cardinality=8
 |
 |--00:SCAN HDFS [functional.alltypestiny t1]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 02:AGGREGATE [FINALIZE]
 |  output: count(tt1.smallint_col)
 |  group by: tt1.month
+|  row-size=12B cardinality=4
 |
 01:SCAN HDFS [functional.alltypestiny tt1]
    partitions=4/4 files=4 size=460B
    runtime filters: RF000 -> tt1.month
+   row-size=6B cardinality=8
 ====
 # Correlated aggregate subquery with count in a function participating in
 # a complex arithmetic expr
@@ -1501,22 +1758,27 @@ PLAN-ROOT SINK
 04:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: int_col
+|  row-size=12B cardinality=2
 |
 03:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: s.id = t.id
 |  other predicates: 1 + log(abs(zeroifnull(count(int_col))), 2) < 10
 |  runtime filters: RF000 <- t.id
+|  row-size=20B cardinality=8
 |
 |--00:SCAN HDFS [functional.alltypestiny t]
 |     partitions=4/4 files=4 size=460B
+|     row-size=8B cardinality=8
 |
 02:AGGREGATE [FINALIZE]
 |  output: count(int_col)
 |  group by: s.id
+|  row-size=12B cardinality=7.30K
 |
 01:SCAN HDFS [functional.alltypes s]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> s.id
+   row-size=8B cardinality=7.30K
 ====
 # Correlated scalar subquery with an aggregate function that returns a
 # non-numeric type on empty input
@@ -1532,23 +1794,28 @@ PLAN-ROOT SINK
 04:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: int_col
+|  row-size=12B cardinality=2
 |
 03:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: s.id = t.id
 |  other predicates: t.string_col = ifnull(sample(int_col), '')
 |  runtime filters: RF000 <- t.id
+|  row-size=38B cardinality=4
 |
 |--00:SCAN HDFS [functional.alltypestiny t]
 |     partitions=4/4 files=4 size=460B
 |     predicates: bool_col = FALSE
+|     row-size=22B cardinality=4
 |
 02:AGGREGATE [FINALIZE]
 |  output: sample(int_col)
 |  group by: s.id
+|  row-size=16B cardinality=7.30K
 |
 01:SCAN HDFS [functional.alltypes s]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> s.id
+   row-size=8B cardinality=7.30K
 ====
 # Uncorrelated scalar subquery where columns from the outer appear in both sides
 # of the binary predicate
@@ -1560,15 +1827,19 @@ PLAN-ROOT SINK
 |
 03:NESTED LOOP JOIN [INNER JOIN]
 |  predicates: count(*) + t1.int_col = t1.bigint_col - 1
+|  row-size=20B cardinality=8
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: count(*)
+|  |  row-size=8B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=0B cardinality=100
 |
 00:SCAN HDFS [functional.alltypestiny t1]
    partitions=4/4 files=4 size=460B
+   row-size=12B cardinality=8
 ====
 # Uncorrelated scalar subquery in complex binary predicate that contains columns
 # from two tables of the outer
@@ -1580,23 +1851,29 @@ PLAN-ROOT SINK
 |
 05:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.int_col + t2.int_col = count(*) + 1
+|  row-size=16B cardinality=9
 |
 |--03:AGGREGATE [FINALIZE]
 |  |  output: count(*)
+|  |  row-size=8B cardinality=1
 |  |
 |  02:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=0B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t2.id = t1.id
 |  runtime filters: RF000 <- t1.id
+|  row-size=16B cardinality=9
 |
 |--00:SCAN HDFS [functional.alltypestiny t1]
 |     partitions=4/4 files=4 size=460B
+|     row-size=8B cardinality=8
 |
 01:SCAN HDFS [functional.alltypessmall t2]
    partitions=4/4 files=4 size=6.32KB
    runtime filters: RF000 -> t2.id
+   row-size=8B cardinality=100
 ====
 # Uncorrelated scalar subquery in complex binary predicate that contains columns
 # from two tables of the outer that appear in both sides of the predicate
@@ -1609,23 +1886,29 @@ PLAN-ROOT SINK
 |
 05:NESTED LOOP JOIN [INNER JOIN]
 |  predicates: count(*) + t2.bigint_col = t1.int_col + t2.int_col
+|  row-size=32B cardinality=9
 |
 |--03:AGGREGATE [FINALIZE]
 |  |  output: count(*)
+|  |  row-size=8B cardinality=1
 |  |
 |  02:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=0B cardinality=7.30K
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t2.id = t1.id
 |  runtime filters: RF000 <- t1.id
+|  row-size=24B cardinality=9
 |
 |--00:SCAN HDFS [functional.alltypestiny t1]
 |     partitions=4/4 files=4 size=460B
+|     row-size=8B cardinality=8
 |
 01:SCAN HDFS [functional.alltypessmall t2]
    partitions=4/4 files=4 size=6.32KB
    runtime filters: RF000 -> t2.id
+   row-size=16B cardinality=100
 ====
 # Correlated scalar subquery with complex correlated predicate (IMPALA-1335)
 select 1
@@ -1639,25 +1922,31 @@ PLAN-ROOT SINK
 |
 05:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: t1.id + t2.id = t.int_col
+|  row-size=4B cardinality=8
 |
 |--00:SCAN HDFS [functional.alltypestiny t]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 04:AGGREGATE [FINALIZE]
 |  output: sum(t1.id)
 |  group by: t1.id + t2.id
 |  having: sum(t1.id) = t1.id + t2.id
+|  row-size=16B cardinality=1.03K
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id
 |  runtime filters: RF002 <- t2.id
+|  row-size=8B cardinality=7.81K
 |
 |--02:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 01:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF002 -> t1.id
+   row-size=4B cardinality=11.00K
 ====
 # Correlated scalar subquery with complex correlared predicate (IMPALA-1335)
 select 1
@@ -1671,24 +1960,30 @@ PLAN-ROOT SINK
 |
 05:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: t1.id + t2.id = t.bigint_col, sum(t1.id) = t.int_col
+|  row-size=12B cardinality=8
 |
 |--00:SCAN HDFS [functional.alltypestiny t]
 |     partitions=4/4 files=4 size=460B
+|     row-size=12B cardinality=8
 |
 04:AGGREGATE [FINALIZE]
 |  output: sum(t1.id)
 |  group by: t1.id + t2.id
+|  row-size=16B cardinality=7.81K
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id
 |  runtime filters: RF004 <- t2.id
+|  row-size=8B cardinality=7.81K
 |
 |--02:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 01:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF004 -> t1.id
+   row-size=4B cardinality=11.00K
 ====
 # Outer query block with multiple tables and a correlated scalar subquery with
 # complex correlated predicate that references multiple subquery tables and multiple
@@ -1705,32 +2000,40 @@ PLAN-ROOT SINK
 |
 07:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: sum(tt1.id) = t1.bigint_col, tt1.id + tt2.id = t1.int_col - t2.int_col
+|  row-size=24B cardinality=9
 |
 |--06:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t2.id = t1.id
 |  |  runtime filters: RF004 <- t1.id
+|  |  row-size=24B cardinality=9
 |  |
 |  |--00:SCAN HDFS [functional.alltypestiny t1]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=16B cardinality=8
 |  |
 |  01:SCAN HDFS [functional.alltypessmall t2]
 |     partitions=4/4 files=4 size=6.32KB
 |     runtime filters: RF004 -> t2.id
+|     row-size=8B cardinality=100
 |
 05:AGGREGATE [FINALIZE]
 |  output: sum(tt1.id)
 |  group by: tt1.id + tt2.id
+|  row-size=16B cardinality=10.28K
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: tt1.int_col = tt2.int_col
 |  runtime filters: RF002 <- tt2.int_col
+|  row-size=16B cardinality=83.91K
 |
 |--03:SCAN HDFS [functional.alltypes tt2]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 02:SCAN HDFS [functional.alltypesagg tt1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF002 -> tt1.int_col
+   row-size=8B cardinality=11.00K
 ====
 # IMPALA-1550/IMPALA-4423: Correlated EXISTS and NOT EXISTS subqueries with aggregates
 # that can be evaluated at query compile time. All predicates evaluate to FALSE.
@@ -1778,6 +2081,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypestiny t1]
    partitions=4/4 files=4 size=460B
+   row-size=0B cardinality=8
 ====
 # Correlated EXISTS and NOT EXISTS subqueries with limit 0 and
 # aggregates. Some predicates evaluate to TRUE while others need to
@@ -1802,30 +2106,37 @@ PLAN-ROOT SINK
 |
 06:HASH JOIN [LEFT ANTI JOIN]
 |  hash predicates: t1.tinyint_col = t4.int_col
+|  row-size=5B cardinality=1
 |
 |--04:AGGREGATE [FINALIZE]
 |  |  output: count(id)
 |  |  group by: t4.int_col
 |  |  having: count(id) > 200
+|  |  row-size=12B cardinality=0
 |  |
 |  03:SCAN HDFS [functional.alltypestiny t4]
 |     partitions=4/4 files=4 size=460B
+|     row-size=8B cardinality=8
 |
 05:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: t3.id = t1.id
 |  runtime filters: RF000 <- t1.id
+|  row-size=5B cardinality=1
 |
 |--00:SCAN HDFS [functional.alltypestiny t1]
 |     partitions=4/4 files=4 size=460B
 |     predicates: t1.id > 100
+|     row-size=5B cardinality=1
 |
 02:AGGREGATE [FINALIZE]
 |  group by: int_col, t3.id
+|  row-size=8B cardinality=1.10K
 |
 01:SCAN HDFS [functional.alltypesagg t3]
    partitions=11/11 files=11 size=814.73KB
    predicates: t3.id > 100
    runtime filters: RF000 -> t3.id
+   row-size=8B cardinality=1.10K
 ====
 # Tests for <=> (aka IS NOT DISTINCT FROM) and IS DISTINCT FROM
 select * from functional.alltypesagg t1
@@ -1838,17 +2149,21 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.id IS NOT DISTINCT FROM min(id), t1.int_col IS NOT DISTINCT FROM t2.int_col
 |  runtime filters: RF000 <- min(id), RF001 <- t2.int_col
+|  row-size=95B cardinality=11
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: min(id)
 |  |  group by: t2.int_col
+|  |  row-size=8B cardinality=10
 |  |
 |  01:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.id, RF001 -> t1.int_col
+   row-size=95B cardinality=11.00K
 ====
 select * from functional.alltypesagg t1
 where t1.id is distinct from
@@ -1861,17 +2176,21 @@ PLAN-ROOT SINK
 |  hash predicates: t1.int_col IS NOT DISTINCT FROM t2.int_col
 |  other join predicates: t1.id IS DISTINCT FROM min(id)
 |  runtime filters: RF000 <- t2.int_col
+|  row-size=95B cardinality=115
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: min(id)
 |  |  group by: t2.int_col
+|  |  row-size=8B cardinality=10
 |  |
 |  01:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.int_col
+   row-size=95B cardinality=11.00K
 ====
 select * from functional.alltypesagg t1
 where t1.id =
@@ -1883,17 +2202,21 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.id = min(id), t1.int_col IS NOT DISTINCT FROM t2.int_col
 |  runtime filters: RF000 <- min(id), RF001 <- t2.int_col
+|  row-size=95B cardinality=11
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: min(id)
 |  |  group by: t2.int_col
+|  |  row-size=8B cardinality=10
 |  |
 |  01:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.id, RF001 -> t1.int_col
+   row-size=95B cardinality=11.00K
 ====
 select * from functional.alltypesagg t1
 where t1.id !=
@@ -1906,17 +2229,21 @@ PLAN-ROOT SINK
 |  hash predicates: t1.int_col IS NOT DISTINCT FROM t2.int_col
 |  other join predicates: t1.id != min(id)
 |  runtime filters: RF000 <- t2.int_col
+|  row-size=95B cardinality=115
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: min(id)
 |  |  group by: t2.int_col
+|  |  row-size=8B cardinality=10
 |  |
 |  01:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.int_col
+   row-size=95B cardinality=11.00K
 ====
 select * from functional.alltypesagg t1
 where t1.id is not distinct from
@@ -1928,17 +2255,21 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.id IS NOT DISTINCT FROM min(id), t1.int_col = t2.int_col
 |  runtime filters: RF000 <- min(id), RF001 <- t2.int_col
+|  row-size=95B cardinality=11
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: min(id)
 |  |  group by: t2.int_col
+|  |  row-size=8B cardinality=10
 |  |
 |  01:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.id, RF001 -> t1.int_col
+   row-size=95B cardinality=11.00K
 ====
 select * from functional.alltypesagg t1
 where t1.id is distinct from
@@ -1951,17 +2282,21 @@ PLAN-ROOT SINK
 |  hash predicates: t1.int_col = t2.int_col
 |  other join predicates: t1.id IS DISTINCT FROM min(id)
 |  runtime filters: RF000 <- t2.int_col
+|  row-size=95B cardinality=115
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: min(id)
 |  |  group by: t2.int_col
+|  |  row-size=8B cardinality=10
 |  |
 |  01:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.int_col
+   row-size=95B cardinality=11.00K
 ====
 select * from functional.alltypesagg t1
 where t1.id =
@@ -1973,17 +2308,21 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: t1.id = min(id), t1.int_col = t2.int_col
 |  runtime filters: RF000 <- min(id), RF001 <- t2.int_col
+|  row-size=95B cardinality=11
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: min(id)
 |  |  group by: t2.int_col
+|  |  row-size=8B cardinality=10
 |  |
 |  01:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.id, RF001 -> t1.int_col
+   row-size=95B cardinality=11.00K
 ====
 select * from functional.alltypesagg t1
 where t1.id !=
@@ -1996,17 +2335,21 @@ PLAN-ROOT SINK
 |  hash predicates: t1.int_col = t2.int_col
 |  other join predicates: t1.id != min(id)
 |  runtime filters: RF000 <- t2.int_col
+|  row-size=95B cardinality=115
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: min(id)
 |  |  group by: t2.int_col
+|  |  row-size=8B cardinality=10
 |  |
 |  01:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.int_col
+   row-size=95B cardinality=11.00K
 ====
 # IMPALA-3861: Test that IN subqueries with correlated BETWEEN predicates work.
 select 1 from functional.alltypes t where id in
@@ -2025,15 +2368,18 @@ PLAN-ROOT SINK
 |  hash predicates: id = id
 |  other join predicates: a.tinyint_col >= t.tinyint_col, t.float_col >= a.float_col, a.smallint_col <= t.int_col, a.tinyint_col <= t.smallint_col, t.float_col <= a.double_col, a.double_col <= CAST(t.string_col AS INT), t.string_col >= a.string_col, a.double_col >= round(acos(t.float_col), 2)
 |  runtime filters: RF000 <- id
+|  row-size=56B cardinality=730
 |
 |--00:SCAN HDFS [functional.alltypes t]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: t.bigint_col <= 20, t.string_col <= t.date_string_col
+|     row-size=56B cardinality=730
 |
 01:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
    predicates: a.int_col >= 20, a.smallint_col >= 10
    runtime filters: RF000 -> id
+   row-size=38B cardinality=1.10K
 ====
 # IMPALA-4423: Correlated EXISTS and NOT EXISTS subqueries with aggregates. Both
 # subqueries can be evaluated at query compile time. The first one evaluates to
@@ -2065,22 +2411,30 @@ PLAN-ROOT SINK
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=89B cardinality=12
 |
 |--05:SCAN HDFS [functional.alltypestiny]
+|     partition predicates: year = 2009, month = 2
 |     partitions=1/4 files=1 size=115B
+|     row-size=89B cardinality=2
 |
 |--04:SCAN HDFS [functional.alltypestiny]
+|     partition predicates: year = 2009, month = 1
 |     partitions=1/4 files=1 size=115B
+|     row-size=89B cardinality=2
 |
 03:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|  row-size=89B cardinality=8
 |
 |--02:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: int_col < 10
 |     limit: 1
+|     row-size=4B cardinality=1
 |
 01:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
+   row-size=89B cardinality=8
 ====
 # IMPALA-4303: Test subquery rewriting with nested unions.
 select * from functional.alltypestiny
@@ -2094,25 +2448,34 @@ PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
 |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+|  row-size=89B cardinality=12
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=89B cardinality=12
 |
 |--05:SCAN HDFS [functional.alltypestiny]
+|     partition predicates: year = 2009, month = 2
 |     partitions=1/4 files=1 size=115B
+|     row-size=89B cardinality=2
 |
 |--04:SCAN HDFS [functional.alltypestiny]
+|     partition predicates: year = 2009, month = 1
 |     partitions=1/4 files=1 size=115B
+|     row-size=89B cardinality=2
 |
 03:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|  row-size=89B cardinality=8
 |
 |--02:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: int_col < 10
 |     limit: 1
+|     row-size=4B cardinality=1
 |
 01:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
+   row-size=89B cardinality=8
 ====
 # Constant on LHS of IN, uncorrelated subquery
 select * from functional.alltypessmall where
@@ -2121,13 +2484,16 @@ select * from functional.alltypessmall where
 PLAN-ROOT SINK
 |
 02:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|  row-size=89B cardinality=100
 |
 |--01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |     predicates: 1 = functional.alltypestiny.int_col
+|     row-size=4B cardinality=4
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
+   row-size=89B cardinality=100
 ====
 # Constant on LHS of NOT IN, uncorrelated subquery
 select * from functional.alltypessmall where
@@ -2136,14 +2502,17 @@ select * from functional.alltypessmall where
 PLAN-ROOT SINK
 |
 02:NESTED LOOP JOIN [LEFT ANTI JOIN]
+|  row-size=89B cardinality=100
 |
 |--01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |     predicates: 1 IS NULL OR functional.alltypestiny.int_col IS NULL OR functional.alltypestiny.int_col = 1
 |     limit: 1
+|     row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
+   row-size=89B cardinality=100
 ====
 # Constant on LHS of IN, correlated subquery
 select * from functional.alltypessmall a where
@@ -2154,14 +2523,17 @@ PLAN-ROOT SINK
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.id = b.id
 |  runtime filters: RF000 <- b.id
+|  row-size=89B cardinality=4
 |
 |--01:SCAN HDFS [functional.alltypestiny b]
 |     partitions=4/4 files=4 size=460B
 |     predicates: 1 = b.int_col
+|     row-size=8B cardinality=4
 |
 00:SCAN HDFS [functional.alltypessmall a]
    partitions=4/4 files=4 size=6.32KB
    runtime filters: RF000 -> a.id
+   row-size=89B cardinality=100
 ====
 # Constant on LHS of IN, subquery with group by
 select * from functional.alltypessmall where
@@ -2170,16 +2542,20 @@ select * from functional.alltypessmall where
 PLAN-ROOT SINK
 |
 03:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|  row-size=89B cardinality=100
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  group by: int_col
+|  |  row-size=4B cardinality=2
 |  |
 |  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |     predicates: 1 = functional.alltypestiny.int_col
+|     row-size=4B cardinality=4
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
+   row-size=89B cardinality=100
 ====
 # Constant on LHS of NOT IN, subquery with group by
 select * from functional.alltypessmall where
@@ -2188,17 +2564,21 @@ select * from functional.alltypessmall where
 PLAN-ROOT SINK
 |
 03:NESTED LOOP JOIN [LEFT ANTI JOIN]
+|  row-size=89B cardinality=100
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  group by: int_col
 |  |  limit: 1
+|  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |     predicates: 1 IS NULL OR functional.alltypestiny.int_col IS NULL OR functional.alltypestiny.int_col = 1
+|     row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
+   row-size=89B cardinality=100
 ====
 # Constant on LHS of IN, subquery with aggregate
 select * from functional.alltypessmall where
@@ -2207,16 +2587,20 @@ select * from functional.alltypessmall where
 PLAN-ROOT SINK
 |
 03:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|  row-size=89B cardinality=100
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: max(int_col)
 |  |  having: 1 = max(int_col)
+|  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
+   row-size=89B cardinality=100
 ====
 # Constant on LHS of NOT IN, subquery with aggregate
 select * from functional.alltypessmall where
@@ -2225,16 +2609,20 @@ select * from functional.alltypessmall where
 PLAN-ROOT SINK
 |
 03:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=93B cardinality=0
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  output: max(int_col)
 |  |  having: 1 != max(int_col)
+|  |  row-size=4B cardinality=0
 |  |
 |  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
+   row-size=89B cardinality=100
 ====
 # Constant on LHS of IN, subquery with limit
 select * from functional.alltypessmall where
@@ -2243,16 +2631,20 @@ select * from functional.alltypessmall where
 PLAN-ROOT SINK
 |
 03:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|  row-size=89B cardinality=100
 |
 |--02:SELECT
 |  |  predicates: 1 = int_col
+|  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |     limit: 1
+|     row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
+   row-size=89B cardinality=100
 ====
 # Constant on LHS of NOT IN, subquery with limit
 select * from functional.alltypessmall where
@@ -2261,16 +2653,20 @@ select * from functional.alltypessmall where
 PLAN-ROOT SINK
 |
 03:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=93B cardinality=0
 |
 |--02:SELECT
 |  |  predicates: 1 != int_col
+|  |  row-size=4B cardinality=0
 |  |
 |  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |     limit: 1
+|     row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
+   row-size=89B cardinality=100
 ====
 # Constant on LHS of IN for nested subqueries (no correlation)
 select * from functional.alltypes t where 1 in
@@ -2280,19 +2676,24 @@ select * from functional.alltypes t where 1 in
 PLAN-ROOT SINK
 |
 04:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|  row-size=89B cardinality=7.30K
 |
 |--03:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|  |  row-size=4B cardinality=unavailable
 |  |
 |  |--02:SCAN HDFS [functional.alltypestiny]
 |  |     partitions=4/4 files=4 size=460B
 |  |     predicates: 1 = functional.alltypestiny.int_col
+|  |     row-size=4B cardinality=4
 |  |
 |  01:SCAN HDFS [functional.tinyinttable]
 |     partitions=1/1 files=1 size=20B
 |     predicates: 1 = functional.tinyinttable.int_col
+|     row-size=4B cardinality=unavailable
 |
 00:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
+   row-size=89B cardinality=7.30K
 ====
 # Constant on LHS of IN for nested subqueries (correlation)
 select * from functional.alltypes t where 1 in
@@ -2302,21 +2703,26 @@ select * from functional.alltypes t where 1 in
 PLAN-ROOT SINK
 |
 04:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|  row-size=89B cardinality=7.30K
 |
 |--03:HASH JOIN [LEFT SEMI JOIN]
 |  |  hash predicates: bigint_col = bigint_col, t.id = id
 |  |  runtime filters: RF000 <- bigint_col, RF001 <- id
+|  |  row-size=16B cardinality=2
 |  |
 |  |--02:SCAN HDFS [functional.alltypestiny]
 |  |     partitions=4/4 files=4 size=460B
+|  |     row-size=12B cardinality=8
 |  |
 |  01:SCAN HDFS [functional.alltypessmall t]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: 1 = t.int_col
 |     runtime filters: RF000 -> bigint_col, RF001 -> t.id
+|     row-size=16B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes t]
    partitions=24/24 files=24 size=478.45KB
+   row-size=89B cardinality=7.30K
 ====
 # EXISTS subquery containing ORDER BY, LIMIT, and OFFSET (IMPALA-6934)
 select count(*) from functional.alltypestiny t where exists
@@ -2326,18 +2732,23 @@ PLAN-ROOT SINK
 |
 04:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 03:NESTED LOOP JOIN [RIGHT SEMI JOIN]
+|  row-size=0B cardinality=8
 |
 |--00:SCAN HDFS [functional.alltypestiny t]
 |     partitions=4/4 files=4 size=460B
+|     row-size=0B cardinality=8
 |
 02:TOP-N [LIMIT=1 OFFSET=6]
 |  order by: id ASC
+|  row-size=4B cardinality=1
 |
 01:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
    predicates: id < 5
+   row-size=4B cardinality=1
 ====
 # Subquery in binary predicate that needs cardinality check at runtime
 select bigint_col from functional.alltypes where id =
@@ -2350,19 +2761,23 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = id
 |  runtime filters: RF000 <- id
+|  row-size=12B cardinality=1
 |
 |--02:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: id = 1
 |     limit: 2
+|     row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    predicates: functional.alltypes.id = 1
    runtime filters: RF000 -> id
+   row-size=12B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -2371,11 +2786,13 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN, PARTITIONED]
 |  hash predicates: id = id
 |  runtime filters: RF000 <- id
+|  row-size=12B cardinality=1
 |
 |--06:EXCHANGE [HASH(id)]
 |  |
 |  02:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=4B cardinality=1
 |  |
 |  04:EXCHANGE [UNPARTITIONED]
 |  |  limit: 2
@@ -2384,6 +2801,7 @@ PLAN-ROOT SINK
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: id = 1
 |     limit: 2
+|     row-size=4B cardinality=1
 |
 05:EXCHANGE [HASH(id)]
 |
@@ -2391,6 +2809,7 @@ PLAN-ROOT SINK
    partitions=24/24 files=24 size=478.45KB
    predicates: functional.alltypes.id = 1
    runtime filters: RF000 -> id
+   row-size=12B cardinality=1
 ====
 # Subquery in arithmetic expression that needs cardinality check at runtime
 select bigint_col from functional.alltypes where id =
@@ -2403,18 +2822,22 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = 3 * id
 |  runtime filters: RF000 <- 3 * id
+|  row-size=12B cardinality=7.30K
 |
 |--02:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: id = 1
 |     limit: 2
+|     row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> id
+   row-size=12B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -2423,11 +2846,13 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
 |  hash predicates: id = 3 * id
 |  runtime filters: RF000 <- 3 * id
+|  row-size=12B cardinality=7.30K
 |
 |--05:EXCHANGE [BROADCAST]
 |  |
 |  02:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=4B cardinality=1
 |  |
 |  04:EXCHANGE [UNPARTITIONED]
 |  |  limit: 2
@@ -2436,10 +2861,12 @@ PLAN-ROOT SINK
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: id = 1
 |     limit: 2
+|     row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> id
+   row-size=12B cardinality=7.30K
 ====
 # Subquery that contains union and needs cardinality check at runtime
 select * from functional.alltypes where id =
@@ -2452,26 +2879,33 @@ PLAN-ROOT SINK
 06:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = i
 |  runtime filters: RF000 <- i
+|  row-size=89B cardinality=1
 |
 |--05:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=8B cardinality=1
 |  |
 |  04:AGGREGATE [FINALIZE]
 |  |  group by: i
 |  |  limit: 2
+|  |  row-size=8B cardinality=2
 |  |
 |  01:UNION
 |  |  pass-through-operands: 02
+|  |  row-size=8B cardinality=14.60K
 |  |
 |  |--03:SCAN HDFS [functional.alltypes]
 |  |     partitions=24/24 files=24 size=478.45KB
+|  |     row-size=2B cardinality=7.30K
 |  |
 |  02:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> id
+   row-size=89B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -2480,11 +2914,13 @@ PLAN-ROOT SINK
 06:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
 |  hash predicates: id = i
 |  runtime filters: RF000 <- i
+|  row-size=89B cardinality=1
 |
 |--10:EXCHANGE [BROADCAST]
 |  |
 |  05:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=8B cardinality=1
 |  |
 |  09:EXCHANGE [UNPARTITIONED]
 |  |  limit: 2
@@ -2492,24 +2928,30 @@ PLAN-ROOT SINK
 |  08:AGGREGATE [FINALIZE]
 |  |  group by: i
 |  |  limit: 2
+|  |  row-size=8B cardinality=2
 |  |
 |  07:EXCHANGE [HASH(i)]
 |  |
 |  04:AGGREGATE [STREAMING]
 |  |  group by: i
+|  |  row-size=8B cardinality=20
 |  |
 |  01:UNION
 |  |  pass-through-operands: 02
+|  |  row-size=8B cardinality=14.60K
 |  |
 |  |--03:SCAN HDFS [functional.alltypes]
 |  |     partitions=24/24 files=24 size=478.45KB
+|  |     row-size=2B cardinality=7.30K
 |  |
 |  02:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=8B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> id
+   row-size=89B cardinality=7.30K
 ====
 # Subquery that contains join and GROUP BY and needs cardinality check at runtime
 select * from functional.alltypes where id =
@@ -2521,29 +2963,37 @@ PLAN-ROOT SINK
 06:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = max(allt.smallint_col)
 |  runtime filters: RF000 <- max(allt.smallint_col)
+|  row-size=89B cardinality=1
 |
 |--05:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=6B cardinality=1
 |  |
 |  04:AGGREGATE [FINALIZE]
 |  |  output: max(allt.smallint_col)
 |  |  group by: ata.month
 |  |  limit: 2
+|  |  row-size=6B cardinality=1
 |  |
 |  03:HASH JOIN [INNER JOIN]
 |  |  hash predicates: ata.id = allt.id
 |  |  runtime filters: RF002 <- allt.id
+|  |  row-size=14B cardinality=7.81K
 |  |
 |  |--01:SCAN HDFS [functional.alltypes allt]
 |  |     partitions=24/24 files=24 size=478.45KB
+|  |     row-size=6B cardinality=7.30K
 |  |
 |  02:SCAN HDFS [functional.alltypesagg ata]
+|     partition predicates: ata.month = 1
 |     partitions=11/11 files=11 size=814.73KB
 |     runtime filters: RF002 -> ata.id
+|     row-size=8B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> id
+   row-size=89B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -2552,11 +3002,13 @@ PLAN-ROOT SINK
 06:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
 |  hash predicates: id = max(allt.smallint_col)
 |  runtime filters: RF000 <- max(allt.smallint_col)
+|  row-size=89B cardinality=1
 |
 |--12:EXCHANGE [BROADCAST]
 |  |
 |  05:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=6B cardinality=1
 |  |
 |  11:EXCHANGE [UNPARTITIONED]
 |  |  limit: 2
@@ -2565,31 +3017,38 @@ PLAN-ROOT SINK
 |  |  output: max:merge(allt.smallint_col)
 |  |  group by: ata.month
 |  |  limit: 2
+|  |  row-size=6B cardinality=1
 |  |
 |  09:EXCHANGE [HASH(ata.month)]
 |  |
 |  04:AGGREGATE [STREAMING]
 |  |  output: max(allt.smallint_col)
 |  |  group by: ata.month
+|  |  row-size=6B cardinality=1
 |  |
 |  03:HASH JOIN [INNER JOIN, PARTITIONED]
 |  |  hash predicates: ata.id = allt.id
 |  |  runtime filters: RF002 <- allt.id
+|  |  row-size=14B cardinality=7.81K
 |  |
 |  |--08:EXCHANGE [HASH(allt.id)]
 |  |  |
 |  |  01:SCAN HDFS [functional.alltypes allt]
 |  |     partitions=24/24 files=24 size=478.45KB
+|  |     row-size=6B cardinality=7.30K
 |  |
 |  07:EXCHANGE [HASH(ata.id)]
 |  |
 |  02:SCAN HDFS [functional.alltypesagg ata]
+|     partition predicates: ata.month = 1
 |     partitions=11/11 files=11 size=814.73KB
 |     runtime filters: RF002 -> ata.id
+|     row-size=8B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> id
+   row-size=89B cardinality=7.30K
 ====
 # IS NULL predicate must not be pushed down to the scan node of the inline view.
 select count(1) from functional.alltypes
@@ -2599,41 +3058,52 @@ PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=0
 |
 04:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=4B cardinality=0
 |
 |--03:SELECT
 |  |  predicates: int_col IS NULL
+|  |  row-size=4B cardinality=0
 |  |
 |  02:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     limit: 2
+|     row-size=4B cardinality=2
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=0B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 09:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
+|  row-size=8B cardinality=0
 |
 08:EXCHANGE [UNPARTITIONED]
 |
 05:AGGREGATE
 |  output: count(*)
+|  row-size=8B cardinality=0
 |
 04:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=4B cardinality=0
 |
 |--07:EXCHANGE [BROADCAST]
 |  |
 |  03:SELECT
 |  |  predicates: int_col IS NULL
+|  |  row-size=4B cardinality=0
 |  |
 |  02:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=4B cardinality=1
 |  |
 |  06:EXCHANGE [UNPARTITIONED]
 |  |  limit: 2
@@ -2641,9 +3111,11 @@ PLAN-ROOT SINK
 |  01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     limit: 2
+|     row-size=4B cardinality=2
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=0B cardinality=7.30K
 ====
 # Binary predicate with constant must not be pushed down
 # to the scan node of the inline view.
@@ -2654,41 +3126,52 @@ PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=0
 |
 04:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=4B cardinality=0
 |
 |--03:SELECT
 |  |  predicates: int_col > 10
+|  |  row-size=4B cardinality=0
 |  |
 |  02:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     limit: 2
+|     row-size=4B cardinality=2
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=0B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 09:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
+|  row-size=8B cardinality=0
 |
 08:EXCHANGE [UNPARTITIONED]
 |
 05:AGGREGATE
 |  output: count(*)
+|  row-size=8B cardinality=0
 |
 04:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=4B cardinality=0
 |
 |--07:EXCHANGE [BROADCAST]
 |  |
 |  03:SELECT
 |  |  predicates: int_col > 10
+|  |  row-size=4B cardinality=0
 |  |
 |  02:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=4B cardinality=1
 |  |
 |  06:EXCHANGE [UNPARTITIONED]
 |  |  limit: 2
@@ -2696,9 +3179,11 @@ PLAN-ROOT SINK
 |  01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
 |     limit: 2
+|     row-size=4B cardinality=2
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=0B cardinality=7.30K
 ====
 # Runtime scalar subquery with offset.
 select count(*) from functional.alltypes
@@ -2709,43 +3194,55 @@ PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=0
 |
 05:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=4B cardinality=0
 |
 |--04:SELECT
 |  |  predicates: id = 7
+|  |  row-size=4B cardinality=0
 |  |
 |  03:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=4B cardinality=1
 |  |
 |  02:TOP-N [LIMIT=2 OFFSET=7]
 |  |  order by: id ASC
+|  |  row-size=4B cardinality=2
 |  |
 |  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=0B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 10:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
+|  row-size=8B cardinality=0
 |
 09:EXCHANGE [UNPARTITIONED]
 |
 06:AGGREGATE
 |  output: count(*)
+|  row-size=8B cardinality=0
 |
 05:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=4B cardinality=0
 |
 |--08:EXCHANGE [BROADCAST]
 |  |
 |  04:SELECT
 |  |  predicates: id = 7
+|  |  row-size=4B cardinality=0
 |  |
 |  03:CARDINALITY CHECK
 |  |  limit: 1
+|  |  row-size=4B cardinality=1
 |  |
 |  07:MERGING-EXCHANGE [UNPARTITIONED]
 |  |  offset: 7
@@ -2754,12 +3251,15 @@ PLAN-ROOT SINK
 |  |
 |  02:TOP-N [LIMIT=9]
 |  |  order by: id ASC
+|  |  row-size=4B cardinality=8
 |  |
 |  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=0B cardinality=7.30K
 ====
 # IMPALA-7108: Select from an inline view that returns a single row.
 select * from functional.alltypes
@@ -2771,16 +3271,20 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: int_col = id
 |  runtime filters: RF000 <- id
+|  row-size=89B cardinality=730
 |
 |--02:TOP-N [LIMIT=1]
 |  |  order by: id ASC
+|  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> int_col
+   row-size=89B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -2789,6 +3293,7 @@ PLAN-ROOT SINK
 03:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
 |  hash predicates: int_col = id
 |  runtime filters: RF000 <- id
+|  row-size=89B cardinality=730
 |
 |--05:EXCHANGE [BROADCAST]
 |  |
@@ -2798,11 +3303,14 @@ PLAN-ROOT SINK
 |  |
 |  02:TOP-N [LIMIT=1]
 |  |  order by: id ASC
+|  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypes]
 |     partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> int_col
+   row-size=89B cardinality=7.30K
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
index 6d434e5..9569998 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
@@ -33,7 +33,7 @@ PLAN-ROOT SINK
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=621
    mem-estimate=80.00MB mem-reservation=32.00KB thread-reservation=1
-   tuple-ids=0 row-size=89B cardinality=3650
+   tuple-ids=0 row-size=89B cardinality=3.65K
    in pipelines: 00(GETNEXT)
 ====
 # Sampling and scan predicates. Scan predicates are evaluated after sampling and
@@ -68,6 +68,7 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 00:SCAN HDFS [functional.alltypes]
+   partition predicates: year = CAST(2009 AS INT)
    partitions=6/24 files=6 size=119.70KB
    stored statistics:
      table: rows=7300 size=478.45KB
@@ -75,7 +76,7 @@ PLAN-ROOT SINK
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=1241
    mem-estimate=48.00MB mem-reservation=32.00KB thread-reservation=1
-   tuple-ids=0 row-size=89B cardinality=1825
+   tuple-ids=0 row-size=89B cardinality=1.82K
    in pipelines: 00(GETNEXT)
 ====
 # Edge case: sample 0%, no files should be selected
@@ -126,6 +127,7 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 00:SCAN HDFS [functional.alltypes]
+   partition predicates: year = CAST(2010 AS INT)
    partitions=1/24 files=1 size=20.36KB
    stored statistics:
      table: rows=7300 size=478.45KB
@@ -152,7 +154,7 @@ PLAN-ROOT SINK
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=310
    mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
-   tuple-ids=0 row-size=89B cardinality=7300
+   tuple-ids=0 row-size=89B cardinality=7.30K
    in pipelines: 00(GETNEXT)
 ====
 # Table that has no stats.
@@ -164,7 +166,7 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=3/24 files=3 size=24.18KB
+   partitions=3/24 files=3 size=23.96KB
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=unavailable
@@ -211,7 +213,7 @@ PLAN-ROOT SINK
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=310
    mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
-   tuple-ids=0 row-size=4B cardinality=7300
+   tuple-ids=0 row-size=4B cardinality=7.30K
    in pipelines: 00(GETNEXT)
 ====
 # Sampling in WITH-clause view.

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit-small.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit-small.test b/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit-small.test
index db3e9a5..e02e669 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit-small.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit-small.test
@@ -6,9 +6,11 @@ PLAN-ROOT SINK
 |
 01:TOP-N [LIMIT=1]
 |  order by: int_col ASC
+|  row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -18,9 +20,11 @@ PLAN-ROOT SINK
 |
 01:TOP-N [LIMIT=1]
 |  order by: int_col ASC
+|  row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ====
 # returns 2 ints, with a total size of 8 bytes, which exceeds the limit of 6 and thus triggers a SORT
 select int_col from functional.alltypes order by 1 limit 2
@@ -29,9 +33,11 @@ PLAN-ROOT SINK
 |
 01:SORT [LIMIT=2]
 |  order by: int_col ASC
+|  row-size=4B cardinality=2
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -41,9 +47,11 @@ PLAN-ROOT SINK
 |
 01:SORT [LIMIT=2]
 |  order by: int_col ASC
+|  row-size=4B cardinality=2
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ====
 # test that offset is taken into account; the query only returns a single row but needs to sort two rows
 # sorting two ints requires 8 bytes of memory, which exceeds the threshold of 6
@@ -53,9 +61,11 @@ PLAN-ROOT SINK
 |
 01:SORT [LIMIT=1 OFFSET=1]
 |  order by: int_col ASC
+|  row-size=4B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -66,7 +76,9 @@ PLAN-ROOT SINK
 |
 01:SORT [LIMIT=2]
 |  order by: int_col ASC
+|  row-size=4B cardinality=2
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit.test b/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit.test
index 9ad000e..f28ee9a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit.test
@@ -5,9 +5,11 @@ PLAN-ROOT SINK
 |
 01:TOP-N [LIMIT=7]
 |  order by: id ASC
+|  row-size=4B cardinality=7
 |
 00:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
+   row-size=4B cardinality=8
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -17,7 +19,9 @@ PLAN-ROOT SINK
 |
 01:TOP-N [LIMIT=7]
 |  order by: id ASC
+|  row-size=4B cardinality=7
 |
 00:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
-====
\ No newline at end of file
+   row-size=4B cardinality=8
+====