You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/25 15:45:21 UTC
[impala] 09/20: IMPALA-7256: Aggregator mem usage isn't reflected
in summary
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 8304591be3b056caa6acef54fa4ac629152e493e
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Thu Jul 19 02:14:15 2018 +0000
IMPALA-7256: Aggregator mem usage isn't reflected in summary
This patch fixes a bug where memory used by an Aggregator wasn't being
reflected in the exec summary for the corresponding aggregation node
by ensuring that the Aggregator's MemTracker is a child of the node's
MemTracker.
Testing:
- Manually ran a query and checked that all memory used by the
Aggregator is accounted for in the exec summary.
Change-Id: Iba6ef207bed47810fc742aec3481db5f313cf97f
Reviewed-on: http://gerrit.cloudera.org:8080/10989
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/aggregator.cc | 3 ++-
be/src/exec/aggregator.h | 1 +
be/src/exec/exec-node.cc | 3 ++-
be/src/exec/grouping-aggregator.cc | 11 ++++++++---
be/src/exec/grouping-aggregator.h | 1 +
be/src/runtime/reservation-manager.cc | 9 +++++----
be/src/runtime/reservation-manager.h | 5 ++++-
7 files changed, 23 insertions(+), 10 deletions(-)
diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc
index 87abc52..9e79bf4 100644
--- a/be/src/exec/aggregator.cc
+++ b/be/src/exec/aggregator.cc
@@ -46,6 +46,7 @@ const char* Aggregator::LLVM_CLASS_NAME = "class.impala::Aggregator";
Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs, const std::string& name)
: id_(exec_node->id()),
+ exec_node_(exec_node),
pool_(pool),
intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)),
@@ -82,7 +83,7 @@ Status Aggregator::Init(const TPlanNode& tnode, RuntimeState* state) {
Status Aggregator::Prepare(RuntimeState* state) {
mem_tracker_.reset(new MemTracker(
- runtime_profile_, -1, runtime_profile_->name(), state->instance_mem_tracker()));
+ runtime_profile_, -1, runtime_profile_->name(), exec_node_->mem_tracker()));
expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false));
expr_perm_pool_.reset(new MemPool(expr_mem_tracker_.get()));
expr_results_pool_.reset(new MemPool(expr_mem_tracker_.get()));
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
index f415606..1ea3c47 100644
--- a/be/src/exec/aggregator.h
+++ b/be/src/exec/aggregator.h
@@ -93,6 +93,7 @@ class Aggregator {
protected:
/// The id of the ExecNode this Aggregator corresponds to.
int id_;
+ ExecNode* exec_node_;
ObjectPool* pool_;
/// Account for peak memory used by this aggregator.
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 5dca184..e187179 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -129,7 +129,8 @@ Status ExecNode::Prepare(RuntimeState* state) {
}
reservation_manager_.Init(
Substitute("$0 id=$1 ptr=$2", PrintThriftEnum(type_), id_, this), runtime_profile_,
- mem_tracker_.get(), resource_profile_, debug_options_);
+ state->instance_buffer_reservation(), mem_tracker_.get(), resource_profile_,
+ debug_options_);
return Status::OK();
}
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 4f3e5cf..092ecfd 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -175,9 +175,13 @@ Status GroupingAggregator::Prepare(RuntimeState* state) {
MAX_PARTITION_DEPTH, 1, expr_perm_pool_.get(), expr_results_pool_.get(),
expr_results_pool_.get(), &ht_ctx_));
- reservation_manager_.Init(
- Substitute("GroupingAggregator id=$0 ptr=$1", id_, this), runtime_profile_,
- mem_tracker_.get(), resource_profile_, debug_options_);
+ reservation_tracker_.reset(new ReservationTracker);
+ reservation_tracker_->InitChildTracker(runtime_profile_,
+ state->instance_buffer_reservation(), exec_node_->mem_tracker(),
+ numeric_limits<int64_t>::max());
+ reservation_manager_.Init(Substitute("GroupingAggregator id=$0 ptr=$1", id_, this),
+ runtime_profile_, reservation_tracker_.get(), mem_tracker_.get(),
+ resource_profile_, debug_options_);
return Status::OK();
}
@@ -400,6 +404,7 @@ void GroupingAggregator::Close(RuntimeState* state) {
ScalarExpr::Close(build_exprs_);
reservation_manager_.Close(state);
+ if (reservation_tracker_ != nullptr) reservation_tracker_->Close();
// Must be called after tuple_pool_ is freed, so that mem_tracker_ can be closed.
Aggregator::Close(state);
}
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index b766a1e..3d64711 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -216,6 +216,7 @@ class GroupingAggregator : public Aggregator {
/// Resource information sent from the frontend.
const TBackendResourceProfile resource_profile_;
+ std::unique_ptr<ReservationTracker> reservation_tracker_;
ReservationManager reservation_manager_;
BufferPool::ClientHandle* buffer_pool_client();
diff --git a/be/src/runtime/reservation-manager.cc b/be/src/runtime/reservation-manager.cc
index f16b48f..1712e09 100644
--- a/be/src/runtime/reservation-manager.cc
+++ b/be/src/runtime/reservation-manager.cc
@@ -29,10 +29,11 @@ using strings::Substitute;
namespace impala {
void ReservationManager::Init(string name, RuntimeProfile* runtime_profile,
- MemTracker* mem_tracker, const TBackendResourceProfile& resource_profile,
- const TDebugOptions& debug_options) {
+ ReservationTracker* parent_reservation, MemTracker* mem_tracker,
+ const TBackendResourceProfile& resource_profile, const TDebugOptions& debug_options) {
name_ = name;
runtime_profile_ = runtime_profile;
+ parent_reservation_ = parent_reservation;
mem_tracker_ = mem_tracker;
resource_profile_ = resource_profile;
debug_options_ = debug_options;
@@ -60,8 +61,8 @@ Status ReservationManager::ClaimBufferReservation(RuntimeState* state) {
}
RETURN_IF_ERROR(buffer_pool->RegisterClient(name_, state->query_state()->file_group(),
- state->instance_buffer_reservation(), mem_tracker_,
- resource_profile_.max_reservation, runtime_profile_, &buffer_pool_client_));
+ parent_reservation_, mem_tracker_, resource_profile_.max_reservation,
+ runtime_profile_, &buffer_pool_client_));
VLOG_FILE << name_ << " claiming reservation " << resource_profile_.min_reservation;
state->query_state()->initial_reservations()->Claim(
&buffer_pool_client_, resource_profile_.min_reservation);
diff --git a/be/src/runtime/reservation-manager.h b/be/src/runtime/reservation-manager.h
index 0545548..10563df 100644
--- a/be/src/runtime/reservation-manager.h
+++ b/be/src/runtime/reservation-manager.h
@@ -40,7 +40,8 @@ class ReservationManager {
/// and error messages, 'debug_options' is used for the SET_DENY_RESERVATION_PROBABILITY
/// action. Does not take ownership of 'runtime_profile' and 'mem_tracker'. Must be
/// called before ClaimBufferReservation().
- void Init(std::string name, RuntimeProfile* runtime_profile, MemTracker* mem_tracker,
+ void Init(std::string name, RuntimeProfile* runtime_profile,
+ ReservationTracker* parent_reservation, MemTracker* mem_tracker,
const TBackendResourceProfile& resource_profile,
const TDebugOptions& debug_options);
void Close(RuntimeState* state);
@@ -72,6 +73,8 @@ class ReservationManager {
MemTracker* mem_tracker_;
+ ReservationTracker* parent_reservation_;
+
/// Buffer pool client for this node. Initialized with the node's minimum reservation
/// in ClaimBufferReservation(). After initialization, the client must hold onto at
/// least the minimum reservation so that it can be returned to the initial