You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:04:02 UTC

[50/51] [abbrv] impala git commit: IMPALA-7251: Fix QueryMaintenance calls in Aggregators

IMPALA-7251: Fix QueryMaintenance calls in Aggregators

A recent change, IMPALA-110 (part 2), refactored
PartitionedAggregationNode into several classes, including a new type
'Aggregator'. During this refactor, code that makes local allocations
while evaluating exprs was moved from the ExecNode (now
AggregationNode/StreamingAggregationNode) into the Aggregators, but
code related to cleaning these allocations up (ie QueryMaintenance())
was not, resulting in some queries using an excessive amount of
memory.

This patch removes all calls to QueryMaintenance() from the exec nodes
and moves them into the Aggregators.

Testing:
- Added new test cases with a mem limit that fails if the expr
  allocations aren't released in a timely manner.
- Passed a full exhaustive run.

Change-Id: I4dac2bb0a15cdd7315ee15608bae409c125c82f5
Reviewed-on: http://gerrit.cloudera.org:8080/10871
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e2aafae2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e2aafae2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e2aafae2

Branch: refs/heads/master
Commit: e2aafae204a695ddc6a9745f879f6164af19027a
Parents: 0459721
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Thu Jul 5 18:28:04 2018 +0000
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Jul 13 03:04:35 2018 +0000

----------------------------------------------------------------------
 be/src/exec/aggregation-node.cc                 |  5 ++--
 be/src/exec/aggregator.cc                       |  5 ++++
 be/src/exec/aggregator.h                        |  6 ++++
 be/src/exec/grouping-aggregator.cc              |  7 ++---
 be/src/exec/grouping-aggregator.h               |  6 ----
 be/src/exec/non-grouping-aggregator.cc          |  2 ++
 be/src/exec/streaming-aggregation-node.cc       |  5 ++--
 .../spilling-regression-exhaustive.test         | 30 ++++++++++++++++++++
 8 files changed, 51 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e2aafae2/be/src/exec/aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
index d25284d..2c95590 100644
--- a/be/src/exec/aggregation-node.cc
+++ b/be/src/exec/aggregation-node.cc
@@ -78,7 +78,6 @@ Status AggregationNode::Open(RuntimeState* state) {
   bool eos = false;
   do {
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(QueryMaintenance(state));
     RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos));
     RETURN_IF_ERROR(aggregator_->AddBatch(state, &batch));
     batch.Reset();
