You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/20 20:20:11 UTC

[1/5] incubator-impala git commit: IMPALA-5941: Fix Metastore schema creation in create-test-configuration.sh

Repository: incubator-impala
Updated Branches:
  refs/heads/master 8ad6d0331 -> fc275fab6


IMPALA-5941: Fix Metastore schema creation in create-test-configuration.sh

The Hive Metastore schema script includes other SQL
scripts using \i, which expects absolute paths. Since
we currently invoke it from outside the schema script
directory, it is unable to find those included scripts.

The fix is to switch to the Hive Metastore script
directory when invoking the schema script.

Change-Id: Ic312df4597c7d211d4ecd551d572f751aea0cd24
Reviewed-on: http://gerrit.cloudera.org:8080/8081
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: David Knupp <dk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/eecbbcb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/eecbbcb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/eecbbcb7

Branch: refs/heads/master
Commit: eecbbcb7c7f392c417d4b65117435faf6ef32820
Parents: 8ad6d03
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Fri Sep 15 09:30:24 2017 -0700
Committer: David Knupp <dk...@cloudera.com>
Committed: Tue Sep 19 22:59:04 2017 +0000

----------------------------------------------------------------------
 bin/create-test-configuration.sh | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eecbbcb7/bin/create-test-configuration.sh
----------------------------------------------------------------------
diff --git a/bin/create-test-configuration.sh b/bin/create-test-configuration.sh
index a59d6f7..2f981aa 100755
--- a/bin/create-test-configuration.sh
+++ b/bin/create-test-configuration.sh
@@ -92,8 +92,11 @@ if [ $CREATE_METASTORE -eq 1 ]; then
   dropdb -U hiveuser ${METASTORE_DB} 2> /dev/null || true
   createdb -U hiveuser ${METASTORE_DB}
 
-  psql -q -U hiveuser -d ${METASTORE_DB} \
-       -f ${HIVE_HOME}/scripts/metastore/upgrade/postgres/hive-schema-1.1.0.postgres.sql
+  # Hive schema SQL scripts include other scripts using \i, which expects absolute paths.
+  # Switch to the scripts directory to make this work.
+  pushd ${HIVE_HOME}/scripts/metastore/upgrade/postgres
+  psql -q -U hiveuser -d ${METASTORE_DB} -f hive-schema-1.1.0.postgres.sql
+  popd
   # Increase the size limit of PARAM_VALUE from SERDE_PARAMS table to be able to create
   # HBase tables with large number of columns.
   echo "alter table \"SERDE_PARAMS\" alter column \"PARAM_VALUE\" type character varying" \


[4/5] incubator-impala git commit: IMPALA-5895: clean up runtime profile lifecycle

Posted by ta...@apache.org.
IMPALA-5895: clean up runtime profile lifecycle

Require callers to explicitly stop counter updating instead of doing it
in the destructor. This replaces ad-hoc logic to stop individual
counters.

Track which counters need to be stopped in separate lists instead of
stopping everything.

Force all RuntimeProfiles to use ObjectPools in a uniform way - the
profile, its counters and its children all are allocated from the
same pool. This is done via a new Create() method.

Consolidate 'time_series_counter_map_lock_' and 'counter_map_lock_'
to reduce complexity of the locking scheme. I didn't see any benefit
to sharding the locks in this way - there are only two time series
counters per fragment instance, which a small fraction of the
total number of counters.

Change-Id: I45c39ac36c8e3c277213d32f5ae5f14be6b7f0df
Reviewed-on: http://gerrit.cloudera.org:8080/8069
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7866eec5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7866eec5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7866eec5

Branch: refs/heads/master
Commit: 7866eec5bdcbf9194a4aad2c87c354cbaad7b802
Parents: 741b052
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Sep 5 15:51:49 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 20 08:48:38 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/hash-benchmark.cc             |   8 +-
 be/src/codegen/llvm-codegen.cc                  |  32 +--
 be/src/codegen/llvm-codegen.h                   |   4 +-
 be/src/exec/data-sink.cc                        |   2 +-
 be/src/exec/data-source-scan-node.cc            |   4 +-
 be/src/exec/exec-node.cc                        |  15 +-
 be/src/exec/exec-node.h                         |   7 +-
 be/src/exec/hash-table-test.cc                  |   2 +-
 be/src/exec/hbase-scan-node.cc                  |   2 +-
 be/src/exec/hdfs-scan-node-base.cc              |  26 +--
 be/src/exec/hdfs-scan-node-base.h               |  11 +
 be/src/exec/kudu-scan-node-base.cc              |  15 --
 be/src/exec/kudu-scan-node-base.h               |  13 --
 be/src/exec/kudu-scan-node-mt.cc                |   4 +-
 be/src/exec/scan-node.cc                        |   8 +
 be/src/exec/scan-node.h                         |  18 +-
 be/src/experiments/data-provider-test.cc        |   6 +-
 be/src/experiments/tuple-splitter-test.cc       |   8 +-
 be/src/runtime/buffered-tuple-stream-test.cc    |   2 +-
 .../runtime/bufferpool/buffer-allocator-test.cc |   2 +-
 be/src/runtime/bufferpool/buffer-pool-test.cc   |   2 +-
 .../bufferpool/reservation-tracker-test.cc      |   2 +-
 be/src/runtime/bufferpool/suballocator-test.cc  |   8 +-
 be/src/runtime/coordinator-backend-state.cc     |   8 +-
 be/src/runtime/coordinator.cc                   |   4 +-
 be/src/runtime/coordinator.h                    |   6 +-
 be/src/runtime/data-stream-recvr.cc             |  32 ++-
 be/src/runtime/data-stream-recvr.h              |   7 +-
 be/src/runtime/data-stream-test.cc              |   7 +-
 be/src/runtime/fragment-instance-state.cc       |  12 +-
 be/src/runtime/query-state.cc                   |   2 +-
 be/src/runtime/runtime-state.cc                 |  11 +-
 be/src/runtime/runtime-state.h                  |   4 +-
 be/src/runtime/tmp-file-mgr-test.cc             |   2 +-
 be/src/service/client-request-state.cc          |  95 ++++----
 be/src/service/client-request-state.h           |  12 +-
 be/src/service/impala-server.cc                 |  12 +-
 be/src/util/dummy-runtime-profile.h             |   6 +-
 be/src/util/periodic-counter-updater.cc         |  15 +-
 be/src/util/periodic-counter-updater.h          |   9 +-
 be/src/util/runtime-profile-test.cc             | 220 +++++++++----------
 be/src/util/runtime-profile.cc                  | 131 +++++++----
 be/src/util/runtime-profile.h                   |  94 +++++---
 43 files changed, 460 insertions(+), 430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/benchmarks/hash-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/hash-benchmark.cc b/be/src/benchmarks/hash-benchmark.cc
index dadad25..b183915 100644
--- a/be/src/benchmarks/hash-benchmark.cc
+++ b/be/src/benchmarks/hash-benchmark.cc
@@ -440,10 +440,10 @@ int main(int argc, char **argv) {
 
   MemTracker tracker;
   MemPool mem_pool(&tracker);
-  RuntimeProfile int_profile(state->obj_pool(), "IntGen");
-  RuntimeProfile mixed_profile(state->obj_pool(), "MixedGen");
-  DataProvider int_provider(&mem_pool, &int_profile);
-  DataProvider mixed_provider(&mem_pool, &mixed_profile);
+  RuntimeProfile* int_profile = RuntimeProfile::Create(state->obj_pool(), "IntGen");
+  RuntimeProfile* mixed_profile = RuntimeProfile::Create(state->obj_pool(), "MixedGen");
+  DataProvider int_provider(&mem_pool, int_profile);
+  DataProvider mixed_provider(&mem_pool, mixed_profile);
 
   scoped_ptr<LlvmCodeGen> codegen;
   status = LlvmCodeGen::CreateImpalaCodegen(state, NULL, "test", &codegen);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 6f8b156..5503969 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -170,8 +170,8 @@ LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* pool,
     MemTracker* parent_mem_tracker, const string& id)
   : state_(state),
     id_(id),
-    profile_(pool, "CodeGen"),
-    mem_tracker_(pool->Add(new MemTracker(&profile_, -1, "CodeGen", parent_mem_tracker))),
+    profile_(RuntimeProfile::Create(pool, "CodeGen")),
+    mem_tracker_(pool->Add(new MemTracker(profile_, -1, "CodeGen", parent_mem_tracker))),
     optimizations_enabled_(false),
     is_corrupt_(false),
     is_compiled_(false),
