You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/01 21:49:00 UTC

[kafka] branch trunk updated: MINOR: Use thread name and task for sensor name (#5111)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cb2f024  MINOR: Use thread name and task for sensor name (#5111)
cb2f024 is described below

commit cb2f024f87aa1d4848eb059b15fefd7eca028a73
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri Jun 1 17:48:44 2018 -0400

    MINOR: Use thread name and task for sensor name (#5111)
    
    Changes to keep the operation name as is and make the sensor name unique.
    
    Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../internals/metrics/StreamsMetricsImpl.java      | 74 +++++++++++++---------
 .../processor/internals/ProcessorNodeTest.java     | 27 ++++----
 .../internals/MeteredKeyValueBytesStoreTest.java   | 16 ++---
 .../state/internals/MeteredSessionStoreTest.java   | 14 ++--
 .../state/internals/MeteredWindowStoreTest.java    | 20 +++---
 5 files changed, 83 insertions(+), 68 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index bc2e150..662ded5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -229,16 +229,33 @@ public class StreamsMetricsImpl implements StreamsMetrics {
                                                 final String operationName,
                                                 final Sensor.RecordingLevel recordingLevel,
                                                 final String... tags) {
+
+        return addLatencyAndThroughputSensor(null,
+                                             scopeName,
+                                             entityName,
+                                             operationName,
+                                             recordingLevel,
+                                             tags);
+
+    }
+
+    public Sensor addLatencyAndThroughputSensor(final String taskName,
+                                                final String scopeName,
+                                                final String entityName,
+                                                final String operationName,
+                                                final Sensor.RecordingLevel recordingLevel,
+                                                final String... tags) {
+
         final Map<String, String> tagMap = constructTags(scopeName, entityName, tags);
         final Map<String, String> allTagMap = constructTags(scopeName, "all", tags);
 
         // first add the global operation metrics if not yet, with the global tags only
-        final Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
+        final Sensor parent = metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName), null), recordingLevel);
         addLatencyMetrics(scopeName, parent, operationName, allTagMap);
         addThroughputMetrics(scopeName, parent, operationName, allTagMap);
 
         // add the operation metrics with additional tags
-        final Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
+        final Sensor sensor = metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName), entityName), recordingLevel, parent);
         addLatencyMetrics(scopeName, sensor, operationName, tagMap);
         addThroughputMetrics(scopeName, sensor, operationName, tagMap);
 
@@ -247,20 +264,6 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         return sensor;
     }
 
-    public Sensor addLatencyAndThroughputSensor(final String taskName,
-                                                final String scopeName,
-                                                final String entityName,
-                                                final String operationName,
-                                                final Sensor.RecordingLevel recordingLevel,
-                                                final String... tags) {
-        return addLatencyAndThroughputSensor(
-            scopeName,
-            entityName,
-            threadName + "." + taskName + "." + operationName,
-            recordingLevel,
-            tags);
-    }
-
     /**
      * @throws IllegalArgumentException if tags is not constructed in key-value pairs
      */
@@ -270,35 +273,44 @@ public class StreamsMetricsImpl implements StreamsMetrics {
                                       final String operationName,
                                       final Sensor.RecordingLevel recordingLevel,
                                       final String... tags) {
+
+        return addThroughputSensor(null,
+                                   scopeName,
+                                   entityName,
+                                   operationName,
+                                   recordingLevel,
+                                   tags);
+
+    }
+
+    public Sensor addThroughputSensor(final String taskName,
+                                      final String scopeName,
+                                      final String entityName,
+                                      final String operationName,
+                                      final Sensor.RecordingLevel recordingLevel,
+                                      final String... tags) {
+
         final Map<String, String> tagMap = constructTags(scopeName, entityName, tags);
         final Map<String, String> allTagMap = constructTags(scopeName, "all", tags);
 
         // first add the global operation metrics if not yet, with the global tags only
-        final Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
+        final Sensor parent = metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName), null), recordingLevel);
         addThroughputMetrics(scopeName, parent, operationName, allTagMap);
 
         // add the operation metrics with additional tags
