You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/20 20:20:13 UTC

[3/5] incubator-impala git commit: IMPALA-5895: clean up runtime profile lifecycle

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/runtime-profile-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index fe9f3ae..1bc7911 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -34,17 +34,17 @@ namespace impala {
 
 TEST(CountersTest, Basic) {
   ObjectPool pool;
-  RuntimeProfile profile_a(&pool, "ProfileA");
-  RuntimeProfile profile_a1(&pool, "ProfileA1");
-  RuntimeProfile profile_a2(&pool, "ProfileAb");
+  RuntimeProfile* profile_a = RuntimeProfile::Create(&pool, "ProfileA");
+  RuntimeProfile* profile_a1 = RuntimeProfile::Create(&pool, "ProfileA1");
+  RuntimeProfile* profile_a2 = RuntimeProfile::Create(&pool, "ProfileAb");
 
   TRuntimeProfileTree thrift_profile;
 
-  profile_a.AddChild(&profile_a1);
-  profile_a.AddChild(&profile_a2);
+  profile_a->AddChild(profile_a1);
+  profile_a->AddChild(profile_a2);
 
   // Test Empty
-  profile_a.ToThrift(&thrift_profile.nodes);
+  profile_a->ToThrift(&thrift_profile.nodes);
   EXPECT_EQ(thrift_profile.nodes.size(), 3);
   thrift_profile.nodes.clear();
 
@@ -53,7 +53,7 @@ TEST(CountersTest, Basic) {
   RuntimeProfile::Counter* counter_merged;
 
   // Updating/setting counter
-  counter_a = profile_a.AddCounter("A", TUnit::UNIT);
+  counter_a = profile_a->AddCounter("A", TUnit::UNIT);
   EXPECT_TRUE(counter_a != NULL);
   counter_a->Add(10);
   counter_a->Add(-5);
@@ -61,40 +61,40 @@ TEST(CountersTest, Basic) {
   counter_a->Set(1);
   EXPECT_EQ(counter_a->value(), 1);
 
-  counter_b = profile_a2.AddCounter("B", TUnit::BYTES);
+  counter_b = profile_a2->AddCounter("B", TUnit::BYTES);
   EXPECT_TRUE(counter_b != NULL);
 
   // Serialize/deserialize
-  profile_a.ToThrift(&thrift_profile.nodes);
+  profile_a->ToThrift(&thrift_profile.nodes);
   RuntimeProfile* from_thrift = RuntimeProfile::CreateFromThrift(&pool, thrift_profile);
   counter_merged = from_thrift->GetCounter("A");
   EXPECT_EQ(counter_merged->value(), 1);
   EXPECT_TRUE(from_thrift->GetCounter("Not there") ==  NULL);
 
   // Averaged
-  RuntimeProfile averaged_profile(&pool, "Merged", true);
-  averaged_profile.UpdateAverage(from_thrift);
-  counter_merged = averaged_profile.GetCounter("A");
+  RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "Merged", true);
+  averaged_profile->UpdateAverage(from_thrift);
+  counter_merged = averaged_profile->GetCounter("A");
   EXPECT_EQ(counter_merged->value(), 1);
 
   // UpdateAverage again, there should be no change.
-  averaged_profile.UpdateAverage(from_thrift);
+  averaged_profile->UpdateAverage(from_thrift);
   EXPECT_EQ(counter_merged->value(), 1);
 
-  counter_a = profile_a2.AddCounter("A", TUnit::UNIT);
+  counter_a = profile_a2->AddCounter("A", TUnit::UNIT);
   counter_a->Set(3);
-  averaged_profile.UpdateAverage(&profile_a2);
+  averaged_profile->UpdateAverage(profile_a2);
   EXPECT_EQ(counter_merged->value(), 2);
 
   // Update
-  RuntimeProfile updated_profile(&pool, "Updated");
-  updated_profile.Update(thrift_profile);
-  RuntimeProfile::Counter* counter_updated = updated_profile.GetCounter("A");
+  RuntimeProfile* updated_profile = RuntimeProfile::Create(&pool, "Updated");
+  updated_profile->Update(thrift_profile);
+  RuntimeProfile::Counter* counter_updated = updated_profile->GetCounter("A");
   EXPECT_EQ(counter_updated->value(), 1);
 
   // Update 2 more times, counters should stay the same
-  updated_profile.Update(thrift_profile);
-  updated_profile.Update(thrift_profile);
+  updated_profile->Update(thrift_profile);
+  updated_profile->Update(thrift_profile);
   EXPECT_EQ(counter_updated->value(), 1);
 }
 
@@ -110,27 +110,27 @@ TEST(CountersTest, MergeAndUpdate) {
   // children, with the counters from the shared child aggregated.
 
   ObjectPool pool;
-  RuntimeProfile profile1(&pool, "Parent1");
-  RuntimeProfile p1_child1(&pool, "Child1");
-  RuntimeProfile p1_child2(&pool, "Child2");
-  profile1.AddChild(&p1_child1);
-  profile1.AddChild(&p1_child2);
-
-  RuntimeProfile profile2(&pool, "Parent2");
-  RuntimeProfile p2_child1(&pool, "Child1");
-  RuntimeProfile p2_child3(&pool, "Child3");
-  profile2.AddChild(&p2_child1);
-  profile2.AddChild(&p2_child3);
+  RuntimeProfile* profile1 = RuntimeProfile::Create(&pool, "Parent1");
+  RuntimeProfile* p1_child1 = RuntimeProfile::Create(&pool, "Child1");
+  RuntimeProfile* p1_child2 = RuntimeProfile::Create(&pool, "Child2");
+  profile1->AddChild(p1_child1);
+  profile1->AddChild(p1_child2);
+
+  RuntimeProfile* profile2 = RuntimeProfile::Create(&pool, "Parent2");
+  RuntimeProfile* p2_child1 = RuntimeProfile::Create(&pool, "Child1");
+  RuntimeProfile* p2_child3 = RuntimeProfile::Create(&pool, "Child3");
+  profile2->AddChild(p2_child1);
+  profile2->AddChild(p2_child3);
 
   // Create parent level counters
   RuntimeProfile::Counter* parent1_shared =
-      profile1.AddCounter("Parent Shared", TUnit::UNIT);
+      profile1->AddCounter("Parent Shared", TUnit::UNIT);
   RuntimeProfile::Counter* parent2_shared =
-      profile2.AddCounter("Parent Shared", TUnit::UNIT);
+      profile2->AddCounter("Parent Shared", TUnit::UNIT);
   RuntimeProfile::Counter* parent1_only =
-      profile1.AddCounter("Parent 1 Only", TUnit::UNIT);
+      profile1->AddCounter("Parent 1 Only", TUnit::UNIT);
   RuntimeProfile::Counter* parent2_only =
-      profile2.AddCounter("Parent 2 Only", TUnit::UNIT);
+      profile2->AddCounter("Parent 2 Only", TUnit::UNIT);
   parent1_shared->Add(1);
   parent2_shared->Add(3);
   parent1_only->Add(2);
@@ -138,17 +138,17 @@ TEST(CountersTest, MergeAndUpdate) {
 
   // Create child level counters
   RuntimeProfile::Counter* p1_c1_shared =
-    p1_child1.AddCounter("Child1 Shared", TUnit::UNIT);
+    p1_child1->AddCounter("Child1 Shared", TUnit::UNIT);
   RuntimeProfile::Counter* p1_c1_only =
-    p1_child1.AddCounter("Child1 Parent 1 Only", TUnit::UNIT);
+    p1_child1->AddCounter("Child1 Parent 1 Only", TUnit::UNIT);
   RuntimeProfile::Counter* p1_c2 =
-    p1_child2.AddCounter("Child2", TUnit::UNIT);
+    p1_child2->AddCounter("Child2", TUnit::UNIT);
   RuntimeProfile::Counter* p2_c1_shared =
-    p2_child1.AddCounter("Child1 Shared", TUnit::UNIT);
+    p2_child1->AddCounter("Child1 Shared", TUnit::UNIT);
   RuntimeProfile::Counter* p2_c1_only =
-    p1_child1.AddCounter("Child1 Parent 2 Only", TUnit::UNIT);
+    p1_child1->AddCounter("Child1 Parent 2 Only", TUnit::UNIT);
   RuntimeProfile::Counter* p2_c3 =
-    p2_child3.AddCounter("Child3", TUnit::UNIT);
+    p2_child3->AddCounter("Child3", TUnit::UNIT);
   p1_c1_shared->Add(10);
   p1_c1_only->Add(50);
   p2_c1_shared->Add(20);
@@ -158,17 +158,17 @@ TEST(CountersTest, MergeAndUpdate) {
 
   // Merge the two and validate
   TRuntimeProfileTree tprofile1;
-  profile1.ToThrift(&tprofile1);
-  RuntimeProfile averaged_profile(&pool, "merged", true);
-  averaged_profile.UpdateAverage(&profile1);
-  averaged_profile.UpdateAverage(&profile2);
-  EXPECT_EQ(5, averaged_profile.num_counters());
-  ValidateCounter(&averaged_profile, "Parent Shared", 2);
-  ValidateCounter(&averaged_profile, "Parent 1 Only", 2);
-  ValidateCounter(&averaged_profile, "Parent 2 Only", 5);
+  profile1->ToThrift(&tprofile1);
+  RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "merged", true);
+  averaged_profile->UpdateAverage(profile1);
+  averaged_profile->UpdateAverage(profile2);
+  EXPECT_EQ(5, averaged_profile->num_counters());
+  ValidateCounter(averaged_profile, "Parent Shared", 2);
+  ValidateCounter(averaged_profile, "Parent 1 Only", 2);
+  ValidateCounter(averaged_profile, "Parent 2 Only", 5);
 
   vector<RuntimeProfile*> children;
-  averaged_profile.GetChildren(&children);
+  averaged_profile->GetChildren(&children);
   EXPECT_EQ(children.size(), 3);
 
   for (int i = 0; i < 3; ++i) {
@@ -191,16 +191,16 @@ TEST(CountersTest, MergeAndUpdate) {
 
   // make sure we can print
   stringstream dummy;
-  averaged_profile.PrettyPrint(&dummy);
+  averaged_profile->PrettyPrint(&dummy);
 
   // Update profile2 w/ profile1 and validate
-  profile2.Update(tprofile1);
-  EXPECT_EQ(5, profile2.num_counters());
-  ValidateCounter(&profile2, "Parent Shared", 1);
-  ValidateCounter(&profile2, "Parent 1 Only", 2);
-  ValidateCounter(&profile2, "Parent 2 Only", 5);
+  profile2->Update(tprofile1);
+  EXPECT_EQ(5, profile2->num_counters());
+  ValidateCounter(profile2, "Parent Shared", 1);
+  ValidateCounter(profile2, "Parent 1 Only", 2);
+  ValidateCounter(profile2, "Parent 2 Only", 5);
 
-  profile2.GetChildren(&children);
+  profile2->GetChildren(&children);
   EXPECT_EQ(children.size(), 3);
 
   for (int i = 0; i < 3; ++i) {
@@ -222,14 +222,14 @@ TEST(CountersTest, MergeAndUpdate) {
   }
 
   // make sure we can print
-  profile2.PrettyPrint(&dummy);
+  profile2->PrettyPrint(&dummy);
 }
 
 TEST(CountersTest, HighWaterMarkCounters) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
   RuntimeProfile::HighWaterMarkCounter* bytes_counter =
-      profile.AddHighWaterMarkCounter("bytes", TUnit::BYTES);
+      profile->AddHighWaterMarkCounter("bytes", TUnit::BYTES);
 
   bytes_counter->Set(10);
   EXPECT_EQ(bytes_counter->current_value(), 10);
@@ -260,9 +260,9 @@ TEST(CountersTest, HighWaterMarkCounters) {
 
 TEST(CountersTest, SummaryStatsCounters) {
   ObjectPool pool;
-  RuntimeProfile profile1(&pool, "Profile 1");
+  RuntimeProfile* profile1 = RuntimeProfile::Create(&pool, "Profile 1");
   RuntimeProfile::SummaryStatsCounter* summary_stats_counter_1 =
-    profile1.AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
+    profile1->AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
 
   EXPECT_EQ(summary_stats_counter_1->value(), 0);
   EXPECT_EQ(summary_stats_counter_1->MinValue(), numeric_limits<int64_t>::max());
@@ -297,9 +297,9 @@ TEST(CountersTest, SummaryStatsCounters) {
   EXPECT_EQ(summary_stats_counter_1->MinValue(), -40);
   EXPECT_EQ(summary_stats_counter_1->MaxValue(), 40);
 
-  RuntimeProfile profile2(&pool, "Profile 2");
+  RuntimeProfile* profile2 = RuntimeProfile::Create(&pool, "Profile 2");
   RuntimeProfile::SummaryStatsCounter* summary_stats_counter_2 =
-    profile2.AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
+    profile2->AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
 
   summary_stats_counter_2->UpdateCounter(100);
   EXPECT_EQ(summary_stats_counter_2->value(), 100);
@@ -307,10 +307,10 @@ TEST(CountersTest, SummaryStatsCounters) {
   EXPECT_EQ(summary_stats_counter_2->MaxValue(), 100);
 
   TRuntimeProfileTree tprofile1;
-  profile1.ToThrift(&tprofile1);
+  profile1->ToThrift(&tprofile1);
 
   // Merge profile1 and profile2 and check that profile2 is overwritten.
-  profile2.Update(tprofile1);
+  profile2->Update(tprofile1);
   EXPECT_EQ(summary_stats_counter_2->value(), 4);
   EXPECT_EQ(summary_stats_counter_2->MinValue(), -40);
   EXPECT_EQ(summary_stats_counter_2->MaxValue(), 40);
@@ -319,16 +319,16 @@ TEST(CountersTest, SummaryStatsCounters) {
 
 TEST(CountersTest, DerivedCounters) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
   RuntimeProfile::Counter* bytes_counter =
-      profile.AddCounter("bytes", TUnit::BYTES);
+      profile->AddCounter("bytes", TUnit::BYTES);
   RuntimeProfile::Counter* ticks_counter =
-      profile.AddCounter("ticks", TUnit::TIME_NS);
+      profile->AddCounter("ticks", TUnit::TIME_NS);
   // set to 1 sec
   ticks_counter->Set(1000L * 1000L * 1000L);
 
   RuntimeProfile::DerivedCounter* throughput_counter =
-      profile.AddDerivedCounter("throughput", TUnit::BYTES,
+      profile->AddDerivedCounter("throughput", TUnit::BYTES,
       bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_counter, ticks_counter));
 
   bytes_counter->Set(10);
@@ -341,11 +341,11 @@ TEST(CountersTest, DerivedCounters) {
 
 TEST(CountersTest, AverageSetCounters) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
   RuntimeProfile::Counter* bytes_1_counter =
-      profile.AddCounter("bytes 1", TUnit::BYTES);
+      profile->AddCounter("bytes 1", TUnit::BYTES);
   RuntimeProfile::Counter* bytes_2_counter =
-      profile.AddCounter("bytes 2", TUnit::BYTES);
+      profile->AddCounter("bytes 2", TUnit::BYTES);
 
   bytes_1_counter->Set(10);
   RuntimeProfile::AveragedCounter bytes_avg(TUnit::BYTES);
@@ -366,9 +366,9 @@ TEST(CountersTest, AverageSetCounters) {
   EXPECT_EQ(bytes_avg.value(), 25);
 
   RuntimeProfile::Counter* double_1_counter =
-      profile.AddCounter("double 1", TUnit::DOUBLE_VALUE);
+      profile->AddCounter("double 1", TUnit::DOUBLE_VALUE);
   RuntimeProfile::Counter* double_2_counter =
-      profile.AddCounter("double 2", TUnit::DOUBLE_VALUE);
+      profile->AddCounter("double 2", TUnit::DOUBLE_VALUE);
   double_1_counter->Set(1.0f);
   RuntimeProfile::AveragedCounter double_avg(TUnit::DOUBLE_VALUE);
   double_avg.UpdateCounter(double_1_counter);
@@ -390,17 +390,17 @@ TEST(CountersTest, AverageSetCounters) {
 
 TEST(CountersTest, InfoStringTest) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
-  EXPECT_TRUE(profile.GetInfoString("Key") == NULL);
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
+  EXPECT_TRUE(profile->GetInfoString("Key") == NULL);
 
-  profile.AddInfoString("Key", "Value");
-  const string* value = profile.GetInfoString("Key");
+  profile->AddInfoString("Key", "Value");
+  const string* value = profile->GetInfoString("Key");
   EXPECT_TRUE(value != NULL);
   EXPECT_EQ(*value, "Value");
 
   // Convert it to thrift
   TRuntimeProfileTree tprofile;
-  profile.ToThrift(&tprofile);
+  profile->ToThrift(&tprofile);
 
   // Convert it back
   RuntimeProfile* from_thrift = RuntimeProfile::CreateFromThrift(
@@ -410,34 +410,34 @@ TEST(CountersTest, InfoStringTest) {
   EXPECT_EQ(*value, "Value");
 
   // Test update.
-  RuntimeProfile update_dst_profile(&pool, "Profile2");
-  update_dst_profile.Update(tprofile);
-  value = update_dst_profile.GetInfoString("Key");
+  RuntimeProfile* update_dst_profile = RuntimeProfile::Create(&pool, "Profile2");
+  update_dst_profile->Update(tprofile);
+  value = update_dst_profile->GetInfoString("Key");
   EXPECT_TRUE(value != NULL);
   EXPECT_EQ(*value, "Value");
 
   // Update the original profile, convert it to thrift and update from the dst
   // profile
-  profile.AddInfoString("Key", "NewValue");
-  profile.AddInfoString("Foo", "Bar");
-  EXPECT_EQ(*profile.GetInfoString("Key"), "NewValue");
-  EXPECT_EQ(*profile.GetInfoString("Foo"), "Bar");
-  profile.ToThrift(&tprofile);
-
-  update_dst_profile.Update(tprofile);
-  EXPECT_EQ(*update_dst_profile.GetInfoString("Key"), "NewValue");
-  EXPECT_EQ(*update_dst_profile.GetInfoString("Foo"), "Bar");
+  profile->AddInfoString("Key", "NewValue");
+  profile->AddInfoString("Foo", "Bar");
+  EXPECT_EQ(*profile->GetInfoString("Key"), "NewValue");
+  EXPECT_EQ(*profile->GetInfoString("Foo"), "Bar");
+  profile->ToThrift(&tprofile);
+
+  update_dst_profile->Update(tprofile);
+  EXPECT_EQ(*update_dst_profile->GetInfoString("Key"), "NewValue");
+  EXPECT_EQ(*update_dst_profile->GetInfoString("Foo"), "Bar");
 }
 
 TEST(CountersTest, RateCounters) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
 
   RuntimeProfile::Counter* bytes_counter =
-      profile.AddCounter("bytes", TUnit::BYTES);
+      profile->AddCounter("bytes", TUnit::BYTES);
 
   RuntimeProfile::Counter* rate_counter =
-      profile.AddRateCounter("RateCounter", bytes_counter);
+      profile->AddRateCounter("RateCounter", bytes_counter);
   EXPECT_TRUE(rate_counter->unit() == TUnit::BYTES_PER_SECOND);
 
   EXPECT_EQ(rate_counter->value(), 0);
@@ -449,8 +449,8 @@ TEST(CountersTest, RateCounters) {
 
   int64_t rate = rate_counter->value();
 
-  // Remove the counter so it no longer gets updates
-  PeriodicCounterUpdater::StopRateCounter(rate_counter);
+  // Stop the counter so it no longer gets updates
+  profile->StopPeriodicCounters();
 
   // The rate counter is not perfectly accurate.  Currently updated at 500ms intervals,
   // we should have seen somewhere between 1 and 3 updates (33 - 200 MB/s)
@@ -468,44 +468,42 @@ TEST(CountersTest, RateCounters) {
 
 TEST(CountersTest, BucketCounters) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
 
   RuntimeProfile::Counter* unit_counter =
-      profile.AddCounter("unit", TUnit::UNIT);
+      profile->AddCounter("unit", TUnit::UNIT);
 
   // Set the unit to 1 before sampling
   unit_counter->Set(1);
 
   // Create the bucket counters and start sampling
-  vector<RuntimeProfile::Counter*> buckets;
-  buckets.push_back(pool.Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
-  buckets.push_back(pool.Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
-  profile.RegisterBucketingCounters(unit_counter, &buckets);
+  vector<RuntimeProfile::Counter*>* buckets =
+      profile->AddBucketingCounters(unit_counter, 2);
 
   // Wait two seconds.
   sleep(2);
 
   // Stop sampling
-  PeriodicCounterUpdater::StopBucketingCounters(&buckets, true);
+  profile->StopPeriodicCounters();
 
   // TODO: change the value to double
   // The value of buckets[0] should be zero and buckets[1] should be 1.
-  double val0 = buckets[0]->double_value();
-  double val1 = buckets[1]->double_value();
+  double val0 = (*buckets)[0]->double_value();
+  double val1 = (*buckets)[1]->double_value();
   EXPECT_EQ(0, val0);
   EXPECT_EQ(100, val1);
 
   // Wait another second.  The counter has been removed. So the value should not be
   // changed (much).
   sleep(2);
-  EXPECT_EQ(val0, buckets[0]->double_value());
-  EXPECT_EQ(val1, buckets[1]->double_value());
+  EXPECT_EQ(val0, (*buckets)[0]->double_value());
+  EXPECT_EQ(val1, (*buckets)[1]->double_value());
 }
 
 TEST(CountersTest, EventSequences) {
   ObjectPool pool;
-  RuntimeProfile profile(&pool, "Profile");
-  RuntimeProfile::EventSequence* seq = profile.AddEventSequence("event sequence");
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
+  RuntimeProfile::EventSequence* seq = profile->AddEventSequence("event sequence");
   seq->MarkEvent("aaaa");
   seq->MarkEvent("bbbb");
   seq->MarkEvent("cccc");
@@ -524,7 +522,7 @@ TEST(CountersTest, EventSequences) {
   }
 
   TRuntimeProfileTree thrift_profile;
-  profile.ToThrift(&thrift_profile);
+  profile->ToThrift(&thrift_profile);
   EXPECT_TRUE(thrift_profile.nodes[0].__isset.event_sequences);
   EXPECT_EQ(1, thrift_profile.nodes[0].event_sequences.size());
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 12f4e25..4254b9c 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -53,10 +53,14 @@ const string RuntimeProfile::TOTAL_TIME_COUNTER_NAME = "TotalTime";
 const string RuntimeProfile::LOCAL_TIME_COUNTER_NAME = "LocalTime";
 const string RuntimeProfile::INACTIVE_TIME_COUNTER_NAME = "InactiveTotalTime";
 
+RuntimeProfile* RuntimeProfile::Create(ObjectPool* pool, const string& name,
+    bool is_averaged_profile) {
+  return pool->Add(new RuntimeProfile(pool, name, is_averaged_profile));
+}
+
 RuntimeProfile::RuntimeProfile(ObjectPool* pool, const string& name,
     bool is_averaged_profile)
   : pool_(pool),
-    own_pool_(false),
     name_(name),
     metadata_(-1),
     is_averaged_profile_(is_averaged_profile),
@@ -78,27 +82,25 @@ RuntimeProfile::RuntimeProfile(ObjectPool* pool, const string& name,
 }
 
 RuntimeProfile::~RuntimeProfile() {
-  map<string, Counter*>::const_iterator iter;
-  for (iter = counter_map_.begin(); iter != counter_map_.end(); ++iter) {
-    PeriodicCounterUpdater::StopRateCounter(iter->second);
-    PeriodicCounterUpdater::StopSamplingCounter(iter->second);
-  }
+  DCHECK(!has_active_periodic_counters_);
+}
 
-  set<vector<Counter*>* >::const_iterator buckets_iter;
-  for (buckets_iter = bucketing_counters_.begin();
-      buckets_iter != bucketing_counters_.end(); ++buckets_iter) {
-    // This is just a clean up. No need to perform conversion. Also, the underlying
-    // counters might be gone already.
-    PeriodicCounterUpdater::StopBucketingCounters(*buckets_iter, false);
+void RuntimeProfile::StopPeriodicCounters() {
+  lock_guard<SpinLock> l(counter_map_lock_);
+  if (!has_active_periodic_counters_) return;
+  for (Counter* sampling_counter : sampling_counters_) {
+    PeriodicCounterUpdater::StopSamplingCounter(sampling_counter);
   }
-
-  TimeSeriesCounterMap::const_iterator time_series_it;
-  for (time_series_it = time_series_counter_map_.begin();
-      time_series_it != time_series_counter_map_.end(); ++time_series_it) {
-    PeriodicCounterUpdater::StopTimeSeriesCounter(time_series_it->second);
+  for (Counter* rate_counter : rate_counters_) {
+    PeriodicCounterUpdater::StopRateCounter(rate_counter);
   }
-
-  if (own_pool_) delete pool_;
+  for (vector<Counter*>* counters : bucketing_counters_) {
+    PeriodicCounterUpdater::StopBucketingCounters(counters);
+  }
+  for (auto& time_series_counter_entry : time_series_counter_map_) {
+    PeriodicCounterUpdater::StopTimeSeriesCounter(time_series_counter_entry.second);
+  }
+  has_active_periodic_counters_ = false;
 }
 
 RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
@@ -113,7 +115,7 @@ RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
   DCHECK_LT(*idx, nodes.size());
 
   const TRuntimeProfileNode& node = nodes[*idx];
-  RuntimeProfile* profile = pool->Add(new RuntimeProfile(pool, node.name));
+  RuntimeProfile* profile = Create(pool, node.name);
   profile->metadata_ = node.metadata;
   for (int i = 0; i < node.counters.size(); ++i) {
     const TCounter& counter = node.counters[i];
@@ -208,7 +210,7 @@ void RuntimeProfile::UpdateAverage(RuntimeProfile* other) {
       if (j != child_map_.end()) {
         child = j->second;
       } else {
-        child = pool_->Add(new RuntimeProfile(pool_, other_child->name_, true));
+        child = Create(pool_, other_child->name_, true);
         child->metadata_ = other_child->metadata_;
         bool indent_other_child = other->children_[i].second;
         child_map_[child->name_] = child;
@@ -282,7 +284,7 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
   }
 
   {
-    lock_guard<SpinLock> l(time_series_counter_map_lock_);
+    lock_guard<SpinLock> l(counter_map_lock_);
     for (int i = 0; i < node.time_series_counters.size(); ++i) {
       const TTimeSeriesCounter& c = node.time_series_counters[i];
       TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(c.name);
@@ -322,7 +324,7 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
       if (j != child_map_.end()) {
         child = j->second;
       } else {
-        child = pool_->Add(new RuntimeProfile(pool_, tchild.name));
+        child = Create(pool_, tchild.name);
         child->metadata_ = tchild.metadata;
         child_map_[tchild.name] = child;
         children_.push_back(make_pair(child, tchild.indent));
@@ -439,7 +441,7 @@ RuntimeProfile* RuntimeProfile::CreateChild(const string& name, bool indent,
     bool prepend) {
   lock_guard<SpinLock> l(children_lock_);
   DCHECK(child_map_.find(name) == child_map_.end());
-  RuntimeProfile* child = pool_->Add(new RuntimeProfile(pool_, name));
+  RuntimeProfile* child = Create(pool_, name);
   AddChildLocked(child, indent, prepend ? children_.begin() : children_.end());
   return child;
 }
@@ -504,9 +506,16 @@ void RuntimeProfile::AddCodegenMsg(
 #define ADD_COUNTER_IMPL(NAME, T)                                                \
   RuntimeProfile::T* RuntimeProfile::NAME(                                       \
       const string& name, TUnit::type unit, const string& parent_counter_name) { \
-    DCHECK_EQ(is_averaged_profile_, false);                                      \
     lock_guard<SpinLock> l(counter_map_lock_);                                   \
+    bool dummy;                                                                  \
+    return NAME##Locked(name, unit, parent_counter_name, &dummy);                \
+  }                                                                              \
+  RuntimeProfile::T* RuntimeProfile::NAME##Locked( const string& name,           \
+      TUnit::type unit, const string& parent_counter_name, bool* created) {      \
+    counter_map_lock_.DCheckLocked();                                            \
+    DCHECK_EQ(is_averaged_profile_, false);                                      \
     if (counter_map_.find(name) != counter_map_.end()) {                         \
+      *created = false;                                                          \
       return reinterpret_cast<T*>(counter_map_[name]);                           \
     }                                                                            \
     DCHECK(parent_counter_name == ROOT_COUNTER                                   \
@@ -516,6 +525,7 @@ void RuntimeProfile::AddCodegenMsg(
     set<string>* child_counters =                                                \
         FindOrInsert(&child_counter_map_, parent_counter_name, set<string>());   \
     child_counters->insert(name);                                                \
+    *created = true;                                                             \
     return counter;                                                              \
   }
 
@@ -668,7 +678,7 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
     // <Name> (<period>): <val1>, <val2>, <etc>
     SpinLock* lock;
     int num, period;
-    lock_guard<SpinLock> l(time_series_counter_map_lock_);
+    lock_guard<SpinLock> l(counter_map_lock_);
     for (const TimeSeriesCounterMap::value_type& v: time_series_counter_map_) {
       const int64_t* samples = v.second->samples_.GetSamples(&num, &period, &lock);
       if (num > 0) {
@@ -813,7 +823,7 @@ void RuntimeProfile::ToThrift(vector<TRuntimeProfileNode>* nodes) const {
   }
 
   {
-    lock_guard<SpinLock> l(time_series_counter_map_lock_);
+    lock_guard<SpinLock> l(counter_map_lock_);
     if (time_series_counter_map_.size() != 0) {
       node.__set_time_series_counters(
           vector<TTimeSeriesCounter>(time_series_counter_map_.size()));
@@ -884,44 +894,71 @@ RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
       DCHECK(false) << "Unsupported src counter unit: " << src_counter->unit();
       return NULL;
   }
-  Counter* dst_counter = AddCounter(name, dst_unit);
-  PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
-      PeriodicCounterUpdater::RATE_COUNTER);
-  return dst_counter;
+  {
+    lock_guard<SpinLock> l(counter_map_lock_);
+    bool created;
+    Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created);
+    if (!created) return dst_counter;
+    rate_counters_.push_back(dst_counter);
+    PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
+        PeriodicCounterUpdater::RATE_COUNTER);
+    has_active_periodic_counters_ = true;
+    return dst_counter;
+  }
 }
 
 RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
     const string& name, DerivedCounterFunction fn, TUnit::type dst_unit) {
-  Counter* dst_counter = AddCounter(name, dst_unit);
+  lock_guard<SpinLock> l(counter_map_lock_);
+  bool created;
+  Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created);
+  if (!created) return dst_counter;
+  rate_counters_.push_back(dst_counter);
   PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, fn, dst_counter,
       PeriodicCounterUpdater::RATE_COUNTER);
+  has_active_periodic_counters_ = true;
   return dst_counter;
 }
 
 RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
     const string& name, Counter* src_counter) {
   DCHECK(src_counter->unit() == TUnit::UNIT);
-  Counter* dst_counter = AddCounter(name, TUnit::DOUBLE_VALUE);
+  lock_guard<SpinLock> l(counter_map_lock_);
+  bool created;
+  Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
+  if (!created) return dst_counter;
+  sampling_counters_.push_back(dst_counter);
   PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
       PeriodicCounterUpdater::SAMPLING_COUNTER);
+  has_active_periodic_counters_ = true;
   return dst_counter;
 }
 
 RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
     const string& name, DerivedCounterFunction sample_fn) {
-  Counter* dst_counter = AddCounter(name, TUnit::DOUBLE_VALUE);
+  lock_guard<SpinLock> l(counter_map_lock_);
+  bool created;
+  Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
+  if (!created) return dst_counter;
+  sampling_counters_.push_back(dst_counter);
   PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, sample_fn, dst_counter,
       PeriodicCounterUpdater::SAMPLING_COUNTER);
+  has_active_periodic_counters_ = true;
   return dst_counter;
 }
 
-void RuntimeProfile::RegisterBucketingCounters(Counter* src_counter,
-    vector<Counter*>* buckets) {
-  {
-    lock_guard<SpinLock> l(counter_map_lock_);
-    bucketing_counters_.insert(buckets);
+vector<RuntimeProfile::Counter*>* RuntimeProfile::AddBucketingCounters(
+    Counter* src_counter, int num_buckets) {
+  lock_guard<SpinLock> l(counter_map_lock_);
+  vector<RuntimeProfile::Counter*>* buckets = pool_->Add(new vector<Counter*>);
+  for (int i = 0; i < num_buckets; ++i) {
+      buckets->push_back(
+          pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
   }
+  bucketing_counters_.insert(buckets);
+  has_active_periodic_counters_ = true;
   PeriodicCounterUpdater::RegisterBucketingCounters(src_counter, buckets);
+  return buckets;
 }
 
 RuntimeProfile::EventSequence* RuntimeProfile::AddEventSequence(const string& name) {
@@ -978,16 +1015,14 @@ RuntimeProfile::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter(
 
 RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddTimeSeriesCounter(
     const string& name, TUnit::type unit, DerivedCounterFunction fn) {
-  DCHECK(fn != NULL);
-  TimeSeriesCounter* counter = NULL;
-  {
-    lock_guard<SpinLock> l(time_series_counter_map_lock_);
-    TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
-    if (it != time_series_counter_map_.end()) return it->second;
-    counter = pool_->Add(new TimeSeriesCounter(name, unit, fn));
-    time_series_counter_map_[name] = counter;
-  }
+  DCHECK(fn != nullptr);
+  lock_guard<SpinLock> l(counter_map_lock_);
+  TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
+  if (it != time_series_counter_map_.end()) return it->second;
+  TimeSeriesCounter* counter = pool_->Add(new TimeSeriesCounter(name, unit, fn));
+  time_series_counter_map_[name] = counter;
   PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
+  has_active_periodic_counters_ = true;
   return counter;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 298c214..8348161 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -101,13 +101,13 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
 
   typedef boost::function<int64_t ()> DerivedCounterFunction;
 
-  /// Create a runtime profile object with 'name'.  Counters and merged profile are
-  /// allocated from pool.
-  /// If is_averaged_profile is true, the counters in this profile will be derived
+  /// Create a runtime profile object with 'name'. The profile, counters and any other
+  /// structures owned by the profile are allocated from 'pool'.
+  /// If 'is_averaged_profile' is true, the counters in this profile will be derived
   /// averages (of unit AveragedCounter) from other profiles, so the counter map will
   /// be left empty Otherwise, the counter map is initialized with a single entry for
   /// TotalTime.
-  RuntimeProfile(ObjectPool* pool, const std::string& name,
+  static RuntimeProfile* Create(ObjectPool* pool, const std::string& name,
       bool is_averaged_profile = false);
 
   ~RuntimeProfile();
@@ -247,6 +247,12 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// the key does not exist.
   const std::string* GetInfoString(const std::string& key) const;
 
+  /// Stops updating all counters in this profile that are periodically updated by a
+  /// background thread (i.e. sampling, rate, bucketing and time series counters).
+  /// Must be called before the profile is destroyed if any such counters are active.
+  /// Does not stop counters on descendant profiles.
+  void StopPeriodicCounters();
+
   /// Returns the counter for the total elapsed time.
   Counter* total_time_counter() { return counter_map_[TOTAL_TIME_COUNTER_NAME]; }
   Counter* inactive_timer() { return counter_map_[INACTIVE_TIME_COUNTER_NAME]; }
@@ -299,8 +305,9 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// Add a rate counter to the current profile based on src_counter with name.
   /// The rate counter is updated periodically based on the src counter.
   /// The rate counter has units in src_counter unit per second.
-  /// Rate counters should be stopped (by calling PeriodicCounterUpdater::StopRateCounter)
-  /// as soon as the src_counter stops changing.
+  /// StopPeriodicCounters() must be called to stop the periodic updating before this
+  /// profile is destroyed. The periodic updating can be stopped earlier by calling
+  /// PeriodicCounterUpdater::StopRateCounter() if 'src_counter' stops changing.
   Counter* AddRateCounter(const std::string& name, Counter* src_counter);
 
   /// Same as 'AddRateCounter' above except values are taken by calling fn.
@@ -312,27 +319,40 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// The sampling counter is updated periodically based on the src counter by averaging
   /// the samples taken from the src counter.
   /// The sampling counter has the same unit as src_counter unit.
-  /// Sampling counters should be stopped (by calling
-  /// PeriodicCounterUpdater::StopSamplingCounter) as soon as the src_counter stops
-  /// changing.
+  /// StopPeriodicCounters() must be called to stop the periodic updating before this
+  /// profile is destroyed. The periodic updating can be stopped earlier by calling
+  /// PeriodicCounterUpdater::StopSamplingCounter() if 'src_counter' stops changing.
   Counter* AddSamplingCounter(const std::string& name, Counter* src_counter);
 
   /// Same as 'AddSamplingCounter' above except the samples are taken by calling fn.
   Counter* AddSamplingCounter(const std::string& name, DerivedCounterFunction fn);
 
-  /// Register a bucket of counters to store the sampled value of src_counter.
-  /// The src_counter is sampled periodically and the buckets are updated.
-  void RegisterBucketingCounters(Counter* src_counter, std::vector<Counter*>* buckets);
+  /// Create a set of counters, one per bucket, to store the sampled value of src_counter.
+  /// The 'src_counter' is sampled periodically to obtain the index of the bucket to
+  /// increment. E.g. if the value of 'src_counter' is 3, the bucket at index 3 is
+  /// updated. If the index exceeds the index of the last bucket, the last bucket is
+  /// updated.
+  ///
+  /// The created counters do not appear in the profile when serialized or
+  /// pretty-printed. The caller must do its own processing of the counter value
+  /// (e.g. converting it to an info string).
+  /// TODO: make this interface more consistent and sane.
+  ///
+  /// StopPeriodicCounters() must be called to stop the periodic updating before this
+  /// profile is destroyed. The periodic updating can be stopped earlier by calling
+  /// PeriodicCounterUpdater::StopBucketingCounters() if 'buckets' stops changing.
+  std::vector<Counter*>* AddBucketingCounters(Counter* src_counter, int num_buckets);
 
   /// Create a time series counter. This begins sampling immediately. This counter
   /// contains a number of samples that are collected periodically by calling sample_fn().
+  /// StopPeriodicCounters() must be called to stop the periodic updating before this
+  /// profile is destroyed. The periodic updating can be stopped earlier by calling
+  /// PeriodicCounterUpdater::StopTimeSeriesCounter() if the input stops changing.
   /// Note: these counters don't get merged (to make average profiles)
   TimeSeriesCounter* AddTimeSeriesCounter(const std::string& name,
       TUnit::type unit, DerivedCounterFunction sample_fn);
 
-  /// Create a time series counter that samples the source counter. Sampling begins
-  /// immediately.
-  /// Note: these counters don't get merged (to make average profiles)
+  /// Same as above except the samples are collected from 'src_counter'.
   TimeSeriesCounter* AddTimeSeriesCounter(const std::string& name, Counter* src_counter);
 
   /// Recursively compute the fraction of the 'total_time' spent in this profile and
@@ -345,9 +365,6 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// object, but occasionally allocated in the constructor.
   ObjectPool* pool_;
 
-  /// True if we have to delete the pool_ on destruction.
-  bool own_pool_;
-
   /// Name for this runtime profile.
   std::string name_;
 
@@ -369,9 +386,27 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   ChildCounterMap child_counter_map_;
 
   /// A set of bucket counters registered in this runtime profile.
-  std::set<std::vector<Counter*>* > bucketing_counters_;
+  std::set<std::vector<Counter*>*> bucketing_counters_;
+
+  /// Rate counters, which also appear in 'counter_map_'. Tracked separately to enable
+  /// stopping the counters.
+  std::vector<Counter*> rate_counters_;
+
+  /// Sampling counters, which also appear in 'counter_map_'. Tracked separately to enable
+  /// stopping the counters.
+  std::vector<Counter*> sampling_counters_;
+
+  /// Time series counters. These do not appear in 'counter_map_'. Tracked separately
+  /// because they are displayed separately in the profile and need to be stopped.
+  typedef std::map<std::string, TimeSeriesCounter*> TimeSeriesCounterMap;
+  TimeSeriesCounterMap time_series_counter_map_;
 
-  /// Protects counter_map_, counter_child_map_ and bucketing_counters_.
+  /// True if this profile has active periodic counters, including bucketing, rate,
+  /// sampling and time series counters.
+  bool has_active_periodic_counters_ = false;
+
+  /// Protects counter_map_, child_counter_map_, bucketing_counters_, rate_counters_,
+  /// sampling_counters_, time_series_counter_map_, and has_active_periodic_counters_.
   mutable SpinLock counter_map_lock_;
 
   /// Child profiles.  Does not own memory.
@@ -403,12 +438,6 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// Protects event_sequence_map_.
   mutable SpinLock event_sequence_lock_;
 
-  typedef std::map<std::string, TimeSeriesCounter*> TimeSeriesCounterMap;
-  TimeSeriesCounterMap time_series_counter_map_;
-
-  /// Protects time_series_counter_map_.
-  mutable SpinLock time_series_counter_map_lock_;
-
   typedef std::map<std::string, SummaryStatsCounter*> SummaryStatsCounterMap;
   SummaryStatsCounterMap summary_stats_map_;
 
@@ -430,6 +459,9 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// ComputeTimeInProfile()
   int64_t local_time_ns_;
 
+  /// Constructor used by Create().
+  RuntimeProfile(ObjectPool* pool, const std::string& name, bool is_averaged_profile);
+
   /// Update a subtree of profiles from nodes, rooted at *idx.
   /// On return, *idx points to the node immediately following this subtree.
   void Update(const std::vector<TRuntimeProfileNode>& nodes, int* idx);
@@ -454,6 +486,16 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   static RuntimeProfile* CreateFromThrift(
       ObjectPool* pool, const std::vector<TRuntimeProfileNode>& nodes, int* node_idx);
 
+  /// Internal implementations of the Add*Counter() functions for use when the caller
+  /// holds counter_map_lock_. Also returns 'created', which is true if a new counter was
+  /// created and false if a counter with the given name already existed.
+  Counter* AddCounterLocked(const std::string& name, TUnit::type unit,
+      const std::string& parent_counter_name, bool* created);
+  HighWaterMarkCounter* AddHighWaterMarkCounterLocked(const std::string& name,
+      TUnit::type unit, const std::string& parent_counter_name, bool* created);
+  ConcurrentTimerCounter* AddConcurrentTimerCounterLocked(const std::string& name,
+      TUnit::type unit, const std::string& parent_counter_name, bool* created);
+
   ///  Inserts 'child' before the iterator 'insert_pos' in 'children_'.
   /// 'children_lock_' must be held by the caller.
   void AddChildLocked(