You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/04/04 17:09:13 UTC

[impala] 11/12: IMPALA-7251: Fix QueryMaintenance calls in Aggregators

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit fdd6db524c9c97f0baebfde0119fce19d62eaec3
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Thu Jul 5 18:28:04 2018 +0000

    IMPALA-7251: Fix QueryMaintenance calls in Aggregators
    
    A recent change, IMPALA-110 (part 2), refactored
    PartitionedAggregationNode into several classes, including a new type
    'Aggregator'. During this refactor, code that makes local allocations
    while evaluating exprs was moved from the ExecNode (now
    AggregationNode/StreamingAggregationNode) into the Aggregators, but
    code related to cleaning these allocations up (ie QueryMaintenance())
    was not, resulting in some queries using an excessive amount of
    memory.
    
    This patch removes all calls to QueryMaintenance() from the exec nodes
    and moves them into the Aggregators.
    
    Testing:
    - Added new test cases with a mem limit that fails if the expr
      allocations aren't released in a timely manner.
    - Passed a full exhaustive run.
    
    Change-Id: I4dac2bb0a15cdd7315ee15608bae409c125c82f5
    Reviewed-on: http://gerrit.cloudera.org:8080/10871
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/aggregation-node.cc                    |  5 ++--
 be/src/exec/aggregator.cc                          |  5 ++++
 be/src/exec/aggregator.h                           |  6 +++++
 be/src/exec/grouping-aggregator.cc                 |  7 ++---
 be/src/exec/grouping-aggregator.h                  |  6 -----
 be/src/exec/non-grouping-aggregator.cc             |  2 ++
 be/src/exec/streaming-aggregation-node.cc          |  5 ++--
 .../QueryTest/spilling-regression-exhaustive.test  | 30 ++++++++++++++++++++++
 8 files changed, 51 insertions(+), 15 deletions(-)

diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
index d25284d..2c95590 100644
--- a/be/src/exec/aggregation-node.cc
+++ b/be/src/exec/aggregation-node.cc
@@ -78,7 +78,6 @@ Status AggregationNode::Open(RuntimeState* state) {
   bool eos = false;
   do {
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(QueryMaintenance(state));
     RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos));
     RETURN_IF_ERROR(aggregator_->AddBatch(state, &batch));
     batch.Reset();