-        final Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
+        final Sensor sensor = metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName), entityName), recordingLevel, parent);
         addThroughputMetrics(scopeName, sensor, operationName, tagMap);
 
         parentSensors.put(sensor, parent);
 
         return sensor;
+
     }
 
-    public Sensor addThroughputSensor(final String taskName,
-                                      final String scopeName,
-                                      final String entityName,
-                                      final String operationName,
-                                      final Sensor.RecordingLevel recordingLevel,
-                                      final String... tags) {
-        return addThroughputSensor(
-            scopeName,
-            entityName,
-            threadName + "." + taskName + "." + operationName,
-            recordingLevel,
-            tags
-        );
+
+    private String buildUniqueSensorName(String operationName, String taskName) {
+        String task = taskName == null ? "" : taskName + ".";
+        return threadName + "." + task + operationName;
     }
 
     private void addLatencyMetrics(final String scopeName, final Sensor sensor, final String opName, final Map<String, String> tags) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 6cab6e3..65dd022 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -118,24 +118,27 @@ public class ProcessorNodeTest {
         assertNotNull(metrics.getSensor("name-mock.0_0." + throughputOperation));
 
         for (final String opName : latencyOperations) {
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-avg", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-max", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-rate", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-total", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-latency-avg", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-latency-max", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-rate", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-total", groupName, metricTags);
         }
-        assertNotNull(metrics.metrics().get(metrics.metricName("mock.0_0." + throughputOperation + "-rate", groupName,
-            "The average number of occurrence of " + "mock.0_0." + throughputOperation + " operation per second.", metricTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName,
+                                                               "The average number of occurrence of " + throughputOperation + " operation per second.",
+                                                               metricTags)));
 
         // test "all"
         metricTags.put("processor-node-id", "all");
         for (final String opName : latencyOperations) {
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-avg", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-max", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-rate", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-total", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-latency-avg", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-latency-max", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-rate", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-total", groupName, metricTags);
         }
