You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/09/23 07:07:01 UTC

[impala] 02/02: IMPALA-2581: LIMIT can be propagated down into some aggregations

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 39cc4b6bf45a172c3fdcd6a9cc42eaadfcf3ae71
Author: liuyao <54...@163.com>
AuthorDate: Tue Aug 31 14:14:18 2021 +0800

    IMPALA-2581: LIMIT can be propagated down into some aggregations
    
    This patch contains 2 parts:
    1. When both conditions below are true, push down limit to
    pre-aggregation
         a) aggregation node has no aggregate function
         b) aggregation node has no predicate
    2. finish aggregation when number of unique keys of hash table has
    exceeded the limit.
    
    Sample queries:
    SELECT DISTINCT f FROM t LIMIT n
    Can pass the LIMIT all the way down to the pre-aggregation, which
    leads to a nearly unbounded speedup on these queries in large tables
    when n is low.
    
    Testing:
    Add test targeted-perf/queries/aggregation.test
    Pass core test
    
    Change-Id: I930a6cb203615acfc03f23118d1bc1f0ea360995
    Reviewed-on: http://gerrit.cloudera.org:8080/17821
    Reviewed-by: Qifan Chen <qc...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/aggregation-node-base.cc               |  4 +++-
 be/src/exec/aggregation-node-base.h                |  3 +++
 be/src/exec/aggregation-node.cc                    | 12 +++++++++++
 be/src/exec/aggregator.h                           |  3 +++
 be/src/exec/grouping-aggregator.cc                 | 25 +++++++++++++++++++++-
 be/src/exec/grouping-aggregator.h                  |  7 +++++-
 be/src/exec/non-grouping-aggregator.h              |  2 ++
 be/src/exec/streaming-aggregation-node.cc          | 19 ++++++++++++++--
 common/thrift/PlanNodes.thrift                     |  3 +++
 .../org/apache/impala/planner/AggregationNode.java | 10 +++++++++
 .../apache/impala/planner/DistributedPlanner.java  |  4 +++-
 .../org/apache/impala/planner/ExchangeNode.java    |  5 ++++-
 .../queries/PlannerTest/resource-requirements.test | 15 +++++++------
 .../queries/PlannerTest/setoperation-rewrite.test  |  3 ++-
 .../queries/PlannerTest/subquery-rewrite.test      |  3 ++-
 .../queries/PlannerTest/tpcds/tpcds-q06.test       | 10 +++++----
 .../queries/PlannerTest/tpcds/tpcds-q54.test       | 20 ++++++++++-------
 .../queries/QueryTest/spilling.test                | 17 ++++++++++++++-
 .../targeted-perf/queries/aggregation.test         | 11 ++++++++++
 19 files changed, 147 insertions(+), 29 deletions(-)

diff --git a/be/src/exec/aggregation-node-base.cc b/be/src/exec/aggregation-node-base.cc
index 0cad9a8..33e285a 100644
--- a/be/src/exec/aggregation-node-base.cc
+++ b/be/src/exec/aggregation-node-base.cc
@@ -82,11 +82,13 @@ AggregationNodeBase::AggregationNodeBase(
           static_cast<const GroupingAggregatorConfig*>(agg);
       DCHECK(grouping_config != nullptr);
       node.reset(new GroupingAggregator(this, pool_, *grouping_config,
-          pnode.tnode_->agg_node.estimated_input_cardinality));
+          pnode.tnode_->agg_node.estimated_input_cardinality,
+          pnode.tnode_->agg_node.fast_limit_check));
     }
     aggs_.push_back(std::move(node));
     runtime_profile_->AddChild(aggs_[i]->runtime_profile());
   }
