You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/05/12 00:19:16 UTC

[kafka] branch trunk updated: MINOR: Ensure sensor names are unique in Kafka Streams (#5009)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9947cd4  MINOR: Ensure sensor names are unique in Kafka Streams (#5009)
9947cd4 is described below

commit 9947cd40c617d4e12690b81a2d28624355580e34
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri May 11 17:19:11 2018 -0700

    MINOR: Ensure sensor names are unique in Kafka Streams (#5009)
    
    Reviewer: Guozhang Wang <gu...@confluent.io>
---
 .../org/apache/kafka/common/metrics/Metrics.java   |  2 +
 .../streams/processor/internals/ProcessorNode.java |  5 ++
 .../internals/metrics/StreamsMetricsImpl.java      | 29 ++++++++++
 .../state/internals/InnerMeteredKeyValueStore.java | 61 +++++++++++-----------
 .../state/internals/MeteredSessionStore.java       | 28 +++++-----
 .../state/internals/MeteredWindowStore.java        | 24 ++++-----
 .../processor/internals/ProcessorNodeTest.java     | 28 +++++-----
 .../internals/StreamsMetricsImplTest.java          | 11 ++--
 .../internals/MeteredKeyValueBytesStoreTest.java   | 16 +++---
 .../state/internals/MeteredSessionStoreTest.java   | 14 ++---
 .../state/internals/MeteredWindowStoreTest.java    | 20 +++----
 11 files changed, 139 insertions(+), 99 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 7a8667c..d456fed 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -531,6 +531,7 @@ public class Metrics implements Closeable {
                     log.error("Error when removing metric from " + reporter.getClass().getName(), e);
                 }
             }
+            log.trace("Removed metric named {}", metricName);
         }
         return metric;
     }
@@ -564,6 +565,7 @@ public class Metrics implements Closeable {
                 log.error("Error when registering metric on " + reporter.getClass().getName(), e);
             }
         }
+        log.trace("Registered metric named {}", metricName);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index a0a7041..64ef538 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -186,6 +186,7 @@ public class ProcessorNode<K, V> {
 
             // these are all latency metrics
             this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(
+                context.taskId().toString(),
                 "processor-node",
                 processorNodeName,
                 "process",
@@ -193,6 +194,7 @@ public class ProcessorNode<K, V> {
                 "task-id", context.taskId().toString()
             );
             this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(
+                context.taskId().toString(),
                 "processor-node",
                 processorNodeName,
                 "punctuate",
@@ -200,6 +202,7 @@ public class ProcessorNode<K, V> {
                 "task-id", context.taskId().toString()
             );
             this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(
+                context.taskId().toString(),
                 "processor-node",
                 processorNodeName,
                 "create",
@@ -207,6 +210,7 @@ public class ProcessorNode<K, V> {
                 "task-id", context.taskId().toString()
             );
             this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(
+                context.taskId().toString(),
                 "processor-node",
                 processorNodeName,
                 "destroy",
@@ -214,6 +218,7 @@ public class ProcessorNode<K, V> {
                 "task-id", context.taskId().toString()
             );
             this.sourceNodeForwardSensor = metrics.addThroughputSensor(
+                context.taskId().toString(),
                 "processor-node",
                 processorNodeName,
                 "forward",
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 0251265..bc2e150 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -247,6 +247,20 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         return sensor;
     }
 
+    public Sensor addLatencyAndThroughputSensor(final String taskName,
+                                                final String scopeName,
+                                                final String entityName,
+                                                final String operationName,
+                                                final Sensor.RecordingLevel recordingLevel,
+                                                final String... tags) {
+        return addLatencyAndThroughputSensor(
+            scopeName,
+            entityName,
+            threadName + "." + taskName + "." + operationName,
+            recordingLevel,
+            tags);
+    }
+
     /**
      * @throws IllegalArgumentException if tags is not constructed in key-value pairs
      */
@@ -272,6 +286,21 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         return sensor;
     }
 
+    public Sensor addThroughputSensor(final String taskName,
+                                      final String scopeName,
+                                      final String entityName,
+                                      final String operationName,
+                                      final Sensor.RecordingLevel recordingLevel,
+                                      final String... tags) {
+        return addThroughputSensor(
+            scopeName,
+            entityName,
+            threadName + "." + taskName + "." + operationName,
+            recordingLevel,
+            tags
+        );
+    }
+
     private void addLatencyMetrics(final String scopeName, final Sensor sensor, final String opName, final Map<String, String> tags) {
         sensor.add(
             metrics.metricName(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
index 40e2d43..14464e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -49,7 +49,7 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
     private Sensor allTime;
     private Sensor rangeTime;
     private Sensor flushTime;
-    private StreamsMetrics metrics;
+    private StreamsMetricsImpl metrics;
     private ProcessorContext context;
     private StateStore root;
 
@@ -89,63 +89,64 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
     public void init(ProcessorContext context, StateStore root) {
         final String name = name();
         final String tagKey = "task-id";
-        final String tagValue = context.taskId().toString();
+        final String taskName = context.taskId().toString();
         this.context = context;
         this.root = root;
-        this.metrics = context.metrics();
-        this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+        this.metrics = (StreamsMetricsImpl) context.metrics();
+        this.putTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+                                                                  metricScope,
                                                                   name,
                                                                   "put",
                                                                   Sensor.RecordingLevel.DEBUG,
-                                                                  tagKey, tagValue);
-        this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                  tagKey, taskName);
+        this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+                                                                          metricScope,
                                                                           name,
                                                                           "put-if-absent",
                                                                           Sensor.RecordingLevel.DEBUG,
-                                                                          tagKey,
-                                                                          tagValue);
-        this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                          tagKey, taskName);
+        this.getTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+                                                                  metricScope,
                                                                   name,
                                                                   "get",
                                                                   Sensor.RecordingLevel.DEBUG,
-                                                                  tagKey,
-                                                                  tagValue);
-        this.deleteTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                  tagKey, taskName);
+        this.deleteTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+                                                                     metricScope,
                                                                      name,
                                                                      "delete",
                                                                      Sensor.RecordingLevel.DEBUG,
-                                                                     tagKey,
-                                                                     tagValue);
-        this.putAllTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                     tagKey, taskName);
+        this.putAllTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+                                                                     metricScope,
                                                                      name,
                                                                      "put-all",
                                                                      Sensor.RecordingLevel.DEBUG,
