You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/08/28 23:22:11 UTC

[impala] branch master updated: IMPALA-12395: Override scan cardinality for optimized count star

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c8fc997e IMPALA-12395: Override scan cardinality for optimized count star
0c8fc997e is described below

commit 0c8fc997ef7df09b675180a7baa1482852d60b11
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Tue Aug 22 17:51:28 2023 -0700

    IMPALA-12395: Override scan cardinality for optimized count star
    
    The cardinality estimate in HdfsScanNode.java for count queries does not
    account for the fact that the count optimization only scans metadata and
    not the actual columns. Optimized count star scan will return only 1 row
    per parquet row group.
    
    This patch override the scan cardinality with total number of files,
    which is the closest estimate to number of row group. Similar override
    already exist in IcebergScanNode.java.
    
    Testing:
    - Add count query testcases in test_query_cpu_count_divisor_default
    - Pass core tests
    
    Change-Id: Id5ce967657208057d50bd80adadac29ebb51cbc5
    Reviewed-on: http://gerrit.cloudera.org:8080/20406
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/planner/HdfsScanNode.java    |  7 ++++
 .../queries/PlannerTest/resource-requirements.test |  9 +++--
 tests/custom_cluster/test_executor_groups.py       | 47 +++++++++++++++++++---
 tests/custom_cluster/test_query_retries.py         | 33 ++++++++-------
 4 files changed, 73 insertions(+), 23 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 2e1a87d88..41a230741 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1558,6 +1558,13 @@ public class HdfsScanNode extends ScanNode {
           numRangesAdjusted :
           Math.min(inputCardinality_, numRangesAdjusted);
     }
+
+    if (countStarSlot_ != null) {
+      // We are doing optimized count star. Override cardinality with total num files.
+      long totalFiles = sumValues(totalFilesPerFs_);
+      inputCardinality_ = totalFiles;
+      cardinality_ = totalFiles;
+    }
     if (LOG.isTraceEnabled()) {
       LOG.trace("HdfsScan: cardinality_=" + Long.toString(cardinality_));
     }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index ee00ac9c4..abf0e1417 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -1869,6 +1869,7 @@ select count(*) from tpch_parquet.lineitem
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=128.00KB Threads=2
 Per-Host Resource Estimates: Memory=10MB
+Codegen disabled by planner
 Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1890,11 +1891,12 @@ PLAN-ROOT SINK
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1
-   tuple-ids=0 row-size=8B cardinality=6.00M
+   tuple-ids=0 row-size=8B cardinality=3
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=128.00KB Threads=3
 Per-Host Resource Estimates: Memory=10MB
+Codegen disabled by planner
 Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1929,11 +1931,12 @@ Per-Host Resources: mem-estimate=1.02MB mem-reservation=128.00KB thread-reservat
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1
-   tuple-ids=0 row-size=8B cardinality=6.00M
+   tuple-ids=0 row-size=8B cardinality=3
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=128.00KB Threads=2
 Per-Host Resource Estimates: Memory=80MB
+Codegen disabled by planner
 Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1968,7 +1971,7 @@ Per-Instance Resources: mem-estimate=80.02MB mem-reservation=128.00KB thread-res
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=80.00MB mem-reservation=128.00KB thread-reservation=0
-   tuple-ids=0 row-size=8B cardinality=6.00M
+   tuple-ids=0 row-size=8B cardinality=3
    in pipelines: 00(GETNEXT)
 ====
 # Sort
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index bd6ffca81..7d250bda9 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -764,9 +764,9 @@ class TestExecutorGroups(CustomClusterTestSuite):
   def test_query_assignment_with_two_exec_groups(self):
     """This test verifies that query assignment works with two executor groups with
     different number of executors and memory limit in each."""
-    # A small query with estimated memory per host of 10MB that can run on the small
+    # A small query with estimated memory per host of 16MB that can run on the small
     # executor group
-    SMALL_QUERY = "select count(*) from tpcds_parquet.date_dim;"
+    SMALL_QUERY = "select count(*) from tpcds_parquet.date_dim where d_year=2022;"
     # A large query with estimated memory per host of 132MB that can only run on
     # the large executor group.
     LARGE_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 limit 50;"
@@ -1093,6 +1093,41 @@ class TestExecutorGroups(CustomClusterTestSuite):
       'PROCESSING_COST_MIN_THREADS': '',
       'MAX_FRAGMENT_INSTANCES_PER_NODE': ''})
 