@@ -98,7 +97,6 @@ Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
-  RETURN_IF_ERROR(QueryMaintenance(state));
 
   if (ReachedLimit()) {
     *eos = true;
@@ -118,6 +116,9 @@ Status AggregationNode::Reset(RuntimeState* state) {
 
 void AggregationNode::Close(RuntimeState* state) {
   if (is_closed()) return;
+  // All expr mem allocations should happen in the Aggregator.
+  DCHECK(expr_results_pool() == nullptr
+      || expr_results_pool()->total_allocated_bytes() == 0);
   aggregator_->Close(state);
   ExecNode::Close(state);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e2aafae2/be/src/exec/aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc
index 70178cc..87abc52 100644
--- a/be/src/exec/aggregator.cc
+++ b/be/src/exec/aggregator.cc
@@ -196,6 +196,11 @@ Tuple* Aggregator::GetOutputTuple(
   return dst;
 }
 
+Status Aggregator::QueryMaintenance(RuntimeState* state) {
+  expr_results_pool_->Clear();
+  return state->CheckQueryState();
+}
+
 // IR Generation for updating a single aggregation slot. Signature is:
 // void UpdateSlot(AggFnEvaluator* agg_expr_eval, AggTuple* agg_tuple, char** row)
 //

http://git-wip-us.apache.org/repos/asf/impala/blob/e2aafae2/be/src/exec/aggregator.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
index ab13d45..f415606 100644
--- a/be/src/exec/aggregator.h
+++ b/be/src/exec/aggregator.h
@@ -188,6 +188,12 @@ class Aggregator {
   Tuple* GetOutputTuple(
       const std::vector<AggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool);
 
+  /// Clears 'expr_results_pool_' and returns the result of state->CheckQueryState().
+  /// Aggregators should call this periodically, e.g. once per input row batch. This
+  /// should not be called outside the main execution thread.
+  /// TODO: IMPALA-2399: replace QueryMaintenance() - see JIRA for more details.
+  Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT;
+
   /// Codegen for updating aggregate expressions agg_fns_[agg_fn_idx]
   /// and returns the IR function in 'fn'. Returns non-OK status if codegen
   /// is unsuccessful.

http://git-wip-us.apache.org/repos/asf/impala/blob/e2aafae2/be/src/exec/grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 0eb4a3f..4f3e5cf 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -228,6 +228,7 @@ Status GroupingAggregator::Open(RuntimeState* state) {
 }
 
 Status GroupingAggregator::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+  RETURN_IF_ERROR(QueryMaintenance(state));
   if (!partition_eos_) {
     RETURN_IF_ERROR(GetRowsFromPartition(state, row_batch));
   }
@@ -405,6 +406,7 @@ void GroupingAggregator::Close(RuntimeState* state) {
 
 Status GroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(build_timer_);
+  RETURN_IF_ERROR(QueryMaintenance(state));
   num_input_rows_ += batch->num_rows();
 
   TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
@@ -952,11 +954,6 @@ int64_t GroupingAggregator::MinReservation() const {
       + resource_profile_.max_row_buffer_size * 2;
 }
 
-Status GroupingAggregator::QueryMaintenance(RuntimeState* state) {
-  expr_results_pool_->Clear();
-  return state->CheckQueryState();
-}
-
 BufferPool::ClientHandle* GroupingAggregator::buffer_pool_client() {
   return reservation_manager_.buffer_pool_client();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e2aafae2/be/src/exec/grouping-aggregator.h
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index 0d1b893..b766a1e 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -587,12 +587,6 @@ class GroupingAggregator : public Aggregator {
   void CleanupHashTbl(
       const std::vector<AggFnEvaluator*>& agg_fn_evals, HashTable::Iterator it);
 
-  /// Clears 'expr_results_pool_' and returns the result of state->CheckQueryState().
-  /// Aggregators should call this periodically, e.g. once per input row batch. This
-  /// should not be called outside the main execution thread.
-  /// TODO: IMPALA-2399: replace QueryMaintenance() - see JIRA for more details.
-  Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT;
-
   /// Codegen the non-streaming add row batch loop. The loop has already been compiled to
   /// IR and loaded into the codegen object. UpdateAggTuple has also been codegen'd to IR.
   /// This function will modify the loop subsituting the statically compiled functions

http://git-wip-us.apache.org/repos/asf/impala/blob/e2aafae2/be/src/exec/non-grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/non-grouping-aggregator.cc b/be/src/exec/non-grouping-aggregator.cc
index 585c264..1ee4e46 100644
--- a/be/src/exec/non-grouping-aggregator.cc
+++ b/be/src/exec/non-grouping-aggregator.cc
@@ -72,6 +72,7 @@ Status NonGroupingAggregator::Open(RuntimeState* state) {
 
 Status NonGroupingAggregator::GetNext(
     RuntimeState* state, RowBatch* row_batch, bool* eos) {
+  RETURN_IF_ERROR(QueryMaintenance(state));
   // There was no grouping, so evaluate the conjuncts and return the single result row.
   // We allow calling GetNext() after eos, so don't return this row again.
   if (!singleton_output_tuple_returned_) GetSingletonOutput(row_batch);
@@ -116,6 +117,7 @@ void NonGroupingAggregator::Close(RuntimeState* state) {
 
 Status NonGroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(build_timer_);
+  RETURN_IF_ERROR(QueryMaintenance(state));
 
   if (add_batch_impl_fn_ != nullptr) {
     RETURN_IF_ERROR(add_batch_impl_fn_(this, batch));

http://git-wip-us.apache.org/repos/asf/impala/blob/e2aafae2/be/src/exec/streaming-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/streaming-aggregation-node.cc b/be/src/exec/streaming-aggregation-node.cc
index 4ad7820..c1e9184 100644
--- a/be/src/exec/streaming-aggregation-node.cc
+++ b/be/src/exec/streaming-aggregation-node.cc
@@ -80,7 +80,6 @@ Status StreamingAggregationNode::GetNext(
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
-  RETURN_IF_ERROR(QueryMaintenance(state));
 
   if (ReachedLimit()) {
     *eos = true;
@@ -113,7 +112,6 @@ Status StreamingAggregationNode::GetRowsStreaming(
   do {
     DCHECK_EQ(out_batch->num_rows(), 0);
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(QueryMaintenance(state));
 
     RETURN_IF_ERROR(child(0)->GetNext(state, child_batch_.get(), &child_eos_));
 
@@ -137,6 +135,9 @@ Status StreamingAggregationNode::Reset(RuntimeState* state) {
 
 void StreamingAggregationNode::Close(RuntimeState* state) {
   if (is_closed()) return;
+  // All expr mem allocations should happen in the Aggregator.
+  DCHECK(expr_results_pool() == nullptr
+      || expr_results_pool()->total_allocated_bytes() == 0);
   aggregator_->Close(state);
   child_batch_.reset();
   ExecNode::Close(state);

http://git-wip-us.apache.org/repos/asf/impala/blob/e2aafae2/testdata/workloads/functional-query/queries/QueryTest/spilling-regression-exhaustive.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-regression-exhaustive.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-regression-exhaustive.test
index 03a1010..1f3bf24 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-regression-exhaustive.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-regression-exhaustive.test
@@ -245,6 +245,36 @@ BIGINT
 row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
 ====
 ---- QUERY
+# Same as above, except disable streaming preaggs to ensure that AggregationNode is also
+# releasing local memory allocations as appropriate.
+set mem_limit=800m;
+set num_scanner_threads=1;
+set disable_streaming_preaggregations=true;
+select count(distinct concat(cast(l_comment as char(120)), cast(l_comment as char(120)),
+                             cast(l_comment as char(120)), cast(l_comment as char(120)),
+                             cast(l_comment as char(120)), cast(l_comment as char(120))))
+from lineitem
+---- RESULTS
+4502054
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+# Verify that the agg spilled.
+row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# Same as above, except use a non-grouping aggregate function to ensure that
+# NonGroupingAggregator is also releasing local memory allocations as appropriate.
+set mem_limit=50m;
+set num_scanner_threads=1;
+select min(regexp_replace(l_comment, ".", "x"))
+from lineitem
+---- RESULTS
+'xxxxxxxxxx'
+---- TYPES
+STRING
+====
+---- QUERY
 # IMPALA-3304: test that avg() can spill with a query mem limit.
 # This test only covers that use FIXED_UDA_INTERMEDIATE, not functions that allocate
 # strings for intermediate values. mem_limit is tuned to reproduce the issue on a 3-node