You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/20 20:20:14 UTC

[4/5] incubator-impala git commit: IMPALA-5895: clean up runtime profile lifecycle

IMPALA-5895: clean up runtime profile lifecycle

Require callers to explicitly stop counter updating instead of doing it
in the destructor. This replaces ad-hoc logic to stop individual
counters.

Track which counters need to be stopped in separate lists instead of
stopping everything.

Force all RuntimeProfiles to use ObjectPools in a uniform way - the
profile, its counters and its children all are allocated from the
same pool. This is done via a new Create() method.

Consolidate 'time_series_counter_map_lock_' and 'counter_map_lock_'
to reduce complexity of the locking scheme. I didn't see any benefit
to sharding the locks in this way - there are only two time series
counters per fragment instance, which a small fraction of the
total number of counters.

Change-Id: I45c39ac36c8e3c277213d32f5ae5f14be6b7f0df
Reviewed-on: http://gerrit.cloudera.org:8080/8069
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7866eec5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7866eec5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7866eec5

Branch: refs/heads/master
Commit: 7866eec5bdcbf9194a4aad2c87c354cbaad7b802
Parents: 741b052
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Sep 5 15:51:49 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 20 08:48:38 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/hash-benchmark.cc             |   8 +-
 be/src/codegen/llvm-codegen.cc                  |  32 +--
 be/src/codegen/llvm-codegen.h                   |   4 +-
 be/src/exec/data-sink.cc                        |   2 +-
 be/src/exec/data-source-scan-node.cc            |   4 +-
 be/src/exec/exec-node.cc                        |  15 +-
 be/src/exec/exec-node.h                         |   7 +-
 be/src/exec/hash-table-test.cc                  |   2 +-
 be/src/exec/hbase-scan-node.cc                  |   2 +-
 be/src/exec/hdfs-scan-node-base.cc              |  26 +--
 be/src/exec/hdfs-scan-node-base.h               |  11 +
 be/src/exec/kudu-scan-node-base.cc              |  15 --
 be/src/exec/kudu-scan-node-base.h               |  13 --
 be/src/exec/kudu-scan-node-mt.cc                |   4 +-
 be/src/exec/scan-node.cc                        |   8 +
 be/src/exec/scan-node.h                         |  18 +-
 be/src/experiments/data-provider-test.cc        |   6 +-
 be/src/experiments/tuple-splitter-test.cc       |   8 +-
 be/src/runtime/buffered-tuple-stream-test.cc    |   2 +-
 .../runtime/bufferpool/buffer-allocator-test.cc |   2 +-
 be/src/runtime/bufferpool/buffer-pool-test.cc   |   2 +-
 .../bufferpool/reservation-tracker-test.cc      |   2 +-
 be/src/runtime/bufferpool/suballocator-test.cc  |   8 +-
 be/src/runtime/coordinator-backend-state.cc     |   8 +-
 be/src/runtime/coordinator.cc                   |   4 +-
 be/src/runtime/coordinator.h                    |   6 +-
 be/src/runtime/data-stream-recvr.cc             |  32 ++-
 be/src/runtime/data-stream-recvr.h              |   7 +-
 be/src/runtime/data-stream-test.cc              |   7 +-
 be/src/runtime/fragment-instance-state.cc       |  12 +-
 be/src/runtime/query-state.cc                   |   2 +-
 be/src/runtime/runtime-state.cc                 |  11 +-
 be/src/runtime/runtime-state.h                  |   4 +-
 be/src/runtime/tmp-file-mgr-test.cc             |   2 +-
 be/src/service/client-request-state.cc          |  95 ++++----
 be/src/service/client-request-state.h           |  12 +-
 be/src/service/impala-server.cc                 |  12 +-
 be/src/util/dummy-runtime-profile.h             |   6 +-
 be/src/util/periodic-counter-updater.cc         |  15 +-
 be/src/util/periodic-counter-updater.h          |   9 +-
 be/src/util/runtime-profile-test.cc             | 220 +++++++++----------
 be/src/util/runtime-profile.cc                  | 131 +++++++----
 be/src/util/runtime-profile.h                   |  94 +++++---
 43 files changed, 460 insertions(+), 430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/benchmarks/hash-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/hash-benchmark.cc b/be/src/benchmarks/hash-benchmark.cc
index dadad25..b183915 100644
--- a/be/src/benchmarks/hash-benchmark.cc
+++ b/be/src/benchmarks/hash-benchmark.cc
@@ -440,10 +440,10 @@ int main(int argc, char **argv) {
 
   MemTracker tracker;
   MemPool mem_pool(&tracker);
-  RuntimeProfile int_profile(state->obj_pool(), "IntGen");
-  RuntimeProfile mixed_profile(state->obj_pool(), "MixedGen");
-  DataProvider int_provider(&mem_pool, &int_profile);
-  DataProvider mixed_provider(&mem_pool, &mixed_profile);
+  RuntimeProfile* int_profile = RuntimeProfile::Create(state->obj_pool(), "IntGen");
+  RuntimeProfile* mixed_profile = RuntimeProfile::Create(state->obj_pool(), "MixedGen");
+  DataProvider int_provider(&mem_pool, int_profile);
+  DataProvider mixed_provider(&mem_pool, mixed_profile);
 
   scoped_ptr<LlvmCodeGen> codegen;
   status = LlvmCodeGen::CreateImpalaCodegen(state, NULL, "test", &codegen);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 6f8b156..5503969 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -170,8 +170,8 @@ LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* pool,
     MemTracker* parent_mem_tracker, const string& id)
   : state_(state),
     id_(id),
-    profile_(pool, "CodeGen"),
-    mem_tracker_(pool->Add(new MemTracker(&profile_, -1, "CodeGen", parent_mem_tracker))),
+    profile_(RuntimeProfile::Create(pool, "CodeGen")),
+    mem_tracker_(pool->Add(new MemTracker(profile_, -1, "CodeGen", parent_mem_tracker))),
     optimizations_enabled_(false),
     is_corrupt_(false),
     is_compiled_(false),