-                                                                     tagKey,
-                                                                     tagValue);
-        this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                     tagKey, taskName);
+        this.allTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+                                                                  metricScope,
                                                                   name,
                                                                   "all",
                                                                   Sensor.RecordingLevel.DEBUG,
-                                                                  tagKey,
-                                                                  tagValue);
-        this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                  tagKey, taskName);
+        this.rangeTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+                                                                    metricScope,
                                                                     name,
                                                                     "range",
                                                                     Sensor.RecordingLevel.DEBUG,
-                                                                    tagKey,
-                                                                    tagValue);
-        this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                    tagKey, taskName);
+        this.flushTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+                                                                    metricScope,
                                                                     name,
                                                                     "flush",
                                                                     Sensor.RecordingLevel.DEBUG,
-                                                                    tagKey,
-                                                                    tagValue);
-        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                    tagKey, taskName);
+        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+                                                                              metricScope,
                                                                               name,
                                                                               "restore",
                                                                               Sensor.RecordingLevel.DEBUG,
-                                                                              tagKey,
-                                                                              tagValue);
+                                                                              tagKey, taskName);
 
         // register and possibly restore the state from the logs
         if (restoreTime.shouldRecord()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 1a9ac20..5636219 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -20,11 +20,11 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -38,7 +38,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
     private final Serde<V> valueSerde;
     private final Time time;
     private StateSerdes<K, V> serdes;
-    private StreamsMetrics metrics;
+    private StreamsMetricsImpl metrics;
     private Sensor putTime;
     private Sensor fetchTime;
     private Sensor flushTime;
@@ -65,19 +65,19 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                                         valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
         final String tagKey = "task-id";
-        final String tagValue = context.taskId().toString();
-        this.metrics = context.metrics();
-        this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put",
-                                                                  Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        this.fetchTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "fetch",
-                                                                    Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "flush",
-                                                                    Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        this.removeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "remove",
-                                                                     Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        final String taskName = context.taskId().toString();
+        this.metrics = (StreamsMetricsImpl) context.metrics();
+        this.putTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "put",
+                                                                  Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+        this.fetchTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "fetch",
+                                                                    Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+        this.flushTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "flush",
+                                                                    Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+        this.removeTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "remove",
+                                                                     Sensor.RecordingLevel.DEBUG, tagKey, taskName);
 
-        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "restore",
-                                                                              Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "restore",
+                                                                              Sensor.RecordingLevel.DEBUG, tagKey, taskName);
         // register and possibly restore the state from the logs
         final long startNs = time.nanoseconds();
         try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index b131db5..2487854 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -20,11 +20,11 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
@@ -37,7 +37,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     private final Time time;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
-    private StreamsMetrics metrics;
+    private StreamsMetricsImpl metrics;
     private Sensor putTime;
     private Sensor fetchTime;
     private Sensor flushTime;
@@ -65,16 +65,16 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                                         valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
         final String tagKey = "task-id";
