You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/25 15:45:21 UTC

[impala] 09/20: IMPALA-7256: Aggregator mem usage isn't reflected in summary

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8304591be3b056caa6acef54fa4ac629152e493e
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Thu Jul 19 02:14:15 2018 +0000

    IMPALA-7256: Aggregator mem usage isn't reflected in summary
    
    This patch fixes a bug where memory used by an Aggregator wasn't being
    reflected in the exec summary for the corresponding aggregation node
    by ensuring that the Aggregator's MemTracker is a child of the node's
    MemTracker.
    
    Testing:
    - Manually ran a query and checked that all memory used by the
      Aggregator is accounted for in the exec summary.
    
    Change-Id: Iba6ef207bed47810fc742aec3481db5f313cf97f
    Reviewed-on: http://gerrit.cloudera.org:8080/10989
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/aggregator.cc             |  3 ++-
 be/src/exec/aggregator.h              |  1 +
 be/src/exec/exec-node.cc              |  3 ++-
 be/src/exec/grouping-aggregator.cc    | 11 ++++++++---
 be/src/exec/grouping-aggregator.h     |  1 +
 be/src/runtime/reservation-manager.cc |  9 +++++----
 be/src/runtime/reservation-manager.h  |  5 ++++-
 7 files changed, 23 insertions(+), 10 deletions(-)

diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc
index 87abc52..9e79bf4 100644
--- a/be/src/exec/aggregator.cc
+++ b/be/src/exec/aggregator.cc
@@ -46,6 +46,7 @@ const char* Aggregator::LLVM_CLASS_NAME = "class.impala::Aggregator";
 Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode,
     const DescriptorTbl& descs, const std::string& name)
   : id_(exec_node->id()),
+    exec_node_(exec_node),
     pool_(pool),
     intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
     intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)),
@@ -82,7 +83,7 @@ Status Aggregator::Init(const TPlanNode& tnode, RuntimeState* state) {
 
 Status Aggregator::Prepare(RuntimeState* state) {
   mem_tracker_.reset(new MemTracker(
-      runtime_profile_, -1, runtime_profile_->name(), state->instance_mem_tracker()));
+      runtime_profile_, -1, runtime_profile_->name(), exec_node_->mem_tracker()));
   expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false));
   expr_perm_pool_.reset(new MemPool(expr_mem_tracker_.get()));
   expr_results_pool_.reset(new MemPool(expr_mem_tracker_.get()));
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
index f415606..1ea3c47 100644
--- a/be/src/exec/aggregator.h
+++ b/be/src/exec/aggregator.h
@@ -93,6 +93,7 @@ class Aggregator {
  protected:
   /// The id of the ExecNode this Aggregator corresponds to.
   int id_;
+  ExecNode* exec_node_;
   ObjectPool* pool_;
 
   /// Account for peak memory used by this aggregator.
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 5dca184..e187179 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -129,7 +129,8 @@ Status ExecNode::Prepare(RuntimeState* state) {
   }
   reservation_manager_.Init(
       Substitute("$0 id=$1 ptr=$2", PrintThriftEnum(type_), id_, this), runtime_profile_,
-      mem_tracker_.get(), resource_profile_, debug_options_);
+      state->instance_buffer_reservation(), mem_tracker_.get(), resource_profile_,
+      debug_options_);
   return Status::OK();
 }
 
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 4f3e5cf..092ecfd 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -175,9 +175,13 @@ Status GroupingAggregator::Prepare(RuntimeState* state) {
       MAX_PARTITION_DEPTH, 1, expr_perm_pool_.get(), expr_results_pool_.get(),
       expr_results_pool_.get(), &ht_ctx_));
 
-  reservation_manager_.Init(
-      Substitute("GroupingAggregator id=$0 ptr=$1", id_, this), runtime_profile_,
-      mem_tracker_.get(), resource_profile_, debug_options_);
+  reservation_tracker_.reset(new ReservationTracker);
+  reservation_tracker_->InitChildTracker(runtime_profile_,
+      state->instance_buffer_reservation(), exec_node_->mem_tracker(),
+      numeric_limits<int64_t>::max());
+  reservation_manager_.Init(Substitute("GroupingAggregator id=$0 ptr=$1", id_, this),
+      runtime_profile_, reservation_tracker_.get(), mem_tracker_.get(),
+      resource_profile_, debug_options_);
   return Status::OK();
 }
 
