You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/09/10 16:27:00 UTC

[impala] 01/04: IMPALA-10883: Do not override existing counters with empty profile

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 108cf8d07c82a179ede6d0b3385ddc4a6709822c
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Wed Aug 25 23:00:23 2021 -0700

    IMPALA-10883: Do not override existing counters with empty profile
    
    Some profile information was missing when gen_experimental_profile flag
    is enabled. This is because profile aggregation in the coordinator does
    not anticipate a partial update from a backend. From a backend
    perspective, if a fragment instance has sent its final report, that
    instance will not participate in the subsequent profile report.
    Therefore, the aggregated counters that belong to the finished instance
    will be empty. This patch adds empty checks in the aggregation of
    input_profile_names_, TAggTimeSeriesCounter, and TAggEventSequence to
    prevent the existing value from being overridden by an empty profile.
    
    Testing:
    - Add BE test CountersTest.PartialUpdate
    
    Change-Id: I9bb179bf739ffaa4e5ec8dc911480ac835ae387f
    Reviewed-on: http://gerrit.cloudera.org:8080/17819
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/util/runtime-profile-test.cc | 134 ++++++++++++++++++++++++++++++++++++
 be/src/util/runtime-profile.cc      |   6 +-
 2 files changed, 139 insertions(+), 1 deletion(-)

diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index e0a9742..bd5f490 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -1465,6 +1465,140 @@ TEST(TimeSeriesCounterTest, AggregateTimeSeries) {
   VerifyThriftTimeSeries(ttree2.nodes[0], OFFSET, NUM_PROFILES + NUM_UNINIT_PROFILES);
 }
 