@@ -181,21 +181,21 @@ LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* pool,
     loaded_functions_(IRFunction::FN_END, NULL) {
   DCHECK(llvm_initialized_) << "Must call LlvmCodeGen::InitializeLlvm first.";
 
-  load_module_timer_ = ADD_TIMER(&profile_, "LoadTime");
-  prepare_module_timer_ = ADD_TIMER(&profile_, "PrepareTime");
-  module_bitcode_size_ = ADD_COUNTER(&profile_, "ModuleBitcodeSize", TUnit::BYTES);
-  codegen_timer_ = ADD_TIMER(&profile_, "CodegenTime");
-  optimization_timer_ = ADD_TIMER(&profile_, "OptimizationTime");
-  compile_timer_ = ADD_TIMER(&profile_, "CompileTime");
-  num_functions_ = ADD_COUNTER(&profile_, "NumFunctions", TUnit::UNIT);
-  num_instructions_ = ADD_COUNTER(&profile_, "NumInstructions", TUnit::UNIT);
+  load_module_timer_ = ADD_TIMER(profile_, "LoadTime");
+  prepare_module_timer_ = ADD_TIMER(profile_, "PrepareTime");
+  module_bitcode_size_ = ADD_COUNTER(profile_, "ModuleBitcodeSize", TUnit::BYTES);
+  codegen_timer_ = ADD_TIMER(profile_, "CodegenTime");
+  optimization_timer_ = ADD_TIMER(profile_, "OptimizationTime");
+  compile_timer_ = ADD_TIMER(profile_, "CompileTime");
+  num_functions_ = ADD_COUNTER(profile_, "NumFunctions", TUnit::UNIT);
+  num_instructions_ = ADD_COUNTER(profile_, "NumInstructions", TUnit::UNIT);
 }
 
 Status LlvmCodeGen::CreateFromFile(RuntimeState* state, ObjectPool* pool,
     MemTracker* parent_mem_tracker, const string& file, const string& id,
     scoped_ptr<LlvmCodeGen>* codegen) {
   codegen->reset(new LlvmCodeGen(state, pool, parent_mem_tracker, id));
-  SCOPED_TIMER((*codegen)->profile_.total_time_counter());
+  SCOPED_TIMER((*codegen)->profile_->total_time_counter());
 
   unique_ptr<Module> loaded_module;
   RETURN_IF_ERROR((*codegen)->LoadModuleFromFile(file, &loaded_module));
@@ -206,7 +206,7 @@ Status LlvmCodeGen::CreateFromFile(RuntimeState* state, ObjectPool* pool,
 Status LlvmCodeGen::CreateFromMemory(RuntimeState* state, ObjectPool* pool,
     MemTracker* parent_mem_tracker, const string& id, scoped_ptr<LlvmCodeGen>* codegen) {
   codegen->reset(new LlvmCodeGen(state, pool, parent_mem_tracker, id));
-  SCOPED_TIMER((*codegen)->profile_.total_time_counter());
+  SCOPED_TIMER((*codegen)->profile_->total_time_counter());
 
   // Select the appropriate IR version. We cannot use LLVM IR with SSE4.2 instructions on
   // a machine without SSE4.2 support.
@@ -276,7 +276,7 @@ Status LlvmCodeGen::LoadModuleFromMemory(unique_ptr<MemoryBuffer> module_ir_buf,
 Status LlvmCodeGen::LinkModule(const string& file) {
   if (linked_modules_.find(file) != linked_modules_.end()) return Status::OK();
 
-  SCOPED_TIMER(profile_.total_time_counter());
+  SCOPED_TIMER(profile_->total_time_counter());
   unique_ptr<Module> new_module;
   RETURN_IF_ERROR(LoadModuleFromFile(file, &new_module));
 
@@ -324,7 +324,7 @@ Status LlvmCodeGen::CreateImpalaCodegen(RuntimeState* state,
   LlvmCodeGen* codegen = codegen_ret->get();
 
   // Parse module for cross compiled functions and types
-  SCOPED_TIMER(codegen->profile_.total_time_counter());
+  SCOPED_TIMER(codegen->profile_->total_time_counter());
   SCOPED_TIMER(codegen->prepare_module_timer_);
 
   // Get type for StringValue
@@ -620,7 +620,7 @@ Status LlvmCodeGen::MaterializeFunctionHelper(Function *fn) {
 }
 
 Status LlvmCodeGen::MaterializeFunction(Function *fn) {
-  SCOPED_TIMER(profile_.total_time_counter());
+  SCOPED_TIMER(profile_->total_time_counter());
   SCOPED_TIMER(prepare_module_timer_);
   return MaterializeFunctionHelper(fn);
 }
@@ -1037,7 +1037,7 @@ Status LlvmCodeGen::FinalizeModule() {
   }
 
   if (is_corrupt_) return Status("Module is corrupt.");
-  SCOPED_TIMER(profile_.total_time_counter());
+  SCOPED_TIMER(profile_->total_time_counter());
 
   // Don't waste time optimizing module if there are no functions to JIT. This can happen
   // if the codegen object is created but no functions are successfully codegen'd.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/codegen/llvm-codegen.h
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index 8dce330..8aa9f2b 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -159,7 +159,7 @@ class LlvmCodeGen {
   /// any other API methods after calling close.
   void Close();
 
-  RuntimeProfile* runtime_profile() { return &profile_; }
+  RuntimeProfile* runtime_profile() { return profile_; }
   RuntimeProfile::Counter* codegen_timer() { return codegen_timer_; }
 
   /// Turns on/off optimization passes
@@ -669,7 +669,7 @@ class LlvmCodeGen {
   std::string id_;
 
   /// Codegen counters
-  RuntimeProfile profile_;
+  RuntimeProfile* const profile_;
 
   /// MemTracker used for tracking memory consumed by codegen. Connected to a parent
   /// MemTracker if one was provided during initialization. Owned by the ObjectPool

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index fe23694..c173ed5 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -175,7 +175,7 @@ string DataSink::OutputDmlStats(const PartitionStatusMap& stats,
 
 Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   DCHECK(parent_mem_tracker != NULL);
-  profile_ = state->obj_pool()->Add(new RuntimeProfile(state->obj_pool(), GetName()));
+  profile_ = RuntimeProfile::Create(state->obj_pool(), GetName());
   const string& name = GetName();
   mem_tracker_.reset(new MemTracker(profile_, -1, name, parent_mem_tracker));
   expr_mem_tracker_.reset(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index 01e0dbe..78ba492 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -365,15 +365,13 @@ Status DataSourceScanNode::Reset(RuntimeState* state) {
 void DataSourceScanNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   SCOPED_TIMER(runtime_profile_->total_time_counter());
-  PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
-  PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
   input_batch_.reset();
   TCloseParams params;
   params.__set_scan_handle(scan_handle_);
   TCloseResult result;
   Status status = data_source_executor_->Close(params, &result);
   if (!status.ok()) state->LogError(status.msg());
-  ExecNode::Close(state);
+  ScanNode::Close(state);
 }
 
 void DataSourceScanNode::DebugString(int indentation_level, stringstream* out) const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 94e9ed1..b656d2b 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -130,12 +130,14 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
     debug_action_(TDebugAction::WAIT),
     limit_(tnode.limit),
     num_rows_returned_(0),
+    runtime_profile_(RuntimeProfile::Create(pool_,
+        Substitute("$0 (id=$1)", PrintPlanNodeType(tnode.node_type), id_))),
     rows_returned_counter_(NULL),
     rows_returned_rate_(NULL),
     containing_subplan_(NULL),
     disable_codegen_(tnode.disable_codegen),
     is_closed_(false) {
-  InitRuntimeProfile(PrintPlanNodeType(tnode.node_type));
+  runtime_profile_->set_metadata(id_);
 }
 
 ExecNode::~ExecNode() {
@@ -149,8 +151,8 @@ Status ExecNode::Init(const TPlanNode& tnode, RuntimeState* state) {
 
 Status ExecNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::PREPARE, state));
-  DCHECK(runtime_profile_.get() != NULL);
-  mem_tracker_.reset(new MemTracker(runtime_profile_.get(), -1, runtime_profile_->name(),
+  DCHECK(runtime_profile_ != NULL);
+  mem_tracker_.reset(new MemTracker(runtime_profile_, -1, runtime_profile_->name(),
       state->instance_mem_tracker()));
   expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false));
   expr_mem_pool_.reset(new MemPool(expr_mem_tracker_.get()));
@@ -462,13 +464,6 @@ void ExecNode::CollectScanNodes(vector<ExecNode*>* nodes) {
   CollectNodes(TPlanNodeType::KUDU_SCAN_NODE, nodes);
 }
 
-void ExecNode::InitRuntimeProfile(const string& name) {
-  stringstream ss;
-  ss << name << " (id=" << id_ << ")";
-  runtime_profile_.reset(new RuntimeProfile(pool_, ss.str()));
-  runtime_profile_->set_metadata(id_);
-}
-
 Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* state) {
   DCHECK_EQ(debug_phase_, phase);
   if (debug_action_ == TDebugAction::FAIL) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 2f3f714..7cba6ac 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -208,7 +208,7 @@ class ExecNode {
   int64_t limit() const { return limit_; }
   bool ReachedLimit() { return limit_ != -1 && num_rows_returned_ >= limit_; }
 
-  RuntimeProfile* runtime_profile() { return runtime_profile_.get(); }
+  RuntimeProfile* runtime_profile() { return runtime_profile_; }
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
   MemTracker* expr_mem_tracker() { return expr_mem_tracker_.get(); }
   MemPool* expr_mem_pool() { return expr_mem_pool_.get(); }
@@ -310,7 +310,8 @@ class ExecNode {
   int64_t limit_;  // -1: no limit
   int64_t num_rows_returned_;
 
-  boost::scoped_ptr<RuntimeProfile> runtime_profile_;
+  /// Runtime profile for this node. Owned by the QueryState's ObjectPool.
+  RuntimeProfile* const runtime_profile_;
   RuntimeProfile::Counter* rows_returned_counter_;
   RuntimeProfile::Counter* rows_returned_rate_;
 
@@ -354,8 +355,6 @@ class ExecNode {
 
   virtual bool IsScanNode() const { return false; }
 
-  void InitRuntimeProfile(const std::string& name);
-
   /// Executes 'debug_action_' if 'phase' matches 'debug_phase_'.
   /// 'phase' must not be INVALID.
   Status ExecDebugAction(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 816e6cf..0b99cbd 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -233,7 +233,7 @@ class HashTableTest : public testing::Test {
       int64_t block_size = 8 * 1024 * 1024, int max_num_blocks = 100,
       int initial_reserved_blocks = 10, int64_t suballocator_buffer_len = 64 * 1024) {
     BufferPool* buffer_pool = test_env_->exec_env()->buffer_pool();
-    RuntimeProfile* profile = pool_.Add(new RuntimeProfile(&pool_, "ht"));
+    RuntimeProfile* profile = RuntimeProfile::Create(&pool_, "ht");
 
     // Set up memory tracking for the hash table.
     MemTracker* client_tracker =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hbase-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index e783731..a74d4b3 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -279,7 +279,7 @@ void HBaseScanNode::Close(RuntimeState* state) {
     JNIEnv* env = getJNIEnv();
     hbase_scanner_->Close(env);
   }
-  ExecNode::Close(state);
+  ScanNode::Close(state);
 }
 
 void HBaseScanNode::DebugString(int indentation_level, stringstream* out) const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index ca71201..b5169a8 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -97,7 +97,8 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
       thrift_dict_filter_conjuncts_map_(
           tnode.hdfs_scan_node.__isset.dictionary_filter_conjuncts ?
           &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr),
-      disks_accessed_bitmap_(TUnit::UNIT, 0) {
+      disks_accessed_bitmap_(TUnit::UNIT, 0),
+      active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
 }
 
 HdfsScanNodeBase::~HdfsScanNodeBase() {
@@ -142,8 +143,8 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
     filter_ctx.filter = state->filter_bank()->RegisterFilter(filter_desc, false);
     string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id,
         PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
-    RuntimeProfile* profile = state->obj_pool()->Add(
-        new RuntimeProfile(state->obj_pool(), filter_profile_title));
+    RuntimeProfile* profile =
+        RuntimeProfile::Create(state->obj_pool(), filter_profile_title);
     runtime_profile_->AddChild(profile);
     filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile,
         target.is_bound_by_partition_columns));
@@ -441,12 +442,8 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   max_compressed_text_file_length_ = runtime_profile()->AddHighWaterMarkCounter(
       "MaxCompressedTextFileLength", TUnit::BYTES);
 
-  for (int i = 0; i < state->io_mgr()->num_total_disks() + 1; ++i) {
-    hdfs_read_thread_concurrency_bucket_.push_back(
-        pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
-  }
-  runtime_profile()->RegisterBucketingCounters(&active_hdfs_read_thread_counter_,
-      &hdfs_read_thread_concurrency_bucket_);
+  hdfs_read_thread_concurrency_bucket_ = runtime_profile()->AddBucketingCounters(
+      &active_hdfs_read_thread_counter_, state->io_mgr()->num_total_disks() + 1);
 
   counters_running_ = true;
 
@@ -851,18 +848,13 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
   if (!counters_running_) return;
   counters_running_ = false;
 
-  PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
-  PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
-  PeriodicCounterUpdater::StopSamplingCounter(average_scanner_thread_concurrency_);
-  PeriodicCounterUpdater::StopSamplingCounter(average_hdfs_read_thread_concurrency_);
-  PeriodicCounterUpdater::StopBucketingCounters(&hdfs_read_thread_concurrency_bucket_,
-      true);
+  runtime_profile_->StopPeriodicCounters();
 
   // Output hdfs read thread concurrency into info string
   stringstream ss;
-  for (int i = 0; i < hdfs_read_thread_concurrency_bucket_.size(); ++i) {
+  for (int i = 0; i < hdfs_read_thread_concurrency_bucket_->size(); ++i) {
     ss << i << ":" << setprecision(4)
-       << hdfs_read_thread_concurrency_bucket_[i]->double_value() << "% ";
+       << (*hdfs_read_thread_concurrency_bucket_)[i]->double_value() << "% ";
   }
   runtime_profile_->AddInfoString("Hdfs Read Thread Concurrency Bucket", ss.str());
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index e1c431f..e33de5a 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -469,6 +469,17 @@ class HdfsScanNodeBase : public ScanNode {
   /// Total number of file handle opens where the file handle was not in the cache
   RuntimeProfile::Counter* cached_file_handles_miss_count_ = nullptr;
 
+  /// The number of active hdfs reading threads reading for this node.
+  RuntimeProfile::Counter active_hdfs_read_thread_counter_;
+
+  /// Average number of active hdfs reading threads
+  /// This should be created in Open() and stopped when all the scanner threads are done.
+  RuntimeProfile::Counter* average_hdfs_read_thread_concurrency_ = nullptr;
+
+  /// HDFS read thread concurrency bucket: bucket[i] refers to the number of sample
+  /// taken where there are i concurrent hdfs read thread running. Created in Open().
+  std::vector<RuntimeProfile::Counter*>* hdfs_read_thread_concurrency_bucket_ = nullptr;
+
   /// Pool for allocating some amounts of memory that is shared between scanners.
   /// e.g. partition key tuple and their string buffers
   boost::scoped_ptr<MemPool> scan_node_pool_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/kudu-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc
index d587660..526374c 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -52,7 +52,6 @@ KuduScanNodeBase::KuduScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
     : ScanNode(pool, tnode, descs),
       tuple_id_(tnode.kudu_scan_node.tuple_id),
       client_(nullptr),
-      counters_running_(false),
       next_scan_token_idx_(0) {
   DCHECK(KuduIsAvailable());
 }
@@ -69,7 +68,6 @@ Status KuduScanNodeBase::Prepare(RuntimeState* state) {
       ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT);
   kudu_round_trips_ = ADD_COUNTER(runtime_profile(), KUDU_ROUND_TRIPS, TUnit::UNIT);
   kudu_remote_tokens_ = ADD_COUNTER(runtime_profile(), KUDU_REMOTE_TOKENS, TUnit::UNIT);
-  counters_running_ = true;
 
   DCHECK(state->desc_tbl().GetTupleDescriptor(tuple_id_) != NULL);
   tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
@@ -108,12 +106,6 @@ Status KuduScanNodeBase::Open(RuntimeState* state) {
   return Status::OK();
 }
 
-void KuduScanNodeBase::Close(RuntimeState* state) {
-  if (is_closed()) return;
-  StopAndFinalizeCounters();
-  ExecNode::Close(state);
-}
-
 void KuduScanNodeBase::DebugString(int indentation_level, stringstream* out) const {
   string indent(indentation_level * 2, ' ');
   *out << indent << "KuduScanNode(tupleid=" << tuple_id_ << ")";
@@ -129,12 +121,5 @@ const string* KuduScanNodeBase::GetNextScanToken() {
   return token;
 }
 
-void KuduScanNodeBase::StopAndFinalizeCounters() {
-  if (!counters_running_) return;
-  counters_running_ = false;
-
-  PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
-  PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
-}
 
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/kudu-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h
index 49af13c..08289e1 100644
--- a/be/src/exec/kudu-scan-node-base.h
+++ b/be/src/exec/kudu-scan-node-base.h
@@ -42,7 +42,6 @@ class KuduScanNodeBase : public ScanNode {
   virtual Status Prepare(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) = 0;
-  virtual void Close(RuntimeState* state);
 
  protected:
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
@@ -59,14 +58,6 @@ class KuduScanNodeBase : public ScanNode {
 
   RuntimeState* runtime_state_;
 
-  /// Stops periodic counters and aggregates counter values for the entire scan node.
-  /// This should be called as soon as the scan node is complete to get the most accurate
-  /// counter values.
-  /// This can be called multiple times, subsequent calls will be ignored.
-  /// This must be called on Close() to unregister counters.
-  /// Scan nodes with a RowBatch queue may have to synchronize calls to this function.
-  void StopAndFinalizeCounters();
-
  private:
   friend class KuduScanner;
 
@@ -83,10 +74,6 @@ class KuduScanNodeBase : public ScanNode {
   /// Kudu table reference. Shared between scanner threads for KuduScanNode.
   kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
 
-  /// If true, counters are actively running and need to be reported in the runtime
-  /// profile.
-  bool counters_running_;
-
   /// Set of scan tokens to be deserialized into Kudu scanners.
   std::vector<std::string> scan_tokens_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/kudu-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-mt.cc b/be/src/exec/kudu-scan-node-mt.cc
index 8723daa..2cb7619 100644
--- a/be/src/exec/kudu-scan-node-mt.cc
+++ b/be/src/exec/kudu-scan-node-mt.cc
@@ -62,7 +62,7 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
   if (scan_token_ == nullptr) {
     scan_token_ = GetNextScanToken();
     if (scan_token_ == nullptr) {
-      StopAndFinalizeCounters();
+      runtime_profile_->StopPeriodicCounters();
       scanner_->Close();
       scanner_.reset();
       *eos = true;
@@ -85,7 +85,7 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
     row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
     num_rows_returned_ -= num_rows_over;
     scan_token_ = nullptr;
-    StopAndFinalizeCounters();
+    runtime_profile_->StopPeriodicCounters();
     scanner_->Close();
     scanner_.reset();
     *eos = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 0df0c3f..d09bb6b 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -68,4 +68,12 @@ Status ScanNode::Prepare(RuntimeState* state) {
   return Status::OK();
 }
 
+void ScanNode::Close(RuntimeState* state) {
+  if (is_closed()) return;
+  // ScanNode::Prepare() started periodic counters including 'total_throughput_counter_'
+  // and 'bytes_read_timeseries_counter_'. Subclasses may also have started counters.
+  runtime_profile_->StopPeriodicCounters();
+  ExecNode::Close(state);
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 0f73c2b..4b11361 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -82,11 +82,14 @@ class ScanNode : public ExecNode {
   ScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
     : ExecNode(pool, tnode, descs),
       scan_range_params_(NULL),
-      active_scanner_thread_counter_(TUnit::UNIT, 0),
-      active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {}
+      active_scanner_thread_counter_(TUnit::UNIT, 0) {}
 
   virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
 
+  /// Stops all periodic counters and calls ExecNode::Close(). Subclasses of ScanNode can
+  /// start periodic counters and rely on this function stopping them.
+  virtual void Close(RuntimeState* state);
+
   /// This should be called before Prepare(), and the argument must be not destroyed until
   /// after Prepare().
   void SetScanRanges(const std::vector<TScanRangeParams>& scan_range_params) {
@@ -172,18 +175,7 @@ class ScanNode : public ExecNode {
   /// This should be created in Open and stopped when all the scanner threads are done.
   RuntimeProfile::Counter* average_scanner_thread_concurrency_;
 
-  /// The number of active hdfs reading threads reading for this node.
-  RuntimeProfile::Counter active_hdfs_read_thread_counter_;
-
-  /// Average number of active hdfs reading threads
-  /// This should be created in Open and stopped when all the scanner threads are done.
-  RuntimeProfile::Counter* average_hdfs_read_thread_concurrency_;
-
   RuntimeProfile::Counter* num_scanner_threads_started_counter_;
-
-  /// HDFS read thread concurrency bucket: bucket[i] refers to the number of sample
-  /// taken where there are i concurrent hdfs read thread running
-  std::vector<RuntimeProfile::Counter*> hdfs_read_thread_concurrency_bucket_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/experiments/data-provider-test.cc
----------------------------------------------------------------------
diff --git a/be/src/experiments/data-provider-test.cc b/be/src/experiments/data-provider-test.cc
index 9f9e0cb..3c5a92f 100644
--- a/be/src/experiments/data-provider-test.cc
+++ b/be/src/experiments/data-provider-test.cc
@@ -55,11 +55,11 @@ int main(int argc, char **argv) {
   cols.push_back(DataProvider::ColDesc::Create<StringValue>(min_str, max_str));
 
   ObjectPool obj_pool;
-  RuntimeProfile profile(&obj_pool, "DataGenTest");
+  RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool, "DataGenTest");
 
   MemTracker tracker;
   MemPool pool(&tracker);
-  DataProvider provider(&pool, &profile);
+  DataProvider provider(&pool, profile);
   provider.Reset(20, 2, cols);
   int rows;
   void* data;
@@ -70,7 +70,7 @@ int main(int argc, char **argv) {
     provider.Print(&cout, reinterpret_cast<char*>(data), rows);
   }
 
-  profile.PrettyPrint(&cout);
+  profile->PrettyPrint(&cout);
 
   cout << endl << "Done." << endl;
   return 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/experiments/tuple-splitter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/experiments/tuple-splitter-test.cc b/be/src/experiments/tuple-splitter-test.cc
index 87a52bd..7d68f8e 100644
--- a/be/src/experiments/tuple-splitter-test.cc
+++ b/be/src/experiments/tuple-splitter-test.cc
@@ -380,14 +380,14 @@ int main(int argc, char **argv) {
   MemTracker tracker;
   MemPool pool(&tracker);
   ObjectPool obj_pool;
-  RuntimeProfile profile(&obj_pool, "PartitioningTest");
+  RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool, "PartitioningTest");
 
-  DataProvider provider(&pool, &profile);
+  DataProvider provider(&pool, profile);
   provider.Reset(50*1024*1024, 1024, cols);
   //provider.Reset(100*1024, 1024, cols);
   //provider.Reset(100, 1024, cols);
 
-  DataPartitioner partitioner(&pool, &profile, provider.row_size(), 0);
+  DataPartitioner partitioner(&pool, profile, provider.row_size(), 0);
 
   cout << "Begin processing: " << provider.total_rows() << endl;
   int rows;
@@ -437,7 +437,7 @@ int main(int argc, char **argv) {
   cout << "Largest Partition: " << largest_partition << endl;;
 
   cout << endl;
-  profile.PrettyPrint(&cout);
+  profile->PrettyPrint(&cout);
 
   LOG(ERROR) << "Done.";
   return 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 0b89498..08ce7c3 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -132,7 +132,7 @@ class SimpleTupleStreamTest : public testing::Test {
     ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_));
     query_state_ = runtime_state_->query_state();
 
-    RuntimeProfile* client_profile = pool_.Add(new RuntimeProfile(&pool_, "client"));
+    RuntimeProfile* client_profile = RuntimeProfile::Create(&pool_, "client");
     MemTracker* client_tracker =
         pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
     ASSERT_OK(test_env_->exec_env()->buffer_pool()->RegisterClient("",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/buffer-allocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator-test.cc b/be/src/runtime/bufferpool/buffer-allocator-test.cc
index 6427648..21a9c08 100644
--- a/be/src/runtime/bufferpool/buffer-allocator-test.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator-test.cc
@@ -42,7 +42,7 @@ class BufferAllocatorTest : public ::testing::Test {
     dummy_pool_ = obj_pool_.Add(new BufferPool(1, 0, 0));
     dummy_reservation_.InitRootTracker(nullptr, 0);
     ASSERT_OK(dummy_pool_->RegisterClient("", nullptr, &dummy_reservation_, nullptr, 0,
-        obj_pool_.Add(new RuntimeProfile(&obj_pool_, "")), &dummy_client_));
+        RuntimeProfile::Create(&obj_pool_, ""), &dummy_client_));
   }
 
   virtual void TearDown() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 06ff827..720dc13 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -139,7 +139,7 @@ class BufferPoolTest : public ::testing::Test {
   }
 
   RuntimeProfile* NewProfile() {
-    return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
+    return RuntimeProfile::Create(&obj_pool_, "test profile");
   }
 
   /// Create a new file group with the default configs.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index 0d57488..3fc0e0b 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -44,7 +44,7 @@ class ReservationTrackerTest : public ::testing::Test {
 
  protected:
   RuntimeProfile* NewProfile() {
-    return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
+    return RuntimeProfile::Create(&obj_pool_, "test profile");
   }
 
   ObjectPool obj_pool_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/suballocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator-test.cc b/be/src/runtime/bufferpool/suballocator-test.cc
index 32acfaf..6cd53fb 100644
--- a/be/src/runtime/bufferpool/suballocator-test.cc
+++ b/be/src/runtime/bufferpool/suballocator-test.cc
@@ -45,7 +45,7 @@ class SuballocatorTest : public ::testing::Test {
  public:
   virtual void SetUp() override {
     RandTestUtil::SeedRng("SUBALLOCATOR_TEST_SEED", &rng_);
-    profile_.reset(new RuntimeProfile(&obj_pool_, "test profile"));
+    profile_ = RuntimeProfile::Create(&obj_pool_, "test profile");
   }
 
   virtual void TearDown() override {
@@ -55,7 +55,6 @@ class SuballocatorTest : public ::testing::Test {
     clients_.clear();
     buffer_pool_.reset();
     global_reservation_.Close();
-    profile_.reset();
     obj_pool_.Clear();
   }
 
@@ -78,7 +77,7 @@ class SuballocatorTest : public ::testing::Test {
     clients_.push_back(make_unique<BufferPool::ClientHandle>());
     *client = clients_.back().get();
     ASSERT_OK(buffer_pool_->RegisterClient("test client", NULL, parent_reservation, NULL,
-        numeric_limits<int64_t>::max(), profile(), *client));
+        numeric_limits<int64_t>::max(), profile_, *client));
   }
 
   /// Assert that the memory for all of the suballocations is writable and disjoint by
@@ -97,7 +96,6 @@ class SuballocatorTest : public ::testing::Test {
     EXPECT_EQ(client->GetUsedReservation(), 0) << client->DebugString();
   }
 
-  RuntimeProfile* profile() { return profile_.get(); }
   BufferPool* buffer_pool() { return buffer_pool_.get(); }
 
   /// Pool for objects with per-test lifetime. Cleared after every test.
@@ -114,7 +112,7 @@ class SuballocatorTest : public ::testing::Test {
   vector<unique_ptr<BufferPool::ClientHandle>> clients_;
 
   /// Global profile - recreated for every test.
-  scoped_ptr<RuntimeProfile> profile_;
+  RuntimeProfile* profile_;
 
   /// Per-test random number generator. Seeded before every test.
   mt19937 rng_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 1b7fd20..0ee4bd7 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -433,7 +433,7 @@ Coordinator::BackendState::InstanceStats::InstanceStats(
     total_ranges_complete_(0) {
   const string& profile_name = Substitute("Instance $0 (host=$1)",
       PrintId(exec_params.instance_id), lexical_cast<string>(exec_params.host));
-  profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name));
+  profile_ = RuntimeProfile::Create(obj_pool, profile_name);
   fragment_stats->root_profile()->AddChild(profile_);
 
   // add total split size to fragment_stats->bytes_assigned()
@@ -514,10 +514,8 @@ void Coordinator::BackendState::InstanceStats::Update(
 
 Coordinator::FragmentStats::FragmentStats(const string& avg_profile_name,
     const string& root_profile_name, int num_instances, ObjectPool* obj_pool)
-  : avg_profile_(
-      obj_pool->Add(new RuntimeProfile(obj_pool, avg_profile_name, true))),
-    root_profile_(
-      obj_pool->Add(new RuntimeProfile(obj_pool, root_profile_name))),
+  : avg_profile_(RuntimeProfile::Create(obj_pool, avg_profile_name, true)),
+    root_profile_(RuntimeProfile::Create(obj_pool, root_profile_name)),
     num_instances_(num_instances) {
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index c8df1f5..e022a21 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -123,8 +123,8 @@ Status Coordinator::Exec() {
   query_ctx_.__set_desc_tbl(request.desc_tbl);
   query_ctx_.__set_request_pool(schedule_.request_pool());
 
-  query_profile_.reset(
-      new RuntimeProfile(obj_pool(), "Execution Profile " + PrintId(query_id())));
+  query_profile_ =
+      RuntimeProfile::Create(obj_pool(), "Execution Profile " + PrintId(query_id()));
   finalization_timer_ = ADD_TIMER(query_profile_, "FinalizationTimer");
   filter_updates_received_ = ADD_COUNTER(query_profile_, "FiltersReceived", TUnit::UNIT);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index e67ef13..5802c83 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -150,7 +150,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Get cumulative profile aggregated over all fragments of the query.
   /// This is a snapshot of the current state of execution and will change in
   /// the future if not all fragments have finished execution.
-  RuntimeProfile* query_profile() const { return query_profile_.get(); }
+  RuntimeProfile* query_profile() const { return query_profile_; }
 
   const TUniqueId& query_id() const;
 
@@ -278,8 +278,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   ExecSummary exec_summary_;
 
-  /// Aggregate counters for the entire query.
-  boost::scoped_ptr<RuntimeProfile> query_profile_;
+  /// Aggregate counters for the entire query. Lives in 'obj_pool_'.
+  RuntimeProfile* query_profile_ = nullptr;
 
   /// Protects all fields below. This is held while making RPCs, so this lock should
   /// only be acquired if the acquiring thread is prepared to wait for a significant

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
index 35076f5..d8150eb 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -37,7 +37,7 @@ namespace impala {
 // rows from all senders are placed in the same queue.
 class DataStreamRecvr::SenderQueue {
  public:
-  SenderQueue(DataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile* profile);
+  SenderQueue(DataStreamRecvr* parent_recvr, int num_senders);
 
   // Return the next batch from this sender queue. Sets the returned batch in cur_batch_.
   // A returned batch that is not filled to capacity does *not* indicate
@@ -102,8 +102,7 @@ class DataStreamRecvr::SenderQueue {
   bool received_first_batch_;
 };
 
-DataStreamRecvr::SenderQueue::SenderQueue(DataStreamRecvr* parent_recvr, int num_senders,
-    RuntimeProfile* profile)
+DataStreamRecvr::SenderQueue::SenderQueue(DataStreamRecvr* parent_recvr, int num_senders)
   : recvr_(parent_recvr),
     is_cancelled_(false),
     num_remaining_senders_(num_senders),
@@ -242,8 +241,6 @@ void DataStreamRecvr::SenderQueue::Cancel() {
   // notice that the stream is cancelled and handle it.
   data_arrival_cv_.notify_all();
   data_removal__cv_.notify_all();
-  PeriodicCounterUpdater::StopTimeSeriesCounter(
-      recvr_->bytes_received_time_series_counter_);
 }
 
 void DataStreamRecvr::SenderQueue::Close() {
@@ -286,7 +283,7 @@ void DataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) {
 DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
     const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
     PlanNodeId dest_node_id, int num_senders, bool is_merging, int64_t total_buffer_limit,
-    RuntimeProfile* profile)
+    RuntimeProfile* parent_profile)
   : mgr_(stream_mgr),
     fragment_instance_id_(fragment_instance_id),
     dest_node_id_(dest_node_id),
@@ -294,30 +291,28 @@ DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_t
     row_desc_(row_desc),
     is_merging_(is_merging),
     num_buffered_bytes_(0),
-    profile_(profile) {
+    profile_(parent_profile->CreateChild("DataStreamReceiver")) {
   // Create one queue per sender if is_merging is true.
   int num_queues = is_merging ? num_senders : 1;
   sender_queues_.reserve(num_queues);
   int num_sender_per_queue = is_merging ? 1 : num_senders;
   for (int i = 0; i < num_queues; ++i) {
     SenderQueue* queue = sender_queue_pool_.Add(new SenderQueue(this,
-        num_sender_per_queue, profile));
+        num_sender_per_queue));
     sender_queues_.push_back(queue);
   }
 
-  RuntimeProfile* child_profile = profile_->CreateChild("DataStreamReceiver");
-  mem_tracker_.reset(
-      new MemTracker(child_profile, -1, "DataStreamRecvr", parent_tracker));
+  mem_tracker_.reset(new MemTracker(profile_, -1, "DataStreamRecvr", parent_tracker));
 
   // Initialize the counters
-  bytes_received_counter_ = ADD_COUNTER(child_profile, "BytesReceived", TUnit::BYTES);
+  bytes_received_counter_ = ADD_COUNTER(profile_, "BytesReceived", TUnit::BYTES);
   bytes_received_time_series_counter_ =
-      ADD_TIME_SERIES_COUNTER(child_profile, "BytesReceived", bytes_received_counter_);
-  deserialize_row_batch_timer_ = ADD_TIMER(child_profile, "DeserializeRowBatchTimer");
-  buffer_full_wall_timer_ = ADD_TIMER(child_profile, "SendersBlockedTimer");
-  buffer_full_total_timer_ = ADD_TIMER(child_profile, "SendersBlockedTotalTimer(*)");
-  data_arrival_timer_ = child_profile->inactive_timer();
-  first_batch_wait_total_timer_ = ADD_TIMER(child_profile, "FirstBatchArrivalWaitTime");
+      ADD_TIME_SERIES_COUNTER(profile_, "BytesReceived", bytes_received_counter_);
+  deserialize_row_batch_timer_ = ADD_TIMER(profile_, "DeserializeRowBatchTimer");
+  buffer_full_wall_timer_ = ADD_TIMER(profile_, "SendersBlockedTimer");
+  buffer_full_total_timer_ = ADD_TIMER(profile_, "SendersBlockedTotalTimer(*)");
+  data_arrival_timer_ = profile_->inactive_timer();
+  first_batch_wait_total_timer_ = ADD_TIMER(profile_, "FirstBatchArrivalWaitTime");
 }
 
 Status DataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
@@ -354,6 +349,7 @@ void DataStreamRecvr::Close() {
   }
   merger_.reset();
   mem_tracker_->Close();
+  profile_->StopPeriodicCounters();
 }
 
 DataStreamRecvr::~DataStreamRecvr() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.h b/be/src/runtime/data-stream-recvr.h
index fad588d..9545f82 100644
--- a/be/src/runtime/data-stream-recvr.h
+++ b/be/src/runtime/data-stream-recvr.h
@@ -105,7 +105,7 @@ class DataStreamRecvr : public DataStreamRecvrBase {
   DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
       const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
       PlanNodeId dest_node_id, int num_senders, bool is_merging,
-      int64_t total_buffer_limit, RuntimeProfile* profile);
+      int64_t total_buffer_limit, RuntimeProfile* parent_profile);
 
   /// Add a new batch of rows to the appropriate sender queue, blocking if the queue is
   /// full. Called from DataStreamMgr.
@@ -161,8 +161,9 @@ class DataStreamRecvr : public DataStreamRecvrBase {
   /// Pool of sender queues.
   ObjectPool sender_queue_pool_;
 
-  /// Runtime profile storing the counters below.
-  RuntimeProfile* profile_;
+  /// Runtime profile storing the counters below. Child of 'parent_profile' passed into
+  /// constructor.
+  RuntimeProfile* const profile_;
 
   /// Number of bytes received
   RuntimeProfile::Counter* bytes_received_counter_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 5ea6756..8e85894 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -337,8 +337,7 @@ class DataStreamTest : public testing::Test {
   void StartReceiver(TPartitionType::type stream_type, int num_senders, int receiver_num,
       int buffer_size, bool is_merging, TUniqueId* out_id = nullptr) {
     VLOG_QUERY << "start receiver";
-    RuntimeProfile* profile =
-        obj_pool_.Add(new RuntimeProfile(&obj_pool_, "TestReceiver"));
+    RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
     TUniqueId instance_id;
     GetNextInstanceId(&instance_id);
     receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num));
@@ -607,13 +606,13 @@ TEST_F(DataStreamTest, BasicTest) {
 // TODO: Make lifecycle requirements more explicit.
 TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
   scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), &exec_env_));
-  scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, "TestReceiver"));
+  RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
 
   // Start just one receiver.
   TUniqueId instance_id;
   GetNextInstanceId(&instance_id);
   shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(
-      runtime_state.get(), row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile.get(),
+      runtime_state.get(), row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile,
       false);
 
   // Perform tear down, but keep a reference to the receiver so that it is deleted last

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 74f5495..a7d3c86 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -123,8 +123,8 @@ Status FragmentInstanceState::Prepare() {
 
   // total_time_counter() is in the runtime_state_ so start it up now.
   SCOPED_TIMER(profile()->total_time_counter());
-  timings_profile_ = obj_pool()->Add(
-      new RuntimeProfile(obj_pool(), "Fragment Instance Lifecycle Timings"));
+  timings_profile_ =
+      RuntimeProfile::Create(obj_pool(), "Fragment Instance Lifecycle Timings");
   profile()->AddChild(timings_profile_);
   SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME));
 
@@ -289,12 +289,8 @@ void FragmentInstanceState::Close() {
   // guard against partially-finished Prepare()
   if (sink_ != nullptr) sink_->Close(runtime_state_);
 
-  // disconnect mem_usage_sampled_counter_ from the periodic updater before
-  // RuntimeState::ReleaseResources(), it references the instance memtracker
-  if (mem_usage_sampled_counter_ != nullptr) {
-    PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_);
-    mem_usage_sampled_counter_ = nullptr;
-  }
+  // Stop updating profile counters in background.
+  profile()->StopPeriodicCounters();
 
   // We need to delete row_batch_ here otherwise we can't delete the instance_mem_tracker_
   // in runtime_state_->ReleaseResources().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 4c5eb17..ea24411 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -169,7 +169,7 @@ Status QueryState::InitBufferPoolState() {
 
   // TODO: once there's a mechanism for reporting non-fragment-local profiles,
   // should make sure to report this profile so it's not going into a black hole.
-  RuntimeProfile* dummy_profile = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "dummy"));
+  RuntimeProfile* dummy_profile = RuntimeProfile::Create(&obj_pool_, "dummy");
   // Only create file group if spilling is enabled.
   if (query_options().scratch_limit != 0 && !query_ctx_.disable_spilling) {
     file_group_ = obj_pool_.Add(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 8f48439..0565cf5 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -69,7 +69,8 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
     utc_timestamp_(new TimestampValue(TimestampValue::Parse(
         query_state->query_ctx().utc_timestamp_string))),
     exec_env_(exec_env),
-    profile_(obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)),
+    profile_(RuntimeProfile::Create(
+          obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id))),
     instance_buffer_reservation_(new ReservationTracker) {
   Init();
 }
@@ -83,7 +84,7 @@ RuntimeState::RuntimeState(
     now_(new TimestampValue(TimestampValue::Parse(qctx.now_string))),
     utc_timestamp_(new TimestampValue(TimestampValue::Parse(qctx.utc_timestamp_string))),
     exec_env_(exec_env),
-    profile_(obj_pool(), "<unnamed>") {
+    profile_(RuntimeProfile::Create(obj_pool(), "<unnamed>")) {
   if (query_ctx().request_pool.empty()) {
     const_cast<TQueryCtx&>(query_ctx()).request_pool = "test-pool";
   }
@@ -96,7 +97,7 @@ RuntimeState::~RuntimeState() {
 }
 
 void RuntimeState::Init() {
-  SCOPED_TIMER(profile_.total_time_counter());
+  SCOPED_TIMER(profile_->total_time_counter());
 
   // Register with the thread mgr
   resource_pool_ = exec_env_->thread_mgr()->RegisterPool();
@@ -111,7 +112,7 @@ void RuntimeState::Init() {
       runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker()));
 
   if (instance_buffer_reservation_ != nullptr) {
-    instance_buffer_reservation_->InitChildTracker(&profile_,
+    instance_buffer_reservation_->InitChildTracker(profile_,
         query_state_->buffer_reservation(), instance_mem_tracker_.get(),
         numeric_limits<int64_t>::max());
   }
@@ -127,7 +128,7 @@ Status RuntimeState::CreateCodegen() {
   RETURN_IF_ERROR(LlvmCodeGen::CreateImpalaCodegen(this,
       instance_mem_tracker_.get(), PrintId(fragment_instance_id()), &codegen_));
   codegen_->EnableOptimizations(true);
-  profile_.AddChild(codegen_->runtime_profile());
+  profile_->AddChild(codegen_->runtime_profile());
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 49ea9a6..3da9f8c 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -149,7 +149,7 @@ class RuntimeState {
   PartitionStatusMap* per_partition_status() { return &per_partition_status_; }
 
   /// Returns runtime state profile
-  RuntimeProfile* runtime_profile() { return &profile_; }
+  RuntimeProfile* runtime_profile() { return profile_; }
 
   /// Returns the LlvmCodeGen object for this fragment instance.
   LlvmCodeGen* codegen() { return codegen_.get(); }
@@ -354,7 +354,7 @@ class RuntimeState {
   /// Records summary statistics for the results of inserts into Hdfs partitions.
   PartitionStatusMap per_partition_status_;
 
-  RuntimeProfile profile_;
+  RuntimeProfile* const profile_;
 
   /// Total time waiting in storage (across all threads)
   RuntimeProfile::Counter* total_storage_wait_timer_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 1a5eb58..5f482ba 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -51,7 +51,7 @@ class TmpFileMgrTest : public ::testing::Test {
  public:
   virtual void SetUp() {
     metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
-    profile_ = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "tmp-file-mgr-test"));
+    profile_ = RuntimeProfile::Create(&obj_pool_, "tmp-file-mgr-test");
     test_env_.reset(new TestEnv);
     ASSERT_OK(test_env_->Init());
     cb_counter_ = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 2a5b379..ef6a69d 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -74,9 +74,10 @@ ClientRequestState::ClientRequestState(
     schedule_(NULL),
     coord_(NULL),
     result_cache_max_size_(-1),
-    profile_(&profile_pool_, "Query"), // assign name w/ id after planning
-    server_profile_(&profile_pool_, "ImpalaServer"),
-    summary_profile_(&profile_pool_, "Summary"),
+    // Profile is assigned name w/ id after planning
+    profile_(RuntimeProfile::Create(&profile_pool_, "Query")),
+    server_profile_(RuntimeProfile::Create(&profile_pool_, "ImpalaServer")),
+    summary_profile_(RuntimeProfile::Create(&profile_pool_, "Summary")),
     is_cancelled_(false),
     eos_(false),
     query_state_(beeswax::QueryState::CREATED),
@@ -89,36 +90,36 @@ ClientRequestState::ClientRequestState(
     parent_server_(server),
     start_time_(TimestampValue::LocalTime()) {
 #ifndef NDEBUG
-  profile_.AddInfoString("DEBUG MODE WARNING", "Query profile created while running a "
+  profile_->AddInfoString("DEBUG MODE WARNING", "Query profile created while running a "
       "DEBUG build of Impala. Use RELEASE builds to measure query performance.");
 #endif
-  row_materialization_timer_ = ADD_TIMER(&server_profile_, "RowMaterializationTimer");
-  client_wait_timer_ = ADD_TIMER(&server_profile_, "ClientFetchWaitTimer");
-  query_events_ = summary_profile_.AddEventSequence("Query Timeline");
+  row_materialization_timer_ = ADD_TIMER(server_profile_, "RowMaterializationTimer");
+  client_wait_timer_ = ADD_TIMER(server_profile_, "ClientFetchWaitTimer");
+  query_events_ = summary_profile_->AddEventSequence("Query Timeline");
   query_events_->Start();
-  profile_.AddChild(&summary_profile_);
+  profile_->AddChild(summary_profile_);
 
-  profile_.set_name("Query (id=" + PrintId(query_id()) + ")");
-  summary_profile_.AddInfoString("Session ID", PrintId(session_id()));
-  summary_profile_.AddInfoString("Session Type", PrintTSessionType(session_type()));
+  profile_->set_name("Query (id=" + PrintId(query_id()) + ")");
+  summary_profile_->AddInfoString("Session ID", PrintId(session_id()));
+  summary_profile_->AddInfoString("Session Type", PrintTSessionType(session_type()));
   if (session_type() == TSessionType::HIVESERVER2) {
-    summary_profile_.AddInfoString("HiveServer2 Protocol Version",
+    summary_profile_->AddInfoString("HiveServer2 Protocol Version",
         Substitute("V$0", 1 + session->hs2_version));
   }
-  summary_profile_.AddInfoString("Start Time", start_time().ToString());
-  summary_profile_.AddInfoString("End Time", "");
-  summary_profile_.AddInfoString("Query Type", "N/A");
-  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
-  summary_profile_.AddInfoString("Query Status", "OK");
-  summary_profile_.AddInfoString("Impala Version", GetVersionString(/* compact */ true));
-  summary_profile_.AddInfoString("User", effective_user());
-  summary_profile_.AddInfoString("Connected User", connected_user());
-  summary_profile_.AddInfoString("Delegated User", do_as_user());
-  summary_profile_.AddInfoString("Network Address",
+  summary_profile_->AddInfoString("Start Time", start_time().ToString());
+  summary_profile_->AddInfoString("End Time", "");
+  summary_profile_->AddInfoString("Query Type", "N/A");
+  summary_profile_->AddInfoString("Query State", PrintQueryState(query_state_));
+  summary_profile_->AddInfoString("Query Status", "OK");
+  summary_profile_->AddInfoString("Impala Version", GetVersionString(/* compact */ true));
+  summary_profile_->AddInfoString("User", effective_user());
+  summary_profile_->AddInfoString("Connected User", connected_user());
+  summary_profile_->AddInfoString("Delegated User", do_as_user());
+  summary_profile_->AddInfoString("Network Address",
       lexical_cast<string>(session_->network_address));
-  summary_profile_.AddInfoString("Default Db", default_db());
-  summary_profile_.AddInfoString("Sql Statement", query_ctx_.client_request.stmt);
-  summary_profile_.AddInfoString("Coordinator",
+  summary_profile_->AddInfoString("Default Db", default_db());
+  summary_profile_->AddInfoString("Sql Statement", query_ctx_.client_request.stmt);
+  summary_profile_->AddInfoString("Coordinator",
       TNetworkAddressToString(exec_env->backend_address()));
 }
 
@@ -144,11 +145,11 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) {
   MarkActive();
   exec_request_ = *exec_request;
 
-  profile_.AddChild(&server_profile_);
-  summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type()));
-  summary_profile_.AddInfoString("Query Options (set by configuration)",
+  profile_->AddChild(server_profile_);
+  summary_profile_->AddInfoString("Query Type", PrintTStmtType(stmt_type()));
+  summary_profile_->AddInfoString("Query Options (set by configuration)",
       DebugQueryOptions(query_ctx_.client_request.query_options));
-  summary_profile_.AddInfoString("Query Options (set by configuration and planner)",
+  summary_profile_->AddInfoString("Query Options (set by configuration and planner)",
       DebugQueryOptions(exec_request_.query_options));
 
   switch (exec_request->stmt_type) {
@@ -182,7 +183,7 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) {
       reset_req.reset_metadata_params.__set_table_name(
           exec_request_.load_data_request.table_name);
       catalog_op_executor_.reset(
-          new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
+          new CatalogOpExecutor(exec_env_, frontend_, server_profile_));
       RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req));
       RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
           *catalog_op_executor_->update_catalog_result(),
@@ -298,7 +299,7 @@ Status ClientRequestState::ExecLocalCatalogOp(
         // Verify the user has privileges to perform this operation by checking against
         // the Sentry Service (via the Catalog Server).
         catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
-            &server_profile_));
+            server_profile_));
 
         TSentryAdminCheckRequest req;
         req.__set_header(TCatalogServiceRequestHeader());
@@ -319,7 +320,7 @@ Status ClientRequestState::ExecLocalCatalogOp(
         // Verify the user has privileges to perform this operation by checking against
         // the Sentry Service (via the Catalog Server).
         catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
-            &server_profile_));
+            server_profile_));
 
         TSentryAdminCheckRequest req;
         req.__set_header(TCatalogServiceRequestHeader());
@@ -392,13 +393,13 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
     plan_ss << "\n----------------\n"
             << query_exec_request.query_plan
             << "----------------";
-    summary_profile_.AddInfoString("Plan", plan_ss.str());
+    summary_profile_->AddInfoString("Plan", plan_ss.str());
   }
   // Add info strings consumed by CM: Estimated mem and tables missing stats.
   if (query_exec_request.__isset.per_host_mem_estimate) {
     stringstream ss;
     ss << query_exec_request.per_host_mem_estimate;
-    summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str());
+    summary_profile_->AddInfoString(PER_HOST_MEM_KEY, ss.str());
   }
   if (!query_exec_request.query_ctx.__isset.parent_query_id &&
       query_exec_request.query_ctx.__isset.tables_missing_stats &&
@@ -409,7 +410,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
       if (i != 0) ss << ",";
       ss << tbls[i].db_name << "." << tbls[i].table_name;
     }
-    summary_profile_.AddInfoString(TABLES_MISSING_STATS_KEY, ss.str());
+    summary_profile_->AddInfoString(TABLES_MISSING_STATS_KEY, ss.str());
   }
 
   if (!query_exec_request.query_ctx.__isset.parent_query_id &&
@@ -422,7 +423,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
       if (i != 0) ss << ",";
       ss << tbls[i].db_name << "." << tbls[i].table_name;
     }
-    summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
+    summary_profile_->AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
   }
 
   if (query_exec_request.query_ctx.__isset.tables_missing_diskids &&
@@ -434,7 +435,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
       if (i != 0) ss << ",";
       ss << tbls[i].db_name << "." << tbls[i].table_name;
     }
-    summary_profile_.AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str());
+    summary_profile_->AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str());
   }
 
   {
@@ -442,7 +443,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
     // Don't start executing the query if Cancel() was called concurrently with Exec().
     if (is_cancelled_) return Status::CANCELLED;
     schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
-        exec_request_.query_options, &summary_profile_, query_events_));
+        exec_request_.query_options, summary_profile_, query_events_));
   }
   Status status = exec_env_->scheduler()->Schedule(schedule_.get());
   {
@@ -465,14 +466,14 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
     RETURN_IF_ERROR(UpdateQueryStatus(status));
   }
 
-  profile_.AddChild(coord_->query_profile());
+  profile_->AddChild(coord_->query_profile());
   return Status::OK();
 }
 
 Status ClientRequestState::ExecDdlRequest() {
   string op_type = catalog_op_type() == TCatalogOpType::DDL ?
       PrintTDdlType(ddl_type()) : PrintTCatalogOpType(catalog_op_type());
-  summary_profile_.AddInfoString("DDL Type", op_type);
+  summary_profile_->AddInfoString("DDL Type", op_type);
 
   if (catalog_op_type() != TCatalogOpType::DDL &&
       catalog_op_type() != TCatalogOpType::RESET_METADATA) {
@@ -502,7 +503,7 @@ Status ClientRequestState::ExecDdlRequest() {
   }
 
   catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
-      &server_profile_));
+      server_profile_));
   Status status = catalog_op_executor_->Exec(exec_request_.catalog_op_request);
   {
     lock_guard<mutex> l(lock_);
@@ -564,7 +565,7 @@ void ClientRequestState::Done() {
 
   unique_lock<mutex> l(lock_);
   end_time_ = TimestampValue::LocalTime();
-  summary_profile_.AddInfoString("End Time", end_time().ToString());
+  summary_profile_->AddInfoString("End Time", end_time().ToString());
   query_events_->MarkEvent("Unregister query");
 
   // Update result set cache metrics, and update mem limit accounting before tearing
@@ -586,7 +587,7 @@ void ClientRequestState::Done() {
 Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) {
   TResultSet metadata_op_result;
   // Like the other Exec(), fill out as much profile information as we're able to.
-  summary_profile_.AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL));
+  summary_profile_->AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL));
   RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request,
       &metadata_op_result));
   result_metadata_ = metadata_op_result.schema;
@@ -722,7 +723,7 @@ Status ClientRequestState::UpdateQueryStatus(const Status& status) {
   if (!status.ok() && query_status_.ok()) {
     UpdateQueryState(beeswax::QueryState::EXCEPTION);
     query_status_ = status;
-    summary_profile_.AddInfoString("Query Status", query_status_.GetDetail());
+    summary_profile_->AddInfoString("Query Status", query_status_.GetDetail());
   }
 
   return status;
@@ -898,7 +899,7 @@ Status ClientRequestState::UpdateCatalog() {
   }
 
   query_events_->MarkEvent("DML data written");
-  SCOPED_TIMER(ADD_TIMER(&server_profile_, "MetastoreUpdateTimer"));
+  SCOPED_TIMER(ADD_TIMER(server_profile_, "MetastoreUpdateTimer"));
 
   TQueryExecRequest query_exec_request = exec_request().query_exec_request;
   if (query_exec_request.__isset.finalize_params) {
@@ -1031,7 +1032,7 @@ Status ClientRequestState::UpdateTableAndColumnStats(
   DCHECK_GE(child_queries.size(), 1);
   DCHECK_LE(child_queries.size(), 2);
   catalog_op_executor_.reset(
-      new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
+      new CatalogOpExecutor(exec_env_, frontend_, server_profile_));
 
   // If there was no column stats query, pass in empty thrift structures to
   // ExecComputeStats(). Otherwise pass in the column stats result.
@@ -1078,7 +1079,7 @@ void ClientRequestState::ClearResultCache() {
 void ClientRequestState::UpdateQueryState(
     beeswax::QueryState::type query_state) {
   query_state_ = query_state;
-  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
+  summary_profile_->AddInfoString("Query State", PrintQueryState(query_state_));
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 6846165..1c015c3 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -184,8 +184,8 @@ class ClientRequestState {
   void set_user_profile_access(bool user_has_profile_access) {
     user_has_profile_access_ = user_has_profile_access;
   }
-  const RuntimeProfile& profile() const { return profile_; }
-  const RuntimeProfile& summary_profile() const { return summary_profile_; }
+  const RuntimeProfile* profile() const { return profile_; }
+  const RuntimeProfile* summary_profile() const { return summary_profile_; }
   const TimestampValue& start_time() const { return start_time_; }
   const TimestampValue& end_time() const { return end_time_; }
   const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; }
@@ -211,7 +211,7 @@ class ClientRequestState {
   }
 
   RuntimeProfile::EventSequence* query_events() const { return query_events_; }
-  RuntimeProfile* summary_profile() { return &summary_profile_; }
+  RuntimeProfile* summary_profile() { return summary_profile_; }
 
  private:
   const TQueryCtx query_ctx_;
@@ -299,9 +299,9 @@ class ClientRequestState {
   /// There's a fourth profile which is not built here (but is a
   /// child of profile_); the execution profile which tracks the
   /// actual fragment execution.
-  RuntimeProfile profile_;
-  RuntimeProfile server_profile_;
-  RuntimeProfile summary_profile_;
+  RuntimeProfile* const profile_;
+  RuntimeProfile* const server_profile_;
+  RuntimeProfile* const summary_profile_;
   RuntimeProfile::Counter* row_materialization_timer_;
 
   /// Tracks how long we are idle waiting for a client to fetch rows.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 8ba2894..051cfaf 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -623,9 +623,9 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id,
       RETURN_IF_ERROR(CheckProfileAccess(user, request_state->effective_user(),
           request_state->user_has_profile_access()));
       if (base64_encoded) {
-        RETURN_IF_ERROR(request_state->profile().SerializeToArchiveString(output));
+        RETURN_IF_ERROR(request_state->profile()->SerializeToArchiveString(output));
       } else {
-        request_state->profile().PrettyPrint(output);
+        request_state->profile()->PrettyPrint(output);
       }
       return Status::OK();
     }
@@ -741,7 +741,7 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
 
 void ImpalaServer::ArchiveQuery(const ClientRequestState& query) {
   string encoded_profile_str;
-  Status status = query.profile().SerializeToArchiveString(&encoded_profile_str);
+  Status status = query.profile()->SerializeToArchiveString(&encoded_profile_str);
   if (!status.ok()) {
     // Didn't serialize the string. Continue with empty string.
     LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string "
@@ -1677,7 +1677,7 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque
   id = request_state.query_id();
   const TExecRequest& request = request_state.exec_request();
 
-  const string* plan_str = request_state.summary_profile().GetInfoString("Plan");
+  const string* plan_str = request_state.summary_profile()->GetInfoString("Plan");
   if (plan_str != nullptr) plan = *plan_str;
   stmt = request_state.sql_stmt();
   stmt_type = request.stmt_type;
@@ -1701,11 +1701,11 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque
 
   if (copy_profile) {
     stringstream ss;
-    request_state.profile().PrettyPrint(&ss);
+    request_state.profile()->PrettyPrint(&ss);
     profile_str = ss.str();
     if (encoded_profile.empty()) {
       Status status =
-          request_state.profile().SerializeToArchiveString(&encoded_profile_str);
+          request_state.profile()->SerializeToArchiveString(&encoded_profile_str);
       if (!status.ok()) {
         LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string "
                                    << status.GetDetail();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/dummy-runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/dummy-runtime-profile.h b/be/src/util/dummy-runtime-profile.h
index 83bccbf..1642d4e 100644
--- a/be/src/util/dummy-runtime-profile.h
+++ b/be/src/util/dummy-runtime-profile.h
@@ -28,12 +28,12 @@ namespace impala {
 /// but not always so that the object can still allocate counters in the same way.
 class DummyProfile {
  public:
-  DummyProfile() : pool_(), profile_(&pool_, "dummy", false) {}
-  RuntimeProfile* profile() { return &profile_; }
+  DummyProfile() : pool_(), profile_(RuntimeProfile::Create(&pool_, "dummy", false)) {}
+  RuntimeProfile* profile() { return profile_; }
 
  private:
   ObjectPool pool_;
-  RuntimeProfile profile_;
+  RuntimeProfile* const profile_;
 };
 }
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/periodic-counter-updater.cc
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc
index b1c755a..098e683 100644
--- a/be/src/util/periodic-counter-updater.cc
+++ b/be/src/util/periodic-counter-updater.cc
@@ -93,21 +93,20 @@ void PeriodicCounterUpdater::RegisterBucketingCounters(
 }
 
 void PeriodicCounterUpdater::StopBucketingCounters(
-    vector<RuntimeProfile::Counter*>* buckets, bool convert) {
+    vector<RuntimeProfile::Counter*>* buckets) {
   int64_t num_sampled = 0;
   {
     lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
     BucketCountersMap::iterator itr =
         instance_->bucketing_counters_.find(buckets);
-    if (itr != instance_->bucketing_counters_.end()) {
-      num_sampled = itr->second.num_sampled;
-      instance_->bucketing_counters_.erase(itr);
-    }
+    // If not registered, we have nothing to do.
+    if (itr == instance_->bucketing_counters_.end()) return;
+    num_sampled = itr->second.num_sampled;
+    instance_->bucketing_counters_.erase(itr);
   }
 
-  if (convert && num_sampled > 0) {
-    for (int i = 0; i < buckets->size(); ++i) {
-      RuntimeProfile::Counter* counter = (*buckets)[i];
+  if (num_sampled > 0) {
+    for (RuntimeProfile::Counter* counter : *buckets) {
       double perc = 100 * counter->value() / (double)num_sampled;
       counter->Set(perc);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/periodic-counter-updater.h
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h
index c603522..762f372 100644
--- a/be/src/util/periodic-counter-updater.h
+++ b/be/src/util/periodic-counter-updater.h
@@ -70,12 +70,11 @@ class PeriodicCounterUpdater {
   /// Stops updating the value of 'counter'.
   static void StopSamplingCounter(RuntimeProfile::Counter* counter);
 
-  /// Stops updating the bucket counter.
-  /// If convert is true, convert the buckets from count to percentage.
-  /// Sampling counters are updated periodically so should be removed as soon as the
+  /// If the bucketing counters 'buckets' are registered, stops updating the counters and
+  /// convert the buckets from count to percentage. If not registered, has no effect.
+  /// Perioidic counters are updated periodically so should be removed as soon as the
   /// underlying counter is no longer going to change.
-  static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets,
-      bool convert);
+  static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets);
 
   /// Stops 'counter' from receiving any more samples.
   static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter* counter);


[3/5] incubator-impala git commit: IMPALA-5895: clean up runtime profile lifecycle

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/runtime-profile-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index fe9f3ae..1bc7911 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -34,17 +34,17 @@ namespace impala {
 
 TEST(CountersTest, Basic) {
   ObjectPool pool;
-  RuntimeProfile profile_a(&pool, "ProfileA");
-  RuntimeProfile profile_a1(&pool, "ProfileA1");
-  RuntimeProfile profile_a2(&pool, "ProfileAb");
+  RuntimeProfile* profile_a = RuntimeProfile::Create(&pool, "ProfileA");
+  RuntimeProfile* profile_a1 = RuntimeProfile::Create(&pool, "ProfileA1");
+  RuntimeProfile* profile_a2 = RuntimeProfile::Create(&pool, "ProfileAb");
 
   TRuntimeProfileTree thrift_profile;
 
-  profile_a.AddChild(&profile_a1);
-  profile_a.AddChild(&profile_a2);
+  profile_a->AddChild(profile_a1);
+  profile_a->AddChild(profile_a2);
 
   // Test Empty
-  profile_a.ToThrift(&thrift_profile.nodes);
+  profile_a->ToThrift(&thrift_profile.nodes);
   EXPECT_EQ(thrift_profile.nodes.size(), 3);
   thrift_profile.nodes.clear();
 
@@ -53,7 +53,7 @@ TEST(CountersTest, Basic) {
   RuntimeProfile::Counter* counter_merged;
 
   // Updating/setting counter
-  counter_a = profile_a.AddCounter("A", TUnit::UNIT);
+  counter_a = profile_a->AddCounter("A", TUnit::UNIT);
   EXPECT_TRUE(counter_a != NULL);
   counter_a->Add(10);
   counter_a->Add(-5);
@@ -61,40 +61,40 @@ TEST(CountersTest, Basic) {
   counter_a->Set(1);
   EXPECT_EQ(counter_a->value(), 1);
 
-  counter_b = profile_a2.AddCounter("B", TUnit::BYTES);
+  counter_b = profile_a2->AddCounter("B", TUnit::BYTES);
   EXPECT_TRUE(counter_b != NULL);
 
   // Serialize/deserialize
-  profile_a.ToThrift(&thrift_profile.nodes);
+  profile_a->ToThrift(&thrift_profile.nodes);
   RuntimeProfile* from_thrift = RuntimeProfile::CreateFromThrift(&pool, thrift_profile);
   counter_merged = from_thrift->GetCounter("A");
   EXPECT_EQ(counter_merged->value(), 1);
   EXPECT_TRUE(from_thrift->GetCounter("Not there") ==  NULL);
 
   // Averaged
-  RuntimeProfile averaged_profile(&pool, "Merged", true);
-  averaged_profile.UpdateAverage(from_thrift);
-  counter_merged = averaged_profile.GetCounter("A");
+  RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "Merged", true);
+  averaged_profile->UpdateAverage(from_thrift);
+  counter_merged = averaged_profile->GetCounter("A");
   EXPECT_EQ(counter_merged->value(), 1);
 
   // UpdateAverage again, there should be no change.
-  averaged_profile.UpdateAverage(from_thrift);
+  averaged_profile->UpdateAverage(from_thrift);
   EXPECT_EQ(counter_merged->value(), 1);
 
-  counter_a = profile_a2.AddCounter("A", TUnit::UNIT);
+  counter_a = profile_a2->AddCounter("A", TUnit::UNIT);
   counter_a->Set(3);
-  averaged_profile.UpdateAverage(&profile_a2);
+  averaged_profile->UpdateAverage(profile_a2);
   EXPECT_EQ(counter_merged->value(), 2);
 
   // Update
-  RuntimeProfile updated_profile(&pool, "Updated");
-  updated_profile.Update(thrift_profile);
-  RuntimeProfile::Counter* counter_updated = updated_profile.GetCounter("A");
+  RuntimeProfile* updated_profile = RuntimeProfile::Create(&pool, "Updated");
+  updated_profile->Update(thrift_profile);
+  RuntimeProfile::Counter* counter_updated = updated_profile->GetCounter("A");
   EXPECT_EQ(counter_updated->value(), 1);
 
   // Update 2 more times, counters should stay the same
-  updated_profile.Update(thrift_profile);
-  updated_profile.Update(thrift_profile);
+  updated_profile->Update(thrift_profile);
+  updated_profile->Update(thrift_profile);
   EXPECT_EQ(counter_updated->value(), 1);
 }
 
@@ -110,27 +110,27 @@ TEST(CountersTest, MergeAndUpdate) {
   // children, with the counters from the shared child aggregated.
 
   ObjectPool pool;
-  RuntimeProfile profile1(&pool, "Parent1");
-  RuntimeProfile p1_child1(&pool, "Child1");
-  RuntimeProfile p1_child2(&pool, "Child2");
-  profile1.AddChild(&p1_child1);
-  profile1.AddChild(&p1_child2);
-
-  RuntimeProfile profile2(&pool, "Parent2");
-  RuntimeProfile p2_child1(&pool, "Child1");
-  RuntimeProfile p2_child3(&pool, "Child3");
-  profile2.AddChild(&p2_child1);
-  profile2.AddChild(&p2_child3);
+  RuntimeProfile* profile1 = RuntimeProfile::Create(&pool, "Parent1");
+  RuntimeProfile* p1_child1 = RuntimeProfile::Create(&pool, "Child1");
+  RuntimeProfile* p1_child2 = RuntimeProfile::Create(&pool, "Child2");
+  profile1->AddChild(p1_child1);
+  profile1->AddChild(p1_child2);
+
+  RuntimeProfile* profile2 = RuntimeProfile::Create(&pool, "Parent2");
+  RuntimeProfile* p2_child1 = RuntimeProfile::Create(&pool, "Child1");
+  RuntimeProfile* p2_child3 = RuntimeProfile::Create(&pool, "Child3");
+  profile2->AddChild(p2_child1);
+  profile2->AddChild(p2_child3);
 
   // Create parent level counters
   RuntimeProfile::Counter* parent1_shared =
-      profile1.AddCounter("Parent Shared", TUnit::UNIT);
+      profile1->AddCounter("Parent Shared", TUnit::UNIT);
   RuntimeProfile::Counter* parent2_shared =
-      profile2.AddCounter("Parent Shared", TUnit::UNIT);
+      profile2->AddCounter("Parent Shared", TUnit::UNIT);
   RuntimeProfile::Counter* parent1_only =
-      profile1.AddCounter("Parent 1 Only", TUnit::UNIT);
+      profile1->AddCounter("Parent 1 Only", TUnit::UNIT);
   RuntimeProfile::Counter* parent2_only =
-      profile2.AddCounter("Parent 2 Only", TUnit::UNIT);
+      profile2->AddCounter("Parent 2 Only", TUnit::UNIT);
   parent1_shared->Add(1);
   parent2_shared->Add(3);
   parent1_only->Add(2);
@@ -138,17 +138,17 @@ TEST(CountersTest, MergeAndUpdate) {
 
   // Create child level counters
   RuntimeProfile::Counter* p1_c1_shared =
-    p1_child1.AddCounter("Child1 Shared", TUnit::UNIT);
+    p1_child1->AddCounter("Child1 Shared", TUnit::UNIT);
   RuntimeProfile::Counter* p1_c1_only =
-    p1_child1.AddCounter("Child1 Parent 1 Only", TUnit::UNIT);
+    p1_child1->AddCounter("Child1 Parent 1 Only", TUnit::UNIT);
   RuntimeProfile::Counter* p1_c2 =
-    p1_child2.AddCounter("Child2", TUnit::UNIT);
+    p1_child2->AddCounter("Child2", TUnit::UNIT);
   RuntimeProfile::Counter* p2_c1_shared =
-    p2_child1.AddCounter("Child1 Shared", TUnit::UNIT);
+    p2_child1->AddCounter("Child1 Shared", TUnit::UNIT);
   RuntimeProfile::Counter* p2_c1_only =
-    p1_child1.AddCounter("Child1 Parent 2 Only", TUnit::UNIT);
+    p1_child1->AddCounter("Child1 Parent 2 Only", TUnit::UNIT);
   RuntimeProfile::Counter* p2_c3 =
-    p2_child3.AddCounter("Child3", TUnit::UNIT);
+    p2_child3->AddCounter("Child3", TUnit::UNIT);
   p1_c1_shared->Add(10);
   p1_c1_only->Add(50);
   p2_c1_shared->Add(20);
@@ -158,17 +158,17 @@ TEST(CountersTest, MergeAndUpdate) {
 
   // Merge the two and validate
   TRuntimeProfileTree tprofile1;
-  profile1.ToThrift(&tprofile1);
-  RuntimeProfile averaged_profile(&pool, "merged", true);
-  averaged_profile.UpdateAverage(&profile1);
-  averaged_profile.UpdateAverage(&profile2);
-  EXPECT_EQ(5, averaged_profile.num_counters());
-  ValidateCounter(&averaged_profile, "Parent Shared", 2);
-  ValidateCounter(&averaged_profile, "Parent 1 Only", 2);
-  ValidateCounter(&averaged_profile, "Parent 2 Only", 5);
+  profile1->ToThrift(&tprofile1);
+  RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "merged", true);
+  averaged_profile->UpdateAverage(profile1);
+  averaged_profile->UpdateAverage(profile2);
+  EXPECT_EQ(5, averaged_profile->num_counters());
+  ValidateCounter(averaged_profile, "Parent Shared", 2);
+  ValidateCounter(averaged_profile, "Parent 1 Only", 2);
+  ValidateCounter(averaged_profile, "Parent 2 Only", 5);
 
   vector<RuntimeProfile*> children;
-  averaged_profile.GetChildren(&children);
+  averaged_profile->GetChildren(&children);
   EXPECT_EQ(children.size(), 3);
 
   for (int i = 0; i < 3; ++i) {
@@ -191,16 +191,16 @@ TEST(CountersTest, MergeAndUpdate) {
 
   // make sure we can print
   stringstream dummy;
-  averaged_profile.PrettyPrint(&dummy);
+  averaged_profile->PrettyPrint(&dummy);
 
   // Update profile2 w/ profile1 and validate
-  profile2.Update(tprofile1);
-  EXPECT_EQ(5, profile2.num_counters());
-  ValidateCounter(&profile2, "Parent Shared", 1);
-  ValidateCounter(&profile2, "Parent 1 Only", 2);
-  ValidateCounter(&profile2, "Parent 2 Only", 5);
+  profile2->Update(tprofile1);
+  EXPECT_EQ(5, profile2->num_counters());
+  ValidateCounter(profile2, "Parent Shared", 1);
+  ValidateCounter(profile2, "Parent 1 Only", 2);
+  ValidateCounter(profile2, "Parent 2 Only", 5);
 
-  profile2.GetChildren(&children);
+  profile2->GetChildren(&children);
   EXPECT_EQ(children.size(), 3);
 
   for (int i = 0; i < 3; ++i) {
@@ -222,14 +222,14 @@ TEST(CountersTest, MergeAndUpdate) {
   }
 
   // make sure we can print
-  profile2.PrettyPrint(&dummy);
+  profile2->PrettyPrint(&dummy);
 }
 
 TEST(CountersTest, HighWaterMarkCounters) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
   RuntimeProfile::HighWaterMarkCounter* bytes_counter =
-      profile.AddHighWaterMarkCounter("bytes", TUnit::BYTES);
+      profile->AddHighWaterMarkCounter("bytes", TUnit::BYTES);
 
   bytes_counter->Set(10);
   EXPECT_EQ(bytes_counter->current_value(), 10);
@@ -260,9 +260,9 @@ TEST(CountersTest, HighWaterMarkCounters) {
 
 TEST(CountersTest, SummaryStatsCounters) {
   ObjectPool pool;
-  RuntimeProfile profile1(&pool, "Profile 1");
+  RuntimeProfile* profile1 = RuntimeProfile::Create(&pool, "Profile 1");
   RuntimeProfile::SummaryStatsCounter* summary_stats_counter_1 =
-    profile1.AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
+    profile1->AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
 
   EXPECT_EQ(summary_stats_counter_1->value(), 0);
   EXPECT_EQ(summary_stats_counter_1->MinValue(), numeric_limits<int64_t>::max());
@@ -297,9 +297,9 @@ TEST(CountersTest, SummaryStatsCounters) {
   EXPECT_EQ(summary_stats_counter_1->MinValue(), -40);
   EXPECT_EQ(summary_stats_counter_1->MaxValue(), 40);
 
-  RuntimeProfile profile2(&pool, "Profile 2");
+  RuntimeProfile* profile2 = RuntimeProfile::Create(&pool, "Profile 2");
   RuntimeProfile::SummaryStatsCounter* summary_stats_counter_2 =
-    profile2.AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
+    profile2->AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
 
   summary_stats_counter_2->UpdateCounter(100);
   EXPECT_EQ(summary_stats_counter_2->value(), 100);
@@ -307,10 +307,10 @@ TEST(CountersTest, SummaryStatsCounters) {
   EXPECT_EQ(summary_stats_counter_2->MaxValue(), 100);
 
   TRuntimeProfileTree tprofile1;
-  profile1.ToThrift(&tprofile1);
+  profile1->ToThrift(&tprofile1);
 
   // Merge profile1 and profile2 and check that profile2 is overwritten.
-  profile2.Update(tprofile1);
+  profile2->Update(tprofile1);
   EXPECT_EQ(summary_stats_counter_2->value(), 4);
   EXPECT_EQ(summary_stats_counter_2->MinValue(), -40);
   EXPECT_EQ(summary_stats_counter_2->MaxValue(), 40);
@@ -319,16 +319,16 @@ TEST(CountersTest, SummaryStatsCounters) {
 
 TEST(CountersTest, DerivedCounters) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
   RuntimeProfile::Counter* bytes_counter =
-      profile.AddCounter("bytes", TUnit::BYTES);
+      profile->AddCounter("bytes", TUnit::BYTES);
   RuntimeProfile::Counter* ticks_counter =
-      profile.AddCounter("ticks", TUnit::TIME_NS);
+      profile->AddCounter("ticks", TUnit::TIME_NS);
   // set to 1 sec
   ticks_counter->Set(1000L * 1000L * 1000L);
 
   RuntimeProfile::DerivedCounter* throughput_counter =
-      profile.AddDerivedCounter("throughput", TUnit::BYTES,
+      profile->AddDerivedCounter("throughput", TUnit::BYTES,
       bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_counter, ticks_counter));
 
   bytes_counter->Set(10);
@@ -341,11 +341,11 @@ TEST(CountersTest, DerivedCounters) {
 
 TEST(CountersTest, AverageSetCounters) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
   RuntimeProfile::Counter* bytes_1_counter =
-      profile.AddCounter("bytes 1", TUnit::BYTES);
+      profile->AddCounter("bytes 1", TUnit::BYTES);
   RuntimeProfile::Counter* bytes_2_counter =
-      profile.AddCounter("bytes 2", TUnit::BYTES);
+      profile->AddCounter("bytes 2", TUnit::BYTES);
 
   bytes_1_counter->Set(10);
   RuntimeProfile::AveragedCounter bytes_avg(TUnit::BYTES);
@@ -366,9 +366,9 @@ TEST(CountersTest, AverageSetCounters) {
   EXPECT_EQ(bytes_avg.value(), 25);
 
   RuntimeProfile::Counter* double_1_counter =
-      profile.AddCounter("double 1", TUnit::DOUBLE_VALUE);
+      profile->AddCounter("double 1", TUnit::DOUBLE_VALUE);
   RuntimeProfile::Counter* double_2_counter =
-      profile.AddCounter("double 2", TUnit::DOUBLE_VALUE);
+      profile->AddCounter("double 2", TUnit::DOUBLE_VALUE);
   double_1_counter->Set(1.0f);
   RuntimeProfile::AveragedCounter double_avg(TUnit::DOUBLE_VALUE);
   double_avg.UpdateCounter(double_1_counter);
@@ -390,17 +390,17 @@ TEST(CountersTest, AverageSetCounters) {
 
 TEST(CountersTest, InfoStringTest) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
-  EXPECT_TRUE(profile.GetInfoString("Key") == NULL);
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
+  EXPECT_TRUE(profile->GetInfoString("Key") == NULL);
 
-  profile.AddInfoString("Key", "Value");
-  const string* value = profile.GetInfoString("Key");
+  profile->AddInfoString("Key", "Value");
+  const string* value = profile->GetInfoString("Key");
   EXPECT_TRUE(value != NULL);
   EXPECT_EQ(*value, "Value");
 
   // Convert it to thrift
   TRuntimeProfileTree tprofile;
-  profile.ToThrift(&tprofile);
+  profile->ToThrift(&tprofile);
 
   // Convert it back
   RuntimeProfile* from_thrift = RuntimeProfile::CreateFromThrift(
@@ -410,34 +410,34 @@ TEST(CountersTest, InfoStringTest) {
   EXPECT_EQ(*value, "Value");
 
   // Test update.
-  RuntimeProfile update_dst_profile(&pool, "Profile2");
-  update_dst_profile.Update(tprofile);
-  value = update_dst_profile.GetInfoString("Key");
+  RuntimeProfile* update_dst_profile = RuntimeProfile::Create(&pool, "Profile2");
+  update_dst_profile->Update(tprofile);
+  value = update_dst_profile->GetInfoString("Key");
   EXPECT_TRUE(value != NULL);
   EXPECT_EQ(*value, "Value");
 
   // Update the original profile, convert it to thrift and update from the dst
   // profile
-  profile.AddInfoString("Key", "NewValue");
-  profile.AddInfoString("Foo", "Bar");
-  EXPECT_EQ(*profile.GetInfoString("Key"), "NewValue");
-  EXPECT_EQ(*profile.GetInfoString("Foo"), "Bar");
-  profile.ToThrift(&tprofile);
-
-  update_dst_profile.Update(tprofile);
-  EXPECT_EQ(*update_dst_profile.GetInfoString("Key"), "NewValue");
-  EXPECT_EQ(*update_dst_profile.GetInfoString("Foo"), "Bar");
+  profile->AddInfoString("Key", "NewValue");
+  profile->AddInfoString("Foo", "Bar");
+  EXPECT_EQ(*profile->GetInfoString("Key"), "NewValue");
+  EXPECT_EQ(*profile->GetInfoString("Foo"), "Bar");
+  profile->ToThrift(&tprofile);
+
+  update_dst_profile->Update(tprofile);
+  EXPECT_EQ(*update_dst_profile->GetInfoString("Key"), "NewValue");
+  EXPECT_EQ(*update_dst_profile->GetInfoString("Foo"), "Bar");
 }
 
 TEST(CountersTest, RateCounters) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
 
   RuntimeProfile::Counter* bytes_counter =
-      profile.AddCounter("bytes", TUnit::BYTES);
+      profile->AddCounter("bytes", TUnit::BYTES);
 
   RuntimeProfile::Counter* rate_counter =
-      profile.AddRateCounter("RateCounter", bytes_counter);
+      profile->AddRateCounter("RateCounter", bytes_counter);
   EXPECT_TRUE(rate_counter->unit() == TUnit::BYTES_PER_SECOND);
 
   EXPECT_EQ(rate_counter->value(), 0);
@@ -449,8 +449,8 @@ TEST(CountersTest, RateCounters) {
 
   int64_t rate = rate_counter->value();
 
-  // Remove the counter so it no longer gets updates
-  PeriodicCounterUpdater::StopRateCounter(rate_counter);
+  // Stop the counter so it no longer gets updates
+  profile->StopPeriodicCounters();
 
   // The rate counter is not perfectly accurate.  Currently updated at 500ms intervals,
   // we should have seen somewhere between 1 and 3 updates (33 - 200 MB/s)
@@ -468,44 +468,42 @@ TEST(CountersTest, RateCounters) {
 
 TEST(CountersTest, BucketCounters) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
 
   RuntimeProfile::Counter* unit_counter =
-      profile.AddCounter("unit", TUnit::UNIT);
+      profile->AddCounter("unit", TUnit::UNIT);
 
   // Set the unit to 1 before sampling
   unit_counter->Set(1);
 
   // Create the bucket counters and start sampling
-  vector<RuntimeProfile::Counter*> buckets;
-  buckets.push_back(pool.Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
-  buckets.push_back(pool.Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
-  profile.RegisterBucketingCounters(unit_counter, &buckets);
+  vector<RuntimeProfile::Counter*>* buckets =
+      profile->AddBucketingCounters(unit_counter, 2);
 
   // Wait two seconds.
   sleep(2);
 
   // Stop sampling
-  PeriodicCounterUpdater::StopBucketingCounters(&buckets, true);
+  profile->StopPeriodicCounters();
 
   // TODO: change the value to double
   // The value of buckets[0] should be zero and buckets[1] should be 1.
-  double val0 = buckets[0]->double_value();
-  double val1 = buckets[1]->double_value();
+  double val0 = (*buckets)[0]->double_value();
+  double val1 = (*buckets)[1]->double_value();
   EXPECT_EQ(0, val0);
   EXPECT_EQ(100, val1);
 
   // Wait another second.  The counter has been removed. So the value should not be
   // changed (much).
   sleep(2);
-  EXPECT_EQ(val0, buckets[0]->double_value());
-  EXPECT_EQ(val1, buckets[1]->double_value());
+  EXPECT_EQ(val0, (*buckets)[0]->double_value());
+  EXPECT_EQ(val1, (*buckets)[1]->double_value());
 }
 
 TEST(CountersTest, EventSequences) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
-  RuntimeProfile::EventSequence* seq = profile.AddEventSequence("event sequence");
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
+  RuntimeProfile::EventSequence* seq = profile->AddEventSequence("event sequence");
   seq->MarkEvent("aaaa");
   seq->MarkEvent("bbbb");
   seq->MarkEvent("cccc");
@@ -524,7 +522,7 @@ TEST(CountersTest, EventSequences) {
   }
 
   TRuntimeProfileTree thrift_profile;
-  profile.ToThrift(&thrift_profile);
+  profile->ToThrift(&thrift_profile);
   EXPECT_TRUE(thrift_profile.nodes[0].__isset.event_sequences);
   EXPECT_EQ(1, thrift_profile.nodes[0].event_sequences.size());
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 12f4e25..4254b9c 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -53,10 +53,14 @@ const string RuntimeProfile::TOTAL_TIME_COUNTER_NAME = "TotalTime";
 const string RuntimeProfile::LOCAL_TIME_COUNTER_NAME = "LocalTime";
 const string RuntimeProfile::INACTIVE_TIME_COUNTER_NAME = "InactiveTotalTime";
 
+RuntimeProfile* RuntimeProfile::Create(ObjectPool* pool, const string& name,
+    bool is_averaged_profile) {
+  return pool->Add(new RuntimeProfile(pool, name, is_averaged_profile));
+}
+
 RuntimeProfile::RuntimeProfile(ObjectPool* pool, const string& name,
     bool is_averaged_profile)
   : pool_(pool),
-    own_pool_(false),
     name_(name),
     metadata_(-1),
     is_averaged_profile_(is_averaged_profile),
@@ -78,27 +82,25 @@ RuntimeProfile::RuntimeProfile(ObjectPool* pool, const string& name,
 }
 
 RuntimeProfile::~RuntimeProfile() {
-  map<string, Counter*>::const_iterator iter;
-  for (iter = counter_map_.begin(); iter != counter_map_.end(); ++iter) {
-    PeriodicCounterUpdater::StopRateCounter(iter->second);
-    PeriodicCounterUpdater::StopSamplingCounter(iter->second);
-  }
+  DCHECK(!has_active_periodic_counters_);
+}
 
-  set<vector<Counter*>* >::const_iterator buckets_iter;
-  for (buckets_iter = bucketing_counters_.begin();
-      buckets_iter != bucketing_counters_.end(); ++buckets_iter) {
-    // This is just a clean up. No need to perform conversion. Also, the underlying
-    // counters might be gone already.
-    PeriodicCounterUpdater::StopBucketingCounters(*buckets_iter, false);
+void RuntimeProfile::StopPeriodicCounters() {
+  lock_guard<SpinLock> l(counter_map_lock_);
+  if (!has_active_periodic_counters_) return;
+  for (Counter* sampling_counter : sampling_counters_) {
+    PeriodicCounterUpdater::StopSamplingCounter(sampling_counter);
   }
-
-  TimeSeriesCounterMap::const_iterator time_series_it;
-  for (time_series_it = time_series_counter_map_.begin();
-      time_series_it != time_series_counter_map_.end(); ++time_series_it) {
-    PeriodicCounterUpdater::StopTimeSeriesCounter(time_series_it->second);
+  for (Counter* rate_counter : rate_counters_) {
+    PeriodicCounterUpdater::StopRateCounter(rate_counter);
   }
-
-  if (own_pool_) delete pool_;
+  for (vector<Counter*>* counters : bucketing_counters_) {
+    PeriodicCounterUpdater::StopBucketingCounters(counters);
+  }
+  for (auto& time_series_counter_entry : time_series_counter_map_) {
+    PeriodicCounterUpdater::StopTimeSeriesCounter(time_series_counter_entry.second);
+  }
+  has_active_periodic_counters_ = false;
 }
 
 RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
@@ -113,7 +115,7 @@ RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
   DCHECK_LT(*idx, nodes.size());
 
   const TRuntimeProfileNode& node = nodes[*idx];
-  RuntimeProfile* profile = pool->Add(new RuntimeProfile(pool, node.name));
+  RuntimeProfile* profile = Create(pool, node.name);
   profile->metadata_ = node.metadata;
   for (int i = 0; i < node.counters.size(); ++i) {
     const TCounter& counter = node.counters[i];
@@ -208,7 +210,7 @@ void RuntimeProfile::UpdateAverage(RuntimeProfile* other) {
       if (j != child_map_.end()) {
         child = j->second;
       } else {
-        child = pool_->Add(new RuntimeProfile(pool_, other_child->name_, true));
+        child = Create(pool_, other_child->name_, true);
         child->metadata_ = other_child->metadata_;
         bool indent_other_child = other->children_[i].second;
         child_map_[child->name_] = child;
@@ -282,7 +284,7 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
   }
 
   {
-    lock_guard<SpinLock> l(time_series_counter_map_lock_);
+    lock_guard<SpinLock> l(counter_map_lock_);
     for (int i = 0; i < node.time_series_counters.size(); ++i) {
       const TTimeSeriesCounter& c = node.time_series_counters[i];
       TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(c.name);
@@ -322,7 +324,7 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
       if (j != child_map_.end()) {
         child = j->second;
       } else {
-        child = pool_->Add(new RuntimeProfile(pool_, tchild.name));
+        child = Create(pool_, tchild.name);
         child->metadata_ = tchild.metadata;
         child_map_[tchild.name] = child;
         children_.push_back(make_pair(child, tchild.indent));
@@ -439,7 +441,7 @@ RuntimeProfile* RuntimeProfile::CreateChild(const string& name, bool indent,
     bool prepend) {
   lock_guard<SpinLock> l(children_lock_);
   DCHECK(child_map_.find(name) == child_map_.end());
-  RuntimeProfile* child = pool_->Add(new RuntimeProfile(pool_, name));
+  RuntimeProfile* child = Create(pool_, name);
   AddChildLocked(child, indent, prepend ? children_.begin() : children_.end());
   return child;
 }
@@ -504,9 +506,16 @@ void RuntimeProfile::AddCodegenMsg(
 #define ADD_COUNTER_IMPL(NAME, T)                                                \
   RuntimeProfile::T* RuntimeProfile::NAME(                                       \
       const string& name, TUnit::type unit, const string& parent_counter_name) { \
-    DCHECK_EQ(is_averaged_profile_, false);                                      \
     lock_guard<SpinLock> l(counter_map_lock_);                                   \
+    bool dummy;                                                                  \
+    return NAME##Locked(name, unit, parent_counter_name, &dummy);                \
+  }                                                                              \
+  RuntimeProfile::T* RuntimeProfile::NAME##Locked( const string& name,           \
+      TUnit::type unit, const string& parent_counter_name, bool* created) {      \
+    counter_map_lock_.DCheckLocked();                                            \
+    DCHECK_EQ(is_averaged_profile_, false);                                      \
     if (counter_map_.find(name) != counter_map_.end()) {                         \
+      *created = false;                                                          \
       return reinterpret_cast<T*>(counter_map_[name]);                           \
     }                                                                            \
     DCHECK(parent_counter_name == ROOT_COUNTER                                   \
@@ -516,6 +525,7 @@ void RuntimeProfile::AddCodegenMsg(
     set<string>* child_counters =                                                \
         FindOrInsert(&child_counter_map_, parent_counter_name, set<string>());   \
     child_counters->insert(name);                                                \
+    *created = true;                                                             \
     return counter;                                                              \
   }
 
@@ -668,7 +678,7 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
     // <Name> (<period>): <val1>, <val2>, <etc>
     SpinLock* lock;
     int num, period;
-    lock_guard<SpinLock> l(time_series_counter_map_lock_);
+    lock_guard<SpinLock> l(counter_map_lock_);
     for (const TimeSeriesCounterMap::value_type& v: time_series_counter_map_) {
       const int64_t* samples = v.second->samples_.GetSamples(&num, &period, &lock);
       if (num > 0) {
@@ -813,7 +823,7 @@ void RuntimeProfile::ToThrift(vector<TRuntimeProfileNode>* nodes) const {
   }
 
   {
-    lock_guard<SpinLock> l(time_series_counter_map_lock_);
+    lock_guard<SpinLock> l(counter_map_lock_);
     if (time_series_counter_map_.size() != 0) {
       node.__set_time_series_counters(
           vector<TTimeSeriesCounter>(time_series_counter_map_.size()));
@@ -884,44 +894,71 @@ RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
       DCHECK(false) << "Unsupported src counter unit: " << src_counter->unit();
       return NULL;
   }
-  Counter* dst_counter = AddCounter(name, dst_unit);
-  PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
-      PeriodicCounterUpdater::RATE_COUNTER);
-  return dst_counter;
+  {
+    lock_guard<SpinLock> l(counter_map_lock_);
+    bool created;
+    Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created);
+    if (!created) return dst_counter;
+    rate_counters_.push_back(dst_counter);
+    PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
+        PeriodicCounterUpdater::RATE_COUNTER);
+    has_active_periodic_counters_ = true;
+    return dst_counter;
+  }
 }
 
 RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
     const string& name, DerivedCounterFunction fn, TUnit::type dst_unit) {
-  Counter* dst_counter = AddCounter(name, dst_unit);
+  lock_guard<SpinLock> l(counter_map_lock_);
+  bool created;
+  Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created);
+  if (!created) return dst_counter;
+  rate_counters_.push_back(dst_counter);
   PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, fn, dst_counter,
       PeriodicCounterUpdater::RATE_COUNTER);
+  has_active_periodic_counters_ = true;
   return dst_counter;
 }
 
 RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
     const string& name, Counter* src_counter) {
   DCHECK(src_counter->unit() == TUnit::UNIT);
-  Counter* dst_counter = AddCounter(name, TUnit::DOUBLE_VALUE);
+  lock_guard<SpinLock> l(counter_map_lock_);
+  bool created;
+  Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
+  if (!created) return dst_counter;
+  sampling_counters_.push_back(dst_counter);
   PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
       PeriodicCounterUpdater::SAMPLING_COUNTER);
+  has_active_periodic_counters_ = true;
   return dst_counter;
 }
 
 RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
     const string& name, DerivedCounterFunction sample_fn) {
-  Counter* dst_counter = AddCounter(name, TUnit::DOUBLE_VALUE);
+  lock_guard<SpinLock> l(counter_map_lock_);
+  bool created;
+  Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
+  if (!created) return dst_counter;
+  sampling_counters_.push_back(dst_counter);
   PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, sample_fn, dst_counter,
       PeriodicCounterUpdater::SAMPLING_COUNTER);
+  has_active_periodic_counters_ = true;
   return dst_counter;
 }
 
-void RuntimeProfile::RegisterBucketingCounters(Counter* src_counter,
-    vector<Counter*>* buckets) {
-  {
-    lock_guard<SpinLock> l(counter_map_lock_);
-    bucketing_counters_.insert(buckets);
+vector<RuntimeProfile::Counter*>* RuntimeProfile::AddBucketingCounters(
+    Counter* src_counter, int num_buckets) {
+  lock_guard<SpinLock> l(counter_map_lock_);
+  vector<RuntimeProfile::Counter*>* buckets = pool_->Add(new vector<Counter*>);
+  for (int i = 0; i < num_buckets; ++i) {
+      buckets->push_back(
+          pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
   }
+  bucketing_counters_.insert(buckets);
+  has_active_periodic_counters_ = true;
   PeriodicCounterUpdater::RegisterBucketingCounters(src_counter, buckets);
+  return buckets;
 }
 
 RuntimeProfile::EventSequence* RuntimeProfile::AddEventSequence(const string& name) {
@@ -978,16 +1015,14 @@ RuntimeProfile::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter(
 
 RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddTimeSeriesCounter(
     const string& name, TUnit::type unit, DerivedCounterFunction fn) {
-  DCHECK(fn != NULL);
-  TimeSeriesCounter* counter = NULL;
-  {
-    lock_guard<SpinLock> l(time_series_counter_map_lock_);
-    TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
-    if (it != time_series_counter_map_.end()) return it->second;
-    counter = pool_->Add(new TimeSeriesCounter(name, unit, fn));
-    time_series_counter_map_[name] = counter;
-  }
+  DCHECK(fn != nullptr);
+  lock_guard<SpinLock> l(counter_map_lock_);
+  TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
+  if (it != time_series_counter_map_.end()) return it->second;
+  TimeSeriesCounter* counter = pool_->Add(new TimeSeriesCounter(name, unit, fn));
+  time_series_counter_map_[name] = counter;
   PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
+  has_active_periodic_counters_ = true;
   return counter;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 298c214..8348161 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -101,13 +101,13 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
 
   typedef boost::function<int64_t ()> DerivedCounterFunction;
 
-  /// Create a runtime profile object with 'name'.  Counters and merged profile are
-  /// allocated from pool.
-  /// If is_averaged_profile is true, the counters in this profile will be derived
+  /// Create a runtime profile object with 'name'. The profile, counters and any other
+  /// structures owned by the profile are allocated from 'pool'.
+  /// If 'is_averaged_profile' is true, the counters in this profile will be derived
   /// averages (of unit AveragedCounter) from other profiles, so the counter map will
   /// be left empty Otherwise, the counter map is initialized with a single entry for
   /// TotalTime.
-  RuntimeProfile(ObjectPool* pool, const std::string& name,
+  static RuntimeProfile* Create(ObjectPool* pool, const std::string& name,
       bool is_averaged_profile = false);
 
   ~RuntimeProfile();
@@ -247,6 +247,12 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// the key does not exist.
   const std::string* GetInfoString(const std::string& key) const;
 
+  /// Stops updating all counters in this profile that are periodically updated by a
+  /// background thread (i.e. sampling, rate, bucketing and time series counters).
+  /// Must be called before the profile is destroyed if any such counters are active.
+  /// Does not stop counters on descendant profiles.
+  void StopPeriodicCounters();
+
   /// Returns the counter for the total elapsed time.
   Counter* total_time_counter() { return counter_map_[TOTAL_TIME_COUNTER_NAME]; }
   Counter* inactive_timer() { return counter_map_[INACTIVE_TIME_COUNTER_NAME]; }
@@ -299,8 +305,9 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// Add a rate counter to the current profile based on src_counter with name.
   /// The rate counter is updated periodically based on the src counter.
   /// The rate counter has units in src_counter unit per second.
-  /// Rate counters should be stopped (by calling PeriodicCounterUpdater::StopRateCounter)
-  /// as soon as the src_counter stops changing.
+  /// StopPeriodicCounters() must be called to stop the periodic updating before this
+  /// profile is destroyed. The periodic updating can be stopped earlier by calling
+  /// PeriodicCounterUpdater::StopRateCounter() if 'src_counter' stops changing.
   Counter* AddRateCounter(const std::string& name, Counter* src_counter);
 
   /// Same as 'AddRateCounter' above except values are taken by calling fn.
@@ -312,27 +319,40 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// The sampling counter is updated periodically based on the src counter by averaging
   /// the samples taken from the src counter.
   /// The sampling counter has the same unit as src_counter unit.
-  /// Sampling counters should be stopped (by calling
-  /// PeriodicCounterUpdater::StopSamplingCounter) as soon as the src_counter stops
-  /// changing.
+  /// StopPeriodicCounters() must be called to stop the periodic updating before this
+  /// profile is destroyed. The periodic updating can be stopped earlier by calling
+  /// PeriodicCounterUpdater::StopSamplingCounter() if 'src_counter' stops changing.
   Counter* AddSamplingCounter(const std::string& name, Counter* src_counter);
 
   /// Same as 'AddSamplingCounter' above except the samples are taken by calling fn.
   Counter* AddSamplingCounter(const std::string& name, DerivedCounterFunction fn);
 
-  /// Register a bucket of counters to store the sampled value of src_counter.
-  /// The src_counter is sampled periodically and the buckets are updated.
-  void RegisterBucketingCounters(Counter* src_counter, std::vector<Counter*>* buckets);
+  /// Create a set of counters, one per bucket, to store the sampled value of src_counter.
+  /// The 'src_counter' is sampled periodically to obtain the index of the bucket to
+  /// increment. E.g. if the value of 'src_counter' is 3, the bucket at index 3 is
+  /// updated. If the index exceeds the index of the last bucket, the last bucket is
+  /// updated.
+  ///
+  /// The created counters do not appear in the profile when serialized or
+  /// pretty-printed. The caller must do its own processing of the counter value
+  /// (e.g. converting it to an info string).
+  /// TODO: make this interface more consistent and sane.
+  ///
+  /// StopPeriodicCounters() must be called to stop the periodic updating before this
+  /// profile is destroyed. The periodic updating can be stopped earlier by calling
+  /// PeriodicCounterUpdater::StopBucketingCounters() if 'buckets' stops changing.
+  std::vector<Counter*>* AddBucketingCounters(Counter* src_counter, int num_buckets);
 
   /// Create a time series counter. This begins sampling immediately. This counter
   /// contains a number of samples that are collected periodically by calling sample_fn().
+  /// StopPeriodicCounters() must be called to stop the periodic updating before this
+  /// profile is destroyed. The periodic updating can be stopped earlier by calling
+  /// PeriodicCounterUpdater::StopTimeSeriesCounter() if the input stops changing.
   /// Note: these counters don't get merged (to make average profiles)
   TimeSeriesCounter* AddTimeSeriesCounter(const std::string& name,
       TUnit::type unit, DerivedCounterFunction sample_fn);
 
-  /// Create a time series counter that samples the source counter. Sampling begins
-  /// immediately.
-  /// Note: these counters don't get merged (to make average profiles)
+  /// Same as above except the samples are collected from 'src_counter'.
   TimeSeriesCounter* AddTimeSeriesCounter(const std::string& name, Counter* src_counter);
 
   /// Recursively compute the fraction of the 'total_time' spent in this profile and
@@ -345,9 +365,6 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// object, but occasionally allocated in the constructor.
   ObjectPool* pool_;
 
-  /// True if we have to delete the pool_ on destruction.
-  bool own_pool_;
-
   /// Name for this runtime profile.
   std::string name_;
 
@@ -369,9 +386,27 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   ChildCounterMap child_counter_map_;
 
   /// A set of bucket counters registered in this runtime profile.
-  std::set<std::vector<Counter*>* > bucketing_counters_;
+  std::set<std::vector<Counter*>*> bucketing_counters_;
+
+  /// Rate counters, which also appear in 'counter_map_'. Tracked separately to enable
+  /// stopping the counters.
+  std::vector<Counter*> rate_counters_;
+
+  /// Sampling counters, which also appear in 'counter_map_'. Tracked separately to enable
+  /// stopping the counters.
+  std::vector<Counter*> sampling_counters_;
+
+  /// Time series counters. These do not appear in 'counter_map_'. Tracked separately
+  /// because they are displayed separately in the profile and need to be stopped.
+  typedef std::map<std::string, TimeSeriesCounter*> TimeSeriesCounterMap;
+  TimeSeriesCounterMap time_series_counter_map_;
 
-  /// Protects counter_map_, counter_child_map_ and bucketing_counters_.
+  /// True if this profile has active periodic counters, including bucketing, rate,
+  /// sampling and time series counters.
+  bool has_active_periodic_counters_ = false;
+
+  /// Protects counter_map_, child_counter_map_, bucketing_counters_, rate_counters_,
+  /// sampling_counters_, time_series_counter_map_, and has_active_periodic_counters_.
   mutable SpinLock counter_map_lock_;
 
   /// Child profiles.  Does not own memory.
@@ -403,12 +438,6 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// Protects event_sequence_map_.
   mutable SpinLock event_sequence_lock_;
 
-  typedef std::map<std::string, TimeSeriesCounter*> TimeSeriesCounterMap;
-  TimeSeriesCounterMap time_series_counter_map_;
-
-  /// Protects time_series_counter_map_.
-  mutable SpinLock time_series_counter_map_lock_;
-
   typedef std::map<std::string, SummaryStatsCounter*> SummaryStatsCounterMap;
   SummaryStatsCounterMap summary_stats_map_;
 
@@ -430,6 +459,9 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// ComputeTimeInProfile()
   int64_t local_time_ns_;
 
+  /// Constructor used by Create().
+  RuntimeProfile(ObjectPool* pool, const std::string& name, bool is_averaged_profile);
+
   /// Update a subtree of profiles from nodes, rooted at *idx.
   /// On return, *idx points to the node immediately following this subtree.
   void Update(const std::vector<TRuntimeProfileNode>& nodes, int* idx);
@@ -454,6 +486,16 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   static RuntimeProfile* CreateFromThrift(
       ObjectPool* pool, const std::vector<TRuntimeProfileNode>& nodes, int* node_idx);
 
+  /// Internal implementations of the Add*Counter() functions for use when the caller
+  /// holds counter_map_lock_. Also returns 'created', which is true if a new counter was
+  /// created and false if a counter with the given name already existed.
+  Counter* AddCounterLocked(const std::string& name, TUnit::type unit,
+      const std::string& parent_counter_name, bool* created);
+  HighWaterMarkCounter* AddHighWaterMarkCounterLocked(const std::string& name,
+      TUnit::type unit, const std::string& parent_counter_name, bool* created);
+  ConcurrentTimerCounter* AddConcurrentTimerCounterLocked(const std::string& name,
+      TUnit::type unit, const std::string& parent_counter_name, bool* created);
+
   ///  Inserts 'child' before the iterator 'insert_pos' in 'children_'.
   /// 'children_lock_' must be held by the caller.
   void AddChildLocked(



[2/5] incubator-impala git commit: IMPALA-5932: Improve transitive closure computation performance in FE

Posted by ta...@apache.org.
IMPALA-5932: Improve transitive closure computation performance in FE

This patch implements the Floyd-Warshall algorithm for the transitive
closure computation for the value transfer graph, replacing the existing
N^4 brute force algorithm.
The performance improvement depends on the size and structure of the
value transfer graph. On a random graph with 800 slots and 2800 edges it
is 43X faster itself. And the "Equivalence classes computed" event in
the runtime profile becomes 21X faster.
This computation is covered by the existing tests, which verifies the
equivalency of the new and the old value transfer graphs.

Change-Id: Idb00e3c1f904e60ae25567a52b4bf0809a84c6b3
Reviewed-on: http://gerrit.cloudera.org:8080/8098
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/741b0524
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/741b0524
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/741b0524

Branch: refs/heads/master
Commit: 741b0524a43c93f2d339dad259dea510524cbdbe
Parents: eecbbcb
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Thu Sep 14 12:53:42 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 20 02:45:04 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/Analyzer.java    | 23 ++++++--------------
 1 file changed, 7 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/741b0524/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index c629bb7..df5cf97 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -2620,7 +2620,7 @@ public class Analyzer {
      *    This step partitions the value transfers into disjoint sets.
      * 4. Compute the transitive closure of each partition from (3) in the new slot
      *    domain separately. Hopefully, the partitions are small enough to afford
-     *    the O(N^3) complexity of the brute-force transitive closure computation.
+     *    the O(N^3) complexity of the Floyd-Warshall transitive closure computation.
      * The condensed graph is not transformed back into the original slot domain because
      * of the potential performance penalty. Instead, hasValueTransfer() consults
      * coalescedSlots_, valueTransfer_, and completeSubGraphs_ which can together
@@ -2695,24 +2695,15 @@ public class Analyzer {
           p[numPartitionSlots++] = slotId;
         }
         // Compute the transitive closure of this graph partition.
-        // TODO: Since we are operating on a DAG the performance can be improved if
-        // necessary (e.g., topological sort + backwards propagation of the transitive
-        // closure).
-        boolean changed = false;
-        do {
-          changed = false;
+        for (int j = 0; j < numPartitionSlots; ++j) {
           for (int i = 0; i < numPartitionSlots; ++i) {
-            for (int j = 0; j < numPartitionSlots; ++j) {
-              for (int k = 0; k < numPartitionSlots; ++k) {
-                if (valueTransfer_[p[i]][p[j]] && valueTransfer_[p[j]][p[k]]
-                    && !valueTransfer_[p[i]][p[k]]) {
-                  valueTransfer_[p[i]][p[k]] = true;
-                  changed = true;
-                }
-              }
+            // Our graphs are typically sparse so this filters out a lot of iterations.
+            if (!valueTransfer_[p[i]][p[j]]) continue;
+            for (int k = 0; k < numPartitionSlots; ++k) {
+              valueTransfer_[p[i]][p[k]] |= valueTransfer_[p[j]][p[k]];
             }
           }
-        } while (changed);
+        }
       }
 
       long end = System.currentTimeMillis();


[5/5] incubator-impala git commit: IMPALA-5920: addendum - add missing RAT check

Posted by ta...@apache.org.
IMPALA-5920: addendum - add missing RAT check

Change-Id: I44654004bef74b741cfdf4fb07c274a77320b818
Reviewed-on: http://gerrit.cloudera.org:8080/8108
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/fc275fab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/fc275fab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/fc275fab

Branch: refs/heads/master
Commit: fc275fab62c41f3ddc2978fab531b61870817541
Parents: 7866eec
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Sep 19 21:43:08 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 20 10:30:04 2017 +0000

----------------------------------------------------------------------
 bin/rat_exclude_files.txt | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fc275fab/bin/rat_exclude_files.txt
----------------------------------------------------------------------
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index c1a8e04..e356a9e 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -72,6 +72,7 @@ LOGS.md
 README.md
 */README
 */README.dox
+*/README.txt
 testdata/bin/README-BENCHMARK-TEST-GENERATION
 tests/comparison/ORACLE.txt
 bin/distcc/README.md