-        assertNotNull(metrics.metrics().get(metrics.metricName("mock.0_0." + throughputOperation + "-rate", groupName,
-            "The average number of occurrence of " + "mock.0_0." + throughputOperation + " operation per second.", metricTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate",
+                                                               groupName,
+                                                               "The average number of occurrence of " + throughputOperation + " operation per second.",
+                                                               metricTags)));
 
 
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
index 70bd2ec..a5e0d79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
@@ -100,7 +100,7 @@ public class MeteredKeyValueBytesStoreTest {
 
         metered.put(key, value);
 
-        final KafkaMetric metric = metric("test.0_0.put-rate");
+        final KafkaMetric metric = metric("put-rate");
 
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
@@ -114,7 +114,7 @@ public class MeteredKeyValueBytesStoreTest {
 
         assertThat(metered.get(key), equalTo(value));
 
-        final KafkaMetric metric = metric("test.0_0.get-rate");
+        final KafkaMetric metric = metric("get-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -127,7 +127,7 @@ public class MeteredKeyValueBytesStoreTest {
 
         metered.putIfAbsent(key, value);
 
-        final KafkaMetric metric = metric("test.0_0.put-if-absent-rate");
+        final KafkaMetric metric = metric("put-if-absent-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -146,7 +146,7 @@ public class MeteredKeyValueBytesStoreTest {
 
         metered.putAll(Collections.singletonList(KeyValue.pair(key, value)));
 
-        final KafkaMetric metric = metric("test.0_0.put-all-rate");
+        final KafkaMetric metric = metric("put-all-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -159,7 +159,7 @@ public class MeteredKeyValueBytesStoreTest {
 
         metered.delete(key);
 
-        final KafkaMetric metric = metric("test.0_0.delete-rate");
+        final KafkaMetric metric = metric("delete-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -176,7 +176,7 @@ public class MeteredKeyValueBytesStoreTest {
         assertFalse(iterator.hasNext());
         iterator.close();
 
-        final KafkaMetric metric = metric("test.0_0.range-rate");
+        final KafkaMetric metric = metric("range-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -193,7 +193,7 @@ public class MeteredKeyValueBytesStoreTest {
         assertFalse(iterator.hasNext());
         iterator.close();
 
-        final KafkaMetric metric = metric(new MetricName("test.0_0.all-rate", "stream-scope-metrics", "", tags));
+        final KafkaMetric metric = metric(new MetricName("all-rate", "stream-scope-metrics", "", tags));
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -206,7 +206,7 @@ public class MeteredKeyValueBytesStoreTest {
 
         metered.flush();
 
-        final KafkaMetric metric = metric("test.0_0.flush-rate");
+        final KafkaMetric metric = metric("flush-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 7ffa77a..3bd190a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -98,7 +98,7 @@ public class MeteredSessionStoreTest {
 
         metered.put(new Windowed<>(key, new SessionWindow(0, 0)), key);
 
-        final KafkaMetric metric = metric("test.0_0.put-rate");
+        final KafkaMetric metric = metric("put-rate");
         assertTrue(((Double) metric.metricValue()) > 0);
         EasyMock.verify(inner);
     }
@@ -115,7 +115,7 @@ public class MeteredSessionStoreTest {
         assertFalse(iterator.hasNext());
         iterator.close();
 
-        final KafkaMetric metric = metric("test.0_0.fetch-rate");
+        final KafkaMetric metric = metric("fetch-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -132,7 +132,7 @@ public class MeteredSessionStoreTest {
         assertFalse(iterator.hasNext());
         iterator.close();
 
-        final KafkaMetric metric = metric("test.0_0.fetch-rate");
+        final KafkaMetric metric = metric("fetch-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -146,7 +146,7 @@ public class MeteredSessionStoreTest {
 
         metered.remove(new Windowed<>(key, new SessionWindow(0, 0)));
 
-        final KafkaMetric metric = metric("test.0_0.remove-rate");
+        final KafkaMetric metric = metric("remove-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -163,7 +163,7 @@ public class MeteredSessionStoreTest {
         assertFalse(iterator.hasNext());
         iterator.close();
 
-        final KafkaMetric metric = metric("test.0_0.fetch-rate");
+        final KafkaMetric metric = metric("fetch-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -180,7 +180,7 @@ public class MeteredSessionStoreTest {
         assertFalse(iterator.hasNext());
         iterator.close();
 
-        final KafkaMetric metric = metric("test.0_0.fetch-rate");
+        final KafkaMetric metric = metric("fetch-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -188,7 +188,7 @@ public class MeteredSessionStoreTest {
     @Test
     public void shouldRecordRestoreTimeOnInit() {
         init();
-        final KafkaMetric metric = metric("test.0_0.restore-rate");
+        final KafkaMetric metric = metric("restore-rate");
         assertTrue(metric.value() > 0);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 7067777..19bd523 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -89,8 +89,8 @@ public class MeteredWindowStoreTest {
         EasyMock.replay(innerStoreMock);
         store.init(context, store);
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.restore-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
     }
 
     @Test
@@ -103,8 +103,8 @@ public class MeteredWindowStoreTest {
         store.init(context, store);
         store.put("a", "a");
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.put-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
         EasyMock.verify(innerStoreMock);
     }
 
@@ -116,8 +116,8 @@ public class MeteredWindowStoreTest {
         store.init(context, store);
         store.fetch("a", 1, 1).close(); // recorded on close;
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
         EasyMock.verify(innerStoreMock);
     }
 
@@ -129,8 +129,8 @@ public class MeteredWindowStoreTest {
         store.init(context, store);
         store.fetch("a", "b", 1, 1).close(); // recorded on close;
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
         EasyMock.verify(innerStoreMock);
     }
 
@@ -144,8 +144,8 @@ public class MeteredWindowStoreTest {
         store.init(context, store);
         store.flush();
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.flush-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
         EasyMock.verify(innerStoreMock);
     }
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.