-        final String tagValue = context.taskId().toString();
-        this.metrics = context.metrics();
-        this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put",
-                                                                  Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        this.fetchTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "fetch",
-                                                                    Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "flush",
-                                                                    Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "restore",
-                                                                              Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        final String taskName = context.taskId().toString();
+        this.metrics = (StreamsMetricsImpl) context.metrics();
+        this.putTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "put",
+                                                                  Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+        this.fetchTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "fetch",
+                                                                    Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+        this.flushTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "flush",
+                                                                    Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "restore",
+                                                                              Sensor.RecordingLevel.DEBUG, tagKey, taskName);
         // register and possibly restore the state from the logs
         final long startNs = time.nanoseconds();
         try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 0992063..6cab6e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -113,29 +113,29 @@ public class ProcessorNodeTest {
 
 
         for (final String operation : latencyOperations) {
-            assertNotNull(metrics.getSensor(operation));
+            assertNotNull(metrics.getSensor("name-mock.0_0." + operation));
         }
-        assertNotNull(metrics.getSensor(throughputOperation));
+        assertNotNull(metrics.getSensor("name-mock.0_0." + throughputOperation));
 
         for (final String opName : latencyOperations) {
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-avg", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-max", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-rate", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-total", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-avg", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-max", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-rate", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-total", groupName, metricTags);
         }
-        assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName,
-            "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("mock.0_0." + throughputOperation + "-rate", groupName,
+            "The average number of occurrence of " + "mock.0_0." + throughputOperation + " operation per second.", metricTags)));
 
         // test "all"
         metricTags.put("processor-node-id", "all");
         for (final String opName : latencyOperations) {
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-avg", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-max", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-rate", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-total", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-avg", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-max", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-rate", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-total", groupName, metricTags);
         }
-        assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName,
-            "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("mock.0_0." + throughputOperation + "-rate", groupName,
+            "The average number of occurrence of " + "mock.0_0." + throughputOperation + " operation per second.", metricTags)));
 
 
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index b065e2c..a72dc79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -40,6 +40,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void testRemoveSensor() {
         final String sensorName = "sensor1";
+        final String taskName = "task";
         final String scope = "scope";
         final String entity = "entity";
         final String operation = "put";
@@ -51,10 +52,10 @@ public class StreamsMetricsImplTest {
         final Sensor sensor1a = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG, sensor1);
         streamsMetrics.removeSensor(sensor1a);
 
-        final Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+        final Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(taskName, scope, entity, operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor2);
 
-        final Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+        final Sensor sensor3 = streamsMetrics.addThroughputSensor(taskName, scope, entity, operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor3);
     }
 
@@ -63,11 +64,12 @@ public class StreamsMetricsImplTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
         final int defaultMetrics = streamsMetrics.metrics().size();
 
+        final String taskName = "task";
         final String scope = "scope";
         final String entity = "entity";
         final String operation = "put";
 
-        final Sensor sensor1 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+        final Sensor sensor1 = streamsMetrics.addLatencyAndThroughputSensor(taskName, scope, entity, operation, Sensor.RecordingLevel.DEBUG);
 
         // 2 meters and 4 non-meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
         final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
@@ -83,11 +85,12 @@ public class StreamsMetricsImplTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
         final int defaultMetrics = streamsMetrics.metrics().size();
 
+        final String taskName = "task";
         final String scope = "scope";
         final String entity = "entity";
         final String operation = "put";
 
-        final Sensor sensor1 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+        final Sensor sensor1 = streamsMetrics.addThroughputSensor(taskName,  scope, entity, operation, Sensor.RecordingLevel.DEBUG);
 
         final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
         // 2 meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