@@ -98,7 +97,6 @@ Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
-  RETURN_IF_ERROR(QueryMaintenance(state));
 
   if (ReachedLimit()) {
     *eos = true;
@@ -118,6 +116,9 @@ Status AggregationNode::Reset(RuntimeState* state) {
 
 void AggregationNode::Close(RuntimeState* state) {
   if (is_closed()) return;
+  // All expr mem allocations should happen in the Aggregator.
+  DCHECK(expr_results_pool() == nullptr
+      || expr_results_pool()->total_allocated_bytes() == 0);
   aggregator_->Close(state);
   ExecNode::Close(state);
 }
diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc
index 70178cc..87abc52 100644
--- a/be/src/exec/aggregator.cc
+++ b/be/src/exec/aggregator.cc
@@ -196,6 +196,11 @@ Tuple* Aggregator::GetOutputTuple(
   return dst;
 }
 
+Status Aggregator::QueryMaintenance(RuntimeState* state) {
+  expr_results_pool_->Clear();
+  return state->CheckQueryState();
+}
+
 // IR Generation for updating a single aggregation slot. Signature is:
 // void UpdateSlot(AggFnEvaluator* agg_expr_eval, AggTuple* agg_tuple, char** row)
 //
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
index ab13d45..f415606 100644
--- a/be/src/exec/aggregator.h
+++ b/be/src/exec/aggregator.h
@@ -188,6 +188,12 @@ class Aggregator {
   Tuple* GetOutputTuple(
       const std::vector<AggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool);
 
+  /// Clears 'expr_results_pool_' and returns the result of state->CheckQueryState().
+  /// Aggregators should call this periodically, e.g. once per input row batch. This
+  /// should not be called outside the main execution thread.
+  /// TODO: IMPALA-2399: replace QueryMaintenance() - see JIRA for more details.
+  Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT;
+
   /// Codegen for updating aggregate expressions agg_fns_[agg_fn_idx]
   /// and returns the IR function in 'fn'. Returns non-OK status if codegen
   /// is unsuccessful.
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 0eb4a3f..4f3e5cf 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -228,6 +228,7 @@ Status GroupingAggregator::Open(RuntimeState* state) {
 }
 
 Status GroupingAggregator::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+  RETURN_IF_ERROR(QueryMaintenance(state));
   if (!partition_eos_) {
     RETURN_IF_ERROR(GetRowsFromPartition(state, row_batch));
   }
@@ -405,6 +406,7 @@ void GroupingAggregator::Close(RuntimeState* state) {
 
 Status GroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(build_timer_);
+  RETURN_IF_ERROR(QueryMaintenance(state));
   num_input_rows_ += batch->num_rows();
 
   TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
@@ -952,11 +954,6 @@ int64_t GroupingAggregator::MinReservation() const {
       + resource_profile_.max_row_buffer_size * 2;
 }
 
-Status GroupingAggregator::QueryMaintenance(RuntimeState* state) {
-  expr_results_pool_->Clear();
-  return state->CheckQueryState();
-}
-
 BufferPool::ClientHandle* GroupingAggregator::buffer_pool_client() {
   return reservation_manager_.buffer_pool_client();
 }
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index 0d1b893..b766a1e 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -587,12 +587,6 @@ class GroupingAggregator : public Aggregator {
   void CleanupHashTbl(
       const std::vector<AggFnEvaluator*>& agg_fn_evals, HashTable::Iterator it);
 
-  /// Clears 'expr_results_pool_' and returns the result of state->CheckQueryState().
-  /// Aggregators should call this periodically, e.g. once per input row batch. This
-  /// should not be called outside the main execution thread.
-  /// TODO: IMPALA-2399: replace QueryMaintenance() - see JIRA for more details.
-  Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT;
-
   /// Codegen the non-streaming add row batch loop. The loop has already been compiled to
   /// IR and loaded into the codegen object. UpdateAggTuple has also been codegen'd to IR.
   /// This function will modify the loop subsituting the statically compiled functions
diff --git a/be/src/exec/non-grouping-aggregator.cc b/be/src/exec/non-grouping-aggregator.cc
index 585c264..1ee4e46 100644
--- a/be/src/exec/non-grouping-aggregator.cc
+++ b/be/src/exec/non-grouping-aggregator.cc
@@ -72,6 +72,7 @@ Status NonGroupingAggregator::Open(RuntimeState* state) {
 
 Status NonGroupingAggregator::GetNext(
     RuntimeState* state, RowBatch* row_batch, bool* eos) {
+  RETURN_IF_ERROR(QueryMaintenance(state));
   // There was no grouping, so evaluate the conjuncts and return the single result row.
   // We allow calling GetNext() after eos, so don't return this row again.
   if (!singleton_output_tuple_returned_) GetSingletonOutput(row_batch);
@@ -116,6 +117,7 @@ void NonGroupingAggregator::Close(RuntimeState* state) {
 
 Status NonGroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(build_timer_);
+  RETURN_IF_ERROR(QueryMaintenance(state));
 
   if (add_batch_impl_fn_ != nullptr) {
     RETURN_IF_ERROR(add_batch_impl_fn_(this, batch));
diff --git a/be/src/exec/streaming-aggregation-node.cc b/be/src/exec/streaming-aggregation-node.cc
index 4ad7820..c1e9184 100644
--- a/be/src/exec/streaming-aggregation-node.cc
+++ b/be/src/exec/streaming-aggregation-node.cc
@@ -80,7 +80,6 @@ Status StreamingAggregationNode::GetNext(
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
-  RETURN_IF_ERROR(QueryMaintenance(state));
 
   if (ReachedLimit()) {
     *eos = true;
@@ -113,7 +112,6 @@ Status StreamingAggregationNode::GetRowsStreaming(
   do {
     DCHECK_EQ(out_batch->num_rows(), 0);
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(QueryMaintenance(state));
 
     RETURN_IF_ERROR(child(0)->GetNext(state, child_batch_.get(), &child_eos_));
 
@@ -137,6 +135,9 @@ Status StreamingAggregationNode::Reset(RuntimeState* state) {
 
 void StreamingAggregationNode::Close(RuntimeState* state) {
   if (is_closed()) return;
+  // All expr mem allocations should happen in the Aggregator.
+  DCHECK(expr_results_pool() == nullptr
+      || expr_results_pool()->total_allocated_bytes() == 0);
   aggregator_->Close(state);
   child_batch_.reset();
   ExecNode::Close(state);
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-regression-exhaustive.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-regression-exhaustive.test
index 11c1aac..1e7ac86 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-regression-exhaustive.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-regression-exhaustive.test
@@ -245,6 +245,36 @@ BIGINT
 row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
 ====
 ---- QUERY
+# Same as above, except disable streaming preaggs to ensure that AggregationNode is also
+# releasing local memory allocations as appropriate.
+set mem_limit=800m;
+set num_scanner_threads=1;
+set disable_streaming_preaggregations=true;
+select count(distinct concat(cast(l_comment as char(120)), cast(l_comment as char(120)),
+                             cast(l_comment as char(120)), cast(l_comment as char(120)),
+                             cast(l_comment as char(120)), cast(l_comment as char(120))))
+from lineitem
+---- RESULTS
+4502054
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+# Verify that the agg spilled.
+row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# Same as above, except use a non-grouping aggregate function to ensure that
+# NonGroupingAggregator is also releasing local memory allocations as appropriate.
+set mem_limit=50m;
+set num_scanner_threads=1;
+select min(regexp_replace(l_comment, ".", "x"))
+from lineitem
+---- RESULTS
+'xxxxxxxxxx'
+---- TYPES
+STRING
+====
+---- QUERY
 # IMPALA-3304: test that avg() can spill with a query mem limit.
 # This test only covers that use FIXED_UDA_INTERMEDIATE, not functions that allocate
 # strings for intermediate values. mem_limit is tuned to reproduce the issue on a 3-node