You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/09/12 07:21:43 UTC

[impala] 01/03: IMPALA-12383: Fix SingleNodePlanner aggregation limits

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 704ff7788d015dcbe66a319fb017d0a3f8a76399
Author: Michael Smith <mi...@cloudera.com>
AuthorDate: Fri Aug 18 11:38:11 2023 -0700

    IMPALA-12383: Fix SingleNodePlanner aggregation limits
    
    IMPALA-2581 added enforcement of the limit when adding entries to the
    grouping aggregation. It would stop adding new entries if the number of
    entries in the grouping aggregation was >= the limit. If the grouping
    aggregation never contains more entries than the limit, then it would
    not output more entries.
    
    However, this limit was not enforced exactly when adding. It would add a
    whole batch before checking the limit, so it can go past the limit. In
    practice the exchange in a distributed aggregation would enforce limits,
    so this would only show up when num_nodes=1. As a result, the following
    query incorrectly returns 16 rows, not 10:
    
      set num_nodes=1;
      select distinct l_orderkey from tpch.lineitem limit 10;
    
    One option is to be exact when adding items to the group aggregation,
    which would require testing the limit on each row (we don't know which
    are duplicates). This is awkward. Removing the limit on the output of
    the aggregation also is not really needed for the original change
    (stopping the children early once the limit is reached). Instead, we
    restore the limit on the output of the grouping agg (which is already
    known to work).
    
    Testing:
    - added a test case where we assert number of rows returned by an
      aggregation node (rather than an exchange or top-n).
    - restores definition of ALL_CLUSTER_SIZES and makes it simpler to
      enable for individual test suites. Filed IMPALA-12394 to generally
      re-enable testing with ALL_CLUSTER_SIZES. Enables ALL_CLUSTER_SIZES
      for aggregation tests.
    
    Change-Id: Ic5eec1190e8e182152aa954897b79cc3f219c816
    Reviewed-on: http://gerrit.cloudera.org:8080/20379
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
---
 be/src/exec/aggregation-node-base.cc |  3 +--
 be/src/exec/grouping-aggregator.cc   |  6 +-----
 be/src/exec/grouping-aggregator.h    |  5 +----
 tests/common/impala_test_suite.py    | 12 ++++++------
 tests/common/test_dimensions.py      |  9 +++++----
 tests/query_test/test_aggregation.py | 18 +++++++++++++++++-
 6 files changed, 31 insertions(+), 22 deletions(-)

diff --git a/be/src/exec/aggregation-node-base.cc b/be/src/exec/aggregation-node-base.cc
index 33e285a78..2e748c09c 100644
--- a/be/src/exec/aggregation-node-base.cc
+++ b/be/src/exec/aggregation-node-base.cc
@@ -82,8 +82,7 @@ AggregationNodeBase::AggregationNodeBase(
           static_cast<const GroupingAggregatorConfig*>(agg);
       DCHECK(grouping_config != nullptr);
       node.reset(new GroupingAggregator(this, pool_, *grouping_config,
-          pnode.tnode_->agg_node.estimated_input_cardinality,
-          pnode.tnode_->agg_node.fast_limit_check));
+          pnode.tnode_->agg_node.estimated_input_cardinality));
     }
     aggs_.push_back(std::move(node));
     runtime_profile_->AddChild(aggs_[i]->runtime_profile());
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index cd6162c83..7ff9038aa 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -133,8 +133,7 @@ static const int STREAMING_HT_MIN_REDUCTION_SIZE =
     sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
 
 GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
-    const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality,
-    bool needUnsetLimit)
+    const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality)
   : Aggregator(exec_node, pool, config,
       Substitute("$0$1", RuntimeProfile::PREFIX_GROUPING_AGGREGATOR, config.agg_idx_)),
     hash_table_config_(*config.hash_table_config_),
