You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/01 21:49:00 UTC
[kafka] branch trunk updated: MINOR: Use thread name and task for
sensor name (#5111)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cb2f024 MINOR: Use thread name and task for sensor name (#5111)
cb2f024 is described below
commit cb2f024f87aa1d4848eb059b15fefd7eca028a73
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri Jun 1 17:48:44 2018 -0400
MINOR: Use thread name and task for sensor name (#5111)
Changes to keep the operation name as is and make the sensor name unique.
Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../internals/metrics/StreamsMetricsImpl.java | 74 +++++++++++++---------
.../processor/internals/ProcessorNodeTest.java | 27 ++++----
.../internals/MeteredKeyValueBytesStoreTest.java | 16 ++---
.../state/internals/MeteredSessionStoreTest.java | 14 ++--
.../state/internals/MeteredWindowStoreTest.java | 20 +++---
5 files changed, 83 insertions(+), 68 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index bc2e150..662ded5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -229,16 +229,33 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final String operationName,
final Sensor.RecordingLevel recordingLevel,
final String... tags) {
+
+ return addLatencyAndThroughputSensor(null,
+ scopeName,
+ entityName,
+ operationName,
+ recordingLevel,
+ tags);
+
+ }
+
+ public Sensor addLatencyAndThroughputSensor(final String taskName,
+ final String scopeName,
+ final String entityName,
+ final String operationName,
+ final Sensor.RecordingLevel recordingLevel,
+ final String... tags) {
+
final Map<String, String> tagMap = constructTags(scopeName, entityName, tags);
final Map<String, String> allTagMap = constructTags(scopeName, "all", tags);
// first add the global operation metrics if not yet, with the global tags only
- final Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
+ final Sensor parent = metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName), null), recordingLevel);
addLatencyMetrics(scopeName, parent, operationName, allTagMap);
addThroughputMetrics(scopeName, parent, operationName, allTagMap);
// add the operation metrics with additional tags
- final Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
+ final Sensor sensor = metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName), entityName), recordingLevel, parent);
addLatencyMetrics(scopeName, sensor, operationName, tagMap);
addThroughputMetrics(scopeName, sensor, operationName, tagMap);
@@ -247,20 +264,6 @@ public class StreamsMetricsImpl implements StreamsMetrics {
return sensor;
}
- public Sensor addLatencyAndThroughputSensor(final String taskName,
- final String scopeName,
- final String entityName,
- final String operationName,
- final Sensor.RecordingLevel recordingLevel,
- final String... tags) {
- return addLatencyAndThroughputSensor(
- scopeName,
- entityName,
- threadName + "." + taskName + "." + operationName,
- recordingLevel,
- tags);
- }
-
/**
* @throws IllegalArgumentException if tags is not constructed in key-value pairs
*/
@@ -270,35 +273,44 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final String operationName,
final Sensor.RecordingLevel recordingLevel,
final String... tags) {
+
+ return addThroughputSensor(null,
+ scopeName,
+ entityName,
+ operationName,
+ recordingLevel,
+ tags);
+
+ }
+
+ public Sensor addThroughputSensor(final String taskName,
+ final String scopeName,
+ final String entityName,
+ final String operationName,
+ final Sensor.RecordingLevel recordingLevel,
+ final String... tags) {
+
final Map<String, String> tagMap = constructTags(scopeName, entityName, tags);
final Map<String, String> allTagMap = constructTags(scopeName, "all", tags);
// first add the global operation metrics if not yet, with the global tags only
- final Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
+ final Sensor parent = metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName), null), recordingLevel);
addThroughputMetrics(scopeName, parent, operationName, allTagMap);
// add the operation metrics with additional tags
- final Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
+ final Sensor sensor = metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName), entityName), recordingLevel, parent);
addThroughputMetrics(scopeName, sensor, operationName, tagMap);
parentSensors.put(sensor, parent);
return sensor;
+
}
- public Sensor addThroughputSensor(final String taskName,
- final String scopeName,
- final String entityName,
- final String operationName,
- final Sensor.RecordingLevel recordingLevel,
- final String... tags) {
- return addThroughputSensor(
- scopeName,
- entityName,
- threadName + "." + taskName + "." + operationName,
- recordingLevel,
- tags
- );
+
+ private String buildUniqueSensorName(String operationName, String taskName) {
+ String task = taskName == null ? "" : taskName + ".";
+ return threadName + "." + task + operationName;
}
private void addLatencyMetrics(final String scopeName, final Sensor sensor, final String opName, final Map<String, String> tags) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 6cab6e3..65dd022 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -118,24 +118,27 @@ public class ProcessorNodeTest {
assertNotNull(metrics.getSensor("name-mock.0_0." + throughputOperation));
for (final String opName : latencyOperations) {
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-avg", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-max", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-rate", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-total", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-avg", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-max", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-rate", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-total", groupName, metricTags);
}
- assertNotNull(metrics.metrics().get(metrics.metricName("mock.0_0." + throughputOperation + "-rate", groupName,
- "The average number of occurrence of " + "mock.0_0." + throughputOperation + " operation per second.", metricTags)));
+ assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName,
+ "The average number of occurrence of " + throughputOperation + " operation per second.",
+ metricTags)));
// test "all"
metricTags.put("processor-node-id", "all");
for (final String opName : latencyOperations) {
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-avg", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-max", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-rate", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-total", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-avg", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-max", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-rate", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-total", groupName, metricTags);
}
- assertNotNull(metrics.metrics().get(metrics.metricName("mock.0_0." + throughputOperation + "-rate", groupName,
- "The average number of occurrence of " + "mock.0_0." + throughputOperation + " operation per second.", metricTags)));
+ assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate",
+ groupName,
+ "The average number of occurrence of " + throughputOperation + " operation per second.",
+ metricTags)));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
index 70bd2ec..a5e0d79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
@@ -100,7 +100,7 @@ public class MeteredKeyValueBytesStoreTest {
metered.put(key, value);
- final KafkaMetric metric = metric("test.0_0.put-rate");
+ final KafkaMetric metric = metric("put-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
@@ -114,7 +114,7 @@ public class MeteredKeyValueBytesStoreTest {
assertThat(metered.get(key), equalTo(value));
- final KafkaMetric metric = metric("test.0_0.get-rate");
+ final KafkaMetric metric = metric("get-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -127,7 +127,7 @@ public class MeteredKeyValueBytesStoreTest {
metered.putIfAbsent(key, value);
- final KafkaMetric metric = metric("test.0_0.put-if-absent-rate");
+ final KafkaMetric metric = metric("put-if-absent-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -146,7 +146,7 @@ public class MeteredKeyValueBytesStoreTest {
metered.putAll(Collections.singletonList(KeyValue.pair(key, value)));
- final KafkaMetric metric = metric("test.0_0.put-all-rate");
+ final KafkaMetric metric = metric("put-all-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -159,7 +159,7 @@ public class MeteredKeyValueBytesStoreTest {
metered.delete(key);
- final KafkaMetric metric = metric("test.0_0.delete-rate");
+ final KafkaMetric metric = metric("delete-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -176,7 +176,7 @@ public class MeteredKeyValueBytesStoreTest {
assertFalse(iterator.hasNext());
iterator.close();
- final KafkaMetric metric = metric("test.0_0.range-rate");
+ final KafkaMetric metric = metric("range-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -193,7 +193,7 @@ public class MeteredKeyValueBytesStoreTest {
assertFalse(iterator.hasNext());
iterator.close();
- final KafkaMetric metric = metric(new MetricName("test.0_0.all-rate", "stream-scope-metrics", "", tags));
+ final KafkaMetric metric = metric(new MetricName("all-rate", "stream-scope-metrics", "", tags));
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -206,7 +206,7 @@ public class MeteredKeyValueBytesStoreTest {
metered.flush();
- final KafkaMetric metric = metric("test.0_0.flush-rate");
+ final KafkaMetric metric = metric("flush-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 7ffa77a..3bd190a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -98,7 +98,7 @@ public class MeteredSessionStoreTest {
metered.put(new Windowed<>(key, new SessionWindow(0, 0)), key);
- final KafkaMetric metric = metric("test.0_0.put-rate");
+ final KafkaMetric metric = metric("put-rate");
assertTrue(((Double) metric.metricValue()) > 0);
EasyMock.verify(inner);
}
@@ -115,7 +115,7 @@ public class MeteredSessionStoreTest {
assertFalse(iterator.hasNext());
iterator.close();
- final KafkaMetric metric = metric("test.0_0.fetch-rate");
+ final KafkaMetric metric = metric("fetch-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -132,7 +132,7 @@ public class MeteredSessionStoreTest {
assertFalse(iterator.hasNext());
iterator.close();
- final KafkaMetric metric = metric("test.0_0.fetch-rate");
+ final KafkaMetric metric = metric("fetch-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -146,7 +146,7 @@ public class MeteredSessionStoreTest {
metered.remove(new Windowed<>(key, new SessionWindow(0, 0)));
- final KafkaMetric metric = metric("test.0_0.remove-rate");
+ final KafkaMetric metric = metric("remove-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -163,7 +163,7 @@ public class MeteredSessionStoreTest {
assertFalse(iterator.hasNext());
iterator.close();
- final KafkaMetric metric = metric("test.0_0.fetch-rate");
+ final KafkaMetric metric = metric("fetch-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -180,7 +180,7 @@ public class MeteredSessionStoreTest {
assertFalse(iterator.hasNext());
iterator.close();
- final KafkaMetric metric = metric("test.0_0.fetch-rate");
+ final KafkaMetric metric = metric("fetch-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -188,7 +188,7 @@ public class MeteredSessionStoreTest {
@Test
public void shouldRecordRestoreTimeOnInit() {
init();
- final KafkaMetric metric = metric("test.0_0.restore-rate");
+ final KafkaMetric metric = metric("restore-rate");
assertTrue(metric.value() > 0);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 7067777..19bd523 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -89,8 +89,8 @@ public class MeteredWindowStoreTest {
EasyMock.replay(innerStoreMock);
store.init(context, store);
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.restore-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
}
@Test
@@ -103,8 +103,8 @@ public class MeteredWindowStoreTest {
store.init(context, store);
store.put("a", "a");
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.put-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
EasyMock.verify(innerStoreMock);
}
@@ -116,8 +116,8 @@ public class MeteredWindowStoreTest {
store.init(context, store);
store.fetch("a", 1, 1).close(); // recorded on close;
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
EasyMock.verify(innerStoreMock);
}
@@ -129,8 +129,8 @@ public class MeteredWindowStoreTest {
store.init(context, store);
store.fetch("a", "b", 1, 1).close(); // recorded on close;
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
EasyMock.verify(innerStoreMock);
}
@@ -144,8 +144,8 @@ public class MeteredWindowStoreTest {
store.init(context, store);
store.flush();
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.flush-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
EasyMock.verify(innerStoreMock);
}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.