+  fast_limit_check_ = pnode.tnode_->agg_node.fast_limit_check;
 }
 
 Status AggregationNodeBase::Prepare(RuntimeState* state) {
diff --git a/be/src/exec/aggregation-node-base.h b/be/src/exec/aggregation-node-base.h
index 6d148b9..8bcecfc 100644
--- a/be/src/exec/aggregation-node-base.h
+++ b/be/src/exec/aggregation-node-base.h
@@ -67,6 +67,9 @@ class AggregationNodeBase : public ExecNode {
   /// END: Members that must be Reset()
   /////////////////////////////////////////
 
+  /// If true, aggregation can be done ahead of time without computing all the input data
+  bool fast_limit_check_ = false;
+
   /// Splits the rows of 'batch' up according to which tuple of the row is non-null such
   /// that a row with tuple 'i' non-null is copied into the batch 'mini_batches[i]'.
   /// It is expected that all rows of 'batch' have exactly 1 non-null tuple.
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
index 8db7925..603442e 100644
--- a/be/src/exec/aggregation-node.cc
+++ b/be/src/exec/aggregation-node.cc
@@ -69,6 +69,18 @@ Status AggregationNode::Open(RuntimeState* state) {
     if (num_aggs == 1) {
       RETURN_IF_ERROR(aggs_[0]->AddBatch(state, &child_batch));
       child_batch.Reset();
+      if (fast_limit_check_) {
+        DCHECK(limit() > -1);
+        if (aggs_[0]->GetNumKeys() >= limit()) {
+          eos = true;
+          runtime_profile_->AddInfoString("FastLimitCheckExceededRows",
+              SimpleItoa(aggs_[0]->GetNumKeys() - limit()));
+          VLOG_QUERY << Substitute("the number of rows ($0) returned from the "
+              "aggregation node has exceeded the limit of $1", aggs_[0]->GetNumKeys(),
+              limit());
+          break;
+        }
+      }
       continue;
     }
 
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
index 693716e..6c3cae2 100644
--- a/be/src/exec/aggregator.h
+++ b/be/src/exec/aggregator.h
@@ -172,6 +172,9 @@ class Aggregator {
 
   static const char* LLVM_CLASS_NAME;
 
+  // The number of unique values that have been aggregated
+  virtual int64_t GetNumKeys() const = 0;
+
  protected:
   /// The id of the ExecNode this Aggregator corresponds to.
   const int id_;
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index f77f0df..1b8ebd1 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -134,7 +134,8 @@ static const int STREAMING_HT_MIN_REDUCTION_SIZE =
     sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
 
 GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
-    const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality)
+    const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality,
+    bool needUnsetLimit)
   : Aggregator(
         exec_node, pool, config, Substitute("GroupingAggregator $0", config.agg_idx_)),
     hash_table_config_(*config.hash_table_config_),
@@ -152,6 +153,9 @@ GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
     estimated_input_cardinality_(estimated_input_cardinality),
     partition_pool_(new ObjectPool()) {
   DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
+  if (needUnsetLimit) {
+    UnsetLimit();
+  }
 }
 
 Status GroupingAggregator::Prepare(RuntimeState* state) {
@@ -1317,4 +1321,23 @@ Status GroupingAggregatorConfig::CodegenAddBatchStreamingImpl(
 // Instantiate required templates.
 template Status GroupingAggregator::AppendSpilledRow<false>(Partition*, TupleRow*);
 template Status GroupingAggregator::AppendSpilledRow<true>(Partition*, TupleRow*);
+
+int64_t GroupingAggregator::GetNumKeys() const {
+  int64_t num_keys = 0;
+  for (int i = 0; i < hash_partitions_.size(); ++i) {
+    Partition* partition = hash_partitions_[i];
+    if (partition == nullptr) continue;
+    // We might be dealing with a rebuilt spilled partition, where all partitions are
+    // pointing to a single in-memory partition, so make sure we only proceed for the
+    // right partition.
+    if (i != partition->idx) continue;
+    if (!partition->is_spilled()) {
+      if (partition->hash_tbl == nullptr) {
+        continue;
+      }
+      num_keys += partition->hash_tbl->size();
+    }
+  }
+  return num_keys;
+}
 } // namespace impala
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index f9f7d0d..b71ba98 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -192,7 +192,8 @@ class GroupingAggregatorConfig : public AggregatorConfig {
 class GroupingAggregator : public Aggregator {
  public:
   GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
-      const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality);
+      const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality,
+      bool needUnsetLimit);
 
   virtual Status Prepare(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
@@ -212,6 +213,10 @@ class GroupingAggregator : public Aggregator {
   virtual std::string DebugString(int indentation_level = 0) const override;
   virtual void DebugString(int indentation_level, std::stringstream* out) const override;
 
+  virtual int64_t GetNumKeys() const override;
+
+  void UnsetLimit() { limit_ = -1; }
+
  private:
   struct Partition;
 
diff --git a/be/src/exec/non-grouping-aggregator.h b/be/src/exec/non-grouping-aggregator.h
index e50f609..47b461f 100644
--- a/be/src/exec/non-grouping-aggregator.h
+++ b/be/src/exec/non-grouping-aggregator.h
@@ -97,6 +97,8 @@ class NonGroupingAggregator : public Aggregator {
   virtual std::string DebugString(int indentation_level = 0) const override;
   virtual void DebugString(int indentation_level, std::stringstream* out) const override;
 
+  virtual int64_t GetNumKeys() const override { return 1; }
+
  private:
   /// MemPool used to allocate memory for 'singleton_output_tuple_'. The ownership of the
   /// pool's memory is transferred to the output batch on eos. The pool should not be
diff --git a/be/src/exec/streaming-aggregation-node.cc b/be/src/exec/streaming-aggregation-node.cc
index ef34a0b..88bd243 100644
--- a/be/src/exec/streaming-aggregation-node.cc
+++ b/be/src/exec/streaming-aggregation-node.cc
@@ -36,7 +36,8 @@ StreamingAggregationNode::StreamingAggregationNode(
     ObjectPool* pool, const AggregationPlanNode& pnode, const DescriptorTbl& descs)
   : AggregationNodeBase(pool, pnode, descs) {
   DCHECK(pnode.tnode_->conjuncts.empty()) << "Preaggs have no conjuncts";
-  DCHECK(limit_ == -1) << "Preaggs have no limits";
+  DCHECK(limit_ == -1 || (limit_ != -1 && fast_limit_check_))
+      << "Preaggs have no limits";
   for (auto& t_agg : pnode.tnode_->agg_node.aggregators) {
     DCHECK(t_agg.use_streaming_preaggregation);
   }
@@ -62,7 +63,7 @@ Status StreamingAggregationNode::GetNext(
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
 
-  if (ReachedLimit()) {
+  if (!fast_limit_check_ && ReachedLimit()) {
     *eos = true;
     return Status::OK();
   }
@@ -124,6 +125,20 @@ Status StreamingAggregationNode::GetRowsStreaming(
       if (child_batch_processed_) {
         child_batch_->Reset();
       }
+      if (fast_limit_check_) {
+        DCHECK(limit() > -1);
+        if (aggs_[0]->GetNumKeys() >= limit()) {
+          child_eos_ = true;
+          child_batch_processed_ = true;
+          child_batch_->Reset();
+          runtime_profile_->AddInfoString("FastLimitCheckExceededRows",
+              SimpleItoa(aggs_[0]->GetNumKeys() - limit()));
+          VLOG_QUERY << Substitute("the number of rows ($0) returned from the streaming "
+              "aggregation node has exceeded the limit of $1",aggs_[0]->GetNumKeys(),
+              limit());
+          break;
+        }
+      }
       continue;
     }
 
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 502a6fc..3c48fe8 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -475,6 +475,9 @@ struct TAggregationNode {
   // If true, this is the first AggregationNode in a aggregation plan with multiple
   // Aggregators and the entire input to this node should be passed to each Aggregator.
   3: required bool replicate_input
+
+  // Set to true if this aggregation can complete early
+  4: required bool fast_limit_check
 }
 
 struct TSortInfo {
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index 0a7e59e..b9741ff 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -429,6 +429,7 @@ public class AggregationNode extends PlanNode {
     boolean replicateInput = aggPhase_ == AggPhase.FIRST && aggInfos_.size() > 1;
     msg.agg_node.setReplicate_input(replicateInput);
     msg.agg_node.setEstimated_input_cardinality(getChild(0).getCardinality());
+    msg.agg_node.setFast_limit_check(canCompleteEarly());
     for (int i = 0; i < aggInfos_.size(); ++i) {
       AggregateInfo aggInfo = aggInfos_.get(i);
       List<TExpr> aggregateFunctions = new ArrayList<>();
@@ -641,4 +642,13 @@ public class AggregationNode extends PlanNode {
   public boolean isNonCorrelatedScalarSubquery() {
     return isNonCorrelatedScalarSubquery_;
   }
+
+  // When both conditions below are true, aggregation can complete early
+  //    a) aggregation node has no aggregate function
+  //    b) aggregation node has no predicate
+  // E.g. SELECT DISTINCT f1,f2,...fn FROM t LIMIT n
+  public boolean canCompleteEarly() {
+    return isSingleClassAgg() && hasLimit() && hasGrouping()
+        && !multiAggInfo_.hasAggregateExprs() && getConjuncts().isEmpty();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 7fbd350..67a376a 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -931,7 +931,9 @@ public class DistributedPlanner {
     // if there is a limit, we need to transfer it from the pre-aggregation
     // node in the child fragment to the merge aggregation node in the parent
     long limit = node.getLimit();
-    node.unsetLimit();
+    if (node.getMultiAggInfo().hasAggregateExprs() || !node.getConjuncts().isEmpty()) {
+      node.unsetLimit();
+    }
     node.unsetNeedsFinalize();
 
     // place a merge aggregation step in a new fragment
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index 29ce4f6..c385695 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -82,7 +82,10 @@ public class ExchangeNode extends PlanNode {
     offset_ = 0;
     children_.add(input);
     // Only apply the limit at the receiver if there are multiple senders.
-    if (input.getFragment().isPartitioned()) limit_ = input.limit_;
+    if (input.getFragment().isPartitioned() &&
+        !(input instanceof AggregationNode && !input.isBlockingNode())) {
+      limit_ = input.limit_;
+    }
     computeTupleIds();
   }
 
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 4be80f6..fd42e1f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -6182,8 +6182,8 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=231B cardinality=6.00M
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=112.00MB Threads=4
-Per-Host Resource Estimates: Memory=905MB
+Max Per-Host Resource Reservation: Memory=79.94MB Threads=4
+Per-Host Resource Estimates: Memory=441MB
 Analyzed query: SELECT DISTINCT * FROM tpch_parquet.lineitem LIMIT CAST(5 AS
 TINYINT)
 
@@ -6200,25 +6200,26 @@ PLAN-ROOT SINK
 |  in pipelines: 03(GETNEXT)
 |
 F01:PLAN FRAGMENT [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipin [...]
-Per-Host Resources: mem-estimate=473.84MB mem-reservation=34.00MB thread-reservation=1
+Per-Host Resources: mem-estimate=10.02MB mem-reservation=1.94MB thread-reservation=1
 03:AGGREGATE [FINALIZE]
 |  group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_ship [...]
 |  limit: 5
-|  mem-estimate=463.16MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
 |  tuple-ids=1 row-size=231B cardinality=5
 |  in pipelines: 03(GETNEXT), 00(OPEN)
 |
 02:EXCHANGE [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct [...]
-|  mem-estimate=10.69MB mem-reservation=0B thread-reservation=0
-|  tuple-ids=1 row-size=231B cardinality=6.00M
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=231B cardinality=5
 |  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=427.37MB mem-reservation=74.00MB thread-reservation=2
 01:AGGREGATE [STREAMING]
 |  group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_ship [...]
+|  limit: 5
 |  mem-estimate=347.37MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
-|  tuple-ids=1 row-size=231B cardinality=6.00M
+|  tuple-ids=1 row-size=231B cardinality=5
 |  in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/setoperation-rewrite.test b/testdata/workloads/functional-planner/queries/PlannerTest/setoperation-rewrite.test
index 90ae3cf..3e69931 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/setoperation-rewrite.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/setoperation-rewrite.test
@@ -734,7 +734,8 @@ PLAN-ROOT SINK
 |  |
 |  02:AGGREGATE [STREAMING]
 |  |  group by: functional.alltypestiny.id, functional.alltypestiny.bool_col, functional.alltypestiny.tinyint_col, functional.alltypestiny.smallint_col, functional.alltypestiny.int_col, functional.alltypestiny.bigint_col, functional.alltypestiny.float_col, functional.alltypestiny.double_col, functional.alltypestiny.date_string_col, functional.alltypestiny.string_col, functional.alltypestiny.timestamp_col, functional.alltypestiny.year, functional.alltypestiny.month
-|  |  row-size=89B cardinality=2
+|  |  limit: 1
+|  |  row-size=89B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypestiny]
 |     partition predicates: `year` = 2009, `month` = 2
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
index 0381f6f..8e83a77 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
@@ -2978,7 +2978,8 @@ PLAN-ROOT SINK
 |  |
 |  04:AGGREGATE [STREAMING]
 |  |  group by: i
-|  |  row-size=8B cardinality=20
+|  |  limit: 2
+|  |  row-size=8B cardinality=2
 |  |
 |  01:UNION
 |  |  pass-through-operands: 02
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q06.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q06.test
index e12c465..b1252f6 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q06.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q06.test
@@ -308,15 +308,16 @@ Per-Host Resources: mem-estimate=70.86MB mem-reservation=29.06MB thread-reservat
 |  |
 |  21:EXCHANGE [HASH((d_month_seq))]
 |  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
-|  |  tuple-ids=6 row-size=4B cardinality=108
+|  |  tuple-ids=6 row-size=4B cardinality=1
 |  |  in pipelines: 05(GETNEXT)
 |  |
 |  F05:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=58.00MB mem-reservation=2.25MB thread-reservation=2
 |  06:AGGREGATE [STREAMING]
 |  |  group by: (d_month_seq)
+|  |  limit: 1
 |  |  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0
-|  |  tuple-ids=6 row-size=4B cardinality=108
+|  |  tuple-ids=6 row-size=4B cardinality=1
 |  |  in pipelines: 05(GETNEXT)
 |  |
 |  05:SCAN HDFS [tpcds_parquet.date_dim, RANDOM]
@@ -586,15 +587,16 @@ Per-Instance Resources: mem-estimate=26.00MB mem-reservation=3.00MB thread-reser
 |  |
 |  21:EXCHANGE [HASH((d_month_seq))]
 |  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
-|  |  tuple-ids=6 row-size=4B cardinality=108
+|  |  tuple-ids=6 row-size=4B cardinality=1
 |  |  in pipelines: 05(GETNEXT)
 |  |
 |  F05:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=26.00MB mem-reservation=2.25MB thread-reservation=1
 |  06:AGGREGATE [STREAMING]
 |  |  group by: (d_month_seq)
+|  |  limit: 1
 |  |  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0
-|  |  tuple-ids=6 row-size=4B cardinality=108
+|  |  tuple-ids=6 row-size=4B cardinality=1
 |  |  in pipelines: 05(GETNEXT)
 |  |
 |  05:SCAN HDFS [tpcds_parquet.date_dim, RANDOM]
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q54.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q54.test
index a5fe877..ff936a0 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q54.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q54.test
@@ -421,15 +421,16 @@ Per-Host Resources: mem-estimate=13.66MB mem-reservation=4.94MB thread-reservati
 |  |
 |  45:EXCHANGE [HASH(d_month_seq + 3)]
 |  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
-|  |  tuple-ids=17 row-size=8B cardinality=108
+|  |  tuple-ids=17 row-size=8B cardinality=2
 |  |  in pipelines: 17(GETNEXT)
 |  |
 |  F17:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=58.00MB mem-reservation=2.25MB thread-reservation=2
 |  18:AGGREGATE [STREAMING]
 |  |  group by: CAST(d_month_seq AS BIGINT) + CAST(3 AS BIGINT)
+|  |  limit: 2
 |  |  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0
-|  |  tuple-ids=17 row-size=8B cardinality=108
+|  |  tuple-ids=17 row-size=8B cardinality=2
 |  |  in pipelines: 17(GETNEXT)
 |  |
 |  17:SCAN HDFS [tpcds_parquet.date_dim, RANDOM]
@@ -481,15 +482,16 @@ Per-Host Resources: mem-estimate=13.66MB mem-reservation=4.94MB thread-reservati
 |  |
 |  41:EXCHANGE [HASH(d_month_seq + 1)]
 |  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
-|  |  tuple-ids=14 row-size=8B cardinality=108
+|  |  tuple-ids=14 row-size=8B cardinality=2
 |  |  in pipelines: 14(GETNEXT)
 |  |
 |  F14:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=58.00MB mem-reservation=2.25MB thread-reservation=2
 |  15:AGGREGATE [STREAMING]
 |  |  group by: CAST(d_month_seq AS BIGINT) + CAST(1 AS BIGINT)
+|  |  limit: 2
 |  |  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0
-|  |  tuple-ids=14 row-size=8B cardinality=108
+|  |  tuple-ids=14 row-size=8B cardinality=2
 |  |  in pipelines: 14(GETNEXT)
 |  |
 |  14:SCAN HDFS [tpcds_parquet.date_dim, RANDOM]
@@ -866,15 +868,16 @@ Per-Instance Resources: mem-estimate=10.24MB mem-reservation=2.00MB thread-reser
 |  |
 |  45:EXCHANGE [HASH(d_month_seq + 3)]
 |  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
-|  |  tuple-ids=17 row-size=8B cardinality=108
+|  |  tuple-ids=17 row-size=8B cardinality=2
 |  |  in pipelines: 17(GETNEXT)
 |  |
 |  F17:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=26.00MB mem-reservation=2.25MB thread-reservation=1
 |  18:AGGREGATE [STREAMING]
 |  |  group by: CAST(d_month_seq AS BIGINT) + CAST(3 AS BIGINT)
+|  |  limit: 2
 |  |  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0
-|  |  tuple-ids=17 row-size=8B cardinality=108
+|  |  tuple-ids=17 row-size=8B cardinality=2
 |  |  in pipelines: 17(GETNEXT)
 |  |
 |  17:SCAN HDFS [tpcds_parquet.date_dim, RANDOM]
@@ -933,15 +936,16 @@ Per-Instance Resources: mem-estimate=10.24MB mem-reservation=2.00MB thread-reser
 |  |
 |  41:EXCHANGE [HASH(d_month_seq + 1)]
 |  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
-|  |  tuple-ids=14 row-size=8B cardinality=108
+|  |  tuple-ids=14 row-size=8B cardinality=2
 |  |  in pipelines: 14(GETNEXT)
 |  |
 |  F14:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=26.00MB mem-reservation=2.25MB thread-reservation=1
 |  15:AGGREGATE [STREAMING]
 |  |  group by: CAST(d_month_seq AS BIGINT) + CAST(1 AS BIGINT)
+|  |  limit: 2
 |  |  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0
-|  |  tuple-ids=14 row-size=8B cardinality=108
+|  |  tuple-ids=14 row-size=8B cardinality=2
 |  |  in pipelines: 14(GETNEXT)
 |  |
 |  14:SCAN HDFS [tpcds_parquet.date_dim, RANDOM]
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index f797f7f..8a4a36b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -288,7 +288,7 @@ set num_nodes=1;
 select l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment
 from tpch_parquet.lineitem
 group by 1, 2, 3, 4, 5, random()
-limit 5
+limit 50000000
 ---- RUNTIME_PROFILE
 row_regex: .*Query State: FINISHED.*
 row_regex: .*Query Status: OK.*
@@ -432,3 +432,18 @@ STRING,DECIMAL,DECIMAL,DECIMAL,DECIMAL,BIGINT
 row_regex: .*InMemoryHeapsEvicted: .* \([1-9][0-9]*\)
 #row_regex: .*SpilledRuns: .* \([1-9][0-9]*\)
 ====
+---- QUERY
+# Test spilling an agg with a LIMIT; see IMPALA-2581
+set buffer_pool_limit=136m;
+select c.c2 from
+(select distinct (a.id*10000 + b.id) c1, a.int_col c2
+from functional.alltypes a, functional.alltypes b limit 3500000) c join /* +SHUFFLE */ functional.alltypes d on c.c2 = d.int_col
+group by c.c2
+limit 5
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+# Verify that spilling was activated.
+row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
+row_regex: .*FastLimitCheckExceededRows: [0-9]+
+====
diff --git a/testdata/workloads/targeted-perf/queries/aggregation.test b/testdata/workloads/targeted-perf/queries/aggregation.test
index e0e98d7..68b9b07 100644
--- a/testdata/workloads/targeted-perf/queries/aggregation.test
+++ b/testdata/workloads/targeted-perf/queries/aggregation.test
@@ -2722,3 +2722,14 @@ AGG1,AGG2,G
 ----TYPES
 BIGINT,BIGINT,STRING
 ====
+---- QUERY: PERF_AGG-Q11
+-- IMPALA-2581: LIMIT can be used to speed up aggregations
+select distinct l_orderkey from lineitem limit 10;
+---- RUNTIME_PROFILE
+row_regex: .*FastLimitCheckExceededRows: [1-9][0-9]*
+====
+---- QUERY: PERF_AGG-Q12
+select l_orderkey from lineitem group by 1 limit 10;
+---- RUNTIME_PROFILE
+row_regex: .*FastLimitCheckExceededRows: [1-9][0-9]*
+====
\ No newline at end of file