+    # BEGIN testing count queries
+    # Test optimized count star query with 1824 scan ranges assign to small group.
+    self._run_query_and_verify_profile(
+        "SELECT count(*) FROM tpcds_parquet.store_sales",
+        ["Executor Group: root.small-group", "EffectiveParallelism: 10",
+         "ExecutorGroupsConsidered: 2"])
+
+    # Test optimized count star query with 383 scan ranges assign to tiny group.
+    self._run_query_and_verify_profile(
+       "SELECT count(*) FROM tpcds_parquet.store_sales WHERE ss_sold_date_sk < 2451200",
+       ["Executor Group: root.tiny-group", "EffectiveParallelism: 2",
+        "ExecutorGroupsConsidered: 1"])
+
+    # Test optimized count star query with 1 scan range detected as trivial query
+    # and assign to tiny group.
+    self._run_query_and_verify_profile(
+       "SELECT count(*) FROM tpcds_parquet.date_dim",
+       ["Executor Group: empty group (using coordinator only)",
+        "ExecutorGroupsConsidered: 1",
+        "Verdict: Assign to first group because the number of nodes is 1"])
+
+    # Test unoptimized count star query assign to small group.
+    self._run_query_and_verify_profile(
+      ("SELECT count(*) FROM tpcds_parquet.store_sales "
+       "WHERE ss_ext_discount_amt != 0.3857"),
+      ["Executor Group: root.small-group", "EffectiveParallelism: 10",
+       "ExecutorGroupsConsidered: 2"])
+
+    # Test zero slot scan query assign to small group.
+    self._run_query_and_verify_profile(
+      "SELECT count(ss_sold_date_sk) FROM tpcds_parquet.store_sales",
+      ["Executor Group: root.small-group", "EffectiveParallelism: 10",
+       "ExecutorGroupsConsidered: 2"])
+    # END testing count queries
+
     # BEGIN testing insert + MAX_FS_WRITER
     # Test unpartitioned insert, small scan, no MAX_FS_WRITER.
     # Scanner and writer will collocate since num scanner equals to num writer (1).
@@ -1186,11 +1221,11 @@ class TestExecutorGroups(CustomClusterTestSuite):
     # END testing insert + MAX_FS_WRITER
 
     # Check resource pools on the Web queries site and admission site
-    self._verify_query_num_for_resource_pool("root.small", 7)
-    self._verify_query_num_for_resource_pool("root.tiny", 4)
+    self._verify_query_num_for_resource_pool("root.small", 10)
+    self._verify_query_num_for_resource_pool("root.tiny", 6)
     self._verify_query_num_for_resource_pool("root.large", 12)
-    self._verify_total_admitted_queries("root.small", 8)
-    self._verify_total_admitted_queries("root.tiny", 6)
+    self._verify_total_admitted_queries("root.small", 11)
+    self._verify_total_admitted_queries("root.tiny", 8)
     self._verify_total_admitted_queries("root.large", 16)
 
   @pytest.mark.execute_serially
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index 7d039708a..56e852970 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -79,6 +79,11 @@ class TestQueryRetries(CustomClusterTestSuite):
         union all
         select count(*) from functional.alltypes where bool_col = sleep(50)"""
 
+  # A simple count query with predicate. The predicate is needed so that the planner does
+  # not create the optimized count(star) query plan.
+  _count_query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50"
+  _count_query_result = "55"
+
   @classmethod
   def get_workload(cls):
     return 'functional-query'
@@ -252,7 +257,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     killed_impalad = self.__kill_random_impalad()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
     self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
@@ -264,7 +269,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     results = self.client.fetch(query, handle)
     assert results.success
     assert len(results.data) == 1
-    assert "6001215" in results.data[0]
+    assert self._count_query_result in results.data[0]
 
     # The runtime profile of the retried query.
     retried_runtime_profile = self.client.get_runtime_profile(handle)
@@ -312,7 +317,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # and the query should be retried. Add delay before admission so that the 2nd node
     # is removed from the blacklist before scheduler makes schedule for the retried
     # query.
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true',
                        'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
@@ -325,7 +330,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     results = self.client.fetch(query, handle)
     assert results.success
     assert len(results.data) == 1
-    assert "6001215" in results.data[0]
+    assert self._count_query_result in results.data[0]
 
     # The runtime profile of the retried query.
     retried_runtime_profile = self.client.get_runtime_profile(handle)
@@ -375,7 +380,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     rpc_not_accessible_impalad = self.cluster.impalads[1]
     assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT
 
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true',
                        'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
@@ -698,7 +703,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
     self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
@@ -737,7 +742,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
     self.__wait_until_retry_state(handle, 'RETRYING')
@@ -767,7 +772,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
 
@@ -791,7 +796,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
     self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')
     self.hs2_client.execute_async(query)
@@ -818,7 +823,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     self.execute_query_async(query, query_options={'retry_failed_queries': 'true'})
     # The number of in-flight queries is 0 at the beginning, then 1 when the original
     # query is submitted. It's 2 when the retried query is registered. Although the retry
@@ -848,7 +853,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'})
     self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)
@@ -887,7 +892,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     client = self.cluster.get_first_impalad().service.create_beeswax_client()
     client.set_configuration({'retry_failed_queries': 'true'})
     handle = client.execute_async(query)
@@ -917,7 +922,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     """Test query retries with the HS2 protocol. Enable the results set cache as well and
     test that query retries work with the results cache."""
     self.cluster.impalads[1].kill()
-    query = "select count(*) from tpch_parquet.lineitem"
+    query = self._count_query
     self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
     self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')
     handle = self.hs2_client.execute_async(query)
@@ -926,7 +931,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     results = self.hs2_client.fetch(query, handle)
     assert results.success
     assert len(results.data) == 1
-    assert int(results.data[0]) == 6001215
+    assert results.data[0] == self._count_query_result
 
     # Validate the live exec summary.
     retried_query_id = \