You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/20 20:20:13 UTC
[3/5] incubator-impala git commit: IMPALA-5895: clean up runtime
profile lifecycle
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/runtime-profile-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index fe9f3ae..1bc7911 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -34,17 +34,17 @@ namespace impala {
TEST(CountersTest, Basic) {
ObjectPool pool;
- RuntimeProfile profile_a(&pool, "ProfileA");
- RuntimeProfile profile_a1(&pool, "ProfileA1");
- RuntimeProfile profile_a2(&pool, "ProfileAb");
+ RuntimeProfile* profile_a = RuntimeProfile::Create(&pool, "ProfileA");
+ RuntimeProfile* profile_a1 = RuntimeProfile::Create(&pool, "ProfileA1");
+ RuntimeProfile* profile_a2 = RuntimeProfile::Create(&pool, "ProfileAb");
TRuntimeProfileTree thrift_profile;
- profile_a.AddChild(&profile_a1);
- profile_a.AddChild(&profile_a2);
+ profile_a->AddChild(profile_a1);
+ profile_a->AddChild(profile_a2);
// Test Empty
- profile_a.ToThrift(&thrift_profile.nodes);
+ profile_a->ToThrift(&thrift_profile.nodes);
EXPECT_EQ(thrift_profile.nodes.size(), 3);
thrift_profile.nodes.clear();
@@ -53,7 +53,7 @@ TEST(CountersTest, Basic) {
RuntimeProfile::Counter* counter_merged;
// Updating/setting counter
- counter_a = profile_a.AddCounter("A", TUnit::UNIT);
+ counter_a = profile_a->AddCounter("A", TUnit::UNIT);
EXPECT_TRUE(counter_a != NULL);
counter_a->Add(10);
counter_a->Add(-5);
@@ -61,40 +61,40 @@ TEST(CountersTest, Basic) {
counter_a->Set(1);
EXPECT_EQ(counter_a->value(), 1);
- counter_b = profile_a2.AddCounter("B", TUnit::BYTES);
+ counter_b = profile_a2->AddCounter("B", TUnit::BYTES);
EXPECT_TRUE(counter_b != NULL);
// Serialize/deserialize
- profile_a.ToThrift(&thrift_profile.nodes);
+ profile_a->ToThrift(&thrift_profile.nodes);
RuntimeProfile* from_thrift = RuntimeProfile::CreateFromThrift(&pool, thrift_profile);
counter_merged = from_thrift->GetCounter("A");
EXPECT_EQ(counter_merged->value(), 1);
EXPECT_TRUE(from_thrift->GetCounter("Not there") == NULL);
// Averaged
- RuntimeProfile averaged_profile(&pool, "Merged", true);
- averaged_profile.UpdateAverage(from_thrift);
- counter_merged = averaged_profile.GetCounter("A");
+ RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "Merged", true);
+ averaged_profile->UpdateAverage(from_thrift);
+ counter_merged = averaged_profile->GetCounter("A");
EXPECT_EQ(counter_merged->value(), 1);
// UpdateAverage again, there should be no change.
- averaged_profile.UpdateAverage(from_thrift);
+ averaged_profile->UpdateAverage(from_thrift);
EXPECT_EQ(counter_merged->value(), 1);
- counter_a = profile_a2.AddCounter("A", TUnit::UNIT);
+ counter_a = profile_a2->AddCounter("A", TUnit::UNIT);
counter_a->Set(3);
- averaged_profile.UpdateAverage(&profile_a2);
+ averaged_profile->UpdateAverage(profile_a2);
EXPECT_EQ(counter_merged->value(), 2);
// Update
- RuntimeProfile updated_profile(&pool, "Updated");
- updated_profile.Update(thrift_profile);
- RuntimeProfile::Counter* counter_updated = updated_profile.GetCounter("A");
+ RuntimeProfile* updated_profile = RuntimeProfile::Create(&pool, "Updated");
+ updated_profile->Update(thrift_profile);
+ RuntimeProfile::Counter* counter_updated = updated_profile->GetCounter("A");
EXPECT_EQ(counter_updated->value(), 1);
// Update 2 more times, counters should stay the same
- updated_profile.Update(thrift_profile);
- updated_profile.Update(thrift_profile);
+ updated_profile->Update(thrift_profile);
+ updated_profile->Update(thrift_profile);
EXPECT_EQ(counter_updated->value(), 1);
}
@@ -110,27 +110,27 @@ TEST(CountersTest, MergeAndUpdate) {
// children, with the counters from the shared child aggregated.
ObjectPool pool;
- RuntimeProfile profile1(&pool, "Parent1");
- RuntimeProfile p1_child1(&pool, "Child1");
- RuntimeProfile p1_child2(&pool, "Child2");
- profile1.AddChild(&p1_child1);
- profile1.AddChild(&p1_child2);
-
- RuntimeProfile profile2(&pool, "Parent2");
- RuntimeProfile p2_child1(&pool, "Child1");
- RuntimeProfile p2_child3(&pool, "Child3");
- profile2.AddChild(&p2_child1);
- profile2.AddChild(&p2_child3);
+ RuntimeProfile* profile1 = RuntimeProfile::Create(&pool, "Parent1");
+ RuntimeProfile* p1_child1 = RuntimeProfile::Create(&pool, "Child1");
+ RuntimeProfile* p1_child2 = RuntimeProfile::Create(&pool, "Child2");
+ profile1->AddChild(p1_child1);
+ profile1->AddChild(p1_child2);
+
+ RuntimeProfile* profile2 = RuntimeProfile::Create(&pool, "Parent2");
+ RuntimeProfile* p2_child1 = RuntimeProfile::Create(&pool, "Child1");
+ RuntimeProfile* p2_child3 = RuntimeProfile::Create(&pool, "Child3");
+ profile2->AddChild(p2_child1);
+ profile2->AddChild(p2_child3);
// Create parent level counters
RuntimeProfile::Counter* parent1_shared =
- profile1.AddCounter("Parent Shared", TUnit::UNIT);
+ profile1->AddCounter("Parent Shared", TUnit::UNIT);
RuntimeProfile::Counter* parent2_shared =
- profile2.AddCounter("Parent Shared", TUnit::UNIT);
+ profile2->AddCounter("Parent Shared", TUnit::UNIT);
RuntimeProfile::Counter* parent1_only =
- profile1.AddCounter("Parent 1 Only", TUnit::UNIT);
+ profile1->AddCounter("Parent 1 Only", TUnit::UNIT);
RuntimeProfile::Counter* parent2_only =
- profile2.AddCounter("Parent 2 Only", TUnit::UNIT);
+ profile2->AddCounter("Parent 2 Only", TUnit::UNIT);
parent1_shared->Add(1);
parent2_shared->Add(3);
parent1_only->Add(2);
@@ -138,17 +138,17 @@ TEST(CountersTest, MergeAndUpdate) {
// Create child level counters
RuntimeProfile::Counter* p1_c1_shared =
- p1_child1.AddCounter("Child1 Shared", TUnit::UNIT);
+ p1_child1->AddCounter("Child1 Shared", TUnit::UNIT);
RuntimeProfile::Counter* p1_c1_only =
- p1_child1.AddCounter("Child1 Parent 1 Only", TUnit::UNIT);
+ p1_child1->AddCounter("Child1 Parent 1 Only", TUnit::UNIT);
RuntimeProfile::Counter* p1_c2 =
- p1_child2.AddCounter("Child2", TUnit::UNIT);
+ p1_child2->AddCounter("Child2", TUnit::UNIT);
RuntimeProfile::Counter* p2_c1_shared =
- p2_child1.AddCounter("Child1 Shared", TUnit::UNIT);
+ p2_child1->AddCounter("Child1 Shared", TUnit::UNIT);
RuntimeProfile::Counter* p2_c1_only =
- p1_child1.AddCounter("Child1 Parent 2 Only", TUnit::UNIT);
+ p1_child1->AddCounter("Child1 Parent 2 Only", TUnit::UNIT);
RuntimeProfile::Counter* p2_c3 =
- p2_child3.AddCounter("Child3", TUnit::UNIT);
+ p2_child3->AddCounter("Child3", TUnit::UNIT);
p1_c1_shared->Add(10);
p1_c1_only->Add(50);
p2_c1_shared->Add(20);
@@ -158,17 +158,17 @@ TEST(CountersTest, MergeAndUpdate) {
// Merge the two and validate
TRuntimeProfileTree tprofile1;
- profile1.ToThrift(&tprofile1);
- RuntimeProfile averaged_profile(&pool, "merged", true);
- averaged_profile.UpdateAverage(&profile1);
- averaged_profile.UpdateAverage(&profile2);
- EXPECT_EQ(5, averaged_profile.num_counters());
- ValidateCounter(&averaged_profile, "Parent Shared", 2);
- ValidateCounter(&averaged_profile, "Parent 1 Only", 2);
- ValidateCounter(&averaged_profile, "Parent 2 Only", 5);
+ profile1->ToThrift(&tprofile1);
+ RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "merged", true);
+ averaged_profile->UpdateAverage(profile1);
+ averaged_profile->UpdateAverage(profile2);
+ EXPECT_EQ(5, averaged_profile->num_counters());
+ ValidateCounter(averaged_profile, "Parent Shared", 2);
+ ValidateCounter(averaged_profile, "Parent 1 Only", 2);
+ ValidateCounter(averaged_profile, "Parent 2 Only", 5);
vector<RuntimeProfile*> children;
- averaged_profile.GetChildren(&children);
+ averaged_profile->GetChildren(&children);
EXPECT_EQ(children.size(), 3);
for (int i = 0; i < 3; ++i) {
@@ -191,16 +191,16 @@ TEST(CountersTest, MergeAndUpdate) {
// make sure we can print
stringstream dummy;
- averaged_profile.PrettyPrint(&dummy);
+ averaged_profile->PrettyPrint(&dummy);
// Update profile2 w/ profile1 and validate
- profile2.Update(tprofile1);
- EXPECT_EQ(5, profile2.num_counters());
- ValidateCounter(&profile2, "Parent Shared", 1);
- ValidateCounter(&profile2, "Parent 1 Only", 2);
- ValidateCounter(&profile2, "Parent 2 Only", 5);
+ profile2->Update(tprofile1);
+ EXPECT_EQ(5, profile2->num_counters());
+ ValidateCounter(profile2, "Parent Shared", 1);
+ ValidateCounter(profile2, "Parent 1 Only", 2);
+ ValidateCounter(profile2, "Parent 2 Only", 5);
- profile2.GetChildren(&children);
+ profile2->GetChildren(&children);
EXPECT_EQ(children.size(), 3);
for (int i = 0; i < 3; ++i) {
@@ -222,14 +222,14 @@ TEST(CountersTest, MergeAndUpdate) {
}
// make sure we can print
- profile2.PrettyPrint(&dummy);
+ profile2->PrettyPrint(&dummy);
}
TEST(CountersTest, HighWaterMarkCounters) {
ObjectPool pool;
- RuntimeProfile profile(&pool, "Profile");
+ RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
RuntimeProfile::HighWaterMarkCounter* bytes_counter =
- profile.AddHighWaterMarkCounter("bytes", TUnit::BYTES);
+ profile->AddHighWaterMarkCounter("bytes", TUnit::BYTES);
bytes_counter->Set(10);
EXPECT_EQ(bytes_counter->current_value(), 10);
@@ -260,9 +260,9 @@ TEST(CountersTest, HighWaterMarkCounters) {
TEST(CountersTest, SummaryStatsCounters) {
ObjectPool pool;
- RuntimeProfile profile1(&pool, "Profile 1");
+ RuntimeProfile* profile1 = RuntimeProfile::Create(&pool, "Profile 1");
RuntimeProfile::SummaryStatsCounter* summary_stats_counter_1 =
- profile1.AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
+ profile1->AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
EXPECT_EQ(summary_stats_counter_1->value(), 0);
EXPECT_EQ(summary_stats_counter_1->MinValue(), numeric_limits<int64_t>::max());
@@ -297,9 +297,9 @@ TEST(CountersTest, SummaryStatsCounters) {
EXPECT_EQ(summary_stats_counter_1->MinValue(), -40);
EXPECT_EQ(summary_stats_counter_1->MaxValue(), 40);
- RuntimeProfile profile2(&pool, "Profile 2");
+ RuntimeProfile* profile2 = RuntimeProfile::Create(&pool, "Profile 2");
RuntimeProfile::SummaryStatsCounter* summary_stats_counter_2 =
- profile2.AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
+ profile2->AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
summary_stats_counter_2->UpdateCounter(100);
EXPECT_EQ(summary_stats_counter_2->value(), 100);
@@ -307,10 +307,10 @@ TEST(CountersTest, SummaryStatsCounters) {
EXPECT_EQ(summary_stats_counter_2->MaxValue(), 100);
TRuntimeProfileTree tprofile1;
- profile1.ToThrift(&tprofile1);
+ profile1->ToThrift(&tprofile1);
// Merge profile1 and profile2 and check that profile2 is overwritten.
- profile2.Update(tprofile1);
+ profile2->Update(tprofile1);
EXPECT_EQ(summary_stats_counter_2->value(), 4);
EXPECT_EQ(summary_stats_counter_2->MinValue(), -40);
EXPECT_EQ(summary_stats_counter_2->MaxValue(), 40);
@@ -319,16 +319,16 @@ TEST(CountersTest, SummaryStatsCounters) {
TEST(CountersTest, DerivedCounters) {
ObjectPool pool;
- RuntimeProfile profile(&pool, "Profile");
+ RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
RuntimeProfile::Counter* bytes_counter =
- profile.AddCounter("bytes", TUnit::BYTES);
+ profile->AddCounter("bytes", TUnit::BYTES);
RuntimeProfile::Counter* ticks_counter =
- profile.AddCounter("ticks", TUnit::TIME_NS);
+ profile->AddCounter("ticks", TUnit::TIME_NS);
// set to 1 sec
ticks_counter->Set(1000L * 1000L * 1000L);
RuntimeProfile::DerivedCounter* throughput_counter =
- profile.AddDerivedCounter("throughput", TUnit::BYTES,
+ profile->AddDerivedCounter("throughput", TUnit::BYTES,
bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_counter, ticks_counter));
bytes_counter->Set(10);
@@ -341,11 +341,11 @@ TEST(CountersTest, DerivedCounters) {
TEST(CountersTest, AverageSetCounters) {
ObjectPool pool;
- RuntimeProfile profile(&pool, "Profile");
+ RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
RuntimeProfile::Counter* bytes_1_counter =
- profile.AddCounter("bytes 1", TUnit::BYTES);
+ profile->AddCounter("bytes 1", TUnit::BYTES);
RuntimeProfile::Counter* bytes_2_counter =
- profile.AddCounter("bytes 2", TUnit::BYTES);
+ profile->AddCounter("bytes 2", TUnit::BYTES);
bytes_1_counter->Set(10);
RuntimeProfile::AveragedCounter bytes_avg(TUnit::BYTES);
@@ -366,9 +366,9 @@ TEST(CountersTest, AverageSetCounters) {
EXPECT_EQ(bytes_avg.value(), 25);
RuntimeProfile::Counter* double_1_counter =
- profile.AddCounter("double 1", TUnit::DOUBLE_VALUE);
+ profile->AddCounter("double 1", TUnit::DOUBLE_VALUE);
RuntimeProfile::Counter* double_2_counter =
- profile.AddCounter("double 2", TUnit::DOUBLE_VALUE);
+ profile->AddCounter("double 2", TUnit::DOUBLE_VALUE);
double_1_counter->Set(1.0f);
RuntimeProfile::AveragedCounter double_avg(TUnit::DOUBLE_VALUE);
double_avg.UpdateCounter(double_1_counter);
@@ -390,17 +390,17 @@ TEST(CountersTest, AverageSetCounters) {
TEST(CountersTest, InfoStringTest) {
ObjectPool pool;
- RuntimeProfile profile(&pool, "Profile");
- EXPECT_TRUE(profile.GetInfoString("Key") == NULL);
+ RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
+ EXPECT_TRUE(profile->GetInfoString("Key") == NULL);
- profile.AddInfoString("Key", "Value");
- const string* value = profile.GetInfoString("Key");
+ profile->AddInfoString("Key", "Value");
+ const string* value = profile->GetInfoString("Key");
EXPECT_TRUE(value != NULL);
EXPECT_EQ(*value, "Value");
// Convert it to thrift
TRuntimeProfileTree tprofile;
- profile.ToThrift(&tprofile);
+ profile->ToThrift(&tprofile);
// Convert it back
RuntimeProfile* from_thrift = RuntimeProfile::CreateFromThrift(
@@ -410,34 +410,34 @@ TEST(CountersTest, InfoStringTest) {
EXPECT_EQ(*value, "Value");
// Test update.
- RuntimeProfile update_dst_profile(&pool, "Profile2");
- update_dst_profile.Update(tprofile);
- value = update_dst_profile.GetInfoString("Key");
+ RuntimeProfile* update_dst_profile = RuntimeProfile::Create(&pool, "Profile2");
+ update_dst_profile->Update(tprofile);
+ value = update_dst_profile->GetInfoString("Key");
EXPECT_TRUE(value != NULL);
EXPECT_EQ(*value, "Value");
// Update the original profile, convert it to thrift and update from the dst
// profile
- profile.AddInfoString("Key", "NewValue");
- profile.AddInfoString("Foo", "Bar");
- EXPECT_EQ(*profile.GetInfoString("Key"), "NewValue");
- EXPECT_EQ(*profile.GetInfoString("Foo"), "Bar");
- profile.ToThrift(&tprofile);
-
- update_dst_profile.Update(tprofile);
- EXPECT_EQ(*update_dst_profile.GetInfoString("Key"), "NewValue");
- EXPECT_EQ(*update_dst_profile.GetInfoString("Foo"), "Bar");
+ profile->AddInfoString("Key", "NewValue");
+ profile->AddInfoString("Foo", "Bar");
+ EXPECT_EQ(*profile->GetInfoString("Key"), "NewValue");
+ EXPECT_EQ(*profile->GetInfoString("Foo"), "Bar");
+ profile->ToThrift(&tprofile);
+
+ update_dst_profile->Update(tprofile);
+ EXPECT_EQ(*update_dst_profile->GetInfoString("Key"), "NewValue");
+ EXPECT_EQ(*update_dst_profile->GetInfoString("Foo"), "Bar");
}
TEST(CountersTest, RateCounters) {
ObjectPool pool;
- RuntimeProfile profile(&pool, "Profile");
+ RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
RuntimeProfile::Counter* bytes_counter =
- profile.AddCounter("bytes", TUnit::BYTES);
+ profile->AddCounter("bytes", TUnit::BYTES);
RuntimeProfile::Counter* rate_counter =
- profile.AddRateCounter("RateCounter", bytes_counter);
+ profile->AddRateCounter("RateCounter", bytes_counter);
EXPECT_TRUE(rate_counter->unit() == TUnit::BYTES_PER_SECOND);
EXPECT_EQ(rate_counter->value(), 0);
@@ -449,8 +449,8 @@ TEST(CountersTest, RateCounters) {
int64_t rate = rate_counter->value();
- // Remove the counter so it no longer gets updates
- PeriodicCounterUpdater::StopRateCounter(rate_counter);
+ // Stop the counter so it no longer gets updates
+ profile->StopPeriodicCounters();
// The rate counter is not perfectly accurate. Currently updated at 500ms intervals,
// we should have seen somewhere between 1 and 3 updates (33 - 200 MB/s)
@@ -468,44 +468,42 @@ TEST(CountersTest, RateCounters) {
TEST(CountersTest, BucketCounters) {
ObjectPool pool;
- RuntimeProfile profile(&pool, "Profile");
+ RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
RuntimeProfile::Counter* unit_counter =
- profile.AddCounter("unit", TUnit::UNIT);
+ profile->AddCounter("unit", TUnit::UNIT);
// Set the unit to 1 before sampling
unit_counter->Set(1);
// Create the bucket counters and start sampling
- vector<RuntimeProfile::Counter*> buckets;
- buckets.push_back(pool.Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
- buckets.push_back(pool.Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
- profile.RegisterBucketingCounters(unit_counter, &buckets);
+ vector<RuntimeProfile::Counter*>* buckets =
+ profile->AddBucketingCounters(unit_counter, 2);
// Wait two seconds.
sleep(2);
// Stop sampling
- PeriodicCounterUpdater::StopBucketingCounters(&buckets, true);
+ profile->StopPeriodicCounters();
// TODO: change the value to double
// The value of buckets[0] should be zero and buckets[1] should be 1.
- double val0 = buckets[0]->double_value();
- double val1 = buckets[1]->double_value();
+ double val0 = (*buckets)[0]->double_value();
+ double val1 = (*buckets)[1]->double_value();
EXPECT_EQ(0, val0);
EXPECT_EQ(100, val1);
// Wait another second. The counter has been removed. So the value should not be
// changed (much).
sleep(2);
- EXPECT_EQ(val0, buckets[0]->double_value());
- EXPECT_EQ(val1, buckets[1]->double_value());
+ EXPECT_EQ(val0, (*buckets)[0]->double_value());
+ EXPECT_EQ(val1, (*buckets)[1]->double_value());
}
TEST(CountersTest, EventSequences) {
ObjectPool pool;
- RuntimeProfile profile(&pool, "Profile");
- RuntimeProfile::EventSequence* seq = profile.AddEventSequence("event sequence");
+ RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
+ RuntimeProfile::EventSequence* seq = profile->AddEventSequence("event sequence");
seq->MarkEvent("aaaa");
seq->MarkEvent("bbbb");
seq->MarkEvent("cccc");
@@ -524,7 +522,7 @@ TEST(CountersTest, EventSequences) {
}
TRuntimeProfileTree thrift_profile;
- profile.ToThrift(&thrift_profile);
+ profile->ToThrift(&thrift_profile);
EXPECT_TRUE(thrift_profile.nodes[0].__isset.event_sequences);
EXPECT_EQ(1, thrift_profile.nodes[0].event_sequences.size());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 12f4e25..4254b9c 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -53,10 +53,14 @@ const string RuntimeProfile::TOTAL_TIME_COUNTER_NAME = "TotalTime";
const string RuntimeProfile::LOCAL_TIME_COUNTER_NAME = "LocalTime";
const string RuntimeProfile::INACTIVE_TIME_COUNTER_NAME = "InactiveTotalTime";
+RuntimeProfile* RuntimeProfile::Create(ObjectPool* pool, const string& name,
+ bool is_averaged_profile) {
+ return pool->Add(new RuntimeProfile(pool, name, is_averaged_profile));
+}
+
RuntimeProfile::RuntimeProfile(ObjectPool* pool, const string& name,
bool is_averaged_profile)
: pool_(pool),
- own_pool_(false),
name_(name),
metadata_(-1),
is_averaged_profile_(is_averaged_profile),
@@ -78,27 +82,25 @@ RuntimeProfile::RuntimeProfile(ObjectPool* pool, const string& name,
}
RuntimeProfile::~RuntimeProfile() {
- map<string, Counter*>::const_iterator iter;
- for (iter = counter_map_.begin(); iter != counter_map_.end(); ++iter) {
- PeriodicCounterUpdater::StopRateCounter(iter->second);
- PeriodicCounterUpdater::StopSamplingCounter(iter->second);
- }
+ DCHECK(!has_active_periodic_counters_);
+}
- set<vector<Counter*>* >::const_iterator buckets_iter;
- for (buckets_iter = bucketing_counters_.begin();
- buckets_iter != bucketing_counters_.end(); ++buckets_iter) {
- // This is just a clean up. No need to perform conversion. Also, the underlying
- // counters might be gone already.
- PeriodicCounterUpdater::StopBucketingCounters(*buckets_iter, false);
+void RuntimeProfile::StopPeriodicCounters() {
+ lock_guard<SpinLock> l(counter_map_lock_);
+ if (!has_active_periodic_counters_) return;
+ for (Counter* sampling_counter : sampling_counters_) {
+ PeriodicCounterUpdater::StopSamplingCounter(sampling_counter);
}
-
- TimeSeriesCounterMap::const_iterator time_series_it;
- for (time_series_it = time_series_counter_map_.begin();
- time_series_it != time_series_counter_map_.end(); ++time_series_it) {
- PeriodicCounterUpdater::StopTimeSeriesCounter(time_series_it->second);
+ for (Counter* rate_counter : rate_counters_) {
+ PeriodicCounterUpdater::StopRateCounter(rate_counter);
}
-
- if (own_pool_) delete pool_;
+ for (vector<Counter*>* counters : bucketing_counters_) {
+ PeriodicCounterUpdater::StopBucketingCounters(counters);
+ }
+ for (auto& time_series_counter_entry : time_series_counter_map_) {
+ PeriodicCounterUpdater::StopTimeSeriesCounter(time_series_counter_entry.second);
+ }
+ has_active_periodic_counters_ = false;
}
RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
@@ -113,7 +115,7 @@ RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
DCHECK_LT(*idx, nodes.size());
const TRuntimeProfileNode& node = nodes[*idx];
- RuntimeProfile* profile = pool->Add(new RuntimeProfile(pool, node.name));
+ RuntimeProfile* profile = Create(pool, node.name);
profile->metadata_ = node.metadata;
for (int i = 0; i < node.counters.size(); ++i) {
const TCounter& counter = node.counters[i];
@@ -208,7 +210,7 @@ void RuntimeProfile::UpdateAverage(RuntimeProfile* other) {
if (j != child_map_.end()) {
child = j->second;
} else {
- child = pool_->Add(new RuntimeProfile(pool_, other_child->name_, true));
+ child = Create(pool_, other_child->name_, true);
child->metadata_ = other_child->metadata_;
bool indent_other_child = other->children_[i].second;
child_map_[child->name_] = child;
@@ -282,7 +284,7 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
}
{
- lock_guard<SpinLock> l(time_series_counter_map_lock_);
+ lock_guard<SpinLock> l(counter_map_lock_);
for (int i = 0; i < node.time_series_counters.size(); ++i) {
const TTimeSeriesCounter& c = node.time_series_counters[i];
TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(c.name);
@@ -322,7 +324,7 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
if (j != child_map_.end()) {
child = j->second;
} else {
- child = pool_->Add(new RuntimeProfile(pool_, tchild.name));
+ child = Create(pool_, tchild.name);
child->metadata_ = tchild.metadata;
child_map_[tchild.name] = child;
children_.push_back(make_pair(child, tchild.indent));
@@ -439,7 +441,7 @@ RuntimeProfile* RuntimeProfile::CreateChild(const string& name, bool indent,
bool prepend) {
lock_guard<SpinLock> l(children_lock_);
DCHECK(child_map_.find(name) == child_map_.end());
- RuntimeProfile* child = pool_->Add(new RuntimeProfile(pool_, name));
+ RuntimeProfile* child = Create(pool_, name);
AddChildLocked(child, indent, prepend ? children_.begin() : children_.end());
return child;
}
@@ -504,9 +506,16 @@ void RuntimeProfile::AddCodegenMsg(
#define ADD_COUNTER_IMPL(NAME, T) \
RuntimeProfile::T* RuntimeProfile::NAME( \
const string& name, TUnit::type unit, const string& parent_counter_name) { \
- DCHECK_EQ(is_averaged_profile_, false); \
lock_guard<SpinLock> l(counter_map_lock_); \
+ bool dummy; \
+ return NAME##Locked(name, unit, parent_counter_name, &dummy); \
+ } \
+ RuntimeProfile::T* RuntimeProfile::NAME##Locked( const string& name, \
+ TUnit::type unit, const string& parent_counter_name, bool* created) { \
+ counter_map_lock_.DCheckLocked(); \
+ DCHECK_EQ(is_averaged_profile_, false); \
if (counter_map_.find(name) != counter_map_.end()) { \
+ *created = false; \
return reinterpret_cast<T*>(counter_map_[name]); \
} \
DCHECK(parent_counter_name == ROOT_COUNTER \
@@ -516,6 +525,7 @@ void RuntimeProfile::AddCodegenMsg(
set<string>* child_counters = \
FindOrInsert(&child_counter_map_, parent_counter_name, set<string>()); \
child_counters->insert(name); \
+ *created = true; \
return counter; \
}
@@ -668,7 +678,7 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
// <Name> (<period>): <val1>, <val2>, <etc>
SpinLock* lock;
int num, period;
- lock_guard<SpinLock> l(time_series_counter_map_lock_);
+ lock_guard<SpinLock> l(counter_map_lock_);
for (const TimeSeriesCounterMap::value_type& v: time_series_counter_map_) {
const int64_t* samples = v.second->samples_.GetSamples(&num, &period, &lock);
if (num > 0) {
@@ -813,7 +823,7 @@ void RuntimeProfile::ToThrift(vector<TRuntimeProfileNode>* nodes) const {
}
{
- lock_guard<SpinLock> l(time_series_counter_map_lock_);
+ lock_guard<SpinLock> l(counter_map_lock_);
if (time_series_counter_map_.size() != 0) {
node.__set_time_series_counters(
vector<TTimeSeriesCounter>(time_series_counter_map_.size()));
@@ -884,44 +894,71 @@ RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
DCHECK(false) << "Unsupported src counter unit: " << src_counter->unit();
return NULL;
}
- Counter* dst_counter = AddCounter(name, dst_unit);
- PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
- PeriodicCounterUpdater::RATE_COUNTER);
- return dst_counter;
+ {
+ lock_guard<SpinLock> l(counter_map_lock_);
+ bool created;
+ Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created);
+ if (!created) return dst_counter;
+ rate_counters_.push_back(dst_counter);
+ PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
+ PeriodicCounterUpdater::RATE_COUNTER);
+ has_active_periodic_counters_ = true;
+ return dst_counter;
+ }
}
RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
const string& name, DerivedCounterFunction fn, TUnit::type dst_unit) {
- Counter* dst_counter = AddCounter(name, dst_unit);
+ lock_guard<SpinLock> l(counter_map_lock_);
+ bool created;
+ Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created);
+ if (!created) return dst_counter;
+ rate_counters_.push_back(dst_counter);
PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, fn, dst_counter,
PeriodicCounterUpdater::RATE_COUNTER);
+ has_active_periodic_counters_ = true;
return dst_counter;
}
RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
const string& name, Counter* src_counter) {
DCHECK(src_counter->unit() == TUnit::UNIT);
- Counter* dst_counter = AddCounter(name, TUnit::DOUBLE_VALUE);
+ lock_guard<SpinLock> l(counter_map_lock_);
+ bool created;
+ Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
+ if (!created) return dst_counter;
+ sampling_counters_.push_back(dst_counter);
PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
PeriodicCounterUpdater::SAMPLING_COUNTER);
+ has_active_periodic_counters_ = true;
return dst_counter;
}
RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
const string& name, DerivedCounterFunction sample_fn) {
- Counter* dst_counter = AddCounter(name, TUnit::DOUBLE_VALUE);
+ lock_guard<SpinLock> l(counter_map_lock_);
+ bool created;
+ Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
+ if (!created) return dst_counter;
+ sampling_counters_.push_back(dst_counter);
PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, sample_fn, dst_counter,
PeriodicCounterUpdater::SAMPLING_COUNTER);
+ has_active_periodic_counters_ = true;
return dst_counter;
}
-void RuntimeProfile::RegisterBucketingCounters(Counter* src_counter,
- vector<Counter*>* buckets) {
- {
- lock_guard<SpinLock> l(counter_map_lock_);
- bucketing_counters_.insert(buckets);
+vector<RuntimeProfile::Counter*>* RuntimeProfile::AddBucketingCounters(
+ Counter* src_counter, int num_buckets) {
+ lock_guard<SpinLock> l(counter_map_lock_);
+ vector<RuntimeProfile::Counter*>* buckets = pool_->Add(new vector<Counter*>);
+ for (int i = 0; i < num_buckets; ++i) {
+ buckets->push_back(
+ pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
}
+ bucketing_counters_.insert(buckets);
+ has_active_periodic_counters_ = true;
PeriodicCounterUpdater::RegisterBucketingCounters(src_counter, buckets);
+ return buckets;
}
RuntimeProfile::EventSequence* RuntimeProfile::AddEventSequence(const string& name) {
@@ -978,16 +1015,14 @@ RuntimeProfile::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter(
RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddTimeSeriesCounter(
const string& name, TUnit::type unit, DerivedCounterFunction fn) {
- DCHECK(fn != NULL);
- TimeSeriesCounter* counter = NULL;
- {
- lock_guard<SpinLock> l(time_series_counter_map_lock_);
- TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
- if (it != time_series_counter_map_.end()) return it->second;
- counter = pool_->Add(new TimeSeriesCounter(name, unit, fn));
- time_series_counter_map_[name] = counter;
- }
+ DCHECK(fn != nullptr);
+ lock_guard<SpinLock> l(counter_map_lock_);
+ TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
+ if (it != time_series_counter_map_.end()) return it->second;
+ TimeSeriesCounter* counter = pool_->Add(new TimeSeriesCounter(name, unit, fn));
+ time_series_counter_map_[name] = counter;
PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
+ has_active_periodic_counters_ = true;
return counter;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 298c214..8348161 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -101,13 +101,13 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
typedef boost::function<int64_t ()> DerivedCounterFunction;
- /// Create a runtime profile object with 'name'. Counters and merged profile are
- /// allocated from pool.
- /// If is_averaged_profile is true, the counters in this profile will be derived
+ /// Create a runtime profile object with 'name'. The profile, counters and any other
+ /// structures owned by the profile are allocated from 'pool'.
+ /// If 'is_averaged_profile' is true, the counters in this profile will be derived
/// averages (of unit AveragedCounter) from other profiles, so the counter map will
/// be left empty Otherwise, the counter map is initialized with a single entry for
/// TotalTime.
- RuntimeProfile(ObjectPool* pool, const std::string& name,
+ static RuntimeProfile* Create(ObjectPool* pool, const std::string& name,
bool is_averaged_profile = false);
~RuntimeProfile();
@@ -247,6 +247,12 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
/// the key does not exist.
const std::string* GetInfoString(const std::string& key) const;
+ /// Stops updating all counters in this profile that are periodically updated by a
+ /// background thread (i.e. sampling, rate, bucketing and time series counters).
+ /// Must be called before the profile is destroyed if any such counters are active.
+ /// Does not stop counters on descendant profiles.
+ void StopPeriodicCounters();
+
/// Returns the counter for the total elapsed time.
Counter* total_time_counter() { return counter_map_[TOTAL_TIME_COUNTER_NAME]; }
Counter* inactive_timer() { return counter_map_[INACTIVE_TIME_COUNTER_NAME]; }
@@ -299,8 +305,9 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
/// Add a rate counter to the current profile based on src_counter with name.
/// The rate counter is updated periodically based on the src counter.
/// The rate counter has units in src_counter unit per second.
- /// Rate counters should be stopped (by calling PeriodicCounterUpdater::StopRateCounter)
- /// as soon as the src_counter stops changing.
+ /// StopPeriodicCounters() must be called to stop the periodic updating before this
+ /// profile is destroyed. The periodic updating can be stopped earlier by calling
+ /// PeriodicCounterUpdater::StopRateCounter() if 'src_counter' stops changing.
Counter* AddRateCounter(const std::string& name, Counter* src_counter);
/// Same as 'AddRateCounter' above except values are taken by calling fn.
@@ -312,27 +319,40 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
/// The sampling counter is updated periodically based on the src counter by averaging
/// the samples taken from the src counter.
/// The sampling counter has the same unit as src_counter unit.
- /// Sampling counters should be stopped (by calling
- /// PeriodicCounterUpdater::StopSamplingCounter) as soon as the src_counter stops
- /// changing.
+ /// StopPeriodicCounters() must be called to stop the periodic updating before this
+ /// profile is destroyed. The periodic updating can be stopped earlier by calling
+ /// PeriodicCounterUpdater::StopSamplingCounter() if 'src_counter' stops changing.
Counter* AddSamplingCounter(const std::string& name, Counter* src_counter);
/// Same as 'AddSamplingCounter' above except the samples are taken by calling fn.
Counter* AddSamplingCounter(const std::string& name, DerivedCounterFunction fn);
- /// Register a bucket of counters to store the sampled value of src_counter.
- /// The src_counter is sampled periodically and the buckets are updated.
- void RegisterBucketingCounters(Counter* src_counter, std::vector<Counter*>* buckets);
+ /// Create a set of counters, one per bucket, to store the sampled value of src_counter.
+ /// The 'src_counter' is sampled periodically to obtain the index of the bucket to
+ /// increment. E.g. if the value of 'src_counter' is 3, the bucket at index 3 is
+ /// updated. If the index exceeds the index of the last bucket, the last bucket is
+ /// updated.
+ ///
+ /// The created counters do not appear in the profile when serialized or
+ /// pretty-printed. The caller must do its own processing of the counter value
+ /// (e.g. converting it to an info string).
+ /// TODO: make this interface more consistent and sane.
+ ///
+ /// StopPeriodicCounters() must be called to stop the periodic updating before this
+ /// profile is destroyed. The periodic updating can be stopped earlier by calling
+ /// PeriodicCounterUpdater::StopBucketingCounters() if 'buckets' stops changing.
+ std::vector<Counter*>* AddBucketingCounters(Counter* src_counter, int num_buckets);
/// Create a time series counter. This begins sampling immediately. This counter
/// contains a number of samples that are collected periodically by calling sample_fn().
+ /// StopPeriodicCounters() must be called to stop the periodic updating before this
+ /// profile is destroyed. The periodic updating can be stopped earlier by calling
+ /// PeriodicCounterUpdater::StopTimeSeriesCounter() if the input stops changing.
/// Note: these counters don't get merged (to make average profiles)
TimeSeriesCounter* AddTimeSeriesCounter(const std::string& name,
TUnit::type unit, DerivedCounterFunction sample_fn);
- /// Create a time series counter that samples the source counter. Sampling begins
- /// immediately.
- /// Note: these counters don't get merged (to make average profiles)
+ /// Same as above except the samples are collected from 'src_counter'.
TimeSeriesCounter* AddTimeSeriesCounter(const std::string& name, Counter* src_counter);
/// Recursively compute the fraction of the 'total_time' spent in this profile and
@@ -345,9 +365,6 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
/// object, but occasionally allocated in the constructor.
ObjectPool* pool_;
- /// True if we have to delete the pool_ on destruction.
- bool own_pool_;
-
/// Name for this runtime profile.
std::string name_;
@@ -369,9 +386,27 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
ChildCounterMap child_counter_map_;
/// A set of bucket counters registered in this runtime profile.
- std::set<std::vector<Counter*>* > bucketing_counters_;
+ std::set<std::vector<Counter*>*> bucketing_counters_;
+
+ /// Rate counters, which also appear in 'counter_map_'. Tracked separately to enable
+ /// stopping the counters.
+ std::vector<Counter*> rate_counters_;
+
+ /// Sampling counters, which also appear in 'counter_map_'. Tracked separately to enable
+ /// stopping the counters.
+ std::vector<Counter*> sampling_counters_;
+
+ /// Time series counters. These do not appear in 'counter_map_'. Tracked separately
+ /// because they are displayed separately in the profile and need to be stopped.
+ typedef std::map<std::string, TimeSeriesCounter*> TimeSeriesCounterMap;
+ TimeSeriesCounterMap time_series_counter_map_;
- /// Protects counter_map_, counter_child_map_ and bucketing_counters_.
+ /// True if this profile has active periodic counters, including bucketing, rate,
+ /// sampling and time series counters.
+ bool has_active_periodic_counters_ = false;
+
+ /// Protects counter_map_, child_counter_map_, bucketing_counters_, rate_counters_,
+ /// sampling_counters_, time_series_counter_map_, and has_active_periodic_counters_.
mutable SpinLock counter_map_lock_;
/// Child profiles. Does not own memory.
@@ -403,12 +438,6 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
/// Protects event_sequence_map_.
mutable SpinLock event_sequence_lock_;
- typedef std::map<std::string, TimeSeriesCounter*> TimeSeriesCounterMap;
- TimeSeriesCounterMap time_series_counter_map_;
-
- /// Protects time_series_counter_map_.
- mutable SpinLock time_series_counter_map_lock_;
-
typedef std::map<std::string, SummaryStatsCounter*> SummaryStatsCounterMap;
SummaryStatsCounterMap summary_stats_map_;
@@ -430,6 +459,9 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
/// ComputeTimeInProfile()
int64_t local_time_ns_;
+ /// Constructor used by Create().
+ RuntimeProfile(ObjectPool* pool, const std::string& name, bool is_averaged_profile);
+
/// Update a subtree of profiles from nodes, rooted at *idx.
/// On return, *idx points to the node immediately following this subtree.
void Update(const std::vector<TRuntimeProfileNode>& nodes, int* idx);
@@ -454,6 +486,16 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
static RuntimeProfile* CreateFromThrift(
ObjectPool* pool, const std::vector<TRuntimeProfileNode>& nodes, int* node_idx);
+ /// Internal implementations of the Add*Counter() functions for use when the caller
+ /// holds counter_map_lock_. Also returns 'created', which is true if a new counter was
+ /// created and false if a counter with the given name already existed.
+ Counter* AddCounterLocked(const std::string& name, TUnit::type unit,
+ const std::string& parent_counter_name, bool* created);
+ HighWaterMarkCounter* AddHighWaterMarkCounterLocked(const std::string& name,
+ TUnit::type unit, const std::string& parent_counter_name, bool* created);
+ ConcurrentTimerCounter* AddConcurrentTimerCounterLocked(const std::string& name,
+ TUnit::type unit, const std::string& parent_counter_name, bool* created);
+
/// Inserts 'child' before the iterator 'insert_pos' in 'children_'.
/// 'children_lock_' must be held by the caller.
void AddChildLocked(