You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/05/12 00:19:16 UTC
[kafka] branch trunk updated: MINOR: Ensure sensor names are unique
in Kafka Streams (#5009)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9947cd4 MINOR: Ensure sensor names are unique in Kafka Streams (#5009)
9947cd4 is described below
commit 9947cd40c617d4e12690b81a2d28624355580e34
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri May 11 17:19:11 2018 -0700
MINOR: Ensure sensor names are unique in Kafka Streams (#5009)
Reviewer: Guozhang Wang <gu...@confluent.io>
---
.../org/apache/kafka/common/metrics/Metrics.java | 2 +
.../streams/processor/internals/ProcessorNode.java | 5 ++
.../internals/metrics/StreamsMetricsImpl.java | 29 ++++++++++
.../state/internals/InnerMeteredKeyValueStore.java | 61 +++++++++++-----------
.../state/internals/MeteredSessionStore.java | 28 +++++-----
.../state/internals/MeteredWindowStore.java | 24 ++++-----
.../processor/internals/ProcessorNodeTest.java | 28 +++++-----
.../internals/StreamsMetricsImplTest.java | 11 ++--
.../internals/MeteredKeyValueBytesStoreTest.java | 16 +++---
.../state/internals/MeteredSessionStoreTest.java | 14 ++---
.../state/internals/MeteredWindowStoreTest.java | 20 +++----
11 files changed, 139 insertions(+), 99 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 7a8667c..d456fed 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -531,6 +531,7 @@ public class Metrics implements Closeable {
log.error("Error when removing metric from " + reporter.getClass().getName(), e);
}
}
+ log.trace("Removed metric named {}", metricName);
}
return metric;
}
@@ -564,6 +565,7 @@ public class Metrics implements Closeable {
log.error("Error when registering metric on " + reporter.getClass().getName(), e);
}
}
+ log.trace("Registered metric named {}", metricName);
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index a0a7041..64ef538 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -186,6 +186,7 @@ public class ProcessorNode<K, V> {
// these are all latency metrics
this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(
+ context.taskId().toString(),
"processor-node",
processorNodeName,
"process",
@@ -193,6 +194,7 @@ public class ProcessorNode<K, V> {
"task-id", context.taskId().toString()
);
this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(
+ context.taskId().toString(),
"processor-node",
processorNodeName,
"punctuate",
@@ -200,6 +202,7 @@ public class ProcessorNode<K, V> {
"task-id", context.taskId().toString()
);
this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(
+ context.taskId().toString(),
"processor-node",
processorNodeName,
"create",
@@ -207,6 +210,7 @@ public class ProcessorNode<K, V> {
"task-id", context.taskId().toString()
);
this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(
+ context.taskId().toString(),
"processor-node",
processorNodeName,
"destroy",
@@ -214,6 +218,7 @@ public class ProcessorNode<K, V> {
"task-id", context.taskId().toString()
);
this.sourceNodeForwardSensor = metrics.addThroughputSensor(
+ context.taskId().toString(),
"processor-node",
processorNodeName,
"forward",
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 0251265..bc2e150 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -247,6 +247,20 @@ public class StreamsMetricsImpl implements StreamsMetrics {
return sensor;
}
+ public Sensor addLatencyAndThroughputSensor(final String taskName,
+ final String scopeName,
+ final String entityName,
+ final String operationName,
+ final Sensor.RecordingLevel recordingLevel,
+ final String... tags) {
+ return addLatencyAndThroughputSensor(
+ scopeName,
+ entityName,
+ threadName + "." + taskName + "." + operationName,
+ recordingLevel,
+ tags);
+ }
+
/**
* @throws IllegalArgumentException if tags is not constructed in key-value pairs
*/
@@ -272,6 +286,21 @@ public class StreamsMetricsImpl implements StreamsMetrics {
return sensor;
}
+ public Sensor addThroughputSensor(final String taskName,
+ final String scopeName,
+ final String entityName,
+ final String operationName,
+ final Sensor.RecordingLevel recordingLevel,
+ final String... tags) {
+ return addThroughputSensor(
+ scopeName,
+ entityName,
+ threadName + "." + taskName + "." + operationName,
+ recordingLevel,
+ tags
+ );
+ }
+
private void addLatencyMetrics(final String scopeName, final Sensor sensor, final String opName, final Map<String, String> tags) {
sensor.add(
metrics.metricName(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
index 40e2d43..14464e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -49,7 +49,7 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
private Sensor allTime;
private Sensor rangeTime;
private Sensor flushTime;
- private StreamsMetrics metrics;
+ private StreamsMetricsImpl metrics;
private ProcessorContext context;
private StateStore root;
@@ -89,63 +89,64 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
public void init(ProcessorContext context, StateStore root) {
final String name = name();
final String tagKey = "task-id";
- final String tagValue = context.taskId().toString();
+ final String taskName = context.taskId().toString();
this.context = context;
this.root = root;
- this.metrics = context.metrics();
- this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+ this.metrics = (StreamsMetricsImpl) context.metrics();
+ this.putTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+ metricScope,
name,
"put",
Sensor.RecordingLevel.DEBUG,
- tagKey, tagValue);
- this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+ tagKey, taskName);
+ this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+ metricScope,
name,
"put-if-absent",
Sensor.RecordingLevel.DEBUG,
- tagKey,
- tagValue);
- this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+ tagKey, taskName);
+ this.getTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+ metricScope,
name,
"get",
Sensor.RecordingLevel.DEBUG,
- tagKey,
- tagValue);
- this.deleteTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+ tagKey, taskName);
+ this.deleteTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+ metricScope,
name,
"delete",
Sensor.RecordingLevel.DEBUG,
- tagKey,
- tagValue);
- this.putAllTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+ tagKey, taskName);
+ this.putAllTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+ metricScope,
name,
"put-all",
Sensor.RecordingLevel.DEBUG,
- tagKey,
- tagValue);
- this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+ tagKey, taskName);
+ this.allTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+ metricScope,
name,
"all",
Sensor.RecordingLevel.DEBUG,
- tagKey,
- tagValue);
- this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+ tagKey, taskName);
+ this.rangeTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+ metricScope,
name,
"range",
Sensor.RecordingLevel.DEBUG,
- tagKey,
- tagValue);
- this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+ tagKey, taskName);
+ this.flushTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+ metricScope,
name,
"flush",
Sensor.RecordingLevel.DEBUG,
- tagKey,
- tagValue);
- final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+ tagKey, taskName);
+ final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(taskName,
+ metricScope,
name,
"restore",
Sensor.RecordingLevel.DEBUG,
- tagKey,
- tagValue);
+ tagKey, taskName);
// register and possibly restore the state from the logs
if (restoreTime.shouldRecord()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 1a9ac20..5636219 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -20,11 +20,11 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
@@ -38,7 +38,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
private final Serde<V> valueSerde;
private final Time time;
private StateSerdes<K, V> serdes;
- private StreamsMetrics metrics;
+ private StreamsMetricsImpl metrics;
private Sensor putTime;
private Sensor fetchTime;
private Sensor flushTime;
@@ -65,19 +65,19 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
final String tagKey = "task-id";
- final String tagValue = context.taskId().toString();
- this.metrics = context.metrics();
- this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put",
- Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
- this.fetchTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "fetch",
- Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
- this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "flush",
- Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
- this.removeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "remove",
- Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+ final String taskName = context.taskId().toString();
+ this.metrics = (StreamsMetricsImpl) context.metrics();
+ this.putTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "put",
+ Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+ this.fetchTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "fetch",
+ Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+ this.flushTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "flush",
+ Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+ this.removeTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "remove",
+ Sensor.RecordingLevel.DEBUG, tagKey, taskName);
- final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "restore",
- Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+ final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "restore",
+ Sensor.RecordingLevel.DEBUG, tagKey, taskName);
// register and possibly restore the state from the logs
final long startNs = time.nanoseconds();
try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index b131db5..2487854 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -20,11 +20,11 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
@@ -37,7 +37,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
private final Time time;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
- private StreamsMetrics metrics;
+ private StreamsMetricsImpl metrics;
private Sensor putTime;
private Sensor fetchTime;
private Sensor flushTime;
@@ -65,16 +65,16 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
final String tagKey = "task-id";
- final String tagValue = context.taskId().toString();
- this.metrics = context.metrics();
- this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put",
- Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
- this.fetchTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "fetch",
- Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
- this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "flush",
- Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
- final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "restore",
- Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+ final String taskName = context.taskId().toString();
+ this.metrics = (StreamsMetricsImpl) context.metrics();
+ this.putTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "put",
+ Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+ this.fetchTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "fetch",
+ Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+ this.flushTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "flush",
+ Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+ final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "restore",
+ Sensor.RecordingLevel.DEBUG, tagKey, taskName);
// register and possibly restore the state from the logs
final long startNs = time.nanoseconds();
try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 0992063..6cab6e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -113,29 +113,29 @@ public class ProcessorNodeTest {
for (final String operation : latencyOperations) {
- assertNotNull(metrics.getSensor(operation));
+ assertNotNull(metrics.getSensor("name-mock.0_0." + operation));
}
- assertNotNull(metrics.getSensor(throughputOperation));
+ assertNotNull(metrics.getSensor("name-mock.0_0." + throughputOperation));
for (final String opName : latencyOperations) {
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-avg", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-max", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-rate", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-total", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-avg", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-max", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-rate", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-total", groupName, metricTags);
}
- assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName,
- "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
+ assertNotNull(metrics.metrics().get(metrics.metricName("mock.0_0." + throughputOperation + "-rate", groupName,
+ "The average number of occurrence of " + "mock.0_0." + throughputOperation + " operation per second.", metricTags)));
// test "all"
metricTags.put("processor-node-id", "all");
for (final String opName : latencyOperations) {
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-avg", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-max", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-rate", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-total", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-avg", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-latency-max", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-rate", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), "mock.0_0." + opName + "-total", groupName, metricTags);
}
- assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName,
- "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
+ assertNotNull(metrics.metrics().get(metrics.metricName("mock.0_0." + throughputOperation + "-rate", groupName,
+ "The average number of occurrence of " + "mock.0_0." + throughputOperation + " operation per second.", metricTags)));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index b065e2c..a72dc79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -40,6 +40,7 @@ public class StreamsMetricsImplTest {
@Test
public void testRemoveSensor() {
final String sensorName = "sensor1";
+ final String taskName = "task";
final String scope = "scope";
final String entity = "entity";
final String operation = "put";
@@ -51,10 +52,10 @@ public class StreamsMetricsImplTest {
final Sensor sensor1a = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG, sensor1);
streamsMetrics.removeSensor(sensor1a);
- final Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+ final Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(taskName, scope, entity, operation, Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor2);
- final Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+ final Sensor sensor3 = streamsMetrics.addThroughputSensor(taskName, scope, entity, operation, Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor3);
}
@@ -63,11 +64,12 @@ public class StreamsMetricsImplTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
final int defaultMetrics = streamsMetrics.metrics().size();
+ final String taskName = "task";
final String scope = "scope";
final String entity = "entity";
final String operation = "put";
- final Sensor sensor1 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+ final Sensor sensor1 = streamsMetrics.addLatencyAndThroughputSensor(taskName, scope, entity, operation, Sensor.RecordingLevel.DEBUG);
// 2 meters and 4 non-meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
@@ -83,11 +85,12 @@ public class StreamsMetricsImplTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
final int defaultMetrics = streamsMetrics.metrics().size();
+ final String taskName = "task";
final String scope = "scope";
final String entity = "entity";
final String operation = "put";
- final Sensor sensor1 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+ final Sensor sensor1 = streamsMetrics.addThroughputSensor(taskName, scope, entity, operation, Sensor.RecordingLevel.DEBUG);
final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
// 2 meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
index a5e0d79..70bd2ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
@@ -100,7 +100,7 @@ public class MeteredKeyValueBytesStoreTest {
metered.put(key, value);
- final KafkaMetric metric = metric("put-rate");
+ final KafkaMetric metric = metric("test.0_0.put-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
@@ -114,7 +114,7 @@ public class MeteredKeyValueBytesStoreTest {
assertThat(metered.get(key), equalTo(value));
- final KafkaMetric metric = metric("get-rate");
+ final KafkaMetric metric = metric("test.0_0.get-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -127,7 +127,7 @@ public class MeteredKeyValueBytesStoreTest {
metered.putIfAbsent(key, value);
- final KafkaMetric metric = metric("put-if-absent-rate");
+ final KafkaMetric metric = metric("test.0_0.put-if-absent-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -146,7 +146,7 @@ public class MeteredKeyValueBytesStoreTest {
metered.putAll(Collections.singletonList(KeyValue.pair(key, value)));
- final KafkaMetric metric = metric("put-all-rate");
+ final KafkaMetric metric = metric("test.0_0.put-all-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -159,7 +159,7 @@ public class MeteredKeyValueBytesStoreTest {
metered.delete(key);
- final KafkaMetric metric = metric("delete-rate");
+ final KafkaMetric metric = metric("test.0_0.delete-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -176,7 +176,7 @@ public class MeteredKeyValueBytesStoreTest {
assertFalse(iterator.hasNext());
iterator.close();
- final KafkaMetric metric = metric("range-rate");
+ final KafkaMetric metric = metric("test.0_0.range-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -193,7 +193,7 @@ public class MeteredKeyValueBytesStoreTest {
assertFalse(iterator.hasNext());
iterator.close();
- final KafkaMetric metric = metric(new MetricName("all-rate", "stream-scope-metrics", "", tags));
+ final KafkaMetric metric = metric(new MetricName("test.0_0.all-rate", "stream-scope-metrics", "", tags));
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -206,7 +206,7 @@ public class MeteredKeyValueBytesStoreTest {
metered.flush();
- final KafkaMetric metric = metric("flush-rate");
+ final KafkaMetric metric = metric("test.0_0.flush-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 3bd190a..7ffa77a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -98,7 +98,7 @@ public class MeteredSessionStoreTest {
metered.put(new Windowed<>(key, new SessionWindow(0, 0)), key);
- final KafkaMetric metric = metric("put-rate");
+ final KafkaMetric metric = metric("test.0_0.put-rate");
assertTrue(((Double) metric.metricValue()) > 0);
EasyMock.verify(inner);
}
@@ -115,7 +115,7 @@ public class MeteredSessionStoreTest {
assertFalse(iterator.hasNext());
iterator.close();
- final KafkaMetric metric = metric("fetch-rate");
+ final KafkaMetric metric = metric("test.0_0.fetch-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -132,7 +132,7 @@ public class MeteredSessionStoreTest {
assertFalse(iterator.hasNext());
iterator.close();
- final KafkaMetric metric = metric("fetch-rate");
+ final KafkaMetric metric = metric("test.0_0.fetch-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -146,7 +146,7 @@ public class MeteredSessionStoreTest {
metered.remove(new Windowed<>(key, new SessionWindow(0, 0)));
- final KafkaMetric metric = metric("remove-rate");
+ final KafkaMetric metric = metric("test.0_0.remove-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -163,7 +163,7 @@ public class MeteredSessionStoreTest {
assertFalse(iterator.hasNext());
iterator.close();
- final KafkaMetric metric = metric("fetch-rate");
+ final KafkaMetric metric = metric("test.0_0.fetch-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -180,7 +180,7 @@ public class MeteredSessionStoreTest {
assertFalse(iterator.hasNext());
iterator.close();
- final KafkaMetric metric = metric("fetch-rate");
+ final KafkaMetric metric = metric("test.0_0.fetch-rate");
assertTrue(metric.value() > 0);
EasyMock.verify(inner);
}
@@ -188,7 +188,7 @@ public class MeteredSessionStoreTest {
@Test
public void shouldRecordRestoreTimeOnInit() {
init();
- final KafkaMetric metric = metric("restore-rate");
+ final KafkaMetric metric = metric("test.0_0.restore-rate");
assertTrue(metric.value() > 0);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 19bd523..7067777 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -89,8 +89,8 @@ public class MeteredWindowStoreTest {
EasyMock.replay(innerStoreMock);
store.init(context, store);
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.restore-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
}
@Test
@@ -103,8 +103,8 @@ public class MeteredWindowStoreTest {
store.init(context, store);
store.put("a", "a");
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.put-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
EasyMock.verify(innerStoreMock);
}
@@ -116,8 +116,8 @@ public class MeteredWindowStoreTest {
store.init(context, store);
store.fetch("a", 1, 1).close(); // recorded on close;
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
EasyMock.verify(innerStoreMock);
}
@@ -129,8 +129,8 @@ public class MeteredWindowStoreTest {
store.init(context, store);
store.fetch("a", "b", 1, 1).close(); // recorded on close;
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
EasyMock.verify(innerStoreMock);
}
@@ -144,8 +144,8 @@ public class MeteredWindowStoreTest {
store.init(context, store);
store.flush();
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
- assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "test.0_0.flush-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
EasyMock.verify(innerStoreMock);
}
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.