You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/13 22:20:20 UTC
[5/6] impala git commit: IMPALA-6576: Add metrics for data stream
service memory usage
IMPALA-6576: Add metrics for data stream service memory usage
This change adds metrics for the data stream service memory usage. Both
current and peak usage are exposed.
It adds a test to test_krpc_metrics.py to make sure that the expected
metrics are present and that the peak usage shows a non-zero value after
running a query.
Change-Id: I5033b8dfda0b23d4230535ba13c3e050a35d01a3
Reviewed-on: http://gerrit.cloudera.org:8080/9562
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2ff96233
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2ff96233
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2ff96233
Branch: refs/heads/2.x
Commit: 2ff96233483726c667bea13042e5d3637f68b689
Parents: f90e937
Author: Lars Volker <lv...@cloudera.com>
Authored: Thu Mar 8 17:55:11 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Mar 13 05:33:26 2018 +0000
----------------------------------------------------------------------
be/src/runtime/exec-env.cc | 2 +-
be/src/service/data-stream-service.cc | 4 ++-
be/src/service/data-stream-service.h | 4 +--
be/src/util/memory-metrics.cc | 29 ++++++++++++++++++++
be/src/util/memory-metrics.h | 25 +++++++++++++++++
common/thrift/metrics.json | 20 ++++++++++++++
tests/custom_cluster/test_krpc_metrics.py | 37 +++++++++++++++++---------
7 files changed, 104 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 66d7f3c..47bb0bd 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -319,7 +319,7 @@ Status ExecEnv::Init() {
// Initialization needs to happen in the following order due to dependencies:
// - RPC manager, DataStreamService and DataStreamManager.
RETURN_IF_ERROR(rpc_mgr_->Init());
- data_svc_.reset(new DataStreamService());
+ data_svc_.reset(new DataStreamService(rpc_metrics_));
RETURN_IF_ERROR(data_svc_->Init());
RETURN_IF_ERROR(KrpcStreamMgr()->Init(data_svc_->mem_tracker()));
// Bump thread cache to 1GB to reduce contention for TCMalloc central
http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index b04105b..d94837b 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -27,6 +27,7 @@
#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/exec-env.h"
#include "runtime/row-batch.h"
+#include "util/memory-metrics.h"
#include "util/parse-util.h"
#include "testutil/fault-injection-util.h"
@@ -45,7 +46,7 @@ DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of threads for proce
namespace impala {
-DataStreamService::DataStreamService()
+DataStreamService::DataStreamService(MetricGroup* metric_group)
: DataStreamServiceIf(ExecEnv::GetInstance()->rpc_mgr()->metric_entity(),
ExecEnv::GetInstance()->rpc_mgr()->result_tracker()) {
MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
@@ -54,6 +55,7 @@ DataStreamService::DataStreamService()
&is_percent, process_mem_tracker->limit());
mem_tracker_.reset(new MemTracker(
bytes_limit, "Data Stream Service Queue", process_mem_tracker));
+ MemTrackerMetric::CreateMetrics(metric_group, mem_tracker_.get(), "DataStreamService");
}
Status DataStreamService::Init() {
http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/be/src/service/data-stream-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index cba27ae..812fb2c 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -37,10 +37,10 @@ class RpcMgr;
/// instances. The client for this service is implemented in KrpcDataStreamSender.
/// The processing of incoming requests is implemented in KrpcDataStreamRecvr.
/// KrpcDataStreamMgr is responsible for routing the incoming requests to the
-/// appropriate receivers.
+/// appropriate receivers. Metrics exposed by the service will be added to 'metric_group'.
class DataStreamService : public DataStreamServiceIf {
public:
- DataStreamService();
+ DataStreamService(MetricGroup* metric_group);
/// Initializes the service by registering it with the singleton RPC manager.
/// This mustn't be called until RPC manager has been initialized.
http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/be/src/util/memory-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index fd78343..d37fb3a 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -22,6 +22,7 @@
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/mem-tracker.h"
#include "util/jni-util.h"
#include "util/mem-info.h"
#include "util/time.h"
@@ -300,3 +301,31 @@ int64_t BufferPoolMetric::GetValue() {
}
return 0;
}
+
+void MemTrackerMetric::CreateMetrics(MetricGroup* metrics, MemTracker* mem_tracker,
+ const string& name) {
+ metrics->RegisterMetric(
+ new MemTrackerMetric(MetricDefs::Get("mem-tracker.$0.current_usage_bytes", name),
+ MemTrackerMetricType::CURRENT, mem_tracker));
+ metrics->RegisterMetric(
+ new MemTrackerMetric(MetricDefs::Get("mem-tracker.$0.peak_usage_bytes", name),
+ MemTrackerMetricType::PEAK, mem_tracker));
+}
+
+MemTrackerMetric::MemTrackerMetric(const TMetricDef& def, MemTrackerMetricType type,
+ MemTracker* mem_tracker)
+ : IntGauge(def, 0),
+ type_(type),
+ mem_tracker_(mem_tracker) {}
+
+int64_t MemTrackerMetric::GetValue() {
+ switch (type_) {
+ case MemTrackerMetricType::CURRENT:
+ return mem_tracker_->consumption();
+ case MemTrackerMetricType::PEAK:
+ return mem_tracker_->peak_consumption();
+ default:
+ DCHECK(false) << "Unknown MemTrackerMetricType: " << static_cast<int>(type_);
+ }
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index 0ac04bf..ed0e889 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -33,6 +33,7 @@
namespace impala {
class BufferPool;
+class MemTracker;
class ReservationTracker;
class Thread;
@@ -240,6 +241,30 @@ class BufferPoolMetric : public IntGauge {
BufferPool* buffer_pool_;
};
+/// Metric that reports information about a MemTracker.
+class MemTrackerMetric : public IntGauge {
+ public:
+ // Creates two new metrics tracking the current and peak usages of 'mem_tracker' in
+ // the metrics group 'metrics'. The caller must make sure that 'mem_tracker' is not
+ // destructed before 'metrics'.
+ static void CreateMetrics(MetricGroup* metrics, MemTracker* mem_tracker,
+ const std::string& name);
+
+ virtual int64_t GetValue() override;
+
+ private:
+ enum class MemTrackerMetricType {
+ CURRENT, // Current usage of the MemTracker
+ PEAK, // Peak usage of the MemTracker
+ };
+
+ MemTrackerMetric(const TMetricDef& def, MemTrackerMetricType type,
+ MemTracker* mem_tracker);
+
+ const MemTrackerMetricType type_;
+ const MemTracker* mem_tracker_;
+};
+
/// Registers common tcmalloc memory metrics. If 'register_jvm_metrics' is true, the JVM
/// memory metrics are also registered. If 'global_reservations' and 'buffer_pool' are
/// not NULL, also register buffer pool metrics.
http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index cce34d7..9498479 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1655,5 +1655,25 @@
"units": "UNIT",
"kind": "COUNTER",
"key": "rpc.$0.rpcs_queue_overflow"
+ },
+ {
+ "description": "Memtracker $0 Current Usage Bytes",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Memtracker $0 Current Usage Bytes",
+ "units": "BYTES",
+ "kind": "GAUGE",
+ "key": "mem-tracker.$0.current_usage_bytes"
+ },
+ {
+ "description": "Memtracker $0 Peak Usage Bytes",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Memtracker $0 Peak Max Bytes",
+ "units": "BYTES",
+ "kind": "GAUGE",
+ "key": "mem-tracker.$0.peak_usage_bytes"
}
]
http://git-wip-us.apache.org/repos/asf/impala/blob/2ff96233/tests/custom_cluster/test_krpc_metrics.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_metrics.py b/tests/custom_cluster/test_krpc_metrics.py
index ec56f41..de16ccc 100644
--- a/tests/custom_cluster/test_krpc_metrics.py
+++ b/tests/custom_cluster/test_krpc_metrics.py
@@ -69,12 +69,8 @@ class TestKrpcMetrics(CustomClusterTestSuite):
assert before < after
- @pytest.mark.execute_serially
- @CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \
- -datastream_service_num_svc_threads=1')
- def test_krpc_queue_overflow_metrics(self, vector):
- """Test that rejected RPCs show up on the /metrics debug web page.
- """
+ def get_metric(self, name):
+ """Finds the metric with name 'name' and returns its value as an int."""
def iter_metrics(group):
for m in group['metrics']:
yield m
@@ -82,16 +78,31 @@ class TestKrpcMetrics(CustomClusterTestSuite):
for m in iter_metrics(c):
yield m
- def get_metric(name):
- metrics = self.get_debug_page(self.METRICS_URL)['metric_group']
- for m in iter_metrics(metrics):
- if m['name'] == name:
- return int(m['value'])
+ metrics = self.get_debug_page(self.METRICS_URL)['metric_group']
+ for m in iter_metrics(metrics):
+ if m['name'] == name:
+ return int(m['value'])
+ assert False, "Could not find metric: %s" % name
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \
+ -datastream_service_num_svc_threads=1')
+ def test_krpc_queue_overflow_metrics(self, vector):
+ """Test that rejected RPCs show up on the /metrics debug web page.
+ """
metric_name = 'rpc.impala.DataStreamService.rpcs_queue_overflow'
- before = get_metric(metric_name)
+ before = self.get_metric(metric_name)
assert before == 0
self.client.execute(self.TEST_QUERY)
- after = get_metric(metric_name)
+ after = self.get_metric(metric_name)
assert before < after
+
+ @pytest.mark.execute_serially
+ def test_krpc_service_queue_metrics(self, vector):
+ """Test that memory usage metrics for the data stream service queue show up on the
+ /metrics debug web page.
+ """
+ self.client.execute(self.TEST_QUERY)
+ assert self.get_metric('mem-tracker.DataStreamService.current_usage_bytes') >= 0
+ assert self.get_metric('mem-tracker.DataStreamService.peak_usage_bytes') > 0