@@ -400,6 +404,7 @@ void GroupingAggregator::Close(RuntimeState* state) {
   ScalarExpr::Close(build_exprs_);
 
   reservation_manager_.Close(state);
+  if (reservation_tracker_ != nullptr) reservation_tracker_->Close();
   // Must be called after tuple_pool_ is freed, so that mem_tracker_ can be closed.
   Aggregator::Close(state);
 }
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index b766a1e..3d64711 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -216,6 +216,7 @@ class GroupingAggregator : public Aggregator {
   /// Resource information sent from the frontend.
   const TBackendResourceProfile resource_profile_;
 
+  std::unique_ptr<ReservationTracker> reservation_tracker_;
   ReservationManager reservation_manager_;
   BufferPool::ClientHandle* buffer_pool_client();
 
diff --git a/be/src/runtime/reservation-manager.cc b/be/src/runtime/reservation-manager.cc
index f16b48f..1712e09 100644
--- a/be/src/runtime/reservation-manager.cc
+++ b/be/src/runtime/reservation-manager.cc
@@ -29,10 +29,11 @@ using strings::Substitute;
 namespace impala {
 
 void ReservationManager::Init(string name, RuntimeProfile* runtime_profile,
-    MemTracker* mem_tracker, const TBackendResourceProfile& resource_profile,
-    const TDebugOptions& debug_options) {
+    ReservationTracker* parent_reservation, MemTracker* mem_tracker,
+    const TBackendResourceProfile& resource_profile, const TDebugOptions& debug_options) {
   name_ = name;
   runtime_profile_ = runtime_profile;
+  parent_reservation_ = parent_reservation;
   mem_tracker_ = mem_tracker;
   resource_profile_ = resource_profile;
   debug_options_ = debug_options;
@@ -60,8 +61,8 @@ Status ReservationManager::ClaimBufferReservation(RuntimeState* state) {
   }
 
   RETURN_IF_ERROR(buffer_pool->RegisterClient(name_, state->query_state()->file_group(),
-      state->instance_buffer_reservation(), mem_tracker_,
-      resource_profile_.max_reservation, runtime_profile_, &buffer_pool_client_));
+      parent_reservation_, mem_tracker_, resource_profile_.max_reservation,
+      runtime_profile_, &buffer_pool_client_));
   VLOG_FILE << name_ << " claiming reservation " << resource_profile_.min_reservation;
   state->query_state()->initial_reservations()->Claim(
       &buffer_pool_client_, resource_profile_.min_reservation);
diff --git a/be/src/runtime/reservation-manager.h b/be/src/runtime/reservation-manager.h
index 0545548..10563df 100644
--- a/be/src/runtime/reservation-manager.h
+++ b/be/src/runtime/reservation-manager.h
@@ -40,7 +40,8 @@ class ReservationManager {
   /// and error messages, 'debug_options' is used for the SET_DENY_RESERVATION_PROBABILITY
   /// action. Does not take ownership of 'runtime_profile' and 'mem_tracker'. Must be
   /// called before ClaimBufferReservation().
-  void Init(std::string name, RuntimeProfile* runtime_profile, MemTracker* mem_tracker,
+  void Init(std::string name, RuntimeProfile* runtime_profile,
+      ReservationTracker* parent_reservation, MemTracker* mem_tracker,
       const TBackendResourceProfile& resource_profile,
       const TDebugOptions& debug_options);
   void Close(RuntimeState* state);
@@ -72,6 +73,8 @@ class ReservationManager {
 
   MemTracker* mem_tracker_;
 
+  ReservationTracker* parent_reservation_;
+
   /// Buffer pool client for this node. Initialized with the node's minimum reservation
   /// in ClaimBufferReservation(). After initialization, the client must hold onto at
   /// least the minimum reservation so that it can be returned to the initial