You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/13 22:20:20 UTC

[5/6] impala git commit: IMPALA-6576: Add metrics for data stream service memory usage

IMPALA-6576: Add metrics for data stream service memory usage

This change adds metrics for the data stream service memory usage. Both
current and peak usage are exposed.

It adds a test to test_krpc_metrics.py to make sure that the expected
metrics are present and that the peak usage shows a non-zero value after
running a query.

Change-Id: I5033b8dfda0b23d4230535ba13c3e050a35d01a3
Reviewed-on: http://gerrit.cloudera.org:8080/9562
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2ff96233
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2ff96233
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2ff96233

Branch: refs/heads/2.x
Commit: 2ff96233483726c667bea13042e5d3637f68b689
Parents: f90e937
Author: Lars Volker <lv...@cloudera.com>
Authored: Thu Mar 8 17:55:11 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Mar 13 05:33:26 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/exec-env.cc                |  2 +-
 be/src/service/data-stream-service.cc     |  4 ++-
 be/src/service/data-stream-service.h      |  4 +--
 be/src/util/memory-metrics.cc             | 29 ++++++++++++++++++++
 be/src/util/memory-metrics.h              | 25 +++++++++++++++++
 common/thrift/metrics.json                | 20 ++++++++++++++
 tests/custom_cluster/test_krpc_metrics.py | 37 +++++++++++++++++---------
 7 files changed, 104 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 66d7f3c..47bb0bd 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -319,7 +319,7 @@ Status ExecEnv::Init() {
     // Initialization needs to happen in the following order due to dependencies:
     // - RPC manager, DataStreamService and DataStreamManager.
     RETURN_IF_ERROR(rpc_mgr_->Init());
-    data_svc_.reset(new DataStreamService());
+    data_svc_.reset(new DataStreamService(rpc_metrics_));
     RETURN_IF_ERROR(data_svc_->Init());
     RETURN_IF_ERROR(KrpcStreamMgr()->Init(data_svc_->mem_tracker()));
     // Bump thread cache to 1GB to reduce contention for TCMalloc central

http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index b04105b..d94837b 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -27,6 +27,7 @@
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/row-batch.h"
+#include "util/memory-metrics.h"
 #include "util/parse-util.h"
 #include "testutil/fault-injection-util.h"
 
@@ -45,7 +46,7 @@ DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of threads for proce
 
 namespace impala {
 
-DataStreamService::DataStreamService()
+DataStreamService::DataStreamService(MetricGroup* metric_group)
   : DataStreamServiceIf(ExecEnv::GetInstance()->rpc_mgr()->metric_entity(),
         ExecEnv::GetInstance()->rpc_mgr()->result_tracker()) {
   MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
@@ -54,6 +55,7 @@ DataStreamService::DataStreamService()
       &is_percent, process_mem_tracker->limit());
   mem_tracker_.reset(new MemTracker(
       bytes_limit, "Data Stream Service Queue", process_mem_tracker));
+  MemTrackerMetric::CreateMetrics(metric_group, mem_tracker_.get(), "DataStreamService");
 }
 
 Status DataStreamService::Init() {

http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/be/src/service/data-stream-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index cba27ae..812fb2c 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -37,10 +37,10 @@ class RpcMgr;
 /// instances. The client for this service is implemented in KrpcDataStreamSender.
 /// The processing of incoming requests is implemented in KrpcDataStreamRecvr.
 /// KrpcDataStreamMgr is responsible for routing the incoming requests to the
-/// appropriate receivers.
+/// appropriate receivers. Metrics exposed by the service will be added to 'metric_group'.
 class DataStreamService : public DataStreamServiceIf {
  public:
-  DataStreamService();
+  DataStreamService(MetricGroup* metric_group);
 
   /// Initializes the service by registering it with the singleton RPC manager.
   /// This mustn't be called until RPC manager has been initialized.

http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/be/src/util/memory-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index fd78343..d37fb3a 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -22,6 +22,7 @@
 
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/mem-tracker.h"
 #include "util/jni-util.h"
 #include "util/mem-info.h"
 #include "util/time.h"
@@ -300,3 +301,31 @@ int64_t BufferPoolMetric::GetValue() {
   }
   return 0;
 }
+
+void MemTrackerMetric::CreateMetrics(MetricGroup* metrics, MemTracker* mem_tracker,
+    const string& name) {
+  metrics->RegisterMetric(
+      new MemTrackerMetric(MetricDefs::Get("mem-tracker.$0.current_usage_bytes", name),
+      MemTrackerMetricType::CURRENT, mem_tracker));
+  metrics->RegisterMetric(
+      new MemTrackerMetric(MetricDefs::Get("mem-tracker.$0.peak_usage_bytes", name),
+      MemTrackerMetricType::PEAK, mem_tracker));
+}
+
+MemTrackerMetric::MemTrackerMetric(const TMetricDef& def, MemTrackerMetricType type,
+    MemTracker* mem_tracker)
+  : IntGauge(def, 0),
+    type_(type),
+    mem_tracker_(mem_tracker) {}
+
+int64_t MemTrackerMetric::GetValue() {
+  switch (type_) {
+    case MemTrackerMetricType::CURRENT:
+      return mem_tracker_->consumption();
+    case MemTrackerMetricType::PEAK:
+      return mem_tracker_->peak_consumption();
+    default:
+      DCHECK(false) << "Unknown MemTrackerMetricType: " << static_cast<int>(type_);
+  }
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index 0ac04bf..ed0e889 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -33,6 +33,7 @@
 namespace impala {
 
 class BufferPool;
+class MemTracker;
 class ReservationTracker;
 class Thread;
 
@@ -240,6 +241,30 @@ class BufferPoolMetric : public IntGauge {
   BufferPool* buffer_pool_;
 };
 
+/// Metric that reports information about a MemTracker.
+class MemTrackerMetric : public IntGauge {
+ public:
+  // Creates two new metrics tracking the current and peak usages of 'mem_tracker' in
+  // the metrics group 'metrics'. The caller must make sure that 'mem_tracker' is not
+  // destructed before 'metrics'.
+  static void CreateMetrics(MetricGroup* metrics, MemTracker* mem_tracker,
+      const std::string& name);
+
+  virtual int64_t GetValue() override;
+
+ private:
+  enum class MemTrackerMetricType {
+    CURRENT, // Current usage of the MemTracker
+    PEAK, // Peak usage of the MemTracker
+  };
+
+  MemTrackerMetric(const TMetricDef& def, MemTrackerMetricType type,
+      MemTracker* mem_tracker);
+
+  const MemTrackerMetricType type_;
+  const MemTracker* mem_tracker_;
+};
+
 /// Registers common tcmalloc memory metrics. If 'register_jvm_metrics' is true, the JVM
 /// memory metrics are also registered. If 'global_reservations' and 'buffer_pool' are
 /// not NULL, also register buffer pool metrics.

http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index cce34d7..9498479 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1655,5 +1655,25 @@
     "units": "UNIT",
     "kind": "COUNTER",
     "key": "rpc.$0.rpcs_queue_overflow"
+  },
+  {
+    "description": "Memtracker $0 Current Usage Bytes",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Memtracker $0 Current Usage Bytes",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "mem-tracker.$0.current_usage_bytes"
+  },
+  {
+    "description": "Memtracker $0 Peak Usage Bytes",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Memtracker $0 Peak Max Bytes",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "mem-tracker.$0.peak_usage_bytes"
   }
 ]

http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/tests/custom_cluster/test_krpc_metrics.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_metrics.py b/tests/custom_cluster/test_krpc_metrics.py
index ec56f41..de16ccc 100644
--- a/tests/custom_cluster/test_krpc_metrics.py
+++ b/tests/custom_cluster/test_krpc_metrics.py
@@ -69,12 +69,8 @@ class TestKrpcMetrics(CustomClusterTestSuite):
 
     assert before < after
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \
-                                     -datastream_service_num_svc_threads=1')
-  def test_krpc_queue_overflow_metrics(self, vector):
-    """Test that rejected RPCs show up on the /metrics debug web page.
-    """
+  def get_metric(self, name):
+    """Finds the metric with name 'name' and returns its value as an int."""
     def iter_metrics(group):
       for m in group['metrics']:
         yield m
@@ -82,16 +78,31 @@ class TestKrpcMetrics(CustomClusterTestSuite):
         for m in iter_metrics(c):
           yield m
 
-    def get_metric(name):
-      metrics = self.get_debug_page(self.METRICS_URL)['metric_group']
-      for m in iter_metrics(metrics):
-        if m['name'] == name:
-          return int(m['value'])
+    metrics = self.get_debug_page(self.METRICS_URL)['metric_group']
+    for m in iter_metrics(metrics):
+      if m['name'] == name:
+        return int(m['value'])
+    assert False, "Could not find metric: %s" % name
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \
+                                     -datastream_service_num_svc_threads=1')
+  def test_krpc_queue_overflow_metrics(self, vector):
+    """Test that rejected RPCs show up on the /metrics debug web page.
+    """
     metric_name = 'rpc.impala.DataStreamService.rpcs_queue_overflow'
-    before = get_metric(metric_name)
+    before = self.get_metric(metric_name)
     assert before == 0
 
     self.client.execute(self.TEST_QUERY)
-    after = get_metric(metric_name)
+    after = self.get_metric(metric_name)
     assert before < after
+
+  @pytest.mark.execute_serially
+  def test_krpc_service_queue_metrics(self, vector):
+    """Test that memory usage metrics for the data stream service queue show up on the
+    /metrics debug web page.
+    """
+    self.client.execute(self.TEST_QUERY)
+    assert self.get_metric('mem-tracker.DataStreamService.current_usage_bytes') >= 0
+    assert self.get_metric('mem-tracker.DataStreamService.peak_usage_bytes') > 0