@@ -152,9 +151,6 @@ GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
     estimated_input_cardinality_(estimated_input_cardinality),
     partition_pool_(new ObjectPool()) {
   DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
-  if (needUnsetLimit) {
-    UnsetLimit();
-  }
 }
 
 Status GroupingAggregator::Prepare(RuntimeState* state) {
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index b71ba9885..e07a9fdea 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -192,8 +192,7 @@ class GroupingAggregatorConfig : public AggregatorConfig {
 class GroupingAggregator : public Aggregator {
  public:
   GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
-      const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality,
-      bool needUnsetLimit);
+      const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality);
 
   virtual Status Prepare(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
@@ -215,8 +214,6 @@ class GroupingAggregator : public Aggregator {
 
   virtual int64_t GetNumKeys() const override;
 
-  void UnsetLimit() { limit_ = -1; }
-
  private:
   struct Partition;
 
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index bc83ca609..7b23ae53e 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -48,7 +48,6 @@ from tests.common.impala_connection import create_connection
 from tests.common.impala_service import ImpaladService
 from tests.common.test_dimensions import (
     ALL_BATCH_SIZES,
-    ALL_CLUSTER_SIZES,
     ALL_DISABLE_CODEGEN_OPTIONS,
     ALL_NODES_ONLY,
     TableFormatInfo,
@@ -166,7 +165,7 @@ GROUP_NAME = grp.getgrgid(pwd.getpwnam(getuser()).pw_gid).gr_name
 # Base class for Impala tests. All impala test cases should inherit from this class
 class ImpalaTestSuite(BaseTestSuite):
   @classmethod
-  def add_test_dimensions(cls):
+  def add_test_dimensions(cls, cluster_sizes=None):
     """
     A hook for adding additional dimensions.
 
@@ -176,7 +175,10 @@ class ImpalaTestSuite(BaseTestSuite):
     super(ImpalaTestSuite, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(
         cls.create_table_info_dimension(cls.exploration_strategy()))
-    cls.ImpalaTestMatrix.add_dimension(cls.__create_exec_option_dimension())
+    if not cluster_sizes:
+      # TODO IMPALA-12394: switch to ALL_CLUSTER_SIZES for exhaustive runs
+      cluster_sizes = ALL_NODES_ONLY
+    cls.ImpalaTestMatrix.add_dimension(cls.__create_exec_option_dimension(cluster_sizes))
     # Execute tests through Beeswax by default. Individual tests that have been converted
     # to work with the HS2 client can add HS2 in addition to or instead of beeswax.
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', 'beeswax'))
@@ -1081,14 +1083,12 @@ class ImpalaTestSuite(BaseTestSuite):
     return tf_dimensions
 
   @classmethod
-  def __create_exec_option_dimension(cls):
-    cluster_sizes = ALL_CLUSTER_SIZES
+  def __create_exec_option_dimension(cls, cluster_sizes):
     disable_codegen_options = ALL_DISABLE_CODEGEN_OPTIONS
     batch_sizes = ALL_BATCH_SIZES
     exec_single_node_option = [0]
     if cls.exploration_strategy() == 'core':
       disable_codegen_options = [False]
-      cluster_sizes = ALL_NODES_ONLY
     return create_exec_option_dimension(cluster_sizes, disable_codegen_options,
                                         batch_sizes,
                                         exec_single_node_option=exec_single_node_option,
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 0c5c2fef3..9b957d3bb 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -203,9 +203,8 @@ def orc_schema_resolution_constraint(v):
 # Common sets of values for the exec option vectors
 ALL_BATCH_SIZES = [0]
 
-# Don't run with NUM_NODES=1 due to IMPALA-561
-# ALL_CLUSTER_SIZES = [0, 1]
-ALL_CLUSTER_SIZES = [0]
+# Test SingleNode and Distributed Planners
+ALL_CLUSTER_SIZES = [0, 1]
 
 SINGLE_NODE_ONLY = [1]
 ALL_NODES_ONLY = [0]
@@ -219,7 +218,9 @@ def create_single_exec_option_dimension(num_nodes=0, disable_codegen_rows_thresh
       disable_codegen_rows_threshold_options=[disable_codegen_rows_threshold],
       batch_sizes=[0])
 
-def create_exec_option_dimension(cluster_sizes=ALL_CLUSTER_SIZES,
+
+# TODO IMPALA-12394: switch to ALL_CLUSTER_SIZES
+def create_exec_option_dimension(cluster_sizes=ALL_NODES_ONLY,
                                  disable_codegen_options=ALL_DISABLE_CODEGEN_OPTIONS,
                                  batch_sizes=ALL_BATCH_SIZES,
                                  sync_ddl=None, exec_single_node_option=[0],
diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py
index 0df1767a7..2e20abe7e 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -24,6 +24,7 @@ import pytest
 from testdata.common import widetable
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import (
+    ALL_CLUSTER_SIZES,
     create_exec_option_dimension,
     create_exec_option_dimension_from_dict,
     create_uncompressed_text_dimension)
@@ -91,7 +92,7 @@ class TestAggregation(ImpalaTestSuite):
 
   @classmethod
   def add_test_dimensions(cls):
-    super(TestAggregation, cls).add_test_dimensions()
+    super(TestAggregation, cls).add_test_dimensions(ALL_CLUSTER_SIZES)
 
     # Add two more dimensions
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('agg_func', *AGG_FUNCTIONS))
@@ -391,6 +392,21 @@ class TestAggregationQueries(ImpalaTestSuite):
       pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent")
     self.run_test_case('QueryTest/grouping-sets', vector)
 
+  def test_aggregation_limit(self, vector):
+    """Test that limits are honoured when enforced by aggregation node."""
+    # 1-phase
+    result = self.execute_query(
+        "select distinct l_orderkey from tpch.lineitem limit 10",
+        vector.get_value('exec_option'))
+    assert len(result.data) == 10
+
+    # 2-phase with transpose
+    result = self.execute_query(
+        "select count(distinct l_discount), group_concat(distinct l_linestatus), "
+        "max(l_quantity) from tpch.lineitem group by l_tax, l_shipmode limit 10;",
+        vector.get_value('exec_option'))
+    assert len(result.data) == 10
+
 
 class TestDistinctAggregation(ImpalaTestSuite):
   """Run the distinct aggregation test suite, with codegen and shuffle_distinct_exprs