+// Helper for the TAggCounter that verifies the encoded counter in the thrift
+// representation when it was merged into the profile at instance offset 'offset'.
+static void VerifyThriftCounters(
+    const TRuntimeProfileNode& tnode, int offset, int total_instances) {
+  ASSERT_TRUE(tnode.__isset.aggregated);
+  const int NUM_VALID_INSTANCES = 3;
+  DCHECK_LE(offset + NUM_VALID_INSTANCES, total_instances);
+
+  const TAggregatedRuntimeProfileNode& agg_node = tnode.aggregated;
+  ASSERT_TRUE(agg_node.__isset.time_series_counters);
+
+  const TAggCounter& tcounter = agg_node.counters[2];
+  EXPECT_EQ("simple_counter", tcounter.name);
+  EXPECT_EQ(TUnit::BYTES, tcounter.unit);
+  for (int i = 0; i < total_instances; ++i) {
+    if (i < offset || i >= offset + NUM_VALID_INSTANCES) {
+      EXPECT_EQ(false, tcounter.has_value[i]);
+      EXPECT_EQ(0, tcounter.values[i]);
+      continue;
+    }
+    EXPECT_EQ(true, tcounter.has_value[i]);
+    EXPECT_EQ((i - offset + 1) * 11, tcounter.values[i]);
+  }
+}
+
+// Test handling aggregation of two profile update, where the second profile update is a
+// partial update.
+TEST(CountersTest, PartialUpdate) {
+  auto cert = ScopedFlagSetter<bool>::Make(&FLAGS_gen_experimental_profile, true);
+  const int NUM_PROFILES = 3;
+  // Create a profile with event sequences with some shared event keys.
+  ObjectPool pool;
+  RuntimeProfile* profiles[NUM_PROFILES];
+
+  // Create Profiles and Counters.
+  RuntimeProfile::Counter* counters[NUM_PROFILES];
+  for (int i = 0; i < NUM_PROFILES; ++i) {
+    profiles[i] = RuntimeProfile::Create(&pool, strings::Substitute("Profile $0", i));
+    counters[i] = profiles[i]->AddCounter("simple_counter", TUnit::BYTES);
+    counters[i]->Set((i + 1) * (i > 0 ? 11 : 5));
+  }
+
+  // Create SummaryStatsCounters.
+  RuntimeProfile::SummaryStatsCounter* ss_counters[NUM_PROFILES];
+  for (int i = 0; i < NUM_PROFILES; ++i) {
+    ss_counters[i] = profiles[i]->AddSummaryStatsCounter("test ss", TUnit::UNIT);
+    ss_counters[i]->UpdateCounter(i);
+    if (i > 0) ss_counters[i]->UpdateCounter(i + 1);
+  }
+
+  // Create InfoStrings.
+  for (int i = 0; i < NUM_PROFILES; ++i) {
+    profiles[i]->AddInfoString("shared", "same value");
+    if (i > 0) profiles[i]->AddInfoString("distinct", Substitute("val$0", i));
+  }
+
+  // Create EventSequences.
+  RuntimeProfile::EventSequence* seqs[NUM_PROFILES];
+  for (int i = 0; i < NUM_PROFILES; ++i) {
+    seqs[i] = profiles[i]->AddEventSequence("event sequence");
+    seqs[i]->MarkEvent("aaaa");
+  }
+  seqs[0]->MarkEvent("bbbb");
+  seqs[1]->MarkEvent("cccc");
+  seqs[2]->MarkEvent("dddd");
+  seqs[2]->MarkEvent("bbbb");
+
+  // Create TimeSeriesCounters.
+  RuntimeProfile::TimeSeriesCounter* ts_counters[NUM_PROFILES];
+  const int test_period = FLAGS_periodic_counter_update_period_ms;
+  // Add a counter with a sample function that counts up, starting from 0.
+  int ts_value = 0;
+  for (int i = NUM_PROFILES - 1; i >= 0; --i) {
+    auto sample_fn = [&ts_value]() { return ts_value++; };
+    ts_counters[i] =
+        profiles[i]->AddChunkedTimeSeriesCounter("TestCounter", TUnit::UNIT, sample_fn);
+
+    // Stop counter updates from interfering with the rest of the test.
+    StopAndClearCounter(profiles[i], ts_counters[i]);
+
+    // Reset value after previous values have been retrieved.
+    ts_value = 0;
+
+    for (int j = 0; j < (i == 0 ? 9 : 10); ++j) ts_counters[i]->AddSample(test_period);
+  }
+
+  // Update 1 has instance #1 and #2 reporting final update, while instance #0 reporting
+  // its first update.
+  AggregatedRuntimeProfile* aggregated_profile_1 =
+      AggregatedRuntimeProfile::Create(&pool, "Update 1", NUM_PROFILES, true);
+  for (int i = 0; i < NUM_PROFILES; ++i) {
+    aggregated_profile_1->UpdateAggregatedFromInstance(profiles[i], i);
+  }
+  TRuntimeProfileTree ttree1;
+  aggregated_profile_1->ToThrift(&ttree1);
+
+  // Update 2 has only instance #0 reporting its final update, which has 1 more sample in
+  // its counter, event sequence, and time series counter.
+  counters[0]->Set(11);
+  ss_counters[0]->UpdateCounter(1);
+  profiles[0]->AddInfoString("distinct", "val0");
+  seqs[0]->MarkEvent("cccc");
+  ts_counters[0]->AddSample(test_period);
+  AggregatedRuntimeProfile* aggregated_profile_2 =
+      AggregatedRuntimeProfile::Create(&pool, "Update 2", NUM_PROFILES, true);
+  aggregated_profile_2->UpdateAggregatedFromInstance(profiles[0], 0);
+  TRuntimeProfileTree ttree2;
+  aggregated_profile_2->ToThrift(&ttree2);
+
+  // Test merging both update into larger aggregated profile (size of 9) at an offset 3.
+  const int MERGE_SIZE = NUM_PROFILES * 3;
+  const int OFFSET = NUM_PROFILES;
+  AggregatedRuntimeProfile* merged_profile =
+      AggregatedRuntimeProfile::Create(&pool, "Merged", MERGE_SIZE, true);
+  merged_profile->UpdateAggregatedFromInstances(ttree1, OFFSET);
+  merged_profile->UpdateAggregatedFromInstances(ttree2, OFFSET);
+  TRuntimeProfileTree ttree_merged;
+  merged_profile->ToThrift(&ttree_merged);
+
+  // Verify merged SummaryStats, InfoStrings, EventSequences, and TimeSeries.
+  VerifyThriftCounters(ttree_merged.nodes[0], OFFSET, MERGE_SIZE);
+  VerifyThriftSummaryStats(ttree_merged.nodes[0], OFFSET, MERGE_SIZE);
+  VerifyThriftInfoStrings(ttree_merged.nodes[0], OFFSET, MERGE_SIZE);
+  VerifyThriftEventSequences(ttree_merged.nodes[0], OFFSET, MERGE_SIZE);
+  VerifyThriftTimeSeries(ttree_merged.nodes[0], OFFSET, MERGE_SIZE);
+
+  // Verify that all profile name match.
+  ASSERT_EQ(ttree_merged.nodes[0].aggregated.num_instances, MERGE_SIZE);
+  for (int i = 0; i < NUM_PROFILES; ++i) {
+    ASSERT_STREQ(profiles[i]->name().c_str(),
+        ttree_merged.nodes[0].aggregated.input_profiles[i + OFFSET].c_str());
+  }
+}
+
 /// Test parameter class that helps to test time series resampling during profile pretty
 /// printing with a varying number of test samples.
 struct TimeSeriesTestParam {
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 1585f8b..1995ebc 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -540,7 +540,9 @@ void AggregatedRuntimeProfile::UpdateAggregatedFromInstances(
         << "Update() can only be called on root of averaged profile tree";
     DCHECK_EQ(agg_root.input_profiles.size(), agg_root.num_instances);
     for (int i = 0; i < agg_root.num_instances; ++i) {
-      input_profile_names_[start_idx + i] = agg_root.input_profiles[i];
+      if (LIKELY(!agg_root.input_profiles[i].empty())) {
+        input_profile_names_[start_idx + i] = agg_root.input_profiles[i];
+      }
     }
   }
 
@@ -623,6 +625,7 @@ void AggregatedRuntimeProfile::UpdateCountersFromInstances(
     DCHECK_EQ(tcounter.values.size(), tcounter.start_index.size()) << tcounter.name;
     for (int i = 0; i < tcounter.values.size(); ++i) {
       int idx = start_idx + i;
+      if (UNLIKELY(tcounter.values[i].empty() && !dst.values[idx].empty())) continue;
       dst.period_ms[idx] = tcounter.period_ms[i];
       dst.values[idx] = tcounter.values[i];
       dst.start_index[idx] = tcounter.start_index[i];
@@ -705,6 +708,7 @@ void AggregatedRuntimeProfile::UpdateEventSequencesFromInstances(
     DCHECK_LE(start_idx + tseq.timestamps.size(), num_input_profiles_);
     DCHECK_EQ(tseq.timestamps.size(), tseq.label_idxs.size());
     for (int i = 0; i < tseq.timestamps.size(); ++i) {
+      if (UNLIKELY(tseq.timestamps[i].empty())) continue;
       int idx = start_idx + i;
       std::vector<int32_t>& label_idxs = seq.label_idxs[idx];
       std::vector<int64_t>& timestamps = seq.timestamps[idx];