index a5e0d79..70bd2ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
@@ -100,7 +100,7 @@ public class MeteredKeyValueBytesStoreTest {
 
         metered.put(key, value);
 
-        final KafkaMetric metric = metric("put-rate");
+        final KafkaMetric metric = metric("test.0_0.put-rate");
 
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
@@ -114,7 +114,7 @@ public class MeteredKeyValueBytesStoreTest {
 
         assertThat(metered.get(key), equalTo(value));
 
-        final KafkaMetric metric = metric("get-rate");
+        final KafkaMetric metric = metric("test.0_0.get-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -127,7 +127,7 @@ public class MeteredKeyValueBytesStoreTest {
 
         metered.putIfAbsent(key, value);
 
-        final KafkaMetric metric = metric("put-if-absent-rate");
+        final KafkaMetric metric = metric("test.0_0.put-if-absent-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -146,7 +146,7 @@ public class MeteredKeyValueBytesStoreTest {
 
         metered.putAll(Collections.singletonList(KeyValue.pair(key, value)));
 
-        final KafkaMetric metric = metric("put-all-rate");
+        final KafkaMetric metric = metric("test.0_0.put-all-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -159,7 +159,7 @@ public class MeteredKeyValueBytesStoreTest {
 
         metered.delete(key);
 
-        final KafkaMetric metric = metric("delete-rate");
+        final KafkaMetric metric = metric("test.0_0.delete-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -176,7 +176,7 @@ public class MeteredKeyValueBytesStoreTest {
         assertFalse(iterator.hasNext());
         iterator.close();
 
-        final KafkaMetric metric = metric("range-rate");
+        final KafkaMetric metric = metric("test.0_0.range-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -193,7 +193,7 @@ public class MeteredKeyValueBytesStoreTest {
         assertFalse(iterator.hasNext());
         iterator.close();
 
-        final KafkaMetric metric = metric(new MetricName("all-rate", "stream-scope-metrics", "", tags));
+        final KafkaMetric metric = metric(new MetricName("test.0_0.all-rate", "stream-scope-metrics", "", tags));
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -206,7 +206,7 @@ public class MeteredKeyValueBytesStoreTest {
 
         metered.flush();
 
-        final KafkaMetric metric = metric("flush-rate");
+        final KafkaMetric metric = metric("test.0_0.flush-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 3bd190a..7ffa77a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -98,7 +98,7 @@ public class MeteredSessionStoreTest {
 
         metered.put(new Windowed<>(key, new SessionWindow(0, 0)), key);
 
-        final KafkaMetric metric = metric("put-rate");
+        final KafkaMetric metric = metric("test.0_0.put-rate");
         assertTrue(((Double) metric.metricValue()) > 0);
         EasyMock.verify(inner);
     }
@@ -115,7 +115,7 @@ public class MeteredSessionStoreTest {
         assertFalse(iterator.hasNext());
         iterator.close();
 
-        final KafkaMetric metric = metric("fetch-rate");
+        final KafkaMetric metric = metric("test.0_0.fetch-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -132,7 +132,7 @@ public class MeteredSessionStoreTest {
         assertFalse(iterator.hasNext());
         iterator.close();
 
-        final KafkaMetric metric = metric("fetch-rate");
+        final KafkaMetric metric = metric("test.0_0.fetch-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -146,7 +146,7 @@ public class MeteredSessionStoreTest {
 
         metered.remove(new Windowed<>(key, new SessionWindow(0, 0)));
 
-        final KafkaMetric metric = metric("remove-rate");
+        final KafkaMetric metric = metric("test.0_0.remove-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -163,7 +163,7 @@ public class MeteredSessionStoreTest {
         assertFalse(iterator.hasNext());
         iterator.close();
 
-        final KafkaMetric metric = metric("fetch-rate");
+        final KafkaMetric metric = metric("test.0_0.fetch-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -180,7 +180,7 @@ public class MeteredSessionStoreTest {
         assertFalse(iterator.hasNext());
         iterator.close();
 
-        final KafkaMetric metric = metric("fetch-rate");
+        final KafkaMetric metric = metric("test.0_0.fetch-rate");
         assertTrue(metric.value() > 0);
         EasyMock.verify(inner);
     }
@@ -188,7 +188,7 @@ public class MeteredSessionStoreTest {
     @Test
     public void shouldRecordRestoreTimeOnInit() {
         init();
-        final KafkaMetric metric = metric("restore-rate");
+        final KafkaMetric metric = metric("test.0_0.restore-rate");
         assertTrue(metric.value() > 0);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 19bd523..7067777 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -89,8 +89,8 @@ public class MeteredWindowStoreTest {
         EasyMock.replay(innerStoreMock);
         store.init(context, store);
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.restore-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
     }
 
     @Test
@@ -103,8 +103,8 @@ public class MeteredWindowStoreTest {
         store.init(context, store);
         store.put("a", "a");
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.put-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
         EasyMock.verify(innerStoreMock);
     }
 
@@ -116,8 +116,8 @@ public class MeteredWindowStoreTest {
         store.init(context, store);
         store.fetch("a", 1, 1).close(); // recorded on close;
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
         EasyMock.verify(innerStoreMock);
     }
 
@@ -129,8 +129,8 @@ public class MeteredWindowStoreTest {
         store.init(context, store);
         store.fetch("a", "b", 1, 1).close(); // recorded on close;
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
         EasyMock.verify(innerStoreMock);
     }
 
@@ -144,8 +144,8 @@ public class MeteredWindowStoreTest {
         store.init(context, store);
         store.flush();
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
-        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.flush-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
         EasyMock.verify(innerStoreMock);
     }
 

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.