You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/27 20:57:13 UTC
[kafka] branch trunk updated: KAFKA-7223: Suppression Buffer
Metrics (#5795)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 55c77eb KAFKA-7223: Suppression Buffer Metrics (#5795)
55c77eb is described below
commit 55c77ebf01ea8662b98f73f6f6c17d05163a85b8
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Tue Nov 27 14:57:04 2018 -0600
KAFKA-7223: Suppression Buffer Metrics (#5795)
Add the final batch of metrics from KIP-328
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../streams/kstream/internals/metrics/Sensors.java | 47 ++++-
.../suppress/KTableSuppressProcessor.java | 8 +-
.../streams/processor/internals/ProcessorNode.java | 21 +--
.../internals/metrics/StreamsMetricsImpl.java | 3 +
.../InMemoryTimeOrderedKeyValueBuffer.java | 26 ++-
.../streams/state/internals/metrics/Sensors.java | 69 ++++++-
.../KTableSuppressProcessorMetricsTest.java | 203 +++++++++++++++++++++
.../suppress/KTableSuppressProcessorTest.java | 13 +-
.../streams/processor/MockProcessorContext.java | 9 +-
9 files changed, 375 insertions(+), 24 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 5b0d8b5..12c4813 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -20,10 +20,17 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
public class Sensors {
private Sensors() {}
@@ -38,8 +45,8 @@ public class Sensors {
);
StreamsMetricsImpl.addInvocationRateAndCount(
sensor,
- "stream-processor-node-metrics",
- metrics.tagMap("task-id", context.taskId().toString(), "processor-node-id", context.currentNode().name()),
+ PROCESSOR_NODE_METRICS_GROUP,
+ metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, context.currentNode().name()),
"late-record-drop"
);
return sensor;
@@ -75,4 +82,40 @@ public class Sensors {
);
return sensor;
}
+
+ public static Sensor suppressionEmitSensor(final InternalProcessorContext context) {
+ final StreamsMetricsImpl metrics = context.metrics();
+
+ final Sensor sensor = metrics.nodeLevelSensor(
+ context.taskId().toString(),
+ context.currentNode().name(),
+ "suppression-emit",
+ Sensor.RecordingLevel.DEBUG
+ );
+
+ final Map<String, String> tags = metrics.tagMap(
+ "task-id", context.taskId().toString(),
+ PROCESSOR_NODE_ID_TAG, context.currentNode().name()
+ );
+
+ sensor.add(
+ new MetricName(
+ "suppression-emit-rate",
+ PROCESSOR_NODE_METRICS_GROUP,
+ "The average number of occurrence of suppression-emit operation per second.",
+ tags
+ ),
+ new Rate(TimeUnit.SECONDS, new Sum())
+ );
+ sensor.add(
+ new MetricName(
+ "suppression-emit-total",
+ PROCESSOR_NODE_METRICS_GROUP,
+ "The total number of occurrence of suppression-emit operations.",
+ tags
+ ),
+ new Total()
+ );
+ return sensor;
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 50e74a3..06d5004 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -16,12 +16,14 @@
*/
package org.apache.kafka.streams.kstream.internals.suppress;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.kstream.internals.metrics.Sensors;
import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -42,9 +44,10 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
private final BufferFullStrategy bufferFullStrategy;
private final boolean shouldSuppressTombstones;
private final String storeName;
+
private TimeOrderedKeyValueBuffer buffer;
private InternalProcessorContext internalProcessorContext;
-
+ private Sensor suppressionEmitSensor;
private Serde<K> keySerde;
private FullChangeSerde<V> valueSerde;
@@ -68,6 +71,8 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
@Override
public void init(final ProcessorContext context) {
internalProcessorContext = (InternalProcessorContext) context;
+ suppressionEmitSensor = Sensors.suppressionEmitSensor(internalProcessorContext);
+
keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde;
valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
buffer = Objects.requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName));
@@ -123,6 +128,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
try {
final K key = keySerde.deserializer().deserialize(null, toEmit.key.get());
internalProcessorContext.forward(key, value);
+ suppressionEmitSensor.record();
} finally {
internalProcessorContext.setRecordContext(prevRecordContext);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 8dc6417..8483791 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -31,6 +31,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
@@ -165,15 +167,13 @@ public class ProcessorNode<K, V> {
private NodeMetrics(final StreamsMetricsImpl metrics, final String processorNodeName, final ProcessorContext context) {
this.metrics = metrics;
- final String group = "stream-processor-node-metrics";
final String taskName = context.taskId().toString();
- final Map<String, String> tagMap = metrics.tagMap("task-id", context.taskId().toString(), "processor-node-id", processorNodeName);
- final Map<String, String> allTagMap = metrics.tagMap("task-id", context.taskId().toString(), "processor-node-id", "all");
+ final Map<String, String> tagMap = metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, processorNodeName);
+ final Map<String, String> allTagMap = metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, "all");
nodeProcessTimeSensor = createTaskAndNodeLatencyAndThroughputSensors(
"process",
metrics,
- group,
taskName,
processorNodeName,
allTagMap,
@@ -183,7 +183,6 @@ public class ProcessorNode<K, V> {
nodePunctuateTimeSensor = createTaskAndNodeLatencyAndThroughputSensors(
"punctuate",
metrics,
- group,
taskName,
processorNodeName,
allTagMap,
@@ -193,7 +192,6 @@ public class ProcessorNode<K, V> {
nodeCreationSensor = createTaskAndNodeLatencyAndThroughputSensors(
"create",
metrics,
- group,
taskName,
processorNodeName,
allTagMap,
@@ -204,7 +202,6 @@ public class ProcessorNode<K, V> {
nodeDestructionSensor = createTaskAndNodeLatencyAndThroughputSensors(
"destroy",
metrics,
- group,
taskName,
processorNodeName,
allTagMap,
@@ -214,7 +211,6 @@ public class ProcessorNode<K, V> {
sourceNodeForwardSensor = createTaskAndNodeLatencyAndThroughputSensors(
"forward",
metrics,
- group,
taskName,
processorNodeName,
allTagMap,
@@ -231,17 +227,16 @@ public class ProcessorNode<K, V> {
private static Sensor createTaskAndNodeLatencyAndThroughputSensors(final String operation,
final StreamsMetricsImpl metrics,
- final String group,
final String taskName,
final String processorNodeName,
final Map<String, String> taskTags,
final Map<String, String> nodeTags) {
final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
- addAvgMaxLatency(parent, group, taskTags, operation);
- addInvocationRateAndCount(parent, group, taskTags, operation);
+ addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+ addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
- addAvgMaxLatency(sensor, group, nodeTags, operation);
- addInvocationRateAndCount(sensor, group, nodeTags, operation);
+ addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+ addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
return sensor;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 1703112..8ec2711 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -51,6 +51,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
private static final String SENSOR_PREFIX_DELIMITER = ".";
private static final String SENSOR_NAME_DELIMITER = ".s.";
+ public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics";
+ public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
+
public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
this.threadName = threadName;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index d94f671..234ea05 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -17,17 +17,20 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.metrics.Sensors;
import java.nio.ByteBuffer;
import java.util.Collection;
@@ -57,6 +60,8 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
private long minTimestamp = Long.MAX_VALUE;
private RecordCollector collector;
private String changelogTopic;
+ private Sensor bufferSizeSensor;
+ private Sensor bufferCountSensor;
private volatile boolean open;
@@ -174,11 +179,16 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
@Override
public void init(final ProcessorContext context, final StateStore root) {
+ final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
+ bufferSizeSensor = Sensors.createBufferSizeSensor(this, internalProcessorContext);
+ bufferCountSensor = Sensors.createBufferCountSensor(this, internalProcessorContext);
+
context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
if (loggingEnabled) {
collector = ((RecordCollector.Supplier) context).recordCollector();
changelogTopic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
}
+ updateBufferMetrics();
open = true;
}
@@ -189,12 +199,13 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
@Override
public void close() {
+ open = false;
index.clear();
sortedMap.clear();
dirtyKeys.clear();
memBufferSize = 0;
minTimestamp = Long.MAX_VALUE;
- open = false;
+ updateBufferMetrics();
}
@Override
@@ -265,6 +276,7 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
);
}
}
+ updateBufferMetrics();
}
@@ -272,6 +284,7 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
public void evictWhile(final Supplier<Boolean> predicate,
final Consumer<KeyValue<Bytes, ContextualRecord>> callback) {
final Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = sortedMap.entrySet().iterator();
+ int evictions = 0;
if (predicate.get()) {
Map.Entry<BufferKey, ContextualRecord> next = null;
@@ -298,8 +311,13 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
next = null;
minTimestamp = Long.MAX_VALUE;
}
+
+ evictions++;
}
}
+ if (evictions > 0) {
+ updateBufferMetrics();
+ }
}
@Override
@@ -308,6 +326,7 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
final ContextualRecord value) {
cleanPut(time, key, value);
dirtyKeys.add(key);
+ updateBufferMetrics();
}
private void cleanPut(final long time, final Bytes key, final ContextualRecord value) {
@@ -355,4 +374,9 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
}
return size;
}
+
+ private void updateBufferMetrics() {
+ bufferSizeSensor.record(memBufferSize);
+ bufferCountSensor.record(index.size());
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
index fdbc7c8..13a39c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
@@ -16,7 +16,13 @@
*/
package org.apache.kafka.streams.state.internals.metrics;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import java.util.Map;
@@ -25,7 +31,6 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
public final class Sensors {
-
private Sensors() {}
public static Sensor createTaskAndStoreLatencyAndThroughputSensors(final Sensor.RecordingLevel level,
@@ -44,5 +49,67 @@ public final class Sensors {
addInvocationRateAndCount(sensor, metricsGroup, storeTags, operation);
return sensor;
}
+
+ public static Sensor createBufferSizeSensor(final StateStore store,
+ final InternalProcessorContext context) {
+ return getBufferSizeOrCountSensor(store, context, "size");
+ }
+
+ public static Sensor createBufferCountSensor(final StateStore store,
+ final InternalProcessorContext context) {
+ return getBufferSizeOrCountSensor(store, context, "count");
+ }
+
+ private static Sensor getBufferSizeOrCountSensor(final StateStore store,
+ final InternalProcessorContext context,
+ final String property) {
+ final StreamsMetricsImpl metrics = context.metrics();
+
+ final String sensorName = "suppression-buffer-" + property;
+
+ final Sensor sensor = metrics.storeLevelSensor(
+ context.taskId().toString(),
+ store.name(),
+ sensorName,
+ Sensor.RecordingLevel.DEBUG
+ );
+
+ final String metricsGroup = "stream-buffer-metrics";
+
+ final Map<String, String> tags = metrics.tagMap(
+ "task-id", context.taskId().toString(),
+ "buffer-id", store.name()
+ );
+
+ sensor.add(
+ new MetricName(
+ sensorName + "-current",
+ metricsGroup,
+ "The current " + property + " of buffered records.",
+ tags),
+ new Value()
+ );
+
+
+ sensor.add(
+ new MetricName(
+ sensorName + "-avg",
+ metricsGroup,
+ "The average " + property + " of buffered records.",
+ tags),
+ new Avg()
+ );
+
+ sensor.add(
+ new MetricName(
+ sensorName + "-max",
+ metricsGroup,
+ "The max " + property + " of buffered records.",
+ tags),
+ new Max()
+ );
+
+ return sensor;
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
new file mode 100644
index 0000000..986dc6f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.streams.kstream.Suppressed;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
+import org.apache.kafka.test.MockInternalProcessorContext;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Map;
+
+import static org.apache.kafka.common.serialization.Serdes.Long;
+import static org.apache.kafka.common.serialization.Serdes.String;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.core.Is.is;
+
+@SuppressWarnings("PointlessArithmeticExpression")
+public class KTableSuppressProcessorMetricsTest {
+ private static final long ARBITRARY_LONG = 5L;
+
+ private static final MetricName EVICTION_TOTAL_METRIC = new MetricName(
+ "suppression-emit-total",
+ "stream-processor-node-metrics",
+ "The total number of occurrence of suppression-emit operations.",
+ mkMap(
+ mkEntry("client-id", "mock-processor-context-virtual-thread"),
+ mkEntry("task-id", "0_0"),
+ mkEntry("processor-node-id", "testNode")
+ )
+ );
+
+ private static final MetricName EVICTION_RATE_METRIC = new MetricName(
+ "suppression-emit-rate",
+ "stream-processor-node-metrics",
+ "The average number of occurrence of suppression-emit operation per second.",
+ mkMap(
+ mkEntry("client-id", "mock-processor-context-virtual-thread"),
+ mkEntry("task-id", "0_0"),
+ mkEntry("processor-node-id", "testNode")
+ )
+ );
+
+ private static final MetricName BUFFER_SIZE_AVG_METRIC = new MetricName(
+ "suppression-buffer-size-avg",
+ "stream-buffer-metrics",
+ "The average size of buffered records.",
+ mkMap(
+ mkEntry("client-id", "mock-processor-context-virtual-thread"),
+ mkEntry("task-id", "0_0"),
+ mkEntry("buffer-id", "test-store")
+ )
+ );
+
+ private static final MetricName BUFFER_SIZE_CURRENT_METRIC = new MetricName(
+ "suppression-buffer-size-current",
+ "stream-buffer-metrics",
+ "The current size of buffered records.",
+ mkMap(
+ mkEntry("client-id", "mock-processor-context-virtual-thread"),
+ mkEntry("task-id", "0_0"),
+ mkEntry("buffer-id", "test-store")
+ )
+ );
+
+ private static final MetricName BUFFER_SIZE_MAX_METRIC = new MetricName(
+ "suppression-buffer-size-max",
+ "stream-buffer-metrics",
+ "The max size of buffered records.",
+ mkMap(
+ mkEntry("client-id", "mock-processor-context-virtual-thread"),
+ mkEntry("task-id", "0_0"),
+ mkEntry("buffer-id", "test-store")
+ )
+ );
+
+ private static final MetricName BUFFER_COUNT_AVG_METRIC = new MetricName(
+ "suppression-buffer-count-avg",
+ "stream-buffer-metrics",
+ "The average count of buffered records.",
+ mkMap(
+ mkEntry("client-id", "mock-processor-context-virtual-thread"),
+ mkEntry("task-id", "0_0"),
+ mkEntry("buffer-id", "test-store")
+ )
+ );
+
+ private static final MetricName BUFFER_COUNT_CURRENT_METRIC = new MetricName(
+ "suppression-buffer-count-current",
+ "stream-buffer-metrics",
+ "The current count of buffered records.",
+ mkMap(
+ mkEntry("client-id", "mock-processor-context-virtual-thread"),
+ mkEntry("task-id", "0_0"),
+ mkEntry("buffer-id", "test-store")
+ )
+ );
+
+ private static final MetricName BUFFER_COUNT_MAX_METRIC = new MetricName(
+ "suppression-buffer-count-max",
+ "stream-buffer-metrics",
+ "The max count of buffered records.",
+ mkMap(
+ mkEntry("client-id", "mock-processor-context-virtual-thread"),
+ mkEntry("task-id", "0_0"),
+ mkEntry("buffer-id", "test-store")
+ )
+ );
+
+ @Test
+ public void shouldRecordMetrics() {
+ final String storeName = "test-store";
+
+ final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName)
+ .withLoggingDisabled()
+ .build();
+
+ final KTableSuppressProcessor<String, Long> processor =
+ new KTableSuppressProcessor<>(
+ (SuppressedInternal<String>) Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)),
+ storeName,
+ String(),
+ new FullChangeSerde<>(Long())
+ );
+
+ final MockInternalProcessorContext context = new MockInternalProcessorContext();
+ context.setCurrentNode(new ProcessorNode("testNode"));
+
+ buffer.init(context, buffer);
+ processor.init(context);
+
+ final long timestamp = 100L;
+ context.setStreamTime(timestamp);
+ context.setRecordMetadata("", 0, 0L, null, timestamp);
+ final String key = "longKey";
+ final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+ processor.process(key, value);
+
+ {
+ final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+
+ verifyMetric(metrics, EVICTION_RATE_METRIC, is(0.0));
+ verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(0.0));
+ verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(25.5));
+ verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(51.0));
+ verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(51.0));
+ verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(0.5));
+ verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
+ verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(1.0));
+ }
+
+ context.setStreamTime(timestamp + 1);
+ context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
+ processor.process("key", value);
+
+ {
+ final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+
+ verifyMetric(metrics, EVICTION_RATE_METRIC, greaterThan(0.0));
+ verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(1.0));
+ verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(49.0));
+ verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(47.0));
+ verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(98.0));
+ verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(1.0));
+ verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
+ verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(2.0));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> void verifyMetric(final Map<MetricName, ? extends Metric> metrics,
+ final MetricName metricName,
+ final Matcher<T> matcher) {
+ assertThat(metrics.get(metricName).metricName().description(), is(metricName.description()));
+ assertThat((T) metrics.get(metricName).metricValue(), matcher);
+
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 002ace2..335fae1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -77,9 +77,16 @@ public class KTableSuppressProcessorTest {
.withLoggingDisabled()
.build();
final KTableSuppressProcessor<K, V> processor =
- new KTableSuppressProcessor<>(getImpl(suppressed), storeName, keySerde, new FullChangeSerde<>(valueSerde));
+ new KTableSuppressProcessor<>(
+ (SuppressedInternal<K>) suppressed,
+ storeName,
+ keySerde,
+ new FullChangeSerde<>(valueSerde)
+ );
final MockInternalProcessorContext context = new MockInternalProcessorContext();
+ context.setCurrentNode(new ProcessorNode("testNode"));
+
buffer.init(context, buffer);
processor.init(context);
@@ -461,10 +468,6 @@ public class KTableSuppressProcessorTest {
};
}
- private static <K> SuppressedInternal<K> getImpl(final Suppressed<K> suppressed) {
- return (SuppressedInternal<K>) suppressed;
- }
-
private <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K> rawType, final long windowSize) {
final Serde<K> kSerde = Serdes.serdeFrom(rawType);
return new Serdes.WrapperSerde<>(
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 88a7fe7..4c3a6b2 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -19,7 +19,9 @@ package org.apache.kafka.streams.processor;
import java.time.Duration;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.KeyValue;
@@ -208,7 +210,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
this.taskId = taskId;
this.config = streamsConfig;
this.stateDir = stateDir;
- this.metrics = new StreamsMetricsImpl(new Metrics(), "mock-processor-context-virtual-thread");
+ final MetricConfig metricConfig = new MetricConfig();
+ metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
+ this.metrics = new StreamsMetricsImpl(
+ new Metrics(metricConfig),
+ "mock-processor-context-virtual-thread"
+ );
}
@Override