You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/04/17 03:56:28 UTC

[impala] branch master updated: IMPALA-8375: Add metrics for spill disk usage

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new a80ebb7  IMPALA-8375: Add metrics for spill disk usage
a80ebb7 is described below

commit a80ebb7a729c1ffd25db262527d4261e01763646
Author: Abhishek Rawat <ar...@cloudera.com>
AuthorDate: Sun Apr 7 09:15:15 2019 -0700

    IMPALA-8375: Add metrics for spill disk usage
    
    Added two new metrics tmp-file-mgr.scratch-space-bytes-used-high-water-mark
    & tmp-file-mgr.scratch-space-bytes-used for tracking HWM and current
    value for spilled bytes, respectively.
    
    A new class AtomicHighWaterMarkGauge was added to keep track of the HWM
    value. The new class also encapsulates a metric object which keeps track
    of the current value for the spilled bytes.
    
    The current value is incremented every time a new range is allocated from
    a temporary file. The current value for spilled bytes is decremented when
    a temporary file is closed. The new metrics are not updated when ranges
    are recycled from a file. We can add a new metric in future for keeping
    track of actual spilled bytes. The HWM value is updated whenever the
    current value is greater than the HWM value.
    
    Testing:
    - Added new unit tests to the metrics-test test case.
    - E2E testing for both the metrics by running concurrent spilling queries
      and ensuring that both the current value metric and the HWM metric were
      behaving as expected. Ran concurrent queries and monitored the metrics
      on the impala daemon's metric page.
    
    Change-Id: Ia1b3dd604c7234a8d8af34d70ca731544a46d298
    Reviewed-on: http://gerrit.cloudera.org:8080/12956
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/CMakeLists.txt              |  1 +
 be/src/runtime/tmp-file-mgr.cc | 16 ++++++++--
 be/src/runtime/tmp-file-mgr.h  |  3 ++
 be/src/util/metrics-test.cc    | 30 +++++++++++++++++++
 be/src/util/metrics.h          | 66 ++++++++++++++++++++++++++++++++++++++++++
 common/thrift/metrics.json     | 22 +++++++++++++-
 6 files changed, 135 insertions(+), 3 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index c671daf..837e4af 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -316,6 +316,7 @@ set(CLANG_INCLUDE_FLAGS
   "-I${SQUEASEL_INCLUDE_DIR}"
   "-I${GLOG_INCLUDE_DIR}"
   "-I${GFLAGS_INCLUDE_DIR}"
+  "-I${GTEST_INCLUDE_DIR}"
   "-I${RAPIDJSON_INCLUDE_DIR}"
   "-I${AVRO_INCLUDE_DIR}"
   "-I${ORC_INCLUDE_DIR}"
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 5c76870..32c1509 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -66,11 +66,16 @@ const uint64_t AVAILABLE_SPACE_THRESHOLD_MB = 1024;
 const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS = "tmp-file-mgr.active-scratch-dirs";
 const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST =
     "tmp-file-mgr.active-scratch-dirs.list";
+const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK =
+    "tmp-file-mgr.scratch-space-bytes-used-high-water-mark";
+const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED =
+    "tmp-file-mgr.scratch-space-bytes-used";
 
 TmpFileMgr::TmpFileMgr()
   : initialized_(false),
     num_active_scratch_dirs_metric_(nullptr),
-    active_scratch_dirs_metric_(nullptr) {}
+    active_scratch_dirs_metric_(nullptr),
+    scratch_bytes_used_metric_(nullptr) {}
 
 Status TmpFileMgr::Init(MetricGroup* metrics) {
   string tmp_dirs_spec = FLAGS_scratch_dirs;
@@ -140,6 +145,9 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
   for (int i = 0; i < tmp_dirs_.size(); ++i) {
     active_scratch_dirs_metric_->Add(tmp_dirs_[i]);
   }
+  scratch_bytes_used_metric_ =
+      metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK,
+          TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED, 0);
 
   initialized_ = true;
 
@@ -275,7 +283,10 @@ void TmpFileMgr::FileGroup::Close() {
   if (io_ctx_ != nullptr) io_mgr_->UnregisterContext(io_ctx_.get());
   for (std::unique_ptr<TmpFileMgr::File>& file : tmp_files_) {
     Status status = file->Remove();
-    if (!status.ok()) {
+    if (status.ok()) {
+      tmp_file_mgr_->scratch_bytes_used_metric_->Increment(
+          -1 * scratch_space_bytes_used_counter_->value());
+    } else {
       LOG(WARNING) << "Error removing scratch file '" << file->path()
                    << "': " << status.msg().msg();
     }
@@ -310,6 +321,7 @@ Status TmpFileMgr::FileGroup::AllocateSpace(
     if ((*tmp_file)->is_blacklisted()) continue;
     (*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset);
     scratch_space_bytes_used_counter_->Add(scratch_range_bytes);
+    tmp_file_mgr_->scratch_bytes_used_metric_->Increment(scratch_range_bytes);
     current_bytes_allocated_ += num_bytes;
     return Status::OK();
   }
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 86ef74e..a22a792 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -424,6 +424,9 @@ class TmpFileMgr {
   /// Metrics to track active scratch directories.
   IntGauge* num_active_scratch_dirs_metric_;
   SetMetric<std::string>* active_scratch_dirs_metric_;
+
+  /// Metrics to track the scratch space HWM.
+  AtomicHighWaterMarkGauge* scratch_bytes_used_metric_;
 };
 
 }
diff --git a/be/src/util/metrics-test.cc b/be/src/util/metrics-test.cc
index d61b70d..accbdcd 100644
--- a/be/src/util/metrics-test.cc
+++ b/be/src/util/metrics-test.cc
@@ -106,6 +106,36 @@ TEST_F(MetricsTest, GaugeMetrics) {
   AssertValue(int_gauge_with_units, 10, "10s000ms");
 }
 
+TEST_F(MetricsTest, AtomicHighWaterMarkGauge) {
+  MetricGroup metrics("IntHWMGauge");
+  AddMetricDef("gauge", TMetricKind::GAUGE, TUnit::NONE);
+  AddMetricDef("hwm_gauge", TMetricKind::GAUGE, TUnit::NONE);
+  AtomicHighWaterMarkGauge* int_hwm_gauge = metrics.AddHWMGauge("hwm_gauge", "gauge", 0);
+  IntGauge* int_gauge = int_hwm_gauge->current_value_;
+  AssertValue(int_hwm_gauge, 0, "0");
+  AssertValue(int_gauge, 0, "0");
+  int_hwm_gauge->Increment(-1);
+  AssertValue(int_hwm_gauge, 0, "0");
+  AssertValue(int_gauge, -1, "-1");
+  int_hwm_gauge->Increment(10);
+  AssertValue(int_hwm_gauge, 9, "9");
+  AssertValue(int_gauge, 9, "9");
+  int_hwm_gauge->SetValue(3456);
+  AssertValue(int_hwm_gauge, 3456, "3456");
+  AssertValue(int_gauge, 3456, "3456");
+  int_hwm_gauge->SetValue(100);
+  AssertValue(int_hwm_gauge, 3456, "3456");
+  AssertValue(int_gauge, 100, "100");
+
+  AddMetricDef("hwm_gauge_with_units", TMetricKind::GAUGE, TUnit::BYTES);
+  AddMetricDef("gauge_with_units", TMetricKind::GAUGE, TUnit::BYTES);
+  AtomicHighWaterMarkGauge* int_hwm_gauge_with_units =
+      metrics.AddHWMGauge("hwm_gauge_with_units", "gauge_with_units", 10);
+  IntGauge* int_gauge_with_units = int_hwm_gauge_with_units->current_value_;
+  AssertValue(int_hwm_gauge_with_units, 10, "10.00 B");
+  AssertValue(int_gauge_with_units, 10, "10.00 B");
+}
+
 TEST_F(MetricsTest, SumGauge) {
   MetricGroup metrics("SumGauge");
   AddMetricDef("gauge1", TMetricKind::GAUGE, TUnit::NONE);
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index 3d3c2dc..80b863b 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -26,6 +26,7 @@
 #include <boost/function.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/locks.hpp>
+#include <gtest/gtest_prod.h> // for FRIEND_TEST
 
 #include "common/atomic.h"
 #include "common/logging.h"
@@ -248,6 +249,62 @@ class AtomicMetric : public ScalarMetric<int64_t, metric_kind_t> {
 typedef class AtomicMetric<TMetricKind::GAUGE> IntGauge;
 typedef class AtomicMetric<TMetricKind::COUNTER> IntCounter;
 
+/// An AtomicMetric that keeps track of the highest value seen and the current value.
+///
+/// Implementation notes:
+/// The hwm_value_ member maintains the HWM while the current_value_ metric member
+/// maintains the current value. Note that since two separate atomics are used
+/// for maintaining the current value and HWM, they could be out of sync for a short
+/// duration. This behavior is acceptable for current use case. However, it is very
+/// important that both the hwm_value_ and current_value_ members are updated together
+/// using the interfaces from this class.
+class AtomicHighWaterMarkGauge : public ScalarMetric<int64_t, TMetricKind::GAUGE> {
+ public:
+  AtomicHighWaterMarkGauge(
+      const TMetricDef& metric_def, int64_t initial_value, IntGauge* current_value)
+    : ScalarMetric<int64_t, TMetricKind::GAUGE>(metric_def),
+      hwm_value_(initial_value),
+      current_value_(current_value) {
+    DCHECK(current_value_ != NULL && initial_value == current_value->GetValue());
+  }
+
+  ~AtomicHighWaterMarkGauge() {}
+
+  /// Returns the current high water mark value.
+  int64_t GetValue() override { return hwm_value_.Load(); }
+
+  /// Atomically sets the current value and atomically sets the high water mark value.
+  void SetValue(const int64_t& value) {
+    current_value_->SetValue(value);
+    UpdateMax(value);
+  }
+
+  /// Adds 'delta' to the current value atomically.
+  /// The hwm value is also updated atomically.
+  void Increment(int64_t delta) {
+    const int64_t new_val = current_value_->Increment(delta);
+    UpdateMax(new_val);
+  }
+
+ private:
+  FRIEND_TEST(MetricsTest, AtomicHighWaterMarkGauge);
+  /// Set 'hwm_value_' to 'v' if 'v' is larger than 'hwm_value_'. The entire operation is
+  /// atomic.
+  void UpdateMax(int64_t v) {
+    while (true) {
+      int64_t old_max = hwm_value_.Load();
+      int64_t new_max = std::max(old_max, v);
+      if (new_max == old_max) break; // Avoid atomic update.
+      if (LIKELY(hwm_value_.CompareAndSwap(old_max, new_max))) break;
+    }
+  }
+
+  /// The high water mark value.
+  AtomicInt64 hwm_value_;
+  /// The metric representing the current value.
+  IntGauge* current_value_;
+};
+
 /// Gauge metric that computes the sum of several gauges.
 class SumGauge : public IntGauge {
  public:
@@ -341,6 +398,15 @@ class MetricGroup {
     return RegisterMetric(new IntCounter(MetricDefs::Get(key, metric_def_arg), value));
   }
 
+  AtomicHighWaterMarkGauge* AddHWMGauge(const std::string& key_hwm,
+      const std::string& key_curent_value, const int64_t value,
+      const std::string& metric_def_arg = "") {
+    IntGauge* current_value_metric = RegisterMetric(
+        new IntGauge(MetricDefs::Get(key_curent_value, metric_def_arg), value));
+    return RegisterMetric(new AtomicHighWaterMarkGauge(
+        MetricDefs::Get(key_hwm, metric_def_arg), value, current_value_metric));
+  }
+
   /// Returns a metric by key. All MetricGroups reachable from this group are searched in
   /// depth-first order, starting with the root group.  Returns NULL if there is no metric
   /// with that key. This is not a very cheap operation; the result should be cached where
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 1cdca7b..fa04749 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1820,6 +1820,26 @@
     "key": "tmp-file-mgr.active-scratch-dirs.list"
   },
   {
+    "description": "The current total spilled bytes across all scratch directories.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Spilled bytes for scratch directories",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "tmp-file-mgr.scratch-space-bytes-used"
+  },
+  {
+    "description": "The high water mark for spilled bytes across all scratch directories.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Spilled bytes HWM for scratch directories",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "tmp-file-mgr.scratch-space-bytes-used-high-water-mark"
+  },
+  {
     "description": "Number of senders waiting for receiving fragment to initialize",
     "contexts": [
       "IMPALAD"
@@ -2111,4 +2131,4 @@
     "kind": "GAUGE",
     "key": "events-processor.events-received-15min-rate"
   }
-]
\ No newline at end of file
+]