You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2019/08/29 12:44:11 UTC

[impala] 04/07: IMPALA-7604: part 1: tests for agg cardinality

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a5cf105d95e9c15ce8065279f26366e021b5a432
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Fri Aug 23 15:50:13 2019 -0700

    IMPALA-7604: part 1: tests for agg cardinality
    
    Add tests that demonstrate bugs with overflow and
    multiple aggregation classes and provide a baseline
    to demonstrate the effects of the bugfixes.
    
    Testing:
    Added planner tests:
    * Cardinality estimates with multiple groups (both grouping and
      non-grouping aggs).
    * Overflow case from multiplication of grouping expr cardinality.
    * Overflow handling of addition of multiple groups.
    
    Change-Id: I59eaddbc5be253793293af064bb2d28a425564e1
    Reviewed-on: http://gerrit.cloudera.org:8080/14131
    Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
    Tested-by: Tim Armstrong <ta...@cloudera.com>
---
 .../org/apache/impala/planner/PlannerTest.java     |   9 +
 .../queries/PlannerTest/card-agg.test              | 660 +++++++++++++++++++++
 2 files changed, 669 insertions(+)

diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 606dbbe..3d7782a 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -85,6 +85,15 @@ public class PlannerTest extends PlannerTestBase {
         ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
+  /**
+   * Cardinality for aggregations.
+   */
+  @Test
+  public void testAggCardinality() {
+    runPlannerTestFile("card-agg",
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
+  }
+
   @Test
   public void testPredicatePropagation() {
     runPlannerTestFile("predicate-propagation");
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/card-agg.test b/testdata/workloads/functional-planner/queries/PlannerTest/card-agg.test
new file mode 100644
index 0000000..5fe9265
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/card-agg.test
@@ -0,0 +1,660 @@
+# Cardinality tests for aggregations.
+#
+# Non-grouping aggregation.
+select count(*), min(c_name) from tpch.customer
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*), min(c_name)
+|  row-size=20B cardinality=1
+|
+00:SCAN HDFS [tpch.customer]
+   HDFS partitions=1/1 files=1 size=23.08MB
+   row-size=30B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count:merge(*), min:merge(c_name)
+|  row-size=20B cardinality=1
+|
+02:EXCHANGE [UNPARTITIONED]
+|
+01:AGGREGATE
+|  output: count(*), min(c_name)
+|  row-size=20B cardinality=1
+|
+00:SCAN HDFS [tpch.customer]
+   HDFS partitions=1/1 files=1 size=23.08MB
+   row-size=30B cardinality=150.00K
+====
+# Simple grouping aggregation.
+select c_nationkey, count(*), min(c_phone)
+from tpch.customer c
+group by 1
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*), min(c_phone)
+|  group by: c_nationkey
+|  row-size=22B cardinality=25
+|
+00:SCAN HDFS [tpch.customer c]
+   HDFS partitions=1/1 files=1 size=23.08MB
+   row-size=29B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: count:merge(*), min:merge(c_phone)
+|  group by: c_nationkey
+|  row-size=22B cardinality=25
+|
+02:EXCHANGE [HASH(c_nationkey)]
+|
+01:AGGREGATE [STREAMING]
+|  output: count(*), min(c_phone)
+|  group by: c_nationkey
+|  row-size=22B cardinality=25
+|
+00:SCAN HDFS [tpch.customer c]
+   HDFS partitions=1/1 files=1 size=23.08MB
+   row-size=29B cardinality=150.00K
+====
+# Grouping aggregation with multiple keys where product of keys is greater than input
+# cardinality.
+select l_partkey, l_suppkey, count(*)
+from tpch.lineitem
+group by 1, 2
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: l_partkey, l_suppkey
+|  row-size=24B cardinality=6.00M
+|
+00:SCAN HDFS [tpch.lineitem]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   row-size=16B cardinality=6.00M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: l_partkey, l_suppkey
+|  row-size=24B cardinality=6.00M
+|
+02:EXCHANGE [HASH(l_partkey,l_suppkey)]
+|
+01:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: l_partkey, l_suppkey
+|  row-size=24B cardinality=6.00M
+|
+00:SCAN HDFS [tpch.lineitem]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   row-size=16B cardinality=6.00M
+====
+# Grouping aggregation with multiple keys where product of keys is less than
+# input cardinality.
+select l_partkey, l_linenumber, count(*)
+from tpch.lineitem
+group by 1, 2
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: l_partkey, l_linenumber
+|  row-size=20B cardinality=1.40M
+|
+00:SCAN HDFS [tpch.lineitem]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   row-size=12B cardinality=6.00M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: l_partkey, l_linenumber
+|  row-size=20B cardinality=1.40M
+|
+02:EXCHANGE [HASH(l_partkey,l_linenumber)]
+|
+01:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: l_partkey, l_linenumber
+|  row-size=20B cardinality=1.40M
+|
+00:SCAN HDFS [tpch.lineitem]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   row-size=12B cardinality=6.00M
+====
+# Grouping aggregation with multiple aggregation classes (multiple count distinct).
+# Cardinality for the first aggregation is the sum of cardinalities from the
+# aggregation classes.
+# BUG: some aggregation classes are double counted in the cardinality.
+select count(distinct l_orderkey), count(distinct l_partkey)
+from tpch.lineitem
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(l_orderkey)), aggif(valid_tid() = 4, count(l_partkey))
+|  row-size=16B cardinality=1
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(l_orderkey)
+|  Class 1
+|    output: count(l_partkey)
+|  row-size=16B cardinality=2
+|
+01:AGGREGATE
+|  Class 0
+|    group by: l_orderkey
+|  Class 1
+|    group by: l_partkey
+|  row-size=16B cardinality=3.33M
+|
+00:SCAN HDFS [tpch.lineitem]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   row-size=16B cardinality=6.00M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(l_orderkey)), aggif(valid_tid() = 4, count(l_partkey))
+|  row-size=16B cardinality=1
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(l_orderkey)
+|  Class 1
+|    output: count:merge(l_partkey)
+|  row-size=16B cardinality=2
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  Class 0
+|    output: count(l_orderkey)
+|  Class 1
+|    output: count(l_partkey)
+|  row-size=16B cardinality=2
+|
+05:AGGREGATE
+|  Class 0
+|    group by: l_orderkey
+|  Class 1
+|    group by: l_partkey
+|  row-size=16B cardinality=3.33M
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(l_orderkey) WHEN 3 THEN murmur_hash(l_partkey) END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: l_orderkey
+|  Class 1
+|    group by: l_partkey
+|  row-size=16B cardinality=3.33M
+|
+00:SCAN HDFS [tpch.lineitem]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   row-size=16B cardinality=6.00M
+====
+# Grouping aggregation with multiple aggregation classes (multiple count distinct).
+# In this case the sum of output cardinalities is greater than the sum of the
+# input cardinality.
+# BUG: output cardinality is capped at input cardinality.
+select l_partkey, count(distinct l_orderkey), count(distinct l_comment), count(distinct l_suppkey)
+from tpch.lineitem
+group by l_partkey
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(l_orderkey)), aggif(valid_tid() = 4, count(l_comment)), aggif(valid_tid() = 6, count(l_suppkey))
+|  group by: CASE valid_tid() WHEN 2 THEN l_partkey WHEN 4 THEN l_partkey WHEN 6 THEN l_partkey END
+|  row-size=32B cardinality=200.52K
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(l_orderkey)
+|    group by: l_partkey
+|  Class 1
+|    output: count(l_comment)
+|    group by: l_partkey
+|  Class 2
+|    output: count(l_suppkey)
+|    group by: l_partkey
+|  row-size=48B cardinality=1.40M
+|
+01:AGGREGATE
+|  Class 0
+|    group by: l_partkey, l_orderkey
+|  Class 1
+|    group by: l_partkey, l_comment
+|  Class 2
+|    group by: l_partkey, l_suppkey
+|  row-size=78B cardinality=6.00M
+|
+00:SCAN HDFS [tpch.lineitem]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   row-size=62B cardinality=6.00M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(l_orderkey)), aggif(valid_tid() = 4, count(l_comment)), aggif(valid_tid() = 6, count(l_suppkey))
+|  group by: CASE valid_tid() WHEN 2 THEN l_partkey WHEN 4 THEN l_partkey WHEN 6 THEN l_partkey END
+|  row-size=32B cardinality=200.52K
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(l_orderkey)
+|    group by: l_partkey
+|  Class 1
+|    output: count:merge(l_comment)
+|    group by: l_partkey
+|  Class 2
+|    output: count:merge(l_suppkey)
+|    group by: l_partkey
+|  row-size=48B cardinality=1.40M
+|
+06:EXCHANGE [HASH(CASE valid_tid() WHEN 2 THEN murmur_hash(l_partkey) WHEN 4 THEN murmur_hash(l_partkey) WHEN 6 THEN murmur_hash(l_partkey) END)]
+|
+02:AGGREGATE [STREAMING]
+|  Class 0
+|    output: count(l_orderkey)
+|    group by: l_partkey
+|  Class 1
+|    output: count(l_comment)
+|    group by: l_partkey
+|  Class 2
+|    output: count(l_suppkey)
+|    group by: l_partkey
+|  row-size=48B cardinality=1.40M
+|
+05:AGGREGATE
+|  Class 0
+|    group by: l_partkey, l_orderkey
+|  Class 1
+|    group by: l_partkey, l_comment
+|  Class 2
+|    group by: l_partkey, l_suppkey
+|  row-size=78B cardinality=6.00M
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(l_partkey) WHEN 3 THEN murmur_hash(l_partkey) WHEN 5 THEN murmur_hash(l_partkey) END,CASE valid_tid() WHEN 1 THEN murmur_hash(l_orderkey) WHEN 3 THEN murmur_hash(l_comment) WHEN 5 THEN murmur_hash(l_suppkey) END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: l_partkey, l_orderkey
+|  Class 1
+|    group by: l_partkey, l_comment
+|  Class 2
+|    group by: l_partkey, l_suppkey
+|  row-size=78B cardinality=6.00M
+|
+00:SCAN HDFS [tpch.lineitem]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   row-size=62B cardinality=6.00M
+====
+# Mixed grouping and non-grouping aggregations.
+# BUG: output cardinality is capped at input cardinality.
+select count(distinct id), count(distinct int_col), count(*)
+from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(id)), aggif(valid_tid() = 4, count(int_col)), aggif(valid_tid() = 5, count(*))
+|  row-size=24B cardinality=1
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(id)
+|  Class 1
+|    output: count(int_col)
+|  Class 2
+|    output: count:merge(*)
+|  row-size=24B cardinality=3
+|
+01:AGGREGATE
+|  Class 0
+|    group by: id
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    output: count(*)
+|  row-size=16B cardinality=7.30K
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=8B cardinality=7.30K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(id)), aggif(valid_tid() = 4, count(int_col)), aggif(valid_tid() = 5, count(*))
+|  row-size=24B cardinality=1
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(id)
+|  Class 1
+|    output: count:merge(int_col)
+|  Class 2
+|    output: count:merge(*)
+|  row-size=24B cardinality=3
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  Class 0
+|    output: count(id)
+|  Class 1
+|    output: count(int_col)
+|  Class 2
+|    output: count:merge(*)
+|  row-size=24B cardinality=3
+|
+05:AGGREGATE
+|  Class 0
+|    group by: id
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    output: count:merge(*)
+|  row-size=16B cardinality=7.30K
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(id) WHEN 3 THEN murmur_hash(int_col) WHEN 5 THEN 0 END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: id
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    output: count(*)
+|  row-size=16B cardinality=7.30K
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=8B cardinality=7.30K
+====
+# Test overflow handling for single aggregation class.
+# BUG: output cardinality overflows and is set to 0
+select distinct *
+from tpcds.store_sales s1, tpcds.store_sales s2, tpcds.store_sales s3,
+  tpcds.store_sales s4
+---- PLAN
+PLAN-ROOT SINK
+|
+07:AGGREGATE [FINALIZE]
+|  group by: s1.ss_sold_time_sk, s1.ss_item_sk, s1.ss_customer_sk, s1.ss_cdemo_sk, s1.ss_hdemo_sk, s1.ss_addr_sk, s1.ss_store_sk, s1.ss_promo_sk, s1.ss_ticket_number, s1.ss_quantity, s1.ss_wholesale_cost, s1.ss_list_price, s1.ss_sales_price, s1.ss_ext_discount_amt, s1.ss_ext_sales_price, s1.ss_ext_wholesale_cost, s1.ss_ext_list_price, s1.ss_ext_tax, s1.ss_coupon_amt, s1.ss_net_paid, s1.ss_net_paid_inc_tax, s1.ss_net_profit, s1.ss_sold_date_sk, s2.ss_sold_time_sk, s2.ss_item_sk, s2.ss_cus [...]
+|  row-size=400B cardinality=0
+|
+06:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=400B cardinality=9223372.04T
+|
+|--03:SCAN HDFS [tpcds.store_sales s4]
+|     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|     row-size=100B cardinality=2.88M
+|
+05:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=300B cardinality=9223372.04T
+|
+|--02:SCAN HDFS [tpcds.store_sales s3]
+|     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|     row-size=100B cardinality=2.88M
+|
+04:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=200B cardinality=8.30T
+|
+|--01:SCAN HDFS [tpcds.store_sales s2]
+|     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|     row-size=100B cardinality=2.88M
+|
+00:SCAN HDFS [tpcds.store_sales s1]
+   HDFS partitions=1824/1824 files=1824 size=346.60MB
+   row-size=100B cardinality=2.88M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+13:EXCHANGE [UNPARTITIONED]
+|
+12:AGGREGATE [FINALIZE]
+|  group by: s1.ss_sold_time_sk, s1.ss_item_sk, s1.ss_customer_sk, s1.ss_cdemo_sk, s1.ss_hdemo_sk, s1.ss_addr_sk, s1.ss_store_sk, s1.ss_promo_sk, s1.ss_ticket_number, s1.ss_quantity, s1.ss_wholesale_cost, s1.ss_list_price, s1.ss_sales_price, s1.ss_ext_discount_amt, s1.ss_ext_sales_price, s1.ss_ext_wholesale_cost, s1.ss_ext_list_price, s1.ss_ext_tax, s1.ss_coupon_amt, s1.ss_net_paid, s1.ss_net_paid_inc_tax, s1.ss_net_profit, s1.ss_sold_date_sk, s2.ss_sold_time_sk, s2.ss_item_sk, s2.ss_cus [...]
+|  row-size=400B cardinality=0
+|
+11:EXCHANGE [HASH(s1.ss_sold_time_sk,s1.ss_item_sk,s1.ss_customer_sk,s1.ss_cdemo_sk,s1.ss_hdemo_sk,s1.ss_addr_sk,s1.ss_store_sk,s1.ss_promo_sk,s1.ss_ticket_number,s1.ss_quantity,s1.ss_wholesale_cost,s1.ss_list_price,s1.ss_sales_price,s1.ss_ext_discount_amt,s1.ss_ext_sales_price,s1.ss_ext_wholesale_cost,s1.ss_ext_list_price,s1.ss_ext_tax,s1.ss_coupon_amt,s1.ss_net_paid,s1.ss_net_paid_inc_tax,s1.ss_net_profit,s1.ss_sold_date_sk,s2.ss_sold_time_sk,s2.ss_item_sk,s2.ss_customer_sk,s2.ss_cdemo [...]
+|
+07:AGGREGATE [STREAMING]
+|  group by: s1.ss_sold_time_sk, s1.ss_item_sk, s1.ss_customer_sk, s1.ss_cdemo_sk, s1.ss_hdemo_sk, s1.ss_addr_sk, s1.ss_store_sk, s1.ss_promo_sk, s1.ss_ticket_number, s1.ss_quantity, s1.ss_wholesale_cost, s1.ss_list_price, s1.ss_sales_price, s1.ss_ext_discount_amt, s1.ss_ext_sales_price, s1.ss_ext_wholesale_cost, s1.ss_ext_list_price, s1.ss_ext_tax, s1.ss_coupon_amt, s1.ss_net_paid, s1.ss_net_paid_inc_tax, s1.ss_net_profit, s1.ss_sold_date_sk, s2.ss_sold_time_sk, s2.ss_item_sk, s2.ss_cus [...]
+|  row-size=400B cardinality=0
+|
+06:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=400B cardinality=9223372.04T
+|
+|--10:EXCHANGE [BROADCAST]
+|  |
+|  03:SCAN HDFS [tpcds.store_sales s4]
+|     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|     row-size=100B cardinality=2.88M
+|
+05:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=300B cardinality=9223372.04T
+|
+|--09:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN HDFS [tpcds.store_sales s3]
+|     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|     row-size=100B cardinality=2.88M
+|
+04:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=200B cardinality=8.30T
+|
+|--08:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpcds.store_sales s2]
+|     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|     row-size=100B cardinality=2.88M
+|
+00:SCAN HDFS [tpcds.store_sales s1]
+   HDFS partitions=1824/1824 files=1824 size=346.60MB
+   row-size=100B cardinality=2.88M
+====
+# Test overflow handling for summing multiple aggregation classes.
+# BUG: output cardinality overflows and is set to 0
+select t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk,
+    t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk,
+    t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity,
+    t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price,
+    t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost,
+    t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid,
+    t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk,
+    t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_customer_sk, t2.ss_cdemo_sk,
+    t2.ss_hdemo_sk, t2.ss_addr_sk, t2.ss_store_sk, t2.ss_promo_sk,
+    t2.ss_ticket_number, t2.ss_quantity, t2.ss_wholesale_cost,
+    t2.ss_list_price, t2.ss_sales_price, t2.ss_ext_discount_amt,
+    t2.ss_ext_sales_price, t2.ss_ext_wholesale_cost, t2.ss_ext_list_price,
+    t2.ss_ext_tax, t2.ss_coupon_amt, t2.ss_net_paid, t2.ss_net_paid_inc_tax,
+    t2.ss_net_profit, t2.ss_sold_date_sk, t3.ss_sold_time_sk, t3.ss_item_sk,
+    t3.ss_customer_sk, t3.ss_cdemo_sk, t3.ss_hdemo_sk, t3.ss_addr_sk,
+    t3.ss_store_sk, t3.ss_promo_sk, t3.ss_ticket_number, t3.ss_quantity,
+    t3.ss_wholesale_cost, t3.ss_list_price, t3.ss_sales_price,
+    t3.ss_ext_discount_amt, t3.ss_ext_sales_price, t3.ss_ext_wholesale_cost,
+    t3.ss_ext_list_price, t3.ss_ext_tax, t3.ss_coupon_amt, t3.ss_net_paid,
+    t3.ss_net_paid_inc_tax, t3.ss_net_profit, t3.ss_sold_date_sk,
+    t4.ss_sold_time_sk, t4.ss_item_sk, t4.ss_customer_sk, t4.ss_cdemo_sk,
+    t4.ss_hdemo_sk, t4.ss_addr_sk, t4.ss_store_sk, t4.ss_promo_sk,
+    t4.ss_ticket_number, t4.ss_quantity, t4.ss_wholesale_cost,
+    t4.ss_list_price, t4.ss_sales_price, t4.ss_ext_discount_amt,
+    t4.ss_ext_sales_price, t4.ss_ext_wholesale_cost, t4.ss_ext_list_price,
+    t4.ss_ext_tax, t4.ss_coupon_amt, t4.ss_net_paid, t4.ss_net_paid_inc_tax,
+    count(distinct t4.ss_net_profit), count(distinct t4.ss_sold_date_sk)
+from tpcds.store_sales t1, tpcds.store_sales t2, tpcds.store_sales t3,
+    tpcds.store_sales t4
+group by t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk,
+    t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk,
+    t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity,
+    t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price,
+    t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost,
+    t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid,
+    t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk,
+    t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_customer_sk, t2.ss_cdemo_sk,
+    t2.ss_hdemo_sk, t2.ss_addr_sk, t2.ss_store_sk, t2.ss_promo_sk,
+    t2.ss_ticket_number, t2.ss_quantity, t2.ss_wholesale_cost,
+    t2.ss_list_price, t2.ss_sales_price, t2.ss_ext_discount_amt,
+    t2.ss_ext_sales_price, t2.ss_ext_wholesale_cost, t2.ss_ext_list_price,
+    t2.ss_ext_tax, t2.ss_coupon_amt, t2.ss_net_paid, t2.ss_net_paid_inc_tax,
+    t2.ss_net_profit, t2.ss_sold_date_sk, t3.ss_sold_time_sk, t3.ss_item_sk,
+    t3.ss_customer_sk, t3.ss_cdemo_sk, t3.ss_hdemo_sk, t3.ss_addr_sk,
+    t3.ss_store_sk, t3.ss_promo_sk, t3.ss_ticket_number, t3.ss_quantity,
+    t3.ss_wholesale_cost, t3.ss_list_price, t3.ss_sales_price,
+    t3.ss_ext_discount_amt, t3.ss_ext_sales_price, t3.ss_ext_wholesale_cost,
+    t3.ss_ext_list_price, t3.ss_ext_tax, t3.ss_coupon_amt, t3.ss_net_paid,
+    t3.ss_net_paid_inc_tax, t3.ss_net_profit, t3.ss_sold_date_sk,
+    t4.ss_sold_time_sk, t4.ss_item_sk, t4.ss_customer_sk, t4.ss_cdemo_sk,
+    t4.ss_hdemo_sk, t4.ss_addr_sk, t4.ss_store_sk, t4.ss_promo_sk,
+    t4.ss_ticket_number, t4.ss_quantity, t4.ss_wholesale_cost,
+    t4.ss_list_price, t4.ss_sales_price, t4.ss_ext_discount_amt,
+    t4.ss_ext_sales_price, t4.ss_ext_wholesale_cost, t4.ss_ext_list_price,
+    t4.ss_ext_tax, t4.ss_coupon_amt, t4.ss_net_paid, t4.ss_net_paid_inc_tax
+---- PLAN
+PLAN-ROOT SINK
+|
+09:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 5, count(t4.ss_net_profit)), aggif(valid_tid() = 7, count(t4.ss_sold_date_sk))
+|  group by: CASE valid_tid() WHEN 5 THEN t1.ss_sold_time_sk WHEN 7 THEN t1.ss_sold_time_sk END, CASE valid_tid() WHEN 5 THEN t1.ss_item_sk WHEN 7 THEN t1.ss_item_sk END, CASE valid_tid() WHEN 5 THEN t1.ss_customer_sk WHEN 7 THEN t1.ss_customer_sk END, CASE valid_tid() WHEN 5 THEN t1.ss_cdemo_sk WHEN 7 THEN t1.ss_cdemo_sk END, CASE valid_tid() WHEN 5 THEN t1.ss_hdemo_sk WHEN 7 THEN t1.ss_hdemo_sk END, CASE valid_tid() WHEN 5 THEN t1.ss_addr_sk WHEN 7 THEN t1.ss_addr_sk END, CASE valid_ti [...]
+|  row-size=408B cardinality=0
+|
+08:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(t4.ss_net_profit)
+|    group by: t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk, t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk, t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity, t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price, t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost, t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid, t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk, t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_c [...]
+|  Class 1
+|    output: count(t4.ss_sold_date_sk)
+|    group by: t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk, t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk, t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity, t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price, t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost, t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid, t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk, t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_c [...]
+|  row-size=800B cardinality=0
+|
+07:AGGREGATE
+|  Class 0
+|    group by: t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk, t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk, t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity, t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price, t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost, t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid, t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk, t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_c [...]
+|  Class 1
+|    group by: t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk, t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk, t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity, t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price, t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost, t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid, t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk, t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_c [...]
+|  row-size=792B cardinality=0
+|
+06:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=400B cardinality=9223372.04T
+|
+|--03:SCAN HDFS [tpcds.store_sales t4]
+|     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|     row-size=100B cardinality=2.88M
+|
+05:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=300B cardinality=9223372.04T
+|
+|--02:SCAN HDFS [tpcds.store_sales t3]
+|     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|     row-size=100B cardinality=2.88M
+|
+04:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=200B cardinality=8.30T
+|
+|--01:SCAN HDFS [tpcds.store_sales t2]
+|     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|     row-size=100B cardinality=2.88M
+|
+00:SCAN HDFS [tpcds.store_sales t1]
+   HDFS partitions=1824/1824 files=1824 size=346.60MB
+   row-size=100B cardinality=2.88M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+17:EXCHANGE [UNPARTITIONED]
+|
+09:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 5, count(t4.ss_net_profit)), aggif(valid_tid() = 7, count(t4.ss_sold_date_sk))
+|  group by: CASE valid_tid() WHEN 5 THEN t1.ss_sold_time_sk WHEN 7 THEN t1.ss_sold_time_sk END, CASE valid_tid() WHEN 5 THEN t1.ss_item_sk WHEN 7 THEN t1.ss_item_sk END, CASE valid_tid() WHEN 5 THEN t1.ss_customer_sk WHEN 7 THEN t1.ss_customer_sk END, CASE valid_tid() WHEN 5 THEN t1.ss_cdemo_sk WHEN 7 THEN t1.ss_cdemo_sk END, CASE valid_tid() WHEN 5 THEN t1.ss_hdemo_sk WHEN 7 THEN t1.ss_hdemo_sk END, CASE valid_tid() WHEN 5 THEN t1.ss_addr_sk WHEN 7 THEN t1.ss_addr_sk END, CASE valid_ti [...]
+|  row-size=408B cardinality=0
+|
+16:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(t4.ss_net_profit)
+|    group by: t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk, t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk, t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity, t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price, t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost, t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid, t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk, t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_c [...]
+|  Class 1
+|    output: count:merge(t4.ss_sold_date_sk)
+|    group by: t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk, t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk, t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity, t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price, t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost, t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid, t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk, t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_c [...]
+|  row-size=800B cardinality=0
+|
+15:EXCHANGE [HASH(CASE valid_tid() WHEN 5 THEN murmur_hash(t1.ss_sold_time_sk) WHEN 7 THEN murmur_hash(t1.ss_sold_time_sk) END,CASE valid_tid() WHEN 5 THEN murmur_hash(t1.ss_item_sk) WHEN 7 THEN murmur_hash(t1.ss_item_sk) END,CASE valid_tid() WHEN 5 THEN murmur_hash(t1.ss_customer_sk) WHEN 7 THEN murmur_hash(t1.ss_customer_sk) END,CASE valid_tid() WHEN 5 THEN murmur_hash(t1.ss_cdemo_sk) WHEN 7 THEN murmur_hash(t1.ss_cdemo_sk) END,CASE valid_tid() WHEN 5 THEN murmur_hash(t1.ss_hdemo_sk) W [...]
+|
+08:AGGREGATE [STREAMING]
+|  Class 0
+|    output: count(t4.ss_net_profit)
+|    group by: t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk, t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk, t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity, t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price, t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost, t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid, t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk, t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_c [...]
+|  Class 1
+|    output: count(t4.ss_sold_date_sk)
+|    group by: t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk, t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk, t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity, t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price, t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost, t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid, t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk, t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_c [...]
+|  row-size=800B cardinality=0
+|
+14:AGGREGATE
+|  Class 0
+|    group by: t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk, t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk, t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity, t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price, t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost, t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid, t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk, t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_c [...]
+|  Class 1
+|    group by: t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk, t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk, t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity, t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price, t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost, t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid, t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk, t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_c [...]
+|  row-size=792B cardinality=0
+|
+13:EXCHANGE [HASH(CASE valid_tid() WHEN 4 THEN murmur_hash(t1.ss_sold_time_sk) WHEN 6 THEN murmur_hash(t1.ss_sold_time_sk) END,CASE valid_tid() WHEN 4 THEN murmur_hash(t1.ss_item_sk) WHEN 6 THEN murmur_hash(t1.ss_item_sk) END,CASE valid_tid() WHEN 4 THEN murmur_hash(t1.ss_customer_sk) WHEN 6 THEN murmur_hash(t1.ss_customer_sk) END,CASE valid_tid() WHEN 4 THEN murmur_hash(t1.ss_cdemo_sk) WHEN 6 THEN murmur_hash(t1.ss_cdemo_sk) END,CASE valid_tid() WHEN 4 THEN murmur_hash(t1.ss_hdemo_sk) W [...]
+|
+07:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk, t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk, t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity, t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price, t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost, t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid, t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk, t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_c [...]
+|  Class 1
+|    group by: t1.ss_sold_time_sk, t1.ss_item_sk, t1.ss_customer_sk, t1.ss_cdemo_sk, t1.ss_hdemo_sk, t1.ss_addr_sk, t1.ss_store_sk, t1.ss_promo_sk, t1.ss_ticket_number, t1.ss_quantity, t1.ss_wholesale_cost, t1.ss_list_price, t1.ss_sales_price, t1.ss_ext_discount_amt, t1.ss_ext_sales_price, t1.ss_ext_wholesale_cost, t1.ss_ext_list_price, t1.ss_ext_tax, t1.ss_coupon_amt, t1.ss_net_paid, t1.ss_net_paid_inc_tax, t1.ss_net_profit, t1.ss_sold_date_sk, t2.ss_sold_time_sk, t2.ss_item_sk, t2.ss_c [...]
+|  row-size=792B cardinality=0
+|
+06:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=400B cardinality=9223372.04T
+|
+|--12:EXCHANGE [BROADCAST]
+|  |
+|  03:SCAN HDFS [tpcds.store_sales t4]
+|     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|     row-size=100B cardinality=2.88M
+|
+05:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=300B cardinality=9223372.04T
+|
+|--11:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN HDFS [tpcds.store_sales t3]
+|     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|     row-size=100B cardinality=2.88M
+|
+04:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=200B cardinality=8.30T
+|
+|--10:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpcds.store_sales t2]
+|     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|     row-size=100B cardinality=2.88M
+|
+00:SCAN HDFS [tpcds.store_sales t1]
+   HDFS partitions=1824/1824 files=1824 size=346.60MB
+   row-size=100B cardinality=2.88M
+====