You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/20 20:20:14 UTC
[4/5] incubator-impala git commit: IMPALA-5895: clean up runtime
profile lifecycle
IMPALA-5895: clean up runtime profile lifecycle
Require callers to explicitly stop counter updating instead of doing it
in the destructor. This replaces ad-hoc logic to stop individual
counters.
Track which counters need to be stopped in separate lists instead of
stopping everything.
Force all RuntimeProfiles to use ObjectPools in a uniform way - the
profile, its counters and its children all are allocated from the
same pool. This is done via a new Create() method.
Consolidate 'time_series_counter_map_lock_' and 'counter_map_lock_'
to reduce complexity of the locking scheme. I didn't see any benefit
to sharding the locks in this way - there are only two time series
counters per fragment instance, which a small fraction of the
total number of counters.
Change-Id: I45c39ac36c8e3c277213d32f5ae5f14be6b7f0df
Reviewed-on: http://gerrit.cloudera.org:8080/8069
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7866eec5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7866eec5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7866eec5
Branch: refs/heads/master
Commit: 7866eec5bdcbf9194a4aad2c87c354cbaad7b802
Parents: 741b052
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Sep 5 15:51:49 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 20 08:48:38 2017 +0000
----------------------------------------------------------------------
be/src/benchmarks/hash-benchmark.cc | 8 +-
be/src/codegen/llvm-codegen.cc | 32 +--
be/src/codegen/llvm-codegen.h | 4 +-
be/src/exec/data-sink.cc | 2 +-
be/src/exec/data-source-scan-node.cc | 4 +-
be/src/exec/exec-node.cc | 15 +-
be/src/exec/exec-node.h | 7 +-
be/src/exec/hash-table-test.cc | 2 +-
be/src/exec/hbase-scan-node.cc | 2 +-
be/src/exec/hdfs-scan-node-base.cc | 26 +--
be/src/exec/hdfs-scan-node-base.h | 11 +
be/src/exec/kudu-scan-node-base.cc | 15 --
be/src/exec/kudu-scan-node-base.h | 13 --
be/src/exec/kudu-scan-node-mt.cc | 4 +-
be/src/exec/scan-node.cc | 8 +
be/src/exec/scan-node.h | 18 +-
be/src/experiments/data-provider-test.cc | 6 +-
be/src/experiments/tuple-splitter-test.cc | 8 +-
be/src/runtime/buffered-tuple-stream-test.cc | 2 +-
.../runtime/bufferpool/buffer-allocator-test.cc | 2 +-
be/src/runtime/bufferpool/buffer-pool-test.cc | 2 +-
.../bufferpool/reservation-tracker-test.cc | 2 +-
be/src/runtime/bufferpool/suballocator-test.cc | 8 +-
be/src/runtime/coordinator-backend-state.cc | 8 +-
be/src/runtime/coordinator.cc | 4 +-
be/src/runtime/coordinator.h | 6 +-
be/src/runtime/data-stream-recvr.cc | 32 ++-
be/src/runtime/data-stream-recvr.h | 7 +-
be/src/runtime/data-stream-test.cc | 7 +-
be/src/runtime/fragment-instance-state.cc | 12 +-
be/src/runtime/query-state.cc | 2 +-
be/src/runtime/runtime-state.cc | 11 +-
be/src/runtime/runtime-state.h | 4 +-
be/src/runtime/tmp-file-mgr-test.cc | 2 +-
be/src/service/client-request-state.cc | 95 ++++----
be/src/service/client-request-state.h | 12 +-
be/src/service/impala-server.cc | 12 +-
be/src/util/dummy-runtime-profile.h | 6 +-
be/src/util/periodic-counter-updater.cc | 15 +-
be/src/util/periodic-counter-updater.h | 9 +-
be/src/util/runtime-profile-test.cc | 220 +++++++++----------
be/src/util/runtime-profile.cc | 131 +++++++----
be/src/util/runtime-profile.h | 94 +++++---
43 files changed, 460 insertions(+), 430 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/benchmarks/hash-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/hash-benchmark.cc b/be/src/benchmarks/hash-benchmark.cc
index dadad25..b183915 100644
--- a/be/src/benchmarks/hash-benchmark.cc
+++ b/be/src/benchmarks/hash-benchmark.cc
@@ -440,10 +440,10 @@ int main(int argc, char **argv) {
MemTracker tracker;
MemPool mem_pool(&tracker);
- RuntimeProfile int_profile(state->obj_pool(), "IntGen");
- RuntimeProfile mixed_profile(state->obj_pool(), "MixedGen");
- DataProvider int_provider(&mem_pool, &int_profile);
- DataProvider mixed_provider(&mem_pool, &mixed_profile);
+ RuntimeProfile* int_profile = RuntimeProfile::Create(state->obj_pool(), "IntGen");
+ RuntimeProfile* mixed_profile = RuntimeProfile::Create(state->obj_pool(), "MixedGen");
+ DataProvider int_provider(&mem_pool, int_profile);
+ DataProvider mixed_provider(&mem_pool, mixed_profile);
scoped_ptr<LlvmCodeGen> codegen;
status = LlvmCodeGen::CreateImpalaCodegen(state, NULL, "test", &codegen);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 6f8b156..5503969 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -170,8 +170,8 @@ LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* pool,
MemTracker* parent_mem_tracker, const string& id)
: state_(state),
id_(id),
- profile_(pool, "CodeGen"),
- mem_tracker_(pool->Add(new MemTracker(&profile_, -1, "CodeGen", parent_mem_tracker))),
+ profile_(RuntimeProfile::Create(pool, "CodeGen")),
+ mem_tracker_(pool->Add(new MemTracker(profile_, -1, "CodeGen", parent_mem_tracker))),
optimizations_enabled_(false),
is_corrupt_(false),
is_compiled_(false),
@@ -181,21 +181,21 @@ LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* pool,
loaded_functions_(IRFunction::FN_END, NULL) {
DCHECK(llvm_initialized_) << "Must call LlvmCodeGen::InitializeLlvm first.";
- load_module_timer_ = ADD_TIMER(&profile_, "LoadTime");
- prepare_module_timer_ = ADD_TIMER(&profile_, "PrepareTime");
- module_bitcode_size_ = ADD_COUNTER(&profile_, "ModuleBitcodeSize", TUnit::BYTES);
- codegen_timer_ = ADD_TIMER(&profile_, "CodegenTime");
- optimization_timer_ = ADD_TIMER(&profile_, "OptimizationTime");
- compile_timer_ = ADD_TIMER(&profile_, "CompileTime");
- num_functions_ = ADD_COUNTER(&profile_, "NumFunctions", TUnit::UNIT);
- num_instructions_ = ADD_COUNTER(&profile_, "NumInstructions", TUnit::UNIT);
+ load_module_timer_ = ADD_TIMER(profile_, "LoadTime");
+ prepare_module_timer_ = ADD_TIMER(profile_, "PrepareTime");
+ module_bitcode_size_ = ADD_COUNTER(profile_, "ModuleBitcodeSize", TUnit::BYTES);
+ codegen_timer_ = ADD_TIMER(profile_, "CodegenTime");
+ optimization_timer_ = ADD_TIMER(profile_, "OptimizationTime");
+ compile_timer_ = ADD_TIMER(profile_, "CompileTime");
+ num_functions_ = ADD_COUNTER(profile_, "NumFunctions", TUnit::UNIT);
+ num_instructions_ = ADD_COUNTER(profile_, "NumInstructions", TUnit::UNIT);
}
Status LlvmCodeGen::CreateFromFile(RuntimeState* state, ObjectPool* pool,
MemTracker* parent_mem_tracker, const string& file, const string& id,
scoped_ptr<LlvmCodeGen>* codegen) {
codegen->reset(new LlvmCodeGen(state, pool, parent_mem_tracker, id));
- SCOPED_TIMER((*codegen)->profile_.total_time_counter());
+ SCOPED_TIMER((*codegen)->profile_->total_time_counter());
unique_ptr<Module> loaded_module;
RETURN_IF_ERROR((*codegen)->LoadModuleFromFile(file, &loaded_module));
@@ -206,7 +206,7 @@ Status LlvmCodeGen::CreateFromFile(RuntimeState* state, ObjectPool* pool,
Status LlvmCodeGen::CreateFromMemory(RuntimeState* state, ObjectPool* pool,
MemTracker* parent_mem_tracker, const string& id, scoped_ptr<LlvmCodeGen>* codegen) {
codegen->reset(new LlvmCodeGen(state, pool, parent_mem_tracker, id));
- SCOPED_TIMER((*codegen)->profile_.total_time_counter());
+ SCOPED_TIMER((*codegen)->profile_->total_time_counter());
// Select the appropriate IR version. We cannot use LLVM IR with SSE4.2 instructions on
// a machine without SSE4.2 support.
@@ -276,7 +276,7 @@ Status LlvmCodeGen::LoadModuleFromMemory(unique_ptr<MemoryBuffer> module_ir_buf,
Status LlvmCodeGen::LinkModule(const string& file) {
if (linked_modules_.find(file) != linked_modules_.end()) return Status::OK();
- SCOPED_TIMER(profile_.total_time_counter());
+ SCOPED_TIMER(profile_->total_time_counter());
unique_ptr<Module> new_module;
RETURN_IF_ERROR(LoadModuleFromFile(file, &new_module));
@@ -324,7 +324,7 @@ Status LlvmCodeGen::CreateImpalaCodegen(RuntimeState* state,
LlvmCodeGen* codegen = codegen_ret->get();
// Parse module for cross compiled functions and types
- SCOPED_TIMER(codegen->profile_.total_time_counter());
+ SCOPED_TIMER(codegen->profile_->total_time_counter());
SCOPED_TIMER(codegen->prepare_module_timer_);
// Get type for StringValue
@@ -620,7 +620,7 @@ Status LlvmCodeGen::MaterializeFunctionHelper(Function *fn) {
}
Status LlvmCodeGen::MaterializeFunction(Function *fn) {
- SCOPED_TIMER(profile_.total_time_counter());
+ SCOPED_TIMER(profile_->total_time_counter());
SCOPED_TIMER(prepare_module_timer_);
return MaterializeFunctionHelper(fn);
}
@@ -1037,7 +1037,7 @@ Status LlvmCodeGen::FinalizeModule() {
}
if (is_corrupt_) return Status("Module is corrupt.");
- SCOPED_TIMER(profile_.total_time_counter());
+ SCOPED_TIMER(profile_->total_time_counter());
// Don't waste time optimizing module if there are no functions to JIT. This can happen
// if the codegen object is created but no functions are successfully codegen'd.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/codegen/llvm-codegen.h
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index 8dce330..8aa9f2b 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -159,7 +159,7 @@ class LlvmCodeGen {
/// any other API methods after calling close.
void Close();
- RuntimeProfile* runtime_profile() { return &profile_; }
+ RuntimeProfile* runtime_profile() { return profile_; }
RuntimeProfile::Counter* codegen_timer() { return codegen_timer_; }
/// Turns on/off optimization passes
@@ -669,7 +669,7 @@ class LlvmCodeGen {
std::string id_;
/// Codegen counters
- RuntimeProfile profile_;
+ RuntimeProfile* const profile_;
/// MemTracker used for tracking memory consumed by codegen. Connected to a parent
/// MemTracker if one was provided during initialization. Owned by the ObjectPool
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index fe23694..c173ed5 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -175,7 +175,7 @@ string DataSink::OutputDmlStats(const PartitionStatusMap& stats,
Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
DCHECK(parent_mem_tracker != NULL);
- profile_ = state->obj_pool()->Add(new RuntimeProfile(state->obj_pool(), GetName()));
+ profile_ = RuntimeProfile::Create(state->obj_pool(), GetName());
const string& name = GetName();
mem_tracker_.reset(new MemTracker(profile_, -1, name, parent_mem_tracker));
expr_mem_tracker_.reset(
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index 01e0dbe..78ba492 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -365,15 +365,13 @@ Status DataSourceScanNode::Reset(RuntimeState* state) {
void DataSourceScanNode::Close(RuntimeState* state) {
if (is_closed()) return;
SCOPED_TIMER(runtime_profile_->total_time_counter());
- PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
- PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
input_batch_.reset();
TCloseParams params;
params.__set_scan_handle(scan_handle_);
TCloseResult result;
Status status = data_source_executor_->Close(params, &result);
if (!status.ok()) state->LogError(status.msg());
- ExecNode::Close(state);
+ ScanNode::Close(state);
}
void DataSourceScanNode::DebugString(int indentation_level, stringstream* out) const {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 94e9ed1..b656d2b 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -130,12 +130,14 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
debug_action_(TDebugAction::WAIT),
limit_(tnode.limit),
num_rows_returned_(0),
+ runtime_profile_(RuntimeProfile::Create(pool_,
+ Substitute("$0 (id=$1)", PrintPlanNodeType(tnode.node_type), id_))),
rows_returned_counter_(NULL),
rows_returned_rate_(NULL),
containing_subplan_(NULL),
disable_codegen_(tnode.disable_codegen),
is_closed_(false) {
- InitRuntimeProfile(PrintPlanNodeType(tnode.node_type));
+ runtime_profile_->set_metadata(id_);
}
ExecNode::~ExecNode() {
@@ -149,8 +151,8 @@ Status ExecNode::Init(const TPlanNode& tnode, RuntimeState* state) {
Status ExecNode::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::PREPARE, state));
- DCHECK(runtime_profile_.get() != NULL);
- mem_tracker_.reset(new MemTracker(runtime_profile_.get(), -1, runtime_profile_->name(),
+ DCHECK(runtime_profile_ != NULL);
+ mem_tracker_.reset(new MemTracker(runtime_profile_, -1, runtime_profile_->name(),
state->instance_mem_tracker()));
expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false));
expr_mem_pool_.reset(new MemPool(expr_mem_tracker_.get()));
@@ -462,13 +464,6 @@ void ExecNode::CollectScanNodes(vector<ExecNode*>* nodes) {
CollectNodes(TPlanNodeType::KUDU_SCAN_NODE, nodes);
}
-void ExecNode::InitRuntimeProfile(const string& name) {
- stringstream ss;
- ss << name << " (id=" << id_ << ")";
- runtime_profile_.reset(new RuntimeProfile(pool_, ss.str()));
- runtime_profile_->set_metadata(id_);
-}
-
Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* state) {
DCHECK_EQ(debug_phase_, phase);
if (debug_action_ == TDebugAction::FAIL) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 2f3f714..7cba6ac 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -208,7 +208,7 @@ class ExecNode {
int64_t limit() const { return limit_; }
bool ReachedLimit() { return limit_ != -1 && num_rows_returned_ >= limit_; }
- RuntimeProfile* runtime_profile() { return runtime_profile_.get(); }
+ RuntimeProfile* runtime_profile() { return runtime_profile_; }
MemTracker* mem_tracker() { return mem_tracker_.get(); }
MemTracker* expr_mem_tracker() { return expr_mem_tracker_.get(); }
MemPool* expr_mem_pool() { return expr_mem_pool_.get(); }
@@ -310,7 +310,8 @@ class ExecNode {
int64_t limit_; // -1: no limit
int64_t num_rows_returned_;
- boost::scoped_ptr<RuntimeProfile> runtime_profile_;
+ /// Runtime profile for this node. Owned by the QueryState's ObjectPool.
+ RuntimeProfile* const runtime_profile_;
RuntimeProfile::Counter* rows_returned_counter_;
RuntimeProfile::Counter* rows_returned_rate_;
@@ -354,8 +355,6 @@ class ExecNode {
virtual bool IsScanNode() const { return false; }
- void InitRuntimeProfile(const std::string& name);
-
/// Executes 'debug_action_' if 'phase' matches 'debug_phase_'.
/// 'phase' must not be INVALID.
Status ExecDebugAction(
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 816e6cf..0b99cbd 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -233,7 +233,7 @@ class HashTableTest : public testing::Test {
int64_t block_size = 8 * 1024 * 1024, int max_num_blocks = 100,
int initial_reserved_blocks = 10, int64_t suballocator_buffer_len = 64 * 1024) {
BufferPool* buffer_pool = test_env_->exec_env()->buffer_pool();
- RuntimeProfile* profile = pool_.Add(new RuntimeProfile(&pool_, "ht"));
+ RuntimeProfile* profile = RuntimeProfile::Create(&pool_, "ht");
// Set up memory tracking for the hash table.
MemTracker* client_tracker =
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hbase-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index e783731..a74d4b3 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -279,7 +279,7 @@ void HBaseScanNode::Close(RuntimeState* state) {
JNIEnv* env = getJNIEnv();
hbase_scanner_->Close(env);
}
- ExecNode::Close(state);
+ ScanNode::Close(state);
}
void HBaseScanNode::DebugString(int indentation_level, stringstream* out) const {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index ca71201..b5169a8 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -97,7 +97,8 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
thrift_dict_filter_conjuncts_map_(
tnode.hdfs_scan_node.__isset.dictionary_filter_conjuncts ?
&tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr),
- disks_accessed_bitmap_(TUnit::UNIT, 0) {
+ disks_accessed_bitmap_(TUnit::UNIT, 0),
+ active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
}
HdfsScanNodeBase::~HdfsScanNodeBase() {
@@ -142,8 +143,8 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
filter_ctx.filter = state->filter_bank()->RegisterFilter(filter_desc, false);
string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id,
PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
- RuntimeProfile* profile = state->obj_pool()->Add(
- new RuntimeProfile(state->obj_pool(), filter_profile_title));
+ RuntimeProfile* profile =
+ RuntimeProfile::Create(state->obj_pool(), filter_profile_title);
runtime_profile_->AddChild(profile);
filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile,
target.is_bound_by_partition_columns));
@@ -441,12 +442,8 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
max_compressed_text_file_length_ = runtime_profile()->AddHighWaterMarkCounter(
"MaxCompressedTextFileLength", TUnit::BYTES);
- for (int i = 0; i < state->io_mgr()->num_total_disks() + 1; ++i) {
- hdfs_read_thread_concurrency_bucket_.push_back(
- pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
- }
- runtime_profile()->RegisterBucketingCounters(&active_hdfs_read_thread_counter_,
- &hdfs_read_thread_concurrency_bucket_);
+ hdfs_read_thread_concurrency_bucket_ = runtime_profile()->AddBucketingCounters(
+ &active_hdfs_read_thread_counter_, state->io_mgr()->num_total_disks() + 1);
counters_running_ = true;
@@ -851,18 +848,13 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
if (!counters_running_) return;
counters_running_ = false;
- PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
- PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
- PeriodicCounterUpdater::StopSamplingCounter(average_scanner_thread_concurrency_);
- PeriodicCounterUpdater::StopSamplingCounter(average_hdfs_read_thread_concurrency_);
- PeriodicCounterUpdater::StopBucketingCounters(&hdfs_read_thread_concurrency_bucket_,
- true);
+ runtime_profile_->StopPeriodicCounters();
// Output hdfs read thread concurrency into info string
stringstream ss;
- for (int i = 0; i < hdfs_read_thread_concurrency_bucket_.size(); ++i) {
+ for (int i = 0; i < hdfs_read_thread_concurrency_bucket_->size(); ++i) {
ss << i << ":" << setprecision(4)
- << hdfs_read_thread_concurrency_bucket_[i]->double_value() << "% ";
+ << (*hdfs_read_thread_concurrency_bucket_)[i]->double_value() << "% ";
}
runtime_profile_->AddInfoString("Hdfs Read Thread Concurrency Bucket", ss.str());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index e1c431f..e33de5a 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -469,6 +469,17 @@ class HdfsScanNodeBase : public ScanNode {
/// Total number of file handle opens where the file handle was not in the cache
RuntimeProfile::Counter* cached_file_handles_miss_count_ = nullptr;
+ /// The number of active hdfs reading threads reading for this node.
+ RuntimeProfile::Counter active_hdfs_read_thread_counter_;
+
+ /// Average number of active hdfs reading threads
+ /// This should be created in Open() and stopped when all the scanner threads are done.
+ RuntimeProfile::Counter* average_hdfs_read_thread_concurrency_ = nullptr;
+
+ /// HDFS read thread concurrency bucket: bucket[i] refers to the number of sample
+ /// taken where there are i concurrent hdfs read thread running. Created in Open().
+ std::vector<RuntimeProfile::Counter*>* hdfs_read_thread_concurrency_bucket_ = nullptr;
+
/// Pool for allocating some amounts of memory that is shared between scanners.
/// e.g. partition key tuple and their string buffers
boost::scoped_ptr<MemPool> scan_node_pool_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/kudu-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc
index d587660..526374c 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -52,7 +52,6 @@ KuduScanNodeBase::KuduScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
: ScanNode(pool, tnode, descs),
tuple_id_(tnode.kudu_scan_node.tuple_id),
client_(nullptr),
- counters_running_(false),
next_scan_token_idx_(0) {
DCHECK(KuduIsAvailable());
}
@@ -69,7 +68,6 @@ Status KuduScanNodeBase::Prepare(RuntimeState* state) {
ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT);
kudu_round_trips_ = ADD_COUNTER(runtime_profile(), KUDU_ROUND_TRIPS, TUnit::UNIT);
kudu_remote_tokens_ = ADD_COUNTER(runtime_profile(), KUDU_REMOTE_TOKENS, TUnit::UNIT);
- counters_running_ = true;
DCHECK(state->desc_tbl().GetTupleDescriptor(tuple_id_) != NULL);
tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
@@ -108,12 +106,6 @@ Status KuduScanNodeBase::Open(RuntimeState* state) {
return Status::OK();
}
-void KuduScanNodeBase::Close(RuntimeState* state) {
- if (is_closed()) return;
- StopAndFinalizeCounters();
- ExecNode::Close(state);
-}
-
void KuduScanNodeBase::DebugString(int indentation_level, stringstream* out) const {
string indent(indentation_level * 2, ' ');
*out << indent << "KuduScanNode(tupleid=" << tuple_id_ << ")";
@@ -129,12 +121,5 @@ const string* KuduScanNodeBase::GetNextScanToken() {
return token;
}
-void KuduScanNodeBase::StopAndFinalizeCounters() {
- if (!counters_running_) return;
- counters_running_ = false;
-
- PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
- PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
-}
} // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/kudu-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h
index 49af13c..08289e1 100644
--- a/be/src/exec/kudu-scan-node-base.h
+++ b/be/src/exec/kudu-scan-node-base.h
@@ -42,7 +42,6 @@ class KuduScanNodeBase : public ScanNode {
virtual Status Prepare(RuntimeState* state);
virtual Status Open(RuntimeState* state);
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) = 0;
- virtual void Close(RuntimeState* state);
protected:
virtual void DebugString(int indentation_level, std::stringstream* out) const;
@@ -59,14 +58,6 @@ class KuduScanNodeBase : public ScanNode {
RuntimeState* runtime_state_;
- /// Stops periodic counters and aggregates counter values for the entire scan node.
- /// This should be called as soon as the scan node is complete to get the most accurate
- /// counter values.
- /// This can be called multiple times, subsequent calls will be ignored.
- /// This must be called on Close() to unregister counters.
- /// Scan nodes with a RowBatch queue may have to synchronize calls to this function.
- void StopAndFinalizeCounters();
-
private:
friend class KuduScanner;
@@ -83,10 +74,6 @@ class KuduScanNodeBase : public ScanNode {
/// Kudu table reference. Shared between scanner threads for KuduScanNode.
kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
- /// If true, counters are actively running and need to be reported in the runtime
- /// profile.
- bool counters_running_;
-
/// Set of scan tokens to be deserialized into Kudu scanners.
std::vector<std::string> scan_tokens_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/kudu-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-mt.cc b/be/src/exec/kudu-scan-node-mt.cc
index 8723daa..2cb7619 100644
--- a/be/src/exec/kudu-scan-node-mt.cc
+++ b/be/src/exec/kudu-scan-node-mt.cc
@@ -62,7 +62,7 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
if (scan_token_ == nullptr) {
scan_token_ = GetNextScanToken();
if (scan_token_ == nullptr) {
- StopAndFinalizeCounters();
+ runtime_profile_->StopPeriodicCounters();
scanner_->Close();
scanner_.reset();
*eos = true;
@@ -85,7 +85,7 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
num_rows_returned_ -= num_rows_over;
scan_token_ = nullptr;
- StopAndFinalizeCounters();
+ runtime_profile_->StopPeriodicCounters();
scanner_->Close();
scanner_.reset();
*eos = true;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 0df0c3f..d09bb6b 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -68,4 +68,12 @@ Status ScanNode::Prepare(RuntimeState* state) {
return Status::OK();
}
+void ScanNode::Close(RuntimeState* state) {
+ if (is_closed()) return;
+ // ScanNode::Prepare() started periodic counters including 'total_throughput_counter_'
+ // and 'bytes_read_timeseries_counter_'. Subclasses may also have started counters.
+ runtime_profile_->StopPeriodicCounters();
+ ExecNode::Close(state);
+}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 0f73c2b..4b11361 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -82,11 +82,14 @@ class ScanNode : public ExecNode {
ScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
scan_range_params_(NULL),
- active_scanner_thread_counter_(TUnit::UNIT, 0),
- active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {}
+ active_scanner_thread_counter_(TUnit::UNIT, 0) {}
virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
+ /// Stops all periodic counters and calls ExecNode::Close(). Subclasses of ScanNode can
+ /// start periodic counters and rely on this function stopping them.
+ virtual void Close(RuntimeState* state);
+
/// This should be called before Prepare(), and the argument must be not destroyed until
/// after Prepare().
void SetScanRanges(const std::vector<TScanRangeParams>& scan_range_params) {
@@ -172,18 +175,7 @@ class ScanNode : public ExecNode {
/// This should be created in Open and stopped when all the scanner threads are done.
RuntimeProfile::Counter* average_scanner_thread_concurrency_;
- /// The number of active hdfs reading threads reading for this node.
- RuntimeProfile::Counter active_hdfs_read_thread_counter_;
-
- /// Average number of active hdfs reading threads
- /// This should be created in Open and stopped when all the scanner threads are done.
- RuntimeProfile::Counter* average_hdfs_read_thread_concurrency_;
-
RuntimeProfile::Counter* num_scanner_threads_started_counter_;
-
- /// HDFS read thread concurrency bucket: bucket[i] refers to the number of sample
- /// taken where there are i concurrent hdfs read thread running
- std::vector<RuntimeProfile::Counter*> hdfs_read_thread_concurrency_bucket_;
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/experiments/data-provider-test.cc
----------------------------------------------------------------------
diff --git a/be/src/experiments/data-provider-test.cc b/be/src/experiments/data-provider-test.cc
index 9f9e0cb..3c5a92f 100644
--- a/be/src/experiments/data-provider-test.cc
+++ b/be/src/experiments/data-provider-test.cc
@@ -55,11 +55,11 @@ int main(int argc, char **argv) {
cols.push_back(DataProvider::ColDesc::Create<StringValue>(min_str, max_str));
ObjectPool obj_pool;
- RuntimeProfile profile(&obj_pool, "DataGenTest");
+ RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool, "DataGenTest");
MemTracker tracker;
MemPool pool(&tracker);
- DataProvider provider(&pool, &profile);
+ DataProvider provider(&pool, profile);
provider.Reset(20, 2, cols);
int rows;
void* data;
@@ -70,7 +70,7 @@ int main(int argc, char **argv) {
provider.Print(&cout, reinterpret_cast<char*>(data), rows);
}
- profile.PrettyPrint(&cout);
+ profile->PrettyPrint(&cout);
cout << endl << "Done." << endl;
return 0;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/experiments/tuple-splitter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/experiments/tuple-splitter-test.cc b/be/src/experiments/tuple-splitter-test.cc
index 87a52bd..7d68f8e 100644
--- a/be/src/experiments/tuple-splitter-test.cc
+++ b/be/src/experiments/tuple-splitter-test.cc
@@ -380,14 +380,14 @@ int main(int argc, char **argv) {
MemTracker tracker;
MemPool pool(&tracker);
ObjectPool obj_pool;
- RuntimeProfile profile(&obj_pool, "PartitioningTest");
+ RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool, "PartitioningTest");
- DataProvider provider(&pool, &profile);
+ DataProvider provider(&pool, profile);
provider.Reset(50*1024*1024, 1024, cols);
//provider.Reset(100*1024, 1024, cols);
//provider.Reset(100, 1024, cols);
- DataPartitioner partitioner(&pool, &profile, provider.row_size(), 0);
+ DataPartitioner partitioner(&pool, profile, provider.row_size(), 0);
cout << "Begin processing: " << provider.total_rows() << endl;
int rows;
@@ -437,7 +437,7 @@ int main(int argc, char **argv) {
cout << "Largest Partition: " << largest_partition << endl;;
cout << endl;
- profile.PrettyPrint(&cout);
+ profile->PrettyPrint(&cout);
LOG(ERROR) << "Done.";
return 0;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 0b89498..08ce7c3 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -132,7 +132,7 @@ class SimpleTupleStreamTest : public testing::Test {
ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_));
query_state_ = runtime_state_->query_state();
- RuntimeProfile* client_profile = pool_.Add(new RuntimeProfile(&pool_, "client"));
+ RuntimeProfile* client_profile = RuntimeProfile::Create(&pool_, "client");
MemTracker* client_tracker =
pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
ASSERT_OK(test_env_->exec_env()->buffer_pool()->RegisterClient("",
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/buffer-allocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator-test.cc b/be/src/runtime/bufferpool/buffer-allocator-test.cc
index 6427648..21a9c08 100644
--- a/be/src/runtime/bufferpool/buffer-allocator-test.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator-test.cc
@@ -42,7 +42,7 @@ class BufferAllocatorTest : public ::testing::Test {
dummy_pool_ = obj_pool_.Add(new BufferPool(1, 0, 0));
dummy_reservation_.InitRootTracker(nullptr, 0);
ASSERT_OK(dummy_pool_->RegisterClient("", nullptr, &dummy_reservation_, nullptr, 0,
- obj_pool_.Add(new RuntimeProfile(&obj_pool_, "")), &dummy_client_));
+ RuntimeProfile::Create(&obj_pool_, ""), &dummy_client_));
}
virtual void TearDown() {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 06ff827..720dc13 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -139,7 +139,7 @@ class BufferPoolTest : public ::testing::Test {
}
RuntimeProfile* NewProfile() {
- return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
+ return RuntimeProfile::Create(&obj_pool_, "test profile");
}
/// Create a new file group with the default configs.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index 0d57488..3fc0e0b 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -44,7 +44,7 @@ class ReservationTrackerTest : public ::testing::Test {
protected:
RuntimeProfile* NewProfile() {
- return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
+ return RuntimeProfile::Create(&obj_pool_, "test profile");
}
ObjectPool obj_pool_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/suballocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator-test.cc b/be/src/runtime/bufferpool/suballocator-test.cc
index 32acfaf..6cd53fb 100644
--- a/be/src/runtime/bufferpool/suballocator-test.cc
+++ b/be/src/runtime/bufferpool/suballocator-test.cc
@@ -45,7 +45,7 @@ class SuballocatorTest : public ::testing::Test {
public:
virtual void SetUp() override {
RandTestUtil::SeedRng("SUBALLOCATOR_TEST_SEED", &rng_);
- profile_.reset(new RuntimeProfile(&obj_pool_, "test profile"));
+ profile_ = RuntimeProfile::Create(&obj_pool_, "test profile");
}
virtual void TearDown() override {
@@ -55,7 +55,6 @@ class SuballocatorTest : public ::testing::Test {
clients_.clear();
buffer_pool_.reset();
global_reservation_.Close();
- profile_.reset();
obj_pool_.Clear();
}
@@ -78,7 +77,7 @@ class SuballocatorTest : public ::testing::Test {
clients_.push_back(make_unique<BufferPool::ClientHandle>());
*client = clients_.back().get();
ASSERT_OK(buffer_pool_->RegisterClient("test client", NULL, parent_reservation, NULL,
- numeric_limits<int64_t>::max(), profile(), *client));
+ numeric_limits<int64_t>::max(), profile_, *client));
}
/// Assert that the memory for all of the suballocations is writable and disjoint by
@@ -97,7 +96,6 @@ class SuballocatorTest : public ::testing::Test {
EXPECT_EQ(client->GetUsedReservation(), 0) << client->DebugString();
}
- RuntimeProfile* profile() { return profile_.get(); }
BufferPool* buffer_pool() { return buffer_pool_.get(); }
/// Pool for objects with per-test lifetime. Cleared after every test.
@@ -114,7 +112,7 @@ class SuballocatorTest : public ::testing::Test {
vector<unique_ptr<BufferPool::ClientHandle>> clients_;
/// Global profile - recreated for every test.
- scoped_ptr<RuntimeProfile> profile_;
+ RuntimeProfile* profile_;
/// Per-test random number generator. Seeded before every test.
mt19937 rng_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 1b7fd20..0ee4bd7 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -433,7 +433,7 @@ Coordinator::BackendState::InstanceStats::InstanceStats(
total_ranges_complete_(0) {
const string& profile_name = Substitute("Instance $0 (host=$1)",
PrintId(exec_params.instance_id), lexical_cast<string>(exec_params.host));
- profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name));
+ profile_ = RuntimeProfile::Create(obj_pool, profile_name);
fragment_stats->root_profile()->AddChild(profile_);
// add total split size to fragment_stats->bytes_assigned()
@@ -514,10 +514,8 @@ void Coordinator::BackendState::InstanceStats::Update(
Coordinator::FragmentStats::FragmentStats(const string& avg_profile_name,
const string& root_profile_name, int num_instances, ObjectPool* obj_pool)
- : avg_profile_(
- obj_pool->Add(new RuntimeProfile(obj_pool, avg_profile_name, true))),
- root_profile_(
- obj_pool->Add(new RuntimeProfile(obj_pool, root_profile_name))),
+ : avg_profile_(RuntimeProfile::Create(obj_pool, avg_profile_name, true)),
+ root_profile_(RuntimeProfile::Create(obj_pool, root_profile_name)),
num_instances_(num_instances) {
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index c8df1f5..e022a21 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -123,8 +123,8 @@ Status Coordinator::Exec() {
query_ctx_.__set_desc_tbl(request.desc_tbl);
query_ctx_.__set_request_pool(schedule_.request_pool());
- query_profile_.reset(
- new RuntimeProfile(obj_pool(), "Execution Profile " + PrintId(query_id())));
+ query_profile_ =
+ RuntimeProfile::Create(obj_pool(), "Execution Profile " + PrintId(query_id()));
finalization_timer_ = ADD_TIMER(query_profile_, "FinalizationTimer");
filter_updates_received_ = ADD_COUNTER(query_profile_, "FiltersReceived", TUnit::UNIT);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index e67ef13..5802c83 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -150,7 +150,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// Get cumulative profile aggregated over all fragments of the query.
/// This is a snapshot of the current state of execution and will change in
/// the future if not all fragments have finished execution.
- RuntimeProfile* query_profile() const { return query_profile_.get(); }
+ RuntimeProfile* query_profile() const { return query_profile_; }
const TUniqueId& query_id() const;
@@ -278,8 +278,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
ExecSummary exec_summary_;
- /// Aggregate counters for the entire query.
- boost::scoped_ptr<RuntimeProfile> query_profile_;
+ /// Aggregate counters for the entire query. Lives in 'obj_pool_'.
+ RuntimeProfile* query_profile_ = nullptr;
/// Protects all fields below. This is held while making RPCs, so this lock should
/// only be acquired if the acquiring thread is prepared to wait for a significant
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
index 35076f5..d8150eb 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -37,7 +37,7 @@ namespace impala {
// rows from all senders are placed in the same queue.
class DataStreamRecvr::SenderQueue {
public:
- SenderQueue(DataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile* profile);
+ SenderQueue(DataStreamRecvr* parent_recvr, int num_senders);
// Return the next batch from this sender queue. Sets the returned batch in cur_batch_.
// A returned batch that is not filled to capacity does *not* indicate
@@ -102,8 +102,7 @@ class DataStreamRecvr::SenderQueue {
bool received_first_batch_;
};
-DataStreamRecvr::SenderQueue::SenderQueue(DataStreamRecvr* parent_recvr, int num_senders,
- RuntimeProfile* profile)
+DataStreamRecvr::SenderQueue::SenderQueue(DataStreamRecvr* parent_recvr, int num_senders)
: recvr_(parent_recvr),
is_cancelled_(false),
num_remaining_senders_(num_senders),
@@ -242,8 +241,6 @@ void DataStreamRecvr::SenderQueue::Cancel() {
// notice that the stream is cancelled and handle it.
data_arrival_cv_.notify_all();
data_removal__cv_.notify_all();
- PeriodicCounterUpdater::StopTimeSeriesCounter(
- recvr_->bytes_received_time_series_counter_);
}
void DataStreamRecvr::SenderQueue::Close() {
@@ -286,7 +283,7 @@ void DataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) {
DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int num_senders, bool is_merging, int64_t total_buffer_limit,
- RuntimeProfile* profile)
+ RuntimeProfile* parent_profile)
: mgr_(stream_mgr),
fragment_instance_id_(fragment_instance_id),
dest_node_id_(dest_node_id),
@@ -294,30 +291,28 @@ DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_t
row_desc_(row_desc),
is_merging_(is_merging),
num_buffered_bytes_(0),
- profile_(profile) {
+ profile_(parent_profile->CreateChild("DataStreamReceiver")) {
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;
sender_queues_.reserve(num_queues);
int num_sender_per_queue = is_merging ? 1 : num_senders;
for (int i = 0; i < num_queues; ++i) {
SenderQueue* queue = sender_queue_pool_.Add(new SenderQueue(this,
- num_sender_per_queue, profile));
+ num_sender_per_queue));
sender_queues_.push_back(queue);
}
- RuntimeProfile* child_profile = profile_->CreateChild("DataStreamReceiver");
- mem_tracker_.reset(
- new MemTracker(child_profile, -1, "DataStreamRecvr", parent_tracker));
+ mem_tracker_.reset(new MemTracker(profile_, -1, "DataStreamRecvr", parent_tracker));
// Initialize the counters
- bytes_received_counter_ = ADD_COUNTER(child_profile, "BytesReceived", TUnit::BYTES);
+ bytes_received_counter_ = ADD_COUNTER(profile_, "BytesReceived", TUnit::BYTES);
bytes_received_time_series_counter_ =
- ADD_TIME_SERIES_COUNTER(child_profile, "BytesReceived", bytes_received_counter_);
- deserialize_row_batch_timer_ = ADD_TIMER(child_profile, "DeserializeRowBatchTimer");
- buffer_full_wall_timer_ = ADD_TIMER(child_profile, "SendersBlockedTimer");
- buffer_full_total_timer_ = ADD_TIMER(child_profile, "SendersBlockedTotalTimer(*)");
- data_arrival_timer_ = child_profile->inactive_timer();
- first_batch_wait_total_timer_ = ADD_TIMER(child_profile, "FirstBatchArrivalWaitTime");
+ ADD_TIME_SERIES_COUNTER(profile_, "BytesReceived", bytes_received_counter_);
+ deserialize_row_batch_timer_ = ADD_TIMER(profile_, "DeserializeRowBatchTimer");
+ buffer_full_wall_timer_ = ADD_TIMER(profile_, "SendersBlockedTimer");
+ buffer_full_total_timer_ = ADD_TIMER(profile_, "SendersBlockedTotalTimer(*)");
+ data_arrival_timer_ = profile_->inactive_timer();
+ first_batch_wait_total_timer_ = ADD_TIMER(profile_, "FirstBatchArrivalWaitTime");
}
Status DataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
@@ -354,6 +349,7 @@ void DataStreamRecvr::Close() {
}
merger_.reset();
mem_tracker_->Close();
+ profile_->StopPeriodicCounters();
}
DataStreamRecvr::~DataStreamRecvr() {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.h b/be/src/runtime/data-stream-recvr.h
index fad588d..9545f82 100644
--- a/be/src/runtime/data-stream-recvr.h
+++ b/be/src/runtime/data-stream-recvr.h
@@ -105,7 +105,7 @@ class DataStreamRecvr : public DataStreamRecvrBase {
DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int num_senders, bool is_merging,
- int64_t total_buffer_limit, RuntimeProfile* profile);
+ int64_t total_buffer_limit, RuntimeProfile* parent_profile);
/// Add a new batch of rows to the appropriate sender queue, blocking if the queue is
/// full. Called from DataStreamMgr.
@@ -161,8 +161,9 @@ class DataStreamRecvr : public DataStreamRecvrBase {
/// Pool of sender queues.
ObjectPool sender_queue_pool_;
- /// Runtime profile storing the counters below.
- RuntimeProfile* profile_;
+ /// Runtime profile storing the counters below. Child of 'parent_profile' passed into
+ /// constructor.
+ RuntimeProfile* const profile_;
/// Number of bytes received
RuntimeProfile::Counter* bytes_received_counter_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 5ea6756..8e85894 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -337,8 +337,7 @@ class DataStreamTest : public testing::Test {
void StartReceiver(TPartitionType::type stream_type, int num_senders, int receiver_num,
int buffer_size, bool is_merging, TUniqueId* out_id = nullptr) {
VLOG_QUERY << "start receiver";
- RuntimeProfile* profile =
- obj_pool_.Add(new RuntimeProfile(&obj_pool_, "TestReceiver"));
+ RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
TUniqueId instance_id;
GetNextInstanceId(&instance_id);
receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num));
@@ -607,13 +606,13 @@ TEST_F(DataStreamTest, BasicTest) {
// TODO: Make lifecycle requirements more explicit.
TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), &exec_env_));
- scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, "TestReceiver"));
+ RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
// Start just one receiver.
TUniqueId instance_id;
GetNextInstanceId(&instance_id);
shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(
- runtime_state.get(), row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile.get(),
+ runtime_state.get(), row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile,
false);
// Perform tear down, but keep a reference to the receiver so that it is deleted last
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 74f5495..a7d3c86 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -123,8 +123,8 @@ Status FragmentInstanceState::Prepare() {
// total_time_counter() is in the runtime_state_ so start it up now.
SCOPED_TIMER(profile()->total_time_counter());
- timings_profile_ = obj_pool()->Add(
- new RuntimeProfile(obj_pool(), "Fragment Instance Lifecycle Timings"));
+ timings_profile_ =
+ RuntimeProfile::Create(obj_pool(), "Fragment Instance Lifecycle Timings");
profile()->AddChild(timings_profile_);
SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME));
@@ -289,12 +289,8 @@ void FragmentInstanceState::Close() {
// guard against partially-finished Prepare()
if (sink_ != nullptr) sink_->Close(runtime_state_);
- // disconnect mem_usage_sampled_counter_ from the periodic updater before
- // RuntimeState::ReleaseResources(), it references the instance memtracker
- if (mem_usage_sampled_counter_ != nullptr) {
- PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_);
- mem_usage_sampled_counter_ = nullptr;
- }
+ // Stop updating profile counters in background.
+ profile()->StopPeriodicCounters();
// We need to delete row_batch_ here otherwise we can't delete the instance_mem_tracker_
// in runtime_state_->ReleaseResources().
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 4c5eb17..ea24411 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -169,7 +169,7 @@ Status QueryState::InitBufferPoolState() {
// TODO: once there's a mechanism for reporting non-fragment-local profiles,
// should make sure to report this profile so it's not going into a black hole.
- RuntimeProfile* dummy_profile = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "dummy"));
+ RuntimeProfile* dummy_profile = RuntimeProfile::Create(&obj_pool_, "dummy");
// Only create file group if spilling is enabled.
if (query_options().scratch_limit != 0 && !query_ctx_.disable_spilling) {
file_group_ = obj_pool_.Add(
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 8f48439..0565cf5 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -69,7 +69,8 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
utc_timestamp_(new TimestampValue(TimestampValue::Parse(
query_state->query_ctx().utc_timestamp_string))),
exec_env_(exec_env),
- profile_(obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)),
+ profile_(RuntimeProfile::Create(
+ obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id))),
instance_buffer_reservation_(new ReservationTracker) {
Init();
}
@@ -83,7 +84,7 @@ RuntimeState::RuntimeState(
now_(new TimestampValue(TimestampValue::Parse(qctx.now_string))),
utc_timestamp_(new TimestampValue(TimestampValue::Parse(qctx.utc_timestamp_string))),
exec_env_(exec_env),
- profile_(obj_pool(), "<unnamed>") {
+ profile_(RuntimeProfile::Create(obj_pool(), "<unnamed>")) {
if (query_ctx().request_pool.empty()) {
const_cast<TQueryCtx&>(query_ctx()).request_pool = "test-pool";
}
@@ -96,7 +97,7 @@ RuntimeState::~RuntimeState() {
}
void RuntimeState::Init() {
- SCOPED_TIMER(profile_.total_time_counter());
+ SCOPED_TIMER(profile_->total_time_counter());
// Register with the thread mgr
resource_pool_ = exec_env_->thread_mgr()->RegisterPool();
@@ -111,7 +112,7 @@ void RuntimeState::Init() {
runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker()));
if (instance_buffer_reservation_ != nullptr) {
- instance_buffer_reservation_->InitChildTracker(&profile_,
+ instance_buffer_reservation_->InitChildTracker(profile_,
query_state_->buffer_reservation(), instance_mem_tracker_.get(),
numeric_limits<int64_t>::max());
}
@@ -127,7 +128,7 @@ Status RuntimeState::CreateCodegen() {
RETURN_IF_ERROR(LlvmCodeGen::CreateImpalaCodegen(this,
instance_mem_tracker_.get(), PrintId(fragment_instance_id()), &codegen_));
codegen_->EnableOptimizations(true);
- profile_.AddChild(codegen_->runtime_profile());
+ profile_->AddChild(codegen_->runtime_profile());
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 49ea9a6..3da9f8c 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -149,7 +149,7 @@ class RuntimeState {
PartitionStatusMap* per_partition_status() { return &per_partition_status_; }
/// Returns runtime state profile
- RuntimeProfile* runtime_profile() { return &profile_; }
+ RuntimeProfile* runtime_profile() { return profile_; }
/// Returns the LlvmCodeGen object for this fragment instance.
LlvmCodeGen* codegen() { return codegen_.get(); }
@@ -354,7 +354,7 @@ class RuntimeState {
/// Records summary statistics for the results of inserts into Hdfs partitions.
PartitionStatusMap per_partition_status_;
- RuntimeProfile profile_;
+ RuntimeProfile* const profile_;
/// Total time waiting in storage (across all threads)
RuntimeProfile::Counter* total_storage_wait_timer_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 1a5eb58..5f482ba 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -51,7 +51,7 @@ class TmpFileMgrTest : public ::testing::Test {
public:
virtual void SetUp() {
metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
- profile_ = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "tmp-file-mgr-test"));
+ profile_ = RuntimeProfile::Create(&obj_pool_, "tmp-file-mgr-test");
test_env_.reset(new TestEnv);
ASSERT_OK(test_env_->Init());
cb_counter_ = 0;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 2a5b379..ef6a69d 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -74,9 +74,10 @@ ClientRequestState::ClientRequestState(
schedule_(NULL),
coord_(NULL),
result_cache_max_size_(-1),
- profile_(&profile_pool_, "Query"), // assign name w/ id after planning
- server_profile_(&profile_pool_, "ImpalaServer"),
- summary_profile_(&profile_pool_, "Summary"),
+ // Profile is assigned name w/ id after planning
+ profile_(RuntimeProfile::Create(&profile_pool_, "Query")),
+ server_profile_(RuntimeProfile::Create(&profile_pool_, "ImpalaServer")),
+ summary_profile_(RuntimeProfile::Create(&profile_pool_, "Summary")),
is_cancelled_(false),
eos_(false),
query_state_(beeswax::QueryState::CREATED),
@@ -89,36 +90,36 @@ ClientRequestState::ClientRequestState(
parent_server_(server),
start_time_(TimestampValue::LocalTime()) {
#ifndef NDEBUG
- profile_.AddInfoString("DEBUG MODE WARNING", "Query profile created while running a "
+ profile_->AddInfoString("DEBUG MODE WARNING", "Query profile created while running a "
"DEBUG build of Impala. Use RELEASE builds to measure query performance.");
#endif
- row_materialization_timer_ = ADD_TIMER(&server_profile_, "RowMaterializationTimer");
- client_wait_timer_ = ADD_TIMER(&server_profile_, "ClientFetchWaitTimer");
- query_events_ = summary_profile_.AddEventSequence("Query Timeline");
+ row_materialization_timer_ = ADD_TIMER(server_profile_, "RowMaterializationTimer");
+ client_wait_timer_ = ADD_TIMER(server_profile_, "ClientFetchWaitTimer");
+ query_events_ = summary_profile_->AddEventSequence("Query Timeline");
query_events_->Start();
- profile_.AddChild(&summary_profile_);
+ profile_->AddChild(summary_profile_);
- profile_.set_name("Query (id=" + PrintId(query_id()) + ")");
- summary_profile_.AddInfoString("Session ID", PrintId(session_id()));
- summary_profile_.AddInfoString("Session Type", PrintTSessionType(session_type()));
+ profile_->set_name("Query (id=" + PrintId(query_id()) + ")");
+ summary_profile_->AddInfoString("Session ID", PrintId(session_id()));
+ summary_profile_->AddInfoString("Session Type", PrintTSessionType(session_type()));
if (session_type() == TSessionType::HIVESERVER2) {
- summary_profile_.AddInfoString("HiveServer2 Protocol Version",
+ summary_profile_->AddInfoString("HiveServer2 Protocol Version",
Substitute("V$0", 1 + session->hs2_version));
}
- summary_profile_.AddInfoString("Start Time", start_time().ToString());
- summary_profile_.AddInfoString("End Time", "");
- summary_profile_.AddInfoString("Query Type", "N/A");
- summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
- summary_profile_.AddInfoString("Query Status", "OK");
- summary_profile_.AddInfoString("Impala Version", GetVersionString(/* compact */ true));
- summary_profile_.AddInfoString("User", effective_user());
- summary_profile_.AddInfoString("Connected User", connected_user());
- summary_profile_.AddInfoString("Delegated User", do_as_user());
- summary_profile_.AddInfoString("Network Address",
+ summary_profile_->AddInfoString("Start Time", start_time().ToString());
+ summary_profile_->AddInfoString("End Time", "");
+ summary_profile_->AddInfoString("Query Type", "N/A");
+ summary_profile_->AddInfoString("Query State", PrintQueryState(query_state_));
+ summary_profile_->AddInfoString("Query Status", "OK");
+ summary_profile_->AddInfoString("Impala Version", GetVersionString(/* compact */ true));
+ summary_profile_->AddInfoString("User", effective_user());
+ summary_profile_->AddInfoString("Connected User", connected_user());
+ summary_profile_->AddInfoString("Delegated User", do_as_user());
+ summary_profile_->AddInfoString("Network Address",
lexical_cast<string>(session_->network_address));
- summary_profile_.AddInfoString("Default Db", default_db());
- summary_profile_.AddInfoString("Sql Statement", query_ctx_.client_request.stmt);
- summary_profile_.AddInfoString("Coordinator",
+ summary_profile_->AddInfoString("Default Db", default_db());
+ summary_profile_->AddInfoString("Sql Statement", query_ctx_.client_request.stmt);
+ summary_profile_->AddInfoString("Coordinator",
TNetworkAddressToString(exec_env->backend_address()));
}
@@ -144,11 +145,11 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) {
MarkActive();
exec_request_ = *exec_request;
- profile_.AddChild(&server_profile_);
- summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type()));
- summary_profile_.AddInfoString("Query Options (set by configuration)",
+ profile_->AddChild(server_profile_);
+ summary_profile_->AddInfoString("Query Type", PrintTStmtType(stmt_type()));
+ summary_profile_->AddInfoString("Query Options (set by configuration)",
DebugQueryOptions(query_ctx_.client_request.query_options));
- summary_profile_.AddInfoString("Query Options (set by configuration and planner)",
+ summary_profile_->AddInfoString("Query Options (set by configuration and planner)",
DebugQueryOptions(exec_request_.query_options));
switch (exec_request->stmt_type) {
@@ -182,7 +183,7 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) {
reset_req.reset_metadata_params.__set_table_name(
exec_request_.load_data_request.table_name);
catalog_op_executor_.reset(
- new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
+ new CatalogOpExecutor(exec_env_, frontend_, server_profile_));
RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req));
RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
*catalog_op_executor_->update_catalog_result(),
@@ -298,7 +299,7 @@ Status ClientRequestState::ExecLocalCatalogOp(
// Verify the user has privileges to perform this operation by checking against
// the Sentry Service (via the Catalog Server).
catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
- &server_profile_));
+ server_profile_));
TSentryAdminCheckRequest req;
req.__set_header(TCatalogServiceRequestHeader());
@@ -319,7 +320,7 @@ Status ClientRequestState::ExecLocalCatalogOp(
// Verify the user has privileges to perform this operation by checking against
// the Sentry Service (via the Catalog Server).
catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
- &server_profile_));
+ server_profile_));
TSentryAdminCheckRequest req;
req.__set_header(TCatalogServiceRequestHeader());
@@ -392,13 +393,13 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
plan_ss << "\n----------------\n"
<< query_exec_request.query_plan
<< "----------------";
- summary_profile_.AddInfoString("Plan", plan_ss.str());
+ summary_profile_->AddInfoString("Plan", plan_ss.str());
}
// Add info strings consumed by CM: Estimated mem and tables missing stats.
if (query_exec_request.__isset.per_host_mem_estimate) {
stringstream ss;
ss << query_exec_request.per_host_mem_estimate;
- summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str());
+ summary_profile_->AddInfoString(PER_HOST_MEM_KEY, ss.str());
}
if (!query_exec_request.query_ctx.__isset.parent_query_id &&
query_exec_request.query_ctx.__isset.tables_missing_stats &&
@@ -409,7 +410,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
if (i != 0) ss << ",";
ss << tbls[i].db_name << "." << tbls[i].table_name;
}
- summary_profile_.AddInfoString(TABLES_MISSING_STATS_KEY, ss.str());
+ summary_profile_->AddInfoString(TABLES_MISSING_STATS_KEY, ss.str());
}
if (!query_exec_request.query_ctx.__isset.parent_query_id &&
@@ -422,7 +423,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
if (i != 0) ss << ",";
ss << tbls[i].db_name << "." << tbls[i].table_name;
}
- summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
+ summary_profile_->AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
}
if (query_exec_request.query_ctx.__isset.tables_missing_diskids &&
@@ -434,7 +435,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
if (i != 0) ss << ",";
ss << tbls[i].db_name << "." << tbls[i].table_name;
}
- summary_profile_.AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str());
+ summary_profile_->AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str());
}
{
@@ -442,7 +443,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
// Don't start executing the query if Cancel() was called concurrently with Exec().
if (is_cancelled_) return Status::CANCELLED;
schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
- exec_request_.query_options, &summary_profile_, query_events_));
+ exec_request_.query_options, summary_profile_, query_events_));
}
Status status = exec_env_->scheduler()->Schedule(schedule_.get());
{
@@ -465,14 +466,14 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
RETURN_IF_ERROR(UpdateQueryStatus(status));
}
- profile_.AddChild(coord_->query_profile());
+ profile_->AddChild(coord_->query_profile());
return Status::OK();
}
Status ClientRequestState::ExecDdlRequest() {
string op_type = catalog_op_type() == TCatalogOpType::DDL ?
PrintTDdlType(ddl_type()) : PrintTCatalogOpType(catalog_op_type());
- summary_profile_.AddInfoString("DDL Type", op_type);
+ summary_profile_->AddInfoString("DDL Type", op_type);
if (catalog_op_type() != TCatalogOpType::DDL &&
catalog_op_type() != TCatalogOpType::RESET_METADATA) {
@@ -502,7 +503,7 @@ Status ClientRequestState::ExecDdlRequest() {
}
catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
- &server_profile_));
+ server_profile_));
Status status = catalog_op_executor_->Exec(exec_request_.catalog_op_request);
{
lock_guard<mutex> l(lock_);
@@ -564,7 +565,7 @@ void ClientRequestState::Done() {
unique_lock<mutex> l(lock_);
end_time_ = TimestampValue::LocalTime();
- summary_profile_.AddInfoString("End Time", end_time().ToString());
+ summary_profile_->AddInfoString("End Time", end_time().ToString());
query_events_->MarkEvent("Unregister query");
// Update result set cache metrics, and update mem limit accounting before tearing
@@ -586,7 +587,7 @@ void ClientRequestState::Done() {
Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) {
TResultSet metadata_op_result;
// Like the other Exec(), fill out as much profile information as we're able to.
- summary_profile_.AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL));
+ summary_profile_->AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL));
RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request,
&metadata_op_result));
result_metadata_ = metadata_op_result.schema;
@@ -722,7 +723,7 @@ Status ClientRequestState::UpdateQueryStatus(const Status& status) {
if (!status.ok() && query_status_.ok()) {
UpdateQueryState(beeswax::QueryState::EXCEPTION);
query_status_ = status;
- summary_profile_.AddInfoString("Query Status", query_status_.GetDetail());
+ summary_profile_->AddInfoString("Query Status", query_status_.GetDetail());
}
return status;
@@ -898,7 +899,7 @@ Status ClientRequestState::UpdateCatalog() {
}
query_events_->MarkEvent("DML data written");
- SCOPED_TIMER(ADD_TIMER(&server_profile_, "MetastoreUpdateTimer"));
+ SCOPED_TIMER(ADD_TIMER(server_profile_, "MetastoreUpdateTimer"));
TQueryExecRequest query_exec_request = exec_request().query_exec_request;
if (query_exec_request.__isset.finalize_params) {
@@ -1031,7 +1032,7 @@ Status ClientRequestState::UpdateTableAndColumnStats(
DCHECK_GE(child_queries.size(), 1);
DCHECK_LE(child_queries.size(), 2);
catalog_op_executor_.reset(
- new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
+ new CatalogOpExecutor(exec_env_, frontend_, server_profile_));
// If there was no column stats query, pass in empty thrift structures to
// ExecComputeStats(). Otherwise pass in the column stats result.
@@ -1078,7 +1079,7 @@ void ClientRequestState::ClearResultCache() {
void ClientRequestState::UpdateQueryState(
beeswax::QueryState::type query_state) {
query_state_ = query_state;
- summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
+ summary_profile_->AddInfoString("Query State", PrintQueryState(query_state_));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 6846165..1c015c3 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -184,8 +184,8 @@ class ClientRequestState {
void set_user_profile_access(bool user_has_profile_access) {
user_has_profile_access_ = user_has_profile_access;
}
- const RuntimeProfile& profile() const { return profile_; }
- const RuntimeProfile& summary_profile() const { return summary_profile_; }
+ const RuntimeProfile* profile() const { return profile_; }
+ const RuntimeProfile* summary_profile() const { return summary_profile_; }
const TimestampValue& start_time() const { return start_time_; }
const TimestampValue& end_time() const { return end_time_; }
const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; }
@@ -211,7 +211,7 @@ class ClientRequestState {
}
RuntimeProfile::EventSequence* query_events() const { return query_events_; }
- RuntimeProfile* summary_profile() { return &summary_profile_; }
+ RuntimeProfile* summary_profile() { return summary_profile_; }
private:
const TQueryCtx query_ctx_;
@@ -299,9 +299,9 @@ class ClientRequestState {
/// There's a fourth profile which is not built here (but is a
/// child of profile_); the execution profile which tracks the
/// actual fragment execution.
- RuntimeProfile profile_;
- RuntimeProfile server_profile_;
- RuntimeProfile summary_profile_;
+ RuntimeProfile* const profile_;
+ RuntimeProfile* const server_profile_;
+ RuntimeProfile* const summary_profile_;
RuntimeProfile::Counter* row_materialization_timer_;
/// Tracks how long we are idle waiting for a client to fetch rows.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 8ba2894..051cfaf 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -623,9 +623,9 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id,
RETURN_IF_ERROR(CheckProfileAccess(user, request_state->effective_user(),
request_state->user_has_profile_access()));
if (base64_encoded) {
- RETURN_IF_ERROR(request_state->profile().SerializeToArchiveString(output));
+ RETURN_IF_ERROR(request_state->profile()->SerializeToArchiveString(output));
} else {
- request_state->profile().PrettyPrint(output);
+ request_state->profile()->PrettyPrint(output);
}
return Status::OK();
}
@@ -741,7 +741,7 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
void ImpalaServer::ArchiveQuery(const ClientRequestState& query) {
string encoded_profile_str;
- Status status = query.profile().SerializeToArchiveString(&encoded_profile_str);
+ Status status = query.profile()->SerializeToArchiveString(&encoded_profile_str);
if (!status.ok()) {
// Didn't serialize the string. Continue with empty string.
LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string "
@@ -1677,7 +1677,7 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque
id = request_state.query_id();
const TExecRequest& request = request_state.exec_request();
- const string* plan_str = request_state.summary_profile().GetInfoString("Plan");
+ const string* plan_str = request_state.summary_profile()->GetInfoString("Plan");
if (plan_str != nullptr) plan = *plan_str;
stmt = request_state.sql_stmt();
stmt_type = request.stmt_type;
@@ -1701,11 +1701,11 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque
if (copy_profile) {
stringstream ss;
- request_state.profile().PrettyPrint(&ss);
+ request_state.profile()->PrettyPrint(&ss);
profile_str = ss.str();
if (encoded_profile.empty()) {
Status status =
- request_state.profile().SerializeToArchiveString(&encoded_profile_str);
+ request_state.profile()->SerializeToArchiveString(&encoded_profile_str);
if (!status.ok()) {
LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string "
<< status.GetDetail();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/dummy-runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/dummy-runtime-profile.h b/be/src/util/dummy-runtime-profile.h
index 83bccbf..1642d4e 100644
--- a/be/src/util/dummy-runtime-profile.h
+++ b/be/src/util/dummy-runtime-profile.h
@@ -28,12 +28,12 @@ namespace impala {
/// but not always so that the object can still allocate counters in the same way.
class DummyProfile {
public:
- DummyProfile() : pool_(), profile_(&pool_, "dummy", false) {}
- RuntimeProfile* profile() { return &profile_; }
+ DummyProfile() : pool_(), profile_(RuntimeProfile::Create(&pool_, "dummy", false)) {}
+ RuntimeProfile* profile() { return profile_; }
private:
ObjectPool pool_;
- RuntimeProfile profile_;
+ RuntimeProfile* const profile_;
};
}
#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/periodic-counter-updater.cc
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc
index b1c755a..098e683 100644
--- a/be/src/util/periodic-counter-updater.cc
+++ b/be/src/util/periodic-counter-updater.cc
@@ -93,21 +93,20 @@ void PeriodicCounterUpdater::RegisterBucketingCounters(
}
void PeriodicCounterUpdater::StopBucketingCounters(
- vector<RuntimeProfile::Counter*>* buckets, bool convert) {
+ vector<RuntimeProfile::Counter*>* buckets) {
int64_t num_sampled = 0;
{
lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
BucketCountersMap::iterator itr =
instance_->bucketing_counters_.find(buckets);
- if (itr != instance_->bucketing_counters_.end()) {
- num_sampled = itr->second.num_sampled;
- instance_->bucketing_counters_.erase(itr);
- }
+ // If not registered, we have nothing to do.
+ if (itr == instance_->bucketing_counters_.end()) return;
+ num_sampled = itr->second.num_sampled;
+ instance_->bucketing_counters_.erase(itr);
}
- if (convert && num_sampled > 0) {
- for (int i = 0; i < buckets->size(); ++i) {
- RuntimeProfile::Counter* counter = (*buckets)[i];
+ if (num_sampled > 0) {
+ for (RuntimeProfile::Counter* counter : *buckets) {
double perc = 100 * counter->value() / (double)num_sampled;
counter->Set(perc);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/periodic-counter-updater.h
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h
index c603522..762f372 100644
--- a/be/src/util/periodic-counter-updater.h
+++ b/be/src/util/periodic-counter-updater.h
@@ -70,12 +70,11 @@ class PeriodicCounterUpdater {
/// Stops updating the value of 'counter'.
static void StopSamplingCounter(RuntimeProfile::Counter* counter);
- /// Stops updating the bucket counter.
- /// If convert is true, convert the buckets from count to percentage.
- /// Sampling counters are updated periodically so should be removed as soon as the
+ /// If the bucketing counters 'buckets' are registered, stops updating the counters and
+ /// convert the buckets from count to percentage. If not registered, has no effect.
+ /// Perioidic counters are updated periodically so should be removed as soon as the
/// underlying counter is no longer going to change.
- static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets,
- bool convert);
+ static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets);
/// Stops 'counter' from receiving any more samples.
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter* counter);