@@ -181,21 +181,21 @@ LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* pool,
     loaded_functions_(IRFunction::FN_END, NULL) {
   DCHECK(llvm_initialized_) << "Must call LlvmCodeGen::InitializeLlvm first.";
 
-  load_module_timer_ = ADD_TIMER(&profile_, "LoadTime");
-  prepare_module_timer_ = ADD_TIMER(&profile_, "PrepareTime");
-  module_bitcode_size_ = ADD_COUNTER(&profile_, "ModuleBitcodeSize", TUnit::BYTES);
-  codegen_timer_ = ADD_TIMER(&profile_, "CodegenTime");
-  optimization_timer_ = ADD_TIMER(&profile_, "OptimizationTime");
-  compile_timer_ = ADD_TIMER(&profile_, "CompileTime");
-  num_functions_ = ADD_COUNTER(&profile_, "NumFunctions", TUnit::UNIT);
-  num_instructions_ = ADD_COUNTER(&profile_, "NumInstructions", TUnit::UNIT);
+  load_module_timer_ = ADD_TIMER(profile_, "LoadTime");
+  prepare_module_timer_ = ADD_TIMER(profile_, "PrepareTime");
+  module_bitcode_size_ = ADD_COUNTER(profile_, "ModuleBitcodeSize", TUnit::BYTES);
+  codegen_timer_ = ADD_TIMER(profile_, "CodegenTime");
+  optimization_timer_ = ADD_TIMER(profile_, "OptimizationTime");
+  compile_timer_ = ADD_TIMER(profile_, "CompileTime");
+  num_functions_ = ADD_COUNTER(profile_, "NumFunctions", TUnit::UNIT);
+  num_instructions_ = ADD_COUNTER(profile_, "NumInstructions", TUnit::UNIT);
 }
 
 Status LlvmCodeGen::CreateFromFile(RuntimeState* state, ObjectPool* pool,
     MemTracker* parent_mem_tracker, const string& file, const string& id,
     scoped_ptr<LlvmCodeGen>* codegen) {
   codegen->reset(new LlvmCodeGen(state, pool, parent_mem_tracker, id));
-  SCOPED_TIMER((*codegen)->profile_.total_time_counter());
+  SCOPED_TIMER((*codegen)->profile_->total_time_counter());
 
   unique_ptr<Module> loaded_module;
   RETURN_IF_ERROR((*codegen)->LoadModuleFromFile(file, &loaded_module));
@@ -206,7 +206,7 @@ Status LlvmCodeGen::CreateFromFile(RuntimeState* state, ObjectPool* pool,
 Status LlvmCodeGen::CreateFromMemory(RuntimeState* state, ObjectPool* pool,
     MemTracker* parent_mem_tracker, const string& id, scoped_ptr<LlvmCodeGen>* codegen) {
   codegen->reset(new LlvmCodeGen(state, pool, parent_mem_tracker, id));
-  SCOPED_TIMER((*codegen)->profile_.total_time_counter());
+  SCOPED_TIMER((*codegen)->profile_->total_time_counter());
 
   // Select the appropriate IR version. We cannot use LLVM IR with SSE4.2 instructions on
   // a machine without SSE4.2 support.
@@ -276,7 +276,7 @@ Status LlvmCodeGen::LoadModuleFromMemory(unique_ptr<MemoryBuffer> module_ir_buf,
 Status LlvmCodeGen::LinkModule(const string& file) {
   if (linked_modules_.find(file) != linked_modules_.end()) return Status::OK();
 
-  SCOPED_TIMER(profile_.total_time_counter());
+  SCOPED_TIMER(profile_->total_time_counter());
   unique_ptr<Module> new_module;
   RETURN_IF_ERROR(LoadModuleFromFile(file, &new_module));
 
@@ -324,7 +324,7 @@ Status LlvmCodeGen::CreateImpalaCodegen(RuntimeState* state,
   LlvmCodeGen* codegen = codegen_ret->get();
 
   // Parse module for cross compiled functions and types
-  SCOPED_TIMER(codegen->profile_.total_time_counter());
+  SCOPED_TIMER(codegen->profile_->total_time_counter());
   SCOPED_TIMER(codegen->prepare_module_timer_);
 
   // Get type for StringValue
@@ -620,7 +620,7 @@ Status LlvmCodeGen::MaterializeFunctionHelper(Function *fn) {
 }
 
 Status LlvmCodeGen::MaterializeFunction(Function *fn) {
-  SCOPED_TIMER(profile_.total_time_counter());
+  SCOPED_TIMER(profile_->total_time_counter());
   SCOPED_TIMER(prepare_module_timer_);
   return MaterializeFunctionHelper(fn);
 }
@@ -1037,7 +1037,7 @@ Status LlvmCodeGen::FinalizeModule() {
   }
 
   if (is_corrupt_) return Status("Module is corrupt.");
-  SCOPED_TIMER(profile_.total_time_counter());
+  SCOPED_TIMER(profile_->total_time_counter());
 
   // Don't waste time optimizing module if there are no functions to JIT. This can happen
   // if the codegen object is created but no functions are successfully codegen'd.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/codegen/llvm-codegen.h
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index 8dce330..8aa9f2b 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -159,7 +159,7 @@ class LlvmCodeGen {
   /// any other API methods after calling close.
   void Close();
 
-  RuntimeProfile* runtime_profile() { return &profile_; }
+  RuntimeProfile* runtime_profile() { return profile_; }
   RuntimeProfile::Counter* codegen_timer() { return codegen_timer_; }
 
   /// Turns on/off optimization passes
@@ -669,7 +669,7 @@ class LlvmCodeGen {
   std::string id_;
 
   /// Codegen counters
-  RuntimeProfile profile_;
+  RuntimeProfile* const profile_;
 
   /// MemTracker used for tracking memory consumed by codegen. Connected to a parent
   /// MemTracker if one was provided during initialization. Owned by the ObjectPool

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index fe23694..c173ed5 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -175,7 +175,7 @@ string DataSink::OutputDmlStats(const PartitionStatusMap& stats,
 
 Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   DCHECK(parent_mem_tracker != NULL);
-  profile_ = state->obj_pool()->Add(new RuntimeProfile(state->obj_pool(), GetName()));
+  profile_ = RuntimeProfile::Create(state->obj_pool(), GetName());
   const string& name = GetName();
   mem_tracker_.reset(new MemTracker(profile_, -1, name, parent_mem_tracker));
   expr_mem_tracker_.reset(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index 01e0dbe..78ba492 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -365,15 +365,13 @@ Status DataSourceScanNode::Reset(RuntimeState* state) {
 void DataSourceScanNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   SCOPED_TIMER(runtime_profile_->total_time_counter());
-  PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
-  PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
   input_batch_.reset();
   TCloseParams params;
   params.__set_scan_handle(scan_handle_);
   TCloseResult result;
   Status status = data_source_executor_->Close(params, &result);
   if (!status.ok()) state->LogError(status.msg());
-  ExecNode::Close(state);
+  ScanNode::Close(state);
 }
 
 void DataSourceScanNode::DebugString(int indentation_level, stringstream* out) const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 94e9ed1..b656d2b 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -130,12 +130,14 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
     debug_action_(TDebugAction::WAIT),
     limit_(tnode.limit),
     num_rows_returned_(0),
+    runtime_profile_(RuntimeProfile::Create(pool_,
+        Substitute("$0 (id=$1)", PrintPlanNodeType(tnode.node_type), id_))),
     rows_returned_counter_(NULL),
     rows_returned_rate_(NULL),
     containing_subplan_(NULL),
     disable_codegen_(tnode.disable_codegen),
     is_closed_(false) {
-  InitRuntimeProfile(PrintPlanNodeType(tnode.node_type));
+  runtime_profile_->set_metadata(id_);
 }
 
 ExecNode::~ExecNode() {
@@ -149,8 +151,8 @@ Status ExecNode::Init(const TPlanNode& tnode, RuntimeState* state) {
 
 Status ExecNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::PREPARE, state));
-  DCHECK(runtime_profile_.get() != NULL);
-  mem_tracker_.reset(new MemTracker(runtime_profile_.get(), -1, runtime_profile_->name(),
+  DCHECK(runtime_profile_ != NULL);
+  mem_tracker_.reset(new MemTracker(runtime_profile_, -1, runtime_profile_->name(),
       state->instance_mem_tracker()));
   expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false));
   expr_mem_pool_.reset(new MemPool(expr_mem_tracker_.get()));
@@ -462,13 +464,6 @@ void ExecNode::CollectScanNodes(vector<ExecNode*>* nodes) {
   CollectNodes(TPlanNodeType::KUDU_SCAN_NODE, nodes);
 }
 
-void ExecNode::InitRuntimeProfile(const string& name) {
-  stringstream ss;
-  ss << name << " (id=" << id_ << ")";
-  runtime_profile_.reset(new RuntimeProfile(pool_, ss.str()));
-  runtime_profile_->set_metadata(id_);
-}
-
 Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* state) {
   DCHECK_EQ(debug_phase_, phase);
   if (debug_action_ == TDebugAction::FAIL) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 2f3f714..7cba6ac 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -208,7 +208,7 @@ class ExecNode {
   int64_t limit() const { return limit_; }
   bool ReachedLimit() { return limit_ != -1 && num_rows_returned_ >= limit_; }
 
-  RuntimeProfile* runtime_profile() { return runtime_profile_.get(); }
+  RuntimeProfile* runtime_profile() { return runtime_profile_; }
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
   MemTracker* expr_mem_tracker() { return expr_mem_tracker_.get(); }
   MemPool* expr_mem_pool() { return expr_mem_pool_.get(); }
@@ -310,7 +310,8 @@ class ExecNode {
   int64_t limit_;  // -1: no limit
   int64_t num_rows_returned_;
 
-  boost::scoped_ptr<RuntimeProfile> runtime_profile_;
+  /// Runtime profile for this node. Owned by the QueryState's ObjectPool.
+  RuntimeProfile* const runtime_profile_;
   RuntimeProfile::Counter* rows_returned_counter_;
   RuntimeProfile::Counter* rows_returned_rate_;
 
@@ -354,8 +355,6 @@ class ExecNode {
 
   virtual bool IsScanNode() const { return false; }
 
-  void InitRuntimeProfile(const std::string& name);
-
   /// Executes 'debug_action_' if 'phase' matches 'debug_phase_'.
   /// 'phase' must not be INVALID.
   Status ExecDebugAction(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 816e6cf..0b99cbd 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -233,7 +233,7 @@ class HashTableTest : public testing::Test {
       int64_t block_size = 8 * 1024 * 1024, int max_num_blocks = 100,
       int initial_reserved_blocks = 10, int64_t suballocator_buffer_len = 64 * 1024) {
     BufferPool* buffer_pool = test_env_->exec_env()->buffer_pool();
-    RuntimeProfile* profile = pool_.Add(new RuntimeProfile(&pool_, "ht"));
+    RuntimeProfile* profile = RuntimeProfile::Create(&pool_, "ht");
 
     // Set up memory tracking for the hash table.
     MemTracker* client_tracker =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hbase-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index e783731..a74d4b3 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -279,7 +279,7 @@ void HBaseScanNode::Close(RuntimeState* state) {
     JNIEnv* env = getJNIEnv();
     hbase_scanner_->Close(env);
   }
-  ExecNode::Close(state);
+  ScanNode::Close(state);
 }
 
 void HBaseScanNode::DebugString(int indentation_level, stringstream* out) const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index ca71201..b5169a8 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -97,7 +97,8 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
       thrift_dict_filter_conjuncts_map_(
           tnode.hdfs_scan_node.__isset.dictionary_filter_conjuncts ?
           &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr),
-      disks_accessed_bitmap_(TUnit::UNIT, 0) {
+      disks_accessed_bitmap_(TUnit::UNIT, 0),
+      active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
 }
 
 HdfsScanNodeBase::~HdfsScanNodeBase() {
@@ -142,8 +143,8 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
     filter_ctx.filter = state->filter_bank()->RegisterFilter(filter_desc, false);
     string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id,
         PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
-    RuntimeProfile* profile = state->obj_pool()->Add(
-        new RuntimeProfile(state->obj_pool(), filter_profile_title));
+    RuntimeProfile* profile =
+        RuntimeProfile::Create(state->obj_pool(), filter_profile_title);
     runtime_profile_->AddChild(profile);
     filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile,
         target.is_bound_by_partition_columns));
@@ -441,12 +442,8 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   max_compressed_text_file_length_ = runtime_profile()->AddHighWaterMarkCounter(
       "MaxCompressedTextFileLength", TUnit::BYTES);
 
-  for (int i = 0; i < state->io_mgr()->num_total_disks() + 1; ++i) {
-    hdfs_read_thread_concurrency_bucket_.push_back(
-        pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
-  }
-  runtime_profile()->RegisterBucketingCounters(&active_hdfs_read_thread_counter_,
-      &hdfs_read_thread_concurrency_bucket_);
+  hdfs_read_thread_concurrency_bucket_ = runtime_profile()->AddBucketingCounters(
+      &active_hdfs_read_thread_counter_, state->io_mgr()->num_total_disks() + 1);
 
   counters_running_ = true;
 
@@ -851,18 +848,13 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
   if (!counters_running_) return;
   counters_running_ = false;
 
-  PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
-  PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
-  PeriodicCounterUpdater::StopSamplingCounter(average_scanner_thread_concurrency_);
-  PeriodicCounterUpdater::StopSamplingCounter(average_hdfs_read_thread_concurrency_);
-  PeriodicCounterUpdater::StopBucketingCounters(&hdfs_read_thread_concurrency_bucket_,
-      true);
+  runtime_profile_->StopPeriodicCounters();
 
   // Output hdfs read thread concurrency into info string
   stringstream ss;
-  for (int i = 0; i < hdfs_read_thread_concurrency_bucket_.size(); ++i) {
+  for (int i = 0; i < hdfs_read_thread_concurrency_bucket_->size(); ++i) {
     ss << i << ":" << setprecision(4)
-       << hdfs_read_thread_concurrency_bucket_[i]->double_value() << "% ";
+       << (*hdfs_read_thread_concurrency_bucket_)[i]->double_value() << "% ";
   }
   runtime_profile_->AddInfoString("Hdfs Read Thread Concurrency Bucket", ss.str());
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index e1c431f..e33de5a 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -469,6 +469,17 @@ class HdfsScanNodeBase : public ScanNode {
   /// Total number of file handle opens where the file handle was not in the cache
   RuntimeProfile::Counter* cached_file_handles_miss_count_ = nullptr;
 
+  /// The number of active hdfs reading threads reading for this node.
+  RuntimeProfile::Counter active_hdfs_read_thread_counter_;
+
+  /// Average number of active hdfs reading threads
+  /// This should be created in Open() and stopped when all the scanner threads are done.
+  RuntimeProfile::Counter* average_hdfs_read_thread_concurrency_ = nullptr;
+
+  /// HDFS read thread concurrency bucket: bucket[i] refers to the number of sample
+  /// taken where there are i concurrent hdfs read thread running. Created in Open().
+  std::vector<RuntimeProfile::Counter*>* hdfs_read_thread_concurrency_bucket_ = nullptr;
+
   /// Pool for allocating some amounts of memory that is shared between scanners.
   /// e.g. partition key tuple and their string buffers
   boost::scoped_ptr<MemPool> scan_node_pool_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/kudu-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc
index d587660..526374c 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -52,7 +52,6 @@ KuduScanNodeBase::KuduScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
     : ScanNode(pool, tnode, descs),
       tuple_id_(tnode.kudu_scan_node.tuple_id),
       client_(nullptr),
-      counters_running_(false),
       next_scan_token_idx_(0) {
   DCHECK(KuduIsAvailable());
 }
@@ -69,7 +68,6 @@ Status KuduScanNodeBase::Prepare(RuntimeState* state) {
       ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT);
   kudu_round_trips_ = ADD_COUNTER(runtime_profile(), KUDU_ROUND_TRIPS, TUnit::UNIT);
   kudu_remote_tokens_ = ADD_COUNTER(runtime_profile(), KUDU_REMOTE_TOKENS, TUnit::UNIT);
-  counters_running_ = true;
 
   DCHECK(state->desc_tbl().GetTupleDescriptor(tuple_id_) != NULL);
   tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
@@ -108,12 +106,6 @@ Status KuduScanNodeBase::Open(RuntimeState* state) {
   return Status::OK();
 }
 
-void KuduScanNodeBase::Close(RuntimeState* state) {
-  if (is_closed()) return;
-  StopAndFinalizeCounters();
-  ExecNode::Close(state);
-}
-
 void KuduScanNodeBase::DebugString(int indentation_level, stringstream* out) const {
   string indent(indentation_level * 2, ' ');
   *out << indent << "KuduScanNode(tupleid=" << tuple_id_ << ")";
@@ -129,12 +121,5 @@ const string* KuduScanNodeBase::GetNextScanToken() {
   return token;
 }
 
-void KuduScanNodeBase::StopAndFinalizeCounters() {
-  if (!counters_running_) return;
-  counters_running_ = false;
-
-  PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
-  PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
-}
 
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/kudu-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h
index 49af13c..08289e1 100644
--- a/be/src/exec/kudu-scan-node-base.h
+++ b/be/src/exec/kudu-scan-node-base.h
@@ -42,7 +42,6 @@ class KuduScanNodeBase : public ScanNode {
   virtual Status Prepare(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) = 0;
-  virtual void Close(RuntimeState* state);
 
  protected:
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
@@ -59,14 +58,6 @@ class KuduScanNodeBase : public ScanNode {
 
   RuntimeState* runtime_state_;
 
-  /// Stops periodic counters and aggregates counter values for the entire scan node.
-  /// This should be called as soon as the scan node is complete to get the most accurate
-  /// counter values.
-  /// This can be called multiple times, subsequent calls will be ignored.
-  /// This must be called on Close() to unregister counters.
-  /// Scan nodes with a RowBatch queue may have to synchronize calls to this function.
-  void StopAndFinalizeCounters();
-
  private:
   friend class KuduScanner;
 
@@ -83,10 +74,6 @@ class KuduScanNodeBase : public ScanNode {
   /// Kudu table reference. Shared between scanner threads for KuduScanNode.
   kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
 
-  /// If true, counters are actively running and need to be reported in the runtime
-  /// profile.
-  bool counters_running_;
-
   /// Set of scan tokens to be deserialized into Kudu scanners.
   std::vector<std::string> scan_tokens_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/kudu-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-mt.cc b/be/src/exec/kudu-scan-node-mt.cc
index 8723daa..2cb7619 100644
--- a/be/src/exec/kudu-scan-node-mt.cc
+++ b/be/src/exec/kudu-scan-node-mt.cc
@@ -62,7 +62,7 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
   if (scan_token_ == nullptr) {
     scan_token_ = GetNextScanToken();
     if (scan_token_ == nullptr) {
-      StopAndFinalizeCounters();
+      runtime_profile_->StopPeriodicCounters();
       scanner_->Close();
       scanner_.reset();
       *eos = true;
@@ -85,7 +85,7 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
     row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
     num_rows_returned_ -= num_rows_over;
     scan_token_ = nullptr;
-    StopAndFinalizeCounters();
+    runtime_profile_->StopPeriodicCounters();
     scanner_->Close();
     scanner_.reset();
     *eos = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 0df0c3f..d09bb6b 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -68,4 +68,12 @@ Status ScanNode::Prepare(RuntimeState* state) {
   return Status::OK();
 }
 
+void ScanNode::Close(RuntimeState* state) {
+  if (is_closed()) return;
+  // ScanNode::Prepare() started periodic counters including 'total_throughput_counter_'
+  // and 'bytes_read_timeseries_counter_'. Subclasses may also have started counters.
+  runtime_profile_->StopPeriodicCounters();
+  ExecNode::Close(state);
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 0f73c2b..4b11361 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -82,11 +82,14 @@ class ScanNode : public ExecNode {
   ScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
     : ExecNode(pool, tnode, descs),
       scan_range_params_(NULL),
-      active_scanner_thread_counter_(TUnit::UNIT, 0),
-      active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {}
+      active_scanner_thread_counter_(TUnit::UNIT, 0) {}
 
   virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
 
+  /// Stops all periodic counters and calls ExecNode::Close(). Subclasses of ScanNode can
+  /// start periodic counters and rely on this function stopping them.
+  virtual void Close(RuntimeState* state);
+
   /// This should be called before Prepare(), and the argument must be not destroyed until
   /// after Prepare().
   void SetScanRanges(const std::vector<TScanRangeParams>& scan_range_params) {
@@ -172,18 +175,7 @@ class ScanNode : public ExecNode {
   /// This should be created in Open and stopped when all the scanner threads are done.
   RuntimeProfile::Counter* average_scanner_thread_concurrency_;
 
-  /// The number of active hdfs reading threads reading for this node.
-  RuntimeProfile::Counter active_hdfs_read_thread_counter_;
-
-  /// Average number of active hdfs reading threads
-  /// This should be created in Open and stopped when all the scanner threads are done.
-  RuntimeProfile::Counter* average_hdfs_read_thread_concurrency_;
-
   RuntimeProfile::Counter* num_scanner_threads_started_counter_;
-
-  /// HDFS read thread concurrency bucket: bucket[i] refers to the number of sample
-  /// taken where there are i concurrent hdfs read thread running
-  std::vector<RuntimeProfile::Counter*> hdfs_read_thread_concurrency_bucket_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/experiments/data-provider-test.cc
----------------------------------------------------------------------
diff --git a/be/src/experiments/data-provider-test.cc b/be/src/experiments/data-provider-test.cc
index 9f9e0cb..3c5a92f 100644
--- a/be/src/experiments/data-provider-test.cc
+++ b/be/src/experiments/data-provider-test.cc
@@ -55,11 +55,11 @@ int main(int argc, char **argv) {
   cols.push_back(DataProvider::ColDesc::Create<StringValue>(min_str, max_str));
 
   ObjectPool obj_pool;
-  RuntimeProfile profile(&obj_pool, "DataGenTest");
+  RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool, "DataGenTest");
 
   MemTracker tracker;
   MemPool pool(&tracker);
-  DataProvider provider(&pool, &profile);
+  DataProvider provider(&pool, profile);
   provider.Reset(20, 2, cols);
   int rows;
   void* data;
@@ -70,7 +70,7 @@ int main(int argc, char **argv) {
     provider.Print(&cout, reinterpret_cast<char*>(data), rows);
   }
 
-  profile.PrettyPrint(&cout);
+  profile->PrettyPrint(&cout);
 
   cout << endl << "Done." << endl;
   return 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/experiments/tuple-splitter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/experiments/tuple-splitter-test.cc b/be/src/experiments/tuple-splitter-test.cc
index 87a52bd..7d68f8e 100644
--- a/be/src/experiments/tuple-splitter-test.cc
+++ b/be/src/experiments/tuple-splitter-test.cc
@@ -380,14 +380,14 @@ int main(int argc, char **argv) {
   MemTracker tracker;
   MemPool pool(&tracker);
   ObjectPool obj_pool;
-  RuntimeProfile profile(&obj_pool, "PartitioningTest");
+  RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool, "PartitioningTest");
 
-  DataProvider provider(&pool, &profile);
+  DataProvider provider(&pool, profile);
   provider.Reset(50*1024*1024, 1024, cols);
   //provider.Reset(100*1024, 1024, cols);
   //provider.Reset(100, 1024, cols);
 
-  DataPartitioner partitioner(&pool, &profile, provider.row_size(), 0);
+  DataPartitioner partitioner(&pool, profile, provider.row_size(), 0);
 
   cout << "Begin processing: " << provider.total_rows() << endl;
   int rows;
@@ -437,7 +437,7 @@ int main(int argc, char **argv) {
   cout << "Largest Partition: " << largest_partition << endl;;
 
   cout << endl;
-  profile.PrettyPrint(&cout);
+  profile->PrettyPrint(&cout);
 
   LOG(ERROR) << "Done.";
   return 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 0b89498..08ce7c3 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -132,7 +132,7 @@ class SimpleTupleStreamTest : public testing::Test {
     ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_));
     query_state_ = runtime_state_->query_state();
 
-    RuntimeProfile* client_profile = pool_.Add(new RuntimeProfile(&pool_, "client"));
+    RuntimeProfile* client_profile = RuntimeProfile::Create(&pool_, "client");
     MemTracker* client_tracker =
         pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
     ASSERT_OK(test_env_->exec_env()->buffer_pool()->RegisterClient("",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/buffer-allocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator-test.cc b/be/src/runtime/bufferpool/buffer-allocator-test.cc
index 6427648..21a9c08 100644
--- a/be/src/runtime/bufferpool/buffer-allocator-test.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator-test.cc
@@ -42,7 +42,7 @@ class BufferAllocatorTest : public ::testing::Test {
     dummy_pool_ = obj_pool_.Add(new BufferPool(1, 0, 0));
     dummy_reservation_.InitRootTracker(nullptr, 0);
     ASSERT_OK(dummy_pool_->RegisterClient("", nullptr, &dummy_reservation_, nullptr, 0,
-        obj_pool_.Add(new RuntimeProfile(&obj_pool_, "")), &dummy_client_));
+        RuntimeProfile::Create(&obj_pool_, ""), &dummy_client_));
   }
 
   virtual void TearDown() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 06ff827..720dc13 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -139,7 +139,7 @@ class BufferPoolTest : public ::testing::Test {
   }
 
   RuntimeProfile* NewProfile() {
-    return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
+    return RuntimeProfile::Create(&obj_pool_, "test profile");
   }
 
   /// Create a new file group with the default configs.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index 0d57488..3fc0e0b 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -44,7 +44,7 @@ class ReservationTrackerTest : public ::testing::Test {
 
  protected:
   RuntimeProfile* NewProfile() {
-    return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
+    return RuntimeProfile::Create(&obj_pool_, "test profile");
   }
 
   ObjectPool obj_pool_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/suballocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator-test.cc b/be/src/runtime/bufferpool/suballocator-test.cc
index 32acfaf..6cd53fb 100644
--- a/be/src/runtime/bufferpool/suballocator-test.cc
+++ b/be/src/runtime/bufferpool/suballocator-test.cc
@@ -45,7 +45,7 @@ class SuballocatorTest : public ::testing::Test {
  public:
   virtual void SetUp() override {
     RandTestUtil::SeedRng("SUBALLOCATOR_TEST_SEED", &rng_);
-    profile_.reset(new RuntimeProfile(&obj_pool_, "test profile"));
+    profile_ = RuntimeProfile::Create(&obj_pool_, "test profile");
   }
 
   virtual void TearDown() override {
@@ -55,7 +55,6 @@ class SuballocatorTest : public ::testing::Test {
     clients_.clear();
     buffer_pool_.reset();
     global_reservation_.Close();
-    profile_.reset();
     obj_pool_.Clear();
   }
 
@@ -78,7 +77,7 @@ class SuballocatorTest : public ::testing::Test {
     clients_.push_back(make_unique<BufferPool::ClientHandle>());
     *client = clients_.back().get();
     ASSERT_OK(buffer_pool_->RegisterClient("test client", NULL, parent_reservation, NULL,
-        numeric_limits<int64_t>::max(), profile(), *client));
+        numeric_limits<int64_t>::max(), profile_, *client));
   }
 
   /// Assert that the memory for all of the suballocations is writable and disjoint by
@@ -97,7 +96,6 @@ class SuballocatorTest : public ::testing::Test {
     EXPECT_EQ(client->GetUsedReservation(), 0) << client->DebugString();
   }
 
-  RuntimeProfile* profile() { return profile_.get(); }
   BufferPool* buffer_pool() { return buffer_pool_.get(); }
 
   /// Pool for objects with per-test lifetime. Cleared after every test.
@@ -114,7 +112,7 @@ class SuballocatorTest : public ::testing::Test {
   vector<unique_ptr<BufferPool::ClientHandle>> clients_;
 
   /// Global profile - recreated for every test.
-  scoped_ptr<RuntimeProfile> profile_;
+  RuntimeProfile* profile_;
 
   /// Per-test random number generator. Seeded before every test.
   mt19937 rng_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 1b7fd20..0ee4bd7 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -433,7 +433,7 @@ Coordinator::BackendState::InstanceStats::InstanceStats(
     total_ranges_complete_(0) {
   const string& profile_name = Substitute("Instance $0 (host=$1)",
       PrintId(exec_params.instance_id), lexical_cast<string>(exec_params.host));
-  profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name));
+  profile_ = RuntimeProfile::Create(obj_pool, profile_name);
   fragment_stats->root_profile()->AddChild(profile_);
 
   // add total split size to fragment_stats->bytes_assigned()
@@ -514,10 +514,8 @@ void Coordinator::BackendState::InstanceStats::Update(
 
 Coordinator::FragmentStats::FragmentStats(const string& avg_profile_name,
     const string& root_profile_name, int num_instances, ObjectPool* obj_pool)
-  : avg_profile_(
-      obj_pool->Add(new RuntimeProfile(obj_pool, avg_profile_name, true))),
-    root_profile_(
-      obj_pool->Add(new RuntimeProfile(obj_pool, root_profile_name))),
+  : avg_profile_(RuntimeProfile::Create(obj_pool, avg_profile_name, true)),
+    root_profile_(RuntimeProfile::Create(obj_pool, root_profile_name)),
     num_instances_(num_instances) {
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index c8df1f5..e022a21 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -123,8 +123,8 @@ Status Coordinator::Exec() {
   query_ctx_.__set_desc_tbl(request.desc_tbl);
   query_ctx_.__set_request_pool(schedule_.request_pool());
 
-  query_profile_.reset(
-      new RuntimeProfile(obj_pool(), "Execution Profile " + PrintId(query_id())));
+  query_profile_ =
+      RuntimeProfile::Create(obj_pool(), "Execution Profile " + PrintId(query_id()));
   finalization_timer_ = ADD_TIMER(query_profile_, "FinalizationTimer");
   filter_updates_received_ = ADD_COUNTER(query_profile_, "FiltersReceived", TUnit::UNIT);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index e67ef13..5802c83 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -150,7 +150,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Get cumulative profile aggregated over all fragments of the query.
   /// This is a snapshot of the current state of execution and will change in
   /// the future if not all fragments have finished execution.
-  RuntimeProfile* query_profile() const { return query_profile_.get(); }
+  RuntimeProfile* query_profile() const { return query_profile_; }
 
   const TUniqueId& query_id() const;
 
@@ -278,8 +278,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   ExecSummary exec_summary_;
 
-  /// Aggregate counters for the entire query.
-  boost::scoped_ptr<RuntimeProfile> query_profile_;
+  /// Aggregate counters for the entire query. Lives in 'obj_pool_'.
+  RuntimeProfile* query_profile_ = nullptr;
 
   /// Protects all fields below. This is held while making RPCs, so this lock should
   /// only be acquired if the acquiring thread is prepared to wait for a significant

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
index 35076f5..d8150eb 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -37,7 +37,7 @@ namespace impala {
 // rows from all senders are placed in the same queue.
 class DataStreamRecvr::SenderQueue {
  public:
-  SenderQueue(DataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile* profile);
+  SenderQueue(DataStreamRecvr* parent_recvr, int num_senders);
 
   // Return the next batch from this sender queue. Sets the returned batch in cur_batch_.
   // A returned batch that is not filled to capacity does *not* indicate
@@ -102,8 +102,7 @@ class DataStreamRecvr::SenderQueue {
   bool received_first_batch_;
 };
 
-DataStreamRecvr::SenderQueue::SenderQueue(DataStreamRecvr* parent_recvr, int num_senders,
-    RuntimeProfile* profile)
+DataStreamRecvr::SenderQueue::SenderQueue(DataStreamRecvr* parent_recvr, int num_senders)
   : recvr_(parent_recvr),
     is_cancelled_(false),
     num_remaining_senders_(num_senders),
@@ -242,8 +241,6 @@ void DataStreamRecvr::SenderQueue::Cancel() {
   // notice that the stream is cancelled and handle it.
   data_arrival_cv_.notify_all();
   data_removal__cv_.notify_all();
-  PeriodicCounterUpdater::StopTimeSeriesCounter(
-      recvr_->bytes_received_time_series_counter_);
 }
 
 void DataStreamRecvr::SenderQueue::Close() {
@@ -286,7 +283,7 @@ void DataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) {
 DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
     const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
     PlanNodeId dest_node_id, int num_senders, bool is_merging, int64_t total_buffer_limit,
-    RuntimeProfile* profile)
+    RuntimeProfile* parent_profile)
   : mgr_(stream_mgr),
     fragment_instance_id_(fragment_instance_id),
     dest_node_id_(dest_node_id),
@@ -294,30 +291,28 @@ DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_t
     row_desc_(row_desc),
     is_merging_(is_merging),
     num_buffered_bytes_(0),
-    profile_(profile) {
+    profile_(parent_profile->CreateChild("DataStreamReceiver")) {
   // Create one queue per sender if is_merging is true.
   int num_queues = is_merging ? num_senders : 1;
   sender_queues_.reserve(num_queues);
   int num_sender_per_queue = is_merging ? 1 : num_senders;
   for (int i = 0; i < num_queues; ++i) {
     SenderQueue* queue = sender_queue_pool_.Add(new SenderQueue(this,
-        num_sender_per_queue, profile));
+        num_sender_per_queue));
     sender_queues_.push_back(queue);
   }
 
-  RuntimeProfile* child_profile = profile_->CreateChild("DataStreamReceiver");
-  mem_tracker_.reset(
-      new MemTracker(child_profile, -1, "DataStreamRecvr", parent_tracker));
+  mem_tracker_.reset(new MemTracker(profile_, -1, "DataStreamRecvr", parent_tracker));
 
   // Initialize the counters
-  bytes_received_counter_ = ADD_COUNTER(child_profile, "BytesReceived", TUnit::BYTES);
+  bytes_received_counter_ = ADD_COUNTER(profile_, "BytesReceived", TUnit::BYTES);
   bytes_received_time_series_counter_ =
-      ADD_TIME_SERIES_COUNTER(child_profile, "BytesReceived", bytes_received_counter_);
-  deserialize_row_batch_timer_ = ADD_TIMER(child_profile, "DeserializeRowBatchTimer");
-  buffer_full_wall_timer_ = ADD_TIMER(child_profile, "SendersBlockedTimer");
-  buffer_full_total_timer_ = ADD_TIMER(child_profile, "SendersBlockedTotalTimer(*)");
-  data_arrival_timer_ = child_profile->inactive_timer();
-  first_batch_wait_total_timer_ = ADD_TIMER(child_profile, "FirstBatchArrivalWaitTime");
+      ADD_TIME_SERIES_COUNTER(profile_, "BytesReceived", bytes_received_counter_);
+  deserialize_row_batch_timer_ = ADD_TIMER(profile_, "DeserializeRowBatchTimer");
+  buffer_full_wall_timer_ = ADD_TIMER(profile_, "SendersBlockedTimer");
+  buffer_full_total_timer_ = ADD_TIMER(profile_, "SendersBlockedTotalTimer(*)");
+  data_arrival_timer_ = profile_->inactive_timer();
+  first_batch_wait_total_timer_ = ADD_TIMER(profile_, "FirstBatchArrivalWaitTime");
 }
 
 Status DataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
@@ -354,6 +349,7 @@ void DataStreamRecvr::Close() {
   }
   merger_.reset();
   mem_tracker_->Close();
+  profile_->StopPeriodicCounters();
 }
 
 DataStreamRecvr::~DataStreamRecvr() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.h b/be/src/runtime/data-stream-recvr.h
index fad588d..9545f82 100644
--- a/be/src/runtime/data-stream-recvr.h
+++ b/be/src/runtime/data-stream-recvr.h
@@ -105,7 +105,7 @@ class DataStreamRecvr : public DataStreamRecvrBase {
   DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
       const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
       PlanNodeId dest_node_id, int num_senders, bool is_merging,
-      int64_t total_buffer_limit, RuntimeProfile* profile);
+      int64_t total_buffer_limit, RuntimeProfile* parent_profile);
 
   /// Add a new batch of rows to the appropriate sender queue, blocking if the queue is
   /// full. Called from DataStreamMgr.
@@ -161,8 +161,9 @@ class DataStreamRecvr : public DataStreamRecvrBase {
   /// Pool of sender queues.
   ObjectPool sender_queue_pool_;
 
-  /// Runtime profile storing the counters below.
-  RuntimeProfile* profile_;
+  /// Runtime profile storing the counters below. Child of 'parent_profile' passed into
+  /// constructor.
+  RuntimeProfile* const profile_;
 
   /// Number of bytes received
   RuntimeProfile::Counter* bytes_received_counter_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 5ea6756..8e85894 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -337,8 +337,7 @@ class DataStreamTest : public testing::Test {
   void StartReceiver(TPartitionType::type stream_type, int num_senders, int receiver_num,
       int buffer_size, bool is_merging, TUniqueId* out_id = nullptr) {
     VLOG_QUERY << "start receiver";
-    RuntimeProfile* profile =
-        obj_pool_.Add(new RuntimeProfile(&obj_pool_, "TestReceiver"));
+    RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
     TUniqueId instance_id;
     GetNextInstanceId(&instance_id);
     receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num));
@@ -607,13 +606,13 @@ TEST_F(DataStreamTest, BasicTest) {
 // TODO: Make lifecycle requirements more explicit.
 TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
   scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), &exec_env_));
-  scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, "TestReceiver"));
+  RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
 
   // Start just one receiver.
   TUniqueId instance_id;
   GetNextInstanceId(&instance_id);
   shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(
-      runtime_state.get(), row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile.get(),
+      runtime_state.get(), row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile,
       false);
 
   // Perform tear down, but keep a reference to the receiver so that it is deleted last

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 74f5495..a7d3c86 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -123,8 +123,8 @@ Status FragmentInstanceState::Prepare() {
 
   // total_time_counter() is in the runtime_state_ so start it up now.
   SCOPED_TIMER(profile()->total_time_counter());
-  timings_profile_ = obj_pool()->Add(
-      new RuntimeProfile(obj_pool(), "Fragment Instance Lifecycle Timings"));
+  timings_profile_ =
+      RuntimeProfile::Create(obj_pool(), "Fragment Instance Lifecycle Timings");
   profile()->AddChild(timings_profile_);
   SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME));
 
@@ -289,12 +289,8 @@ void FragmentInstanceState::Close() {
   // guard against partially-finished Prepare()
   if (sink_ != nullptr) sink_->Close(runtime_state_);
 
-  // disconnect mem_usage_sampled_counter_ from the periodic updater before
-  // RuntimeState::ReleaseResources(), it references the instance memtracker
-  if (mem_usage_sampled_counter_ != nullptr) {
-    PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_);
-    mem_usage_sampled_counter_ = nullptr;
-  }
+  // Stop updating profile counters in background.
+  profile()->StopPeriodicCounters();
 
   // We need to delete row_batch_ here otherwise we can't delete the instance_mem_tracker_
   // in runtime_state_->ReleaseResources().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 4c5eb17..ea24411 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -169,7 +169,7 @@ Status QueryState::InitBufferPoolState() {
 
   // TODO: once there's a mechanism for reporting non-fragment-local profiles,
   // should make sure to report this profile so it's not going into a black hole.
-  RuntimeProfile* dummy_profile = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "dummy"));
+  RuntimeProfile* dummy_profile = RuntimeProfile::Create(&obj_pool_, "dummy");
   // Only create file group if spilling is enabled.
   if (query_options().scratch_limit != 0 && !query_ctx_.disable_spilling) {
     file_group_ = obj_pool_.Add(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 8f48439..0565cf5 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -69,7 +69,8 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
     utc_timestamp_(new TimestampValue(TimestampValue::Parse(
         query_state->query_ctx().utc_timestamp_string))),
     exec_env_(exec_env),
-    profile_(obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)),
+    profile_(RuntimeProfile::Create(
+          obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id))),
     instance_buffer_reservation_(new ReservationTracker) {
   Init();
 }
@@ -83,7 +84,7 @@ RuntimeState::RuntimeState(
     now_(new TimestampValue(TimestampValue::Parse(qctx.now_string))),
     utc_timestamp_(new TimestampValue(TimestampValue::Parse(qctx.utc_timestamp_string))),
     exec_env_(exec_env),
-    profile_(obj_pool(), "<unnamed>") {
+    profile_(RuntimeProfile::Create(obj_pool(), "<unnamed>")) {
   if (query_ctx().request_pool.empty()) {
     const_cast<TQueryCtx&>(query_ctx()).request_pool = "test-pool";
   }
@@ -96,7 +97,7 @@ RuntimeState::~RuntimeState() {
 }
 
 void RuntimeState::Init() {
-  SCOPED_TIMER(profile_.total_time_counter());
+  SCOPED_TIMER(profile_->total_time_counter());
 
   // Register with the thread mgr
   resource_pool_ = exec_env_->thread_mgr()->RegisterPool();
@@ -111,7 +112,7 @@ void RuntimeState::Init() {
       runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker()));
 
   if (instance_buffer_reservation_ != nullptr) {
-    instance_buffer_reservation_->InitChildTracker(&profile_,
+    instance_buffer_reservation_->InitChildTracker(profile_,
         query_state_->buffer_reservation(), instance_mem_tracker_.get(),
         numeric_limits<int64_t>::max());
   }
@@ -127,7 +128,7 @@ Status RuntimeState::CreateCodegen() {
   RETURN_IF_ERROR(LlvmCodeGen::CreateImpalaCodegen(this,
       instance_mem_tracker_.get(), PrintId(fragment_instance_id()), &codegen_));
   codegen_->EnableOptimizations(true);
-  profile_.AddChild(codegen_->runtime_profile());
+  profile_->AddChild(codegen_->runtime_profile());
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 49ea9a6..3da9f8c 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -149,7 +149,7 @@ class RuntimeState {
   PartitionStatusMap* per_partition_status() { return &per_partition_status_; }
 
   /// Returns runtime state profile
-  RuntimeProfile* runtime_profile() { return &profile_; }
+  RuntimeProfile* runtime_profile() { return profile_; }
 
   /// Returns the LlvmCodeGen object for this fragment instance.
   LlvmCodeGen* codegen() { return codegen_.get(); }
@@ -354,7 +354,7 @@ class RuntimeState {
   /// Records summary statistics for the results of inserts into Hdfs partitions.
   PartitionStatusMap per_partition_status_;
 
-  RuntimeProfile profile_;
+  RuntimeProfile* const profile_;
 
   /// Total time waiting in storage (across all threads)
   RuntimeProfile::Counter* total_storage_wait_timer_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 1a5eb58..5f482ba 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -51,7 +51,7 @@ class TmpFileMgrTest : public ::testing::Test {
  public:
   virtual void SetUp() {
     metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
-    profile_ = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "tmp-file-mgr-test"));
+    profile_ = RuntimeProfile::Create(&obj_pool_, "tmp-file-mgr-test");
     test_env_.reset(new TestEnv);
     ASSERT_OK(test_env_->Init());
     cb_counter_ = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 2a5b379..ef6a69d 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -74,9 +74,10 @@ ClientRequestState::ClientRequestState(
     schedule_(NULL),
     coord_(NULL),
     result_cache_max_size_(-1),
-    profile_(&profile_pool_, "Query"), // assign name w/ id after planning
-    server_profile_(&profile_pool_, "ImpalaServer"),
-    summary_profile_(&profile_pool_, "Summary"),
+    // Profile is assigned name w/ id after planning
+    profile_(RuntimeProfile::Create(&profile_pool_, "Query")),
+    server_profile_(RuntimeProfile::Create(&profile_pool_, "ImpalaServer")),
+    summary_profile_(RuntimeProfile::Create(&profile_pool_, "Summary")),
     is_cancelled_(false),
     eos_(false),
     query_state_(beeswax::QueryState::CREATED),
@@ -89,36 +90,36 @@ ClientRequestState::ClientRequestState(
     parent_server_(server),
     start_time_(TimestampValue::LocalTime()) {
 #ifndef NDEBUG
-  profile_.AddInfoString("DEBUG MODE WARNING", "Query profile created while running a "
+  profile_->AddInfoString("DEBUG MODE WARNING", "Query profile created while running a "
       "DEBUG build of Impala. Use RELEASE builds to measure query performance.");
 #endif
-  row_materialization_timer_ = ADD_TIMER(&server_profile_, "RowMaterializationTimer");
-  client_wait_timer_ = ADD_TIMER(&server_profile_, "ClientFetchWaitTimer");
-  query_events_ = summary_profile_.AddEventSequence("Query Timeline");
+  row_materialization_timer_ = ADD_TIMER(server_profile_, "RowMaterializationTimer");
+  client_wait_timer_ = ADD_TIMER(server_profile_, "ClientFetchWaitTimer");
+  query_events_ = summary_profile_->AddEventSequence("Query Timeline");
   query_events_->Start();
-  profile_.AddChild(&summary_profile_);
+  profile_->AddChild(summary_profile_);
 
-  profile_.set_name("Query (id=" + PrintId(query_id()) + ")");
-  summary_profile_.AddInfoString("Session ID", PrintId(session_id()));
-  summary_profile_.AddInfoString("Session Type", PrintTSessionType(session_type()));
+  profile_->set_name("Query (id=" + PrintId(query_id()) + ")");
+  summary_profile_->AddInfoString("Session ID", PrintId(session_id()));
+  summary_profile_->AddInfoString("Session Type", PrintTSessionType(session_type()));
   if (session_type() == TSessionType::HIVESERVER2) {
-    summary_profile_.AddInfoString("HiveServer2 Protocol Version",
+    summary_profile_->AddInfoString("HiveServer2 Protocol Version",
         Substitute("V$0", 1 + session->hs2_version));
   }
-  summary_profile_.AddInfoString("Start Time", start_time().ToString());
-  summary_profile_.AddInfoString("End Time", "");
-  summary_profile_.AddInfoString("Query Type", "N/A");
-  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
-  summary_profile_.AddInfoString("Query Status", "OK");
-  summary_profile_.AddInfoString("Impala Version", GetVersionString(/* compact */ true));
-  summary_profile_.AddInfoString("User", effective_user());
-  summary_profile_.AddInfoString("Connected User", connected_user());
-  summary_profile_.AddInfoString("Delegated User", do_as_user());
-  summary_profile_.AddInfoString("Network Address",
+  summary_profile_->AddInfoString("Start Time", start_time().ToString());
+  summary_profile_->AddInfoString("End Time", "");
+  summary_profile_->AddInfoString("Query Type", "N/A");
+  summary_profile_->AddInfoString("Query State", PrintQueryState(query_state_));
+  summary_profile_->AddInfoString("Query Status", "OK");
+  summary_profile_->AddInfoString("Impala Version", GetVersionString(/* compact */ true));
+  summary_profile_->AddInfoString("User", effective_user());
+  summary_profile_->AddInfoString("Connected User", connected_user());
+  summary_profile_->AddInfoString("Delegated User", do_as_user());
+  summary_profile_->AddInfoString("Network Address",
       lexical_cast<string>(session_->network_address));
-  summary_profile_.AddInfoString("Default Db", default_db());
-  summary_profile_.AddInfoString("Sql Statement", query_ctx_.client_request.stmt);
-  summary_profile_.AddInfoString("Coordinator",
+  summary_profile_->AddInfoString("Default Db", default_db());
+  summary_profile_->AddInfoString("Sql Statement", query_ctx_.client_request.stmt);
+  summary_profile_->AddInfoString("Coordinator",
       TNetworkAddressToString(exec_env->backend_address()));
 }
 
@@ -144,11 +145,11 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) {
   MarkActive();
   exec_request_ = *exec_request;
 
-  profile_.AddChild(&server_profile_);
-  summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type()));
-  summary_profile_.AddInfoString("Query Options (set by configuration)",
+  profile_->AddChild(server_profile_);
+  summary_profile_->AddInfoString("Query Type", PrintTStmtType(stmt_type()));
+  summary_profile_->AddInfoString("Query Options (set by configuration)",
       DebugQueryOptions(query_ctx_.client_request.query_options));
-  summary_profile_.AddInfoString("Query Options (set by configuration and planner)",
+  summary_profile_->AddInfoString("Query Options (set by configuration and planner)",
       DebugQueryOptions(exec_request_.query_options));
 
   switch (exec_request->stmt_type) {
@@ -182,7 +183,7 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) {
       reset_req.reset_metadata_params.__set_table_name(
           exec_request_.load_data_request.table_name);
       catalog_op_executor_.reset(
-          new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
+          new CatalogOpExecutor(exec_env_, frontend_, server_profile_));
       RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req));
       RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
           *catalog_op_executor_->update_catalog_result(),
@@ -298,7 +299,7 @@ Status ClientRequestState::ExecLocalCatalogOp(
         // Verify the user has privileges to perform this operation by checking against
         // the Sentry Service (via the Catalog Server).
         catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
-            &server_profile_));
+            server_profile_));
 
         TSentryAdminCheckRequest req;
         req.__set_header(TCatalogServiceRequestHeader());
@@ -319,7 +320,7 @@ Status ClientRequestState::ExecLocalCatalogOp(
         // Verify the user has privileges to perform this operation by checking against
         // the Sentry Service (via the Catalog Server).
         catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
-            &server_profile_));
+            server_profile_));
 
         TSentryAdminCheckRequest req;
         req.__set_header(TCatalogServiceRequestHeader());
@@ -392,13 +393,13 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
     plan_ss << "\n----------------\n"
             << query_exec_request.query_plan
             << "----------------";
-    summary_profile_.AddInfoString("Plan", plan_ss.str());
+    summary_profile_->AddInfoString("Plan", plan_ss.str());
   }
   // Add info strings consumed by CM: Estimated mem and tables missing stats.
   if (query_exec_request.__isset.per_host_mem_estimate) {
     stringstream ss;
     ss << query_exec_request.per_host_mem_estimate;
-    summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str());
+    summary_profile_->AddInfoString(PER_HOST_MEM_KEY, ss.str());
   }
   if (!query_exec_request.query_ctx.__isset.parent_query_id &&
       query_exec_request.query_ctx.__isset.tables_missing_stats &&
@@ -409,7 +410,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
       if (i != 0) ss << ",";
       ss << tbls[i].db_name << "." << tbls[i].table_name;
     }
-    summary_profile_.AddInfoString(TABLES_MISSING_STATS_KEY, ss.str());
+    summary_profile_->AddInfoString(TABLES_MISSING_STATS_KEY, ss.str());
   }
 
   if (!query_exec_request.query_ctx.__isset.parent_query_id &&
@@ -422,7 +423,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
       if (i != 0) ss << ",";
       ss << tbls[i].db_name << "." << tbls[i].table_name;
     }
-    summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
+    summary_profile_->AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
   }
 
   if (query_exec_request.query_ctx.__isset.tables_missing_diskids &&
@@ -434,7 +435,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
       if (i != 0) ss << ",";
       ss << tbls[i].db_name << "." << tbls[i].table_name;
     }
-    summary_profile_.AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str());
+    summary_profile_->AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str());
   }
 
   {
@@ -442,7 +443,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
     // Don't start executing the query if Cancel() was called concurrently with Exec().
     if (is_cancelled_) return Status::CANCELLED;
     schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
-        exec_request_.query_options, &summary_profile_, query_events_));
+        exec_request_.query_options, summary_profile_, query_events_));
   }
   Status status = exec_env_->scheduler()->Schedule(schedule_.get());
   {
@@ -465,14 +466,14 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
     RETURN_IF_ERROR(UpdateQueryStatus(status));
   }
 
-  profile_.AddChild(coord_->query_profile());
+  profile_->AddChild(coord_->query_profile());
   return Status::OK();
 }
 
 Status ClientRequestState::ExecDdlRequest() {
   string op_type = catalog_op_type() == TCatalogOpType::DDL ?
       PrintTDdlType(ddl_type()) : PrintTCatalogOpType(catalog_op_type());
-  summary_profile_.AddInfoString("DDL Type", op_type);
+  summary_profile_->AddInfoString("DDL Type", op_type);
 
   if (catalog_op_type() != TCatalogOpType::DDL &&
       catalog_op_type() != TCatalogOpType::RESET_METADATA) {
@@ -502,7 +503,7 @@ Status ClientRequestState::ExecDdlRequest() {
   }
 
   catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
-      &server_profile_));
+      server_profile_));
   Status status = catalog_op_executor_->Exec(exec_request_.catalog_op_request);
   {
     lock_guard<mutex> l(lock_);
@@ -564,7 +565,7 @@ void ClientRequestState::Done() {
 
   unique_lock<mutex> l(lock_);
   end_time_ = TimestampValue::LocalTime();
-  summary_profile_.AddInfoString("End Time", end_time().ToString());
+  summary_profile_->AddInfoString("End Time", end_time().ToString());
   query_events_->MarkEvent("Unregister query");
 
   // Update result set cache metrics, and update mem limit accounting before tearing
@@ -586,7 +587,7 @@ void ClientRequestState::Done() {
 Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) {
   TResultSet metadata_op_result;
   // Like the other Exec(), fill out as much profile information as we're able to.
-  summary_profile_.AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL));
+  summary_profile_->AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL));
   RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request,
       &metadata_op_result));
   result_metadata_ = metadata_op_result.schema;
@@ -722,7 +723,7 @@ Status ClientRequestState::UpdateQueryStatus(const Status& status) {
   if (!status.ok() && query_status_.ok()) {
     UpdateQueryState(beeswax::QueryState::EXCEPTION);
     query_status_ = status;
-    summary_profile_.AddInfoString("Query Status", query_status_.GetDetail());
+    summary_profile_->AddInfoString("Query Status", query_status_.GetDetail());
   }
 
   return status;
@@ -898,7 +899,7 @@ Status ClientRequestState::UpdateCatalog() {
   }
 
   query_events_->MarkEvent("DML data written");
-  SCOPED_TIMER(ADD_TIMER(&server_profile_, "MetastoreUpdateTimer"));
+  SCOPED_TIMER(ADD_TIMER(server_profile_, "MetastoreUpdateTimer"));
 
   TQueryExecRequest query_exec_request = exec_request().query_exec_request;
   if (query_exec_request.__isset.finalize_params) {
@@ -1031,7 +1032,7 @@ Status ClientRequestState::UpdateTableAndColumnStats(
   DCHECK_GE(child_queries.size(), 1);
   DCHECK_LE(child_queries.size(), 2);
   catalog_op_executor_.reset(
-      new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
+      new CatalogOpExecutor(exec_env_, frontend_, server_profile_));
 
   // If there was no column stats query, pass in empty thrift structures to
   // ExecComputeStats(). Otherwise pass in the column stats result.
@@ -1078,7 +1079,7 @@ void ClientRequestState::ClearResultCache() {
 void ClientRequestState::UpdateQueryState(
     beeswax::QueryState::type query_state) {
   query_state_ = query_state;
-  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
+  summary_profile_->AddInfoString("Query State", PrintQueryState(query_state_));
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 6846165..1c015c3 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -184,8 +184,8 @@ class ClientRequestState {
   void set_user_profile_access(bool user_has_profile_access) {
     user_has_profile_access_ = user_has_profile_access;
   }
-  const RuntimeProfile& profile() const { return profile_; }
-  const RuntimeProfile& summary_profile() const { return summary_profile_; }
+  const RuntimeProfile* profile() const { return profile_; }
+  const RuntimeProfile* summary_profile() const { return summary_profile_; }
   const TimestampValue& start_time() const { return start_time_; }
   const TimestampValue& end_time() const { return end_time_; }
   const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; }
@@ -211,7 +211,7 @@ class ClientRequestState {
   }
 
   RuntimeProfile::EventSequence* query_events() const { return query_events_; }
-  RuntimeProfile* summary_profile() { return &summary_profile_; }
+  RuntimeProfile* summary_profile() { return summary_profile_; }
 
  private:
   const TQueryCtx query_ctx_;
@@ -299,9 +299,9 @@ class ClientRequestState {
   /// There's a fourth profile which is not built here (but is a
   /// child of profile_); the execution profile which tracks the
   /// actual fragment execution.
-  RuntimeProfile profile_;
-  RuntimeProfile server_profile_;
-  RuntimeProfile summary_profile_;
+  RuntimeProfile* const profile_;
+  RuntimeProfile* const server_profile_;
+  RuntimeProfile* const summary_profile_;
   RuntimeProfile::Counter* row_materialization_timer_;
 
   /// Tracks how long we are idle waiting for a client to fetch rows.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 8ba2894..051cfaf 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -623,9 +623,9 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id,
       RETURN_IF_ERROR(CheckProfileAccess(user, request_state->effective_user(),
           request_state->user_has_profile_access()));
       if (base64_encoded) {
-        RETURN_IF_ERROR(request_state->profile().SerializeToArchiveString(output));
+        RETURN_IF_ERROR(request_state->profile()->SerializeToArchiveString(output));
       } else {
-        request_state->profile().PrettyPrint(output);
+        request_state->profile()->PrettyPrint(output);
       }
       return Status::OK();
     }
@@ -741,7 +741,7 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
 
 void ImpalaServer::ArchiveQuery(const ClientRequestState& query) {
   string encoded_profile_str;
-  Status status = query.profile().SerializeToArchiveString(&encoded_profile_str);
+  Status status = query.profile()->SerializeToArchiveString(&encoded_profile_str);
   if (!status.ok()) {
     // Didn't serialize the string. Continue with empty string.
     LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string "
@@ -1677,7 +1677,7 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque
   id = request_state.query_id();
   const TExecRequest& request = request_state.exec_request();
 
-  const string* plan_str = request_state.summary_profile().GetInfoString("Plan");
+  const string* plan_str = request_state.summary_profile()->GetInfoString("Plan");
   if (plan_str != nullptr) plan = *plan_str;
   stmt = request_state.sql_stmt();
   stmt_type = request.stmt_type;
@@ -1701,11 +1701,11 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque
 
   if (copy_profile) {
     stringstream ss;
-    request_state.profile().PrettyPrint(&ss);
+    request_state.profile()->PrettyPrint(&ss);
     profile_str = ss.str();
     if (encoded_profile.empty()) {
       Status status =
-          request_state.profile().SerializeToArchiveString(&encoded_profile_str);
+          request_state.profile()->SerializeToArchiveString(&encoded_profile_str);
       if (!status.ok()) {
         LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string "
                                    << status.GetDetail();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/dummy-runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/dummy-runtime-profile.h b/be/src/util/dummy-runtime-profile.h
index 83bccbf..1642d4e 100644
--- a/be/src/util/dummy-runtime-profile.h
+++ b/be/src/util/dummy-runtime-profile.h
@@ -28,12 +28,12 @@ namespace impala {
 /// but not always so that the object can still allocate counters in the same way.
 class DummyProfile {
  public:
-  DummyProfile() : pool_(), profile_(&pool_, "dummy", false) {}
-  RuntimeProfile* profile() { return &profile_; }
+  DummyProfile() : pool_(), profile_(RuntimeProfile::Create(&pool_, "dummy", false)) {}
+  RuntimeProfile* profile() { return profile_; }
 
  private:
   ObjectPool pool_;
-  RuntimeProfile profile_;
+  RuntimeProfile* const profile_;
 };
 }
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/periodic-counter-updater.cc
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc
index b1c755a..098e683 100644
--- a/be/src/util/periodic-counter-updater.cc
+++ b/be/src/util/periodic-counter-updater.cc
@@ -93,21 +93,20 @@ void PeriodicCounterUpdater::RegisterBucketingCounters(
 }
 
 void PeriodicCounterUpdater::StopBucketingCounters(
-    vector<RuntimeProfile::Counter*>* buckets, bool convert) {
+    vector<RuntimeProfile::Counter*>* buckets) {
   int64_t num_sampled = 0;
   {
     lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
     BucketCountersMap::iterator itr =
         instance_->bucketing_counters_.find(buckets);
-    if (itr != instance_->bucketing_counters_.end()) {
-      num_sampled = itr->second.num_sampled;
-      instance_->bucketing_counters_.erase(itr);
-    }
+    // If not registered, we have nothing to do.
+    if (itr == instance_->bucketing_counters_.end()) return;
+    num_sampled = itr->second.num_sampled;
+    instance_->bucketing_counters_.erase(itr);
   }
 
-  if (convert && num_sampled > 0) {
-    for (int i = 0; i < buckets->size(); ++i) {
-      RuntimeProfile::Counter* counter = (*buckets)[i];
+  if (num_sampled > 0) {
+    for (RuntimeProfile::Counter* counter : *buckets) {
       double perc = 100 * counter->value() / (double)num_sampled;
       counter->Set(perc);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/periodic-counter-updater.h
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h
index c603522..762f372 100644
--- a/be/src/util/periodic-counter-updater.h
+++ b/be/src/util/periodic-counter-updater.h
@@ -70,12 +70,11 @@ class PeriodicCounterUpdater {
   /// Stops updating the value of 'counter'.
   static void StopSamplingCounter(RuntimeProfile::Counter* counter);
 
-  /// Stops updating the bucket counter.
-  /// If convert is true, convert the buckets from count to percentage.
-  /// Sampling counters are updated periodically so should be removed as soon as the
+  /// If the bucketing counters 'buckets' are registered, stops updating the counters and
+  /// convert the buckets from count to percentage. If not registered, has no effect.
+  /// Perioidic counters are updated periodically so should be removed as soon as the
   /// underlying counter is no longer going to change.
-  static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets,
-      bool convert);
+  static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets);
 
   /// Stops 'counter' from receiving any more samples.
   static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter* counter);