You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/23 18:41:15 UTC
[kafka] branch trunk updated: KAFKA-6376;
refactor skip metrics in Kafka Streams
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ed51b2c KAFKA-6376; refactor skip metrics in Kafka Streams
ed51b2c is described below
commit ed51b2cdf5bdac210a6904bead1a2ca6e8411406
Author: John Roesler <jo...@confluent.io>
AuthorDate: Mon Apr 23 11:41:03 2018 -0700
KAFKA-6376; refactor skip metrics in Kafka Streams
* unify skipped records metering
* log warnings when things get skipped
* tighten up metrics usage a bit
### Testing strategy:
Unit testing of the metrics and the logs should be sufficient.
Author: John Roesler <jo...@confluent.io>
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #4812 from vvcephei/kip-274-streams-skip-metrics
---
build.gradle | 2 +
checkstyle/import-control.xml | 4 +
.../kstream/internals/KStreamAggregate.java | 12 +-
.../kstream/internals/KStreamKStreamJoin.java | 11 ++
.../internals/KStreamKTableJoinProcessor.java | 14 +-
.../streams/kstream/internals/KStreamReduce.java | 11 ++
.../internals/KStreamSessionWindowAggregate.java | 11 ++
.../kstream/internals/KStreamWindowAggregate.java | 11 ++
.../kstream/internals/KStreamWindowReduce.java | 11 ++
.../kstream/internals/KTableKTableInnerJoin.java | 11 ++
.../kstream/internals/KTableKTableLeftJoin.java | 11 ++
.../kstream/internals/KTableKTableOuterJoin.java | 11 ++
.../kstream/internals/KTableKTableRightJoin.java | 12 +-
.../streams/kstream/internals/KTableSource.java | 11 ++
.../internals/AbstractProcessorContext.java | 8 +-
.../internals/GlobalProcessorContextImpl.java | 4 +-
.../processor/internals/GlobalStateUpdateTask.java | 10 +-
.../processor/internals/GlobalStreamThread.java | 10 +-
.../internals/InternalProcessorContext.java | 4 +
.../processor/internals/ProcessorContextImpl.java | 4 +-
.../streams/processor/internals/ProcessorNode.java | 69 +++++---
.../processor/internals/RecordCollectorImpl.java | 15 +-
.../processor/internals/RecordDeserializer.java | 17 +-
.../streams/processor/internals/RecordQueue.java | 16 +-
.../streams/processor/internals/SinkNode.java | 2 +-
.../streams/processor/internals/SourceNode.java | 2 +-
.../processor/internals/StandbyContextImpl.java | 23 +--
.../streams/processor/internals/StandbyTask.java | 3 +-
.../streams/processor/internals/StreamTask.java | 97 ++++++------
.../streams/processor/internals/StreamThread.java | 175 +++++++++------------
.../metrics/StreamsMetricsConventions.java} | 24 ++-
.../{ => metrics}/StreamsMetricsImpl.java | 152 +++++++++---------
.../kafka/streams/state/internals/NamedCache.java | 26 +--
.../kstream/internals/KGroupedStreamImplTest.java | 39 +++++
.../kstream/internals/KStreamKStreamJoinTest.java | 34 ++++
.../kstream/internals/KStreamKTableJoinTest.java | 23 +++
...KStreamSessionWindowAggregateProcessorTest.java | 17 ++
.../internals/KStreamWindowAggregateTest.java | 33 ++++
.../kstream/internals/KStreamWindowReduceTest.java | 62 ++++++++
.../internals/KTableKTableInnerJoinTest.java | 27 ++++
.../internals/KTableKTableLeftJoinTest.java | 27 ++++
.../internals/KTableKTableOuterJoinTest.java | 27 ++++
.../internals/KTableKTableRightJoinTest.java | 53 +++++++
.../kstream/internals/KTableSourceTest.java | 20 +++
.../processor/internals/GlobalStateTaskTest.java | 6 +-
.../processor/internals/MockStreamsMetrics.java | 6 +-
.../processor/internals/PartitionGroupTest.java | 11 +-
.../processor/internals/ProcessorNodeTest.java | 33 ++--
.../processor/internals/RecordCollectorTest.java | 73 +++++++--
.../internals/RecordDeserializerTest.java | 11 +-
.../processor/internals/RecordQueueTest.java | 21 ++-
.../streams/processor/internals/SinkNodeTest.java | 4 +-
.../processor/internals/StreamTaskTest.java | 42 ++---
.../processor/internals/StreamThreadTest.java | 121 +++++++++++---
.../internals/StreamsMetricsImplTest.java | 42 ++---
.../internals/testutil/LogCaptureAppender.java | 66 ++++++++
.../streams/state/KeyValueStoreTestDriver.java | 9 +-
.../internals/MeteredKeyValueBytesStoreTest.java | 23 +--
.../state/internals/MeteredSessionStoreTest.java | 13 +-
.../state/internals/MeteredWindowStoreTest.java | 114 ++++++--------
.../streams/state/internals/NamedCacheTest.java | 35 ++---
.../state/internals/RocksDBWindowStoreTest.java | 8 +-
.../state/internals/StoreChangeLoggerTest.java | 3 +-
.../StreamThreadStateStoreProviderTest.java | 3 +-
.../kafka/test/InternalMockProcessorContext.java | 80 ++++++----
.../org/apache/kafka/test/KStreamTestDriver.java | 10 +-
.../org/apache/kafka/test/MockProcessorNode.java | 4 +-
.../java/org/apache/kafka/test/MockSourceNode.java | 4 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 4 +-
.../org/apache/kafka/test/StreamsTestUtils.java | 65 +++++++-
.../apache/kafka/streams/TopologyTestDriver.java | 30 +++-
.../streams/processor/MockProcessorContext.java | 5 +-
72 files changed, 1440 insertions(+), 572 deletions(-)
diff --git a/build.gradle b/build.gradle
index 8561eb4..51d9345 100644
--- a/build.gradle
+++ b/build.gradle
@@ -932,10 +932,12 @@ project(':streams') {
testCompile project(':clients').sourceSets.test.output
testCompile project(':core')
testCompile project(':core').sourceSets.test.output
+ testCompile libs.log4j
testCompile libs.junit
testCompile libs.easymock
testCompile libs.bcpkix
+ testRuntimeOnly project(':streams:test-utils')
testRuntime libs.slf4jlog4j
}
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 192d735..65f294f 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -249,6 +249,10 @@
<allow pkg="org.I0Itec.zkclient" />
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.zookeeper" />
+ <allow pkg="org.apache.zookeeper" />
+ <subpackage name="testutil">
+ <allow pkg="org.apache.log4j" />
+ </subpackage>
</subpackage>
</subpackage>
</subpackage>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 1d2d173..1170ff6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -21,10 +21,13 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> {
-
+ private static final Logger LOG = LoggerFactory.getLogger(KStreamAggregate.class);
private final String storeName;
private final Initializer<T> initializer;
private final Aggregator<? super K, ? super V, T> aggregator;
@@ -52,11 +55,13 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
private KeyValueStore<K, T> store;
private TupleForwarder<K, T> tupleForwarder;
+ private StreamsMetricsImpl metrics;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ metrics = (StreamsMetricsImpl) context.metrics();
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
}
@@ -66,6 +71,11 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
+ LOG.warn(
+ "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ key, value, context().topic(), context().partition(), context().offset()
+ );
+ metrics.skippedRecordsSensor().record();
return;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 6af4972..4c6998a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -21,11 +21,15 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
+ private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class);
private final String otherWindowName;
private final long joinBeforeMs;
@@ -50,11 +54,13 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
private WindowStore<K, V2> otherWindow;
+ private StreamsMetricsImpl metrics;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ metrics = (StreamsMetricsImpl) context.metrics();
otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
}
@@ -69,6 +75,11 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
// furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
if (key == null || value == null) {
+ LOG.warn(
+ "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ key, value, context().topic(), context().partition(), context().offset()
+ );
+ metrics.skippedRecordsSensor().record();
return;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index 8e8b86a..157c91b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -20,13 +20,18 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {
+ private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoinProcessor.class);
private final KTableValueGetter<K2, V2> valueGetter;
private final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper;
private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
private final boolean leftJoin;
+ private StreamsMetricsImpl metrics;
KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper,
@@ -41,6 +46,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ metrics = (StreamsMetricsImpl) context.metrics();
valueGetter.init(context);
}
@@ -54,7 +60,13 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
// an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
// furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
- if (key != null && value != null) {
+ if (key == null || value == null) {
+ LOG.warn(
+ "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ key, value, context().topic(), context().partition(), context().offset()
+ );
+ metrics.skippedRecordsSensor().record();
+ } else {
final K2 mappedKey = keyMapper.apply(key, value);
final V2 value2 = mappedKey == null ? null : valueGetter.get(mappedKey);
if (leftJoin || value2 != null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index bd0598a..fff9348 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -20,9 +20,13 @@ import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(KStreamReduce.class);
private final String storeName;
private final Reducer<V> reducer;
@@ -48,11 +52,13 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
private KeyValueStore<K, V> store;
private TupleForwarder<K, V> tupleForwarder;
+ private StreamsMetricsImpl metrics;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ metrics = (StreamsMetricsImpl) context.metrics();
store = (KeyValueStore<K, V>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
@@ -63,6 +69,11 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
+ LOG.warn(
+ "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ key, value, context().topic(), context().partition(), context().offset()
+ );
+ metrics.skippedRecordsSensor().record();
return;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index d96468a..114c685 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -26,13 +26,17 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
+ private static final Logger LOG = LoggerFactory.getLogger(KStreamSessionWindowAggregate.class);
private final String storeName;
private final SessionWindows windows;
@@ -68,11 +72,13 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl
private SessionStore<K, T> store;
private TupleForwarder<Windowed<K>, T> tupleForwarder;
+ private StreamsMetricsImpl metrics;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ metrics = (StreamsMetricsImpl) context.metrics();
store = (SessionStore<K, T>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
}
@@ -82,6 +88,11 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl
// if the key is null, we do not need proceed aggregating
// the record with the table
if (key == null) {
+ LOG.warn(
+ "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ value, context().topic(), context().partition(), context().offset()
+ );
+ metrics.skippedRecordsSensor().record();
return;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 6a94543..f4bd099 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -24,11 +24,15 @@ import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
+ private static final Logger LOG = LoggerFactory.getLogger(KStreamWindowAggregate.class);
private final String storeName;
private final Windows<W> windows;
@@ -61,11 +65,13 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
private WindowStore<K, T> windowStore;
private TupleForwarder<Windowed<K>, T> tupleForwarder;
+ private StreamsMetricsImpl metrics;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ metrics = (StreamsMetricsImpl) context.metrics();
windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues);
@@ -76,6 +82,11 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
// if the key is null, we do not need proceed aggregating the record
// the record with the table
if (key == null) {
+ LOG.warn(
+ "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ value, context().topic(), context().partition(), context().offset()
+ );
+ metrics.skippedRecordsSensor().record();
return;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 10c3d8a..32dfba3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -23,11 +23,15 @@ import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(KStreamWindowReduce.class);
private final String storeName;
private final Windows<W> windows;
@@ -57,11 +61,13 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
private WindowStore<K, V> windowStore;
private TupleForwarder<Windowed<K>, V> tupleForwarder;
+ private StreamsMetricsImpl metrics;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ metrics = (StreamsMetricsImpl) context.metrics();
windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues);
}
@@ -71,6 +77,11 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
// if the key is null, we do not need proceed aggregating
// the record with the table
if (key == null) {
+ LOG.warn(
+ "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ value, context().topic(), context().partition(), context().offset()
+ );
+ metrics.skippedRecordsSensor().record();
return;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index 78f3517..5a159ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -21,8 +21,12 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
+ private static final Logger LOG = LoggerFactory.getLogger(KTableKTableInnerJoin.class);
private final KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
@Override
@@ -62,6 +66,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
private class KTableKTableJoinProcessor extends AbstractProcessor<K, Change<V1>> {
private final KTableValueGetter<K, V2> valueGetter;
+ private StreamsMetricsImpl metrics;
KTableKTableJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter;
@@ -70,6 +75,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ metrics = (StreamsMetricsImpl) context.metrics();
valueGetter.init(context);
}
@@ -77,6 +83,11 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
public void process(final K key, final Change<V1> change) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
if (key == null) {
+ LOG.warn(
+ "Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ change, context().topic(), context().partition(), context().offset()
+ );
+ metrics.skippedRecordsSensor().record();
return;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 6b170da..a40ac5d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -20,8 +20,12 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
+ private static final Logger LOG = LoggerFactory.getLogger(KTableKTableLeftJoin.class);
KTableKTableLeftJoin(final KTableImpl<K, ?, V1> table1,
final KTableImpl<K, ?, V2> table2,
@@ -55,6 +59,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, Change<V1>> {
private final KTableValueGetter<K, V2> valueGetter;
+ private StreamsMetricsImpl metrics;
KTableKTableLeftJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter;
@@ -63,6 +68,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ metrics = (StreamsMetricsImpl) context.metrics();
valueGetter.init(context);
}
@@ -70,6 +76,11 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
public void process(final K key, final Change<V1> change) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
if (key == null) {
+ LOG.warn(
+ "Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ change, context().topic(), context().partition(), context().offset()
+ );
+ metrics.skippedRecordsSensor().record();
return;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 97f00e3..377e2d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -20,8 +20,12 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
+ private static final Logger LOG = LoggerFactory.getLogger(KTableKTableOuterJoin.class);
KTableKTableOuterJoin(final KTableImpl<K, ?, V1> table1,
final KTableImpl<K, ?, V2> table2,
@@ -54,6 +58,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, Change<V1>> {
private final KTableValueGetter<K, V2> valueGetter;
+ private StreamsMetricsImpl metrics;
KTableKTableOuterJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter;
@@ -62,6 +67,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ metrics = (StreamsMetricsImpl) context.metrics();
valueGetter.init(context);
}
@@ -69,6 +75,11 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
public void process(final K key, final Change<V1> change) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
if (key == null) {
+ LOG.warn(
+ "Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ change, context().topic(), context().partition(), context().offset()
+ );
+ metrics.skippedRecordsSensor().record();
return;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 61798cd..d3916dd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -20,9 +20,12 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
-
+ private static final Logger LOG = LoggerFactory.getLogger(KTableKTableRightJoin.class);
KTableKTableRightJoin(final KTableImpl<K, ?, V1> table1,
final KTableImpl<K, ?, V2> table2,
@@ -55,6 +58,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, Change<V1>> {
private final KTableValueGetter<K, V2> valueGetter;
+ private StreamsMetricsImpl metrics;
KTableKTableRightJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter;
@@ -63,6 +67,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ metrics = (StreamsMetricsImpl) context.metrics();
valueGetter.init(context);
}
@@ -70,6 +75,11 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
public void process(final K key, final Change<V1> change) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
if (key == null) {
+ LOG.warn(
+ "Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ change, context().topic(), context().partition(), context().offset()
+ );
+ metrics.skippedRecordsSensor().record();
return;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 273e255..85ceaae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -20,9 +20,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);
public final String storeName;
@@ -45,11 +49,13 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
private KeyValueStore<K, V> store;
private TupleForwarder<K, V> tupleForwarder;
+ private StreamsMetricsImpl metrics;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ metrics = (StreamsMetricsImpl) context.metrics();
store = (KeyValueStore<K, V>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
}
@@ -58,6 +64,11 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
public void process(final K key, final V value) {
// if the key is null, then ignore the record
if (key == null) {
+ LOG.warn(
+ "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
+ context().topic(), context().partition(), context().offset()
+ );
+ metrics.skippedRecordsSensor().record();
return;
}
final V oldValue = store.get(key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 87408c6..fc3067e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -18,10 +18,10 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.File;
@@ -36,7 +36,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
private final TaskId taskId;
private final String applicationId;
private final StreamsConfig config;
- private final StreamsMetrics metrics;
+ private final StreamsMetricsImpl metrics;
private final Serde keySerde;
private final ThreadCache cache;
private final Serde valueSerde;
@@ -47,7 +47,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
public AbstractProcessorContext(final TaskId taskId,
final StreamsConfig config,
- final StreamsMetrics metrics,
+ final StreamsMetricsImpl metrics,
final StateManager stateManager,
final ThreadCache cache) {
this.taskId = taskId;
@@ -86,7 +86,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
}
@Override
- public StreamsMetrics metrics() {
+ public StreamsMetricsImpl metrics() {
return metrics;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 88d9f56..6bc4121 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -17,13 +17,13 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.util.List;
@@ -33,7 +33,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
public GlobalProcessorContextImpl(final StreamsConfig config,
final StateManager stateMgr,
- final StreamsMetrics metrics,
+ final StreamsMetricsImpl metrics,
final ThreadCache cache) {
super(new TaskId(-1, -1), config, metrics, stateMgr, cache);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index c455275..26bf493 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -63,7 +63,15 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
for (final String storeName : storeNames) {
final String sourceTopic = storeNameToTopic.get(storeName);
final SourceNode source = topology.source(sourceTopic);
- deserializers.put(sourceTopic, new RecordDeserializer(source, deserializationExceptionHandler, logContext));
+ deserializers.put(
+ sourceTopic,
+ new RecordDeserializer(
+ source,
+ deserializationExceptionHandler,
+ logContext,
+ processorContext.metrics().skippedRecordsSensor()
+ )
+ );
}
initTopology();
processorContext.initialized();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index bb8bc1d..af3c7db 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -26,16 +26,15 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -56,7 +55,7 @@ public class GlobalStreamThread extends Thread {
private final StateDirectory stateDirectory;
private final Time time;
private final ThreadCache cache;
- private final StreamsMetrics streamsMetrics;
+ private final StreamsMetricsImpl streamsMetrics;
private final ProcessorTopology topology;
private volatile StreamsException startupException;
@@ -186,7 +185,7 @@ public class GlobalStreamThread extends Thread {
this.topology = topology;
this.globalConsumer = globalConsumer;
this.stateDirectory = stateDirectory;
- this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId));
+ this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId);
this.logPrefix = String.format("global-stream-thread [%s] ", threadClientId);
this.logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());
@@ -298,6 +297,9 @@ public class GlobalStreamThread extends Thread {
} catch (final IOException e) {
log.error("Failed to close state maintainer due to the following error:", e);
}
+
+ streamsMetrics.removeOwnedSensors();
+
setState(DEAD);
log.info("Shutdown complete");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 25df826..0ebaf60 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
/**
@@ -26,6 +27,9 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
*/
public interface InternalProcessorContext extends ProcessorContext {
+ @Override
+ StreamsMetricsImpl metrics();
+
/**
* Returns the current {@link RecordContext}
* @return the current {@link RecordContext}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 44a25c1..178937f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
@@ -25,6 +24,7 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.util.List;
@@ -41,7 +41,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
final StreamsConfig config,
final RecordCollector collector,
final ProcessorStateManager stateMgr,
- final StreamsMetrics metrics,
+ final StreamsMetricsImpl metrics,
final ThreadCache cache) {
super(id, config, metrics, stateMgr, cache);
this.task = task;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 0411a37..0854b67 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -19,11 +19,11 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import java.util.ArrayList;
import java.util.HashMap;
@@ -106,11 +106,11 @@ public class ProcessorNode<K, V> {
childByName.put(child.name, child);
}
- public void init(final ProcessorContext context) {
+ public void init(final InternalProcessorContext context) {
this.context = context;
try {
nodeMetrics = new NodeMetrics(context.metrics(), name, context);
- nodeMetrics.metrics.measureLatencyNs(time, initDelegate, nodeMetrics.nodeCreationSensor);
+ runAndMeasureLatency(time, initDelegate, nodeMetrics.nodeCreationSensor);
} catch (final Exception e) {
throw new StreamsException(String.format("failed to initialize processor %s", name), e);
}
@@ -118,7 +118,7 @@ public class ProcessorNode<K, V> {
public void close() {
try {
- nodeMetrics.metrics.measureLatencyNs(time, closeDelegate, nodeMetrics.nodeDestructionSensor);
+ runAndMeasureLatency(time, closeDelegate, nodeMetrics.nodeDestructionSensor);
nodeMetrics.removeAllSensors();
} catch (final Exception e) {
throw new StreamsException(String.format("failed to close processor %s", name), e);
@@ -130,7 +130,7 @@ public class ProcessorNode<K, V> {
this.key = key;
this.value = value;
- this.nodeMetrics.metrics.measureLatencyNs(time, processDelegate, nodeMetrics.nodeProcessTimeSensor);
+ runAndMeasureLatency(time, processDelegate, nodeMetrics.nodeProcessTimeSensor);
}
public void punctuate(final long timestamp, final Punctuator punctuator) {
@@ -140,7 +140,7 @@ public class ProcessorNode<K, V> {
punctuator.punctuate(timestamp);
}
};
- this.nodeMetrics.metrics.measureLatencyNs(time, punctuateDelegate, nodeMetrics.nodePunctuateTimeSensor);
+ runAndMeasureLatency(time, punctuateDelegate, nodeMetrics.nodePunctuateTimeSensor);
}
/**
@@ -172,44 +172,53 @@ public class ProcessorNode<K, V> {
return nodeMetrics.sourceNodeForwardSensor;
}
- Sensor sourceNodeSkippedDueToDeserializationErrorSensor() {
- return nodeMetrics.sourceNodeSkippedDueToDeserializationError;
- }
-
private static final class NodeMetrics {
private final StreamsMetricsImpl metrics;
private final Sensor nodeProcessTimeSensor;
private final Sensor nodePunctuateTimeSensor;
private final Sensor sourceNodeForwardSensor;
- private final Sensor sourceNodeSkippedDueToDeserializationError;
private final Sensor nodeCreationSensor;
private final Sensor nodeDestructionSensor;
- private NodeMetrics(final StreamsMetrics metrics, final String name, final ProcessorContext context) {
- final String scope = "processor-node";
- final String tagKey = "task-id";
- final String tagValue = context.taskId().toString();
- this.metrics = (StreamsMetricsImpl) metrics;
+ private NodeMetrics(final StreamsMetricsImpl metrics, final String processorNodeName, final ProcessorContext context) {
+ this.metrics = metrics;
// these are all latency metrics
this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(
- scope, name, "process", Sensor.RecordingLevel.DEBUG, tagKey, tagValue
+ "processor-node",
+ processorNodeName,
+ "process",
+ Sensor.RecordingLevel.DEBUG,
+ "task-id", context.taskId().toString()
);
this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(
- scope, name, "punctuate", Sensor.RecordingLevel.DEBUG, tagKey, tagValue
+ "processor-node",
+ processorNodeName,
+ "punctuate",
+ Sensor.RecordingLevel.DEBUG,
+ "task-id", context.taskId().toString()
);
this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(
- scope, name, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue
+ "processor-node",
+ processorNodeName,
+ "create",
+ Sensor.RecordingLevel.DEBUG,
+ "task-id", context.taskId().toString()
);
this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(
- scope, name, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue
+ "processor-node",
+ processorNodeName,
+ "destroy",
+ Sensor.RecordingLevel.DEBUG,
+ "task-id", context.taskId().toString()
);
this.sourceNodeForwardSensor = metrics.addThroughputSensor(
- scope, name, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue
- );
- this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(
- scope, name, "skippedDueToDeserializationError", Sensor.RecordingLevel.DEBUG, tagKey, tagValue
+ "processor-node",
+ processorNodeName,
+ "forward",
+ Sensor.RecordingLevel.DEBUG,
+ "task-id", context.taskId().toString()
);
}
@@ -219,7 +228,17 @@ public class ProcessorNode<K, V> {
metrics.removeSensor(sourceNodeForwardSensor);
metrics.removeSensor(nodeCreationSensor);
metrics.removeSensor(nodeDestructionSensor);
- metrics.removeSensor(sourceNodeSkippedDueToDeserializationError);
+ }
+ }
+
+ private static void runAndMeasureLatency(final Time time, final Runnable action, final Sensor sensor) {
+ long startNs = -1;
+ if (sensor.shouldRecord()) {
+ startNs = time.nanoseconds();
+ }
+ action.run();
+ if (startNs != -1) {
+ sensor.record(time.nanoseconds() - startNs);
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 0d13758..8167539 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
@@ -51,24 +52,25 @@ public class RecordCollectorImpl implements RecordCollector {
private final Map<TopicPartition, Long> offsets;
private final String logPrefix;
private final ProductionExceptionHandler productionExceptionHandler;
+ private final Sensor skippedRecordsSensor;
private final static String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
"No more records will be sent and no more offsets will be recorded for this task.";
private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (key %s value %s timestamp %d) to topic %s due to %s";
private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.";
- private final static String HANDLER_CONTINUED_MESSAGE = "Error sending records (key {} value {} timestamp {}) to topic {} due to {}; " +
- "The exception handler chose to CONTINUE processing in spite of this error.";
private volatile KafkaException sendException;
public RecordCollectorImpl(final Producer<byte[], byte[]> producer,
final String streamTaskId,
final LogContext logContext,
- final ProductionExceptionHandler productionExceptionHandler) {
+ final ProductionExceptionHandler productionExceptionHandler,
+ final Sensor skippedRecordsSensor) {
this.producer = producer;
this.offsets = new HashMap<>();
this.logPrefix = String.format("task [%s] ", streamTaskId);
this.log = logContext.logger(getClass());
this.productionExceptionHandler = productionExceptionHandler;
+ this.skippedRecordsSensor = skippedRecordsSensor;
}
@Override
@@ -183,7 +185,12 @@ public class RecordCollectorImpl implements RecordCollector {
} else if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
recordSendError(key, value, timestamp, topic, exception);
} else {
- log.debug(HANDLER_CONTINUED_MESSAGE, key, value, timestamp, topic, exception);
+ log.warn(
+ "Error sending records (key=[{}] value=[{}] timestamp=[{}]) to topic=[{}] and partition=[{}]; " +
+ "The exception handler chose to CONTINUE processing in spite of this error.",
+ key, value, timestamp, topic, partition, exception
+ );
+ skippedRecordsSensor.record();
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 47591d1..36e2c9a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
@@ -30,13 +31,16 @@ class RecordDeserializer {
private final SourceNode sourceNode;
private final DeserializationExceptionHandler deserializationExceptionHandler;
private final Logger log;
+ private final Sensor skippedRecordSensor;
RecordDeserializer(final SourceNode sourceNode,
final DeserializationExceptionHandler deserializationExceptionHandler,
- final LogContext logContext) {
+ final LogContext logContext,
+ final Sensor skippedRecordsSensor) {
this.sourceNode = sourceNode;
this.deserializationExceptionHandler = deserializationExceptionHandler;
this.log = logContext.logger(RecordDeserializer.class);
+ this.skippedRecordSensor = skippedRecordsSensor;
}
/**
@@ -79,10 +83,17 @@ class RecordDeserializer {
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
deserializationException);
} else {
- sourceNode.sourceNodeSkippedDueToDeserializationErrorSensor().record();
+ log.warn(
+ "Skipping record due to deserialization error. topic=[{}] partition=[{}] offset=[{}]",
+ rawRecord.topic(),
+ rawRecord.partition(),
+ rawRecord.offset(),
+ deserializationException
+ );
+ skippedRecordSensor.record();
+ return null;
}
}
- return null;
}
SourceNode sourceNode() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 85f3b72..22ef4d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import java.util.ArrayDeque;
@@ -49,14 +50,19 @@ public class RecordQueue {
final SourceNode source,
final TimestampExtractor timestampExtractor,
final DeserializationExceptionHandler deserializationExceptionHandler,
- final ProcessorContext processorContext,
+ final InternalProcessorContext processorContext,
final LogContext logContext) {
this.partition = partition;
this.source = source;
this.timestampExtractor = timestampExtractor;
this.fifoQueue = new ArrayDeque<>();
this.timeTracker = new MinTimestampTracker<>();
- this.recordDeserializer = new RecordDeserializer(source, deserializationExceptionHandler, logContext);
+ this.recordDeserializer = new RecordDeserializer(
+ source,
+ deserializationExceptionHandler,
+ logContext,
+ processorContext.metrics().skippedRecordsSensor()
+ );
this.processorContext = processorContext;
this.log = logContext.logger(RecordQueue.class);
}
@@ -90,6 +96,7 @@ public class RecordQueue {
final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(processorContext, rawRecord);
if (record == null) {
+ // this only happens if the deserializer decides to skip. It has already logged the reason.
continue;
}
@@ -107,6 +114,11 @@ public class RecordQueue {
// drop message if TS is invalid, i.e., negative
if (timestamp < 0) {
+ log.warn(
+ "Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]",
+ record.topic(), record.partition(), record.offset(), timestamp, timestampExtractor.getClass().getCanonicalName()
+ );
+ ((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record();
continue;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 00e5dca..0fbd6dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -54,7 +54,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
@SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext context) {
+ public void init(final InternalProcessorContext context) {
super.init(context);
this.context = context;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 8df3998..4aaba9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -65,7 +65,7 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
@SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext context) {
+ public void init(final InternalProcessorContext context) {
super.init(context);
this.context = context;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 360c4ab..ef4585a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@@ -28,6 +27,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.util.Collections;
@@ -70,15 +70,18 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
StandbyContextImpl(final TaskId id,
final StreamsConfig config,
final ProcessorStateManager stateMgr,
- final StreamsMetrics metrics) {
- super(id,
- config,
- metrics,
- stateMgr,
- new ThreadCache(
- new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())),
- 0,
- metrics));
+ final StreamsMetricsImpl metrics) {
+ super(
+ id,
+ config,
+ metrics,
+ stateMgr,
+ new ThreadCache(
+ new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())),
+ 0,
+ metrics
+ )
+ );
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index a048407..ddf84fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import java.util.Collection;
import java.util.Collections;
@@ -53,7 +54,7 @@ public class StandbyTask extends AbstractTask {
final Consumer<byte[], byte[]> consumer,
final ChangelogReader changelogReader,
final StreamsConfig config,
- final StreamsMetrics metrics,
+ final StreamsMetricsImpl metrics,
final StateDirectory stateDirectory) {
super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f7816d2..d9515b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -25,10 +25,8 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
@@ -38,6 +36,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.util.Collection;
@@ -68,16 +67,16 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
private boolean commitOffsetNeeded = false;
private boolean transactionInFlight = false;
private final Time time;
- private final TaskMetrics metrics;
+ private final TaskMetrics taskMetrics;
- protected static class TaskMetrics {
+ protected static final class TaskMetrics {
final StreamsMetricsImpl metrics;
final Sensor taskCommitTimeSensor;
- TaskMetrics(final TaskId id, final StreamsMetrics metrics) {
+ TaskMetrics(final TaskId id, final StreamsMetricsImpl metrics) {
final String name = id.toString();
- this.metrics = (StreamsMetricsImpl) metrics;
+ this.metrics = metrics;
taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit", Sensor.RecordingLevel.DEBUG);
}
@@ -86,41 +85,51 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
}
- /**
- * Create {@link StreamTask} with its assigned partitions
- *
- * @param id the ID of this task
- * @param partitions the collection of assigned {@link TopicPartition}
- * @param topology the instance of {@link ProcessorTopology}
- * @param consumer the instance of {@link Consumer}
- * @param changelogReader the instance of {@link ChangelogReader} used for restoring state
- * @param config the {@link StreamsConfig} specified by the user
- * @param metrics the {@link StreamsMetrics} created by the thread
- * @param stateDirectory the {@link StateDirectory} created by the thread
- * @param cache the {@link ThreadCache} created by the thread
- * @param time the system {@link Time} of the thread
- * @param producer the instance of {@link Producer} used to produce records
- */
public StreamTask(final TaskId id,
final Collection<TopicPartition> partitions,
final ProcessorTopology topology,
final Consumer<byte[], byte[]> consumer,
final ChangelogReader changelogReader,
final StreamsConfig config,
- final StreamsMetrics metrics,
+ final StreamsMetricsImpl metrics,
final StateDirectory stateDirectory,
final ThreadCache cache,
final Time time,
final Producer<byte[], byte[]> producer) {
+ this(id, partitions, topology, consumer, changelogReader, config, metrics, stateDirectory, cache, time, producer, null);
+ }
+
+ public StreamTask(final TaskId id,
+ final Collection<TopicPartition> partitions,
+ final ProcessorTopology topology,
+ final Consumer<byte[], byte[]> consumer,
+ final ChangelogReader changelogReader,
+ final StreamsConfig config,
+ final StreamsMetricsImpl metrics,
+ final StateDirectory stateDirectory,
+ final ThreadCache cache,
+ final Time time,
+ final Producer<byte[], byte[]> producer,
+ final RecordCollector recordCollector) {
super(id, partitions, topology, consumer, changelogReader, false, stateDirectory, config);
this.time = time;
this.producer = producer;
- this.metrics = new TaskMetrics(id, metrics);
+ this.taskMetrics = new TaskMetrics(id, metrics);
final ProductionExceptionHandler productionExceptionHandler = config.defaultProductionExceptionHandler();
- recordCollector = createRecordCollector(logContext, productionExceptionHandler);
+ if (recordCollector == null) {
+ this.recordCollector = new RecordCollectorImpl(
+ producer,
+ id.toString(),
+ logContext,
+ productionExceptionHandler,
+ metrics.skippedRecordsSensor()
+ );
+ } else {
+ this.recordCollector = recordCollector;
+ }
streamTimePunctuationQueue = new PunctuationQueue();
systemTimePunctuationQueue = new PunctuationQueue();
maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
@@ -133,7 +142,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
// initialize the topology with its own context
- processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
+ processorContext = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, metrics, cache);
final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler();
@@ -312,22 +321,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
*/
// visible for testing
void commit(final boolean startNewTransaction) {
+ final long startNs = time.nanoseconds();
log.debug("Committing");
- metrics.metrics.measureLatencyNs(
- time,
- new Runnable() {
- @Override
- public void run() {
- flushState();
- if (!eosEnabled) {
- stateMgr.checkpoint(recordCollectorOffsets());
- }
- commitOffsets(startNewTransaction);
- }
- },
- metrics.taskCommitTimeSensor);
+
+ flushState();
+
+ if (!eosEnabled) {
+ stateMgr.checkpoint(recordCollectorOffsets());
+ }
+
+ commitOffsets(startNewTransaction);
commitRequested = false;
+
+ taskMetrics.taskCommitTimeSensor.record(time.nanoseconds() - startNs);
}
@Override
@@ -489,7 +496,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
try {
partitionGroup.close();
- metrics.removeAllSensors();
+ taskMetrics.removeAllSensors();
} finally {
if (eosEnabled) {
if (!clean) {
@@ -567,11 +574,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
*
* @param partition the partition
* @param records the records
- * @return the number of added records
*/
@SuppressWarnings("unchecked")
- public int addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
- final int oldQueueSize = partitionGroup.numBuffered(partition);
+ public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
final int newQueueSize = partitionGroup.addRawRecords(partition, records);
if (log.isTraceEnabled()) {
@@ -583,8 +588,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
if (newQueueSize > maxBufferedSize) {
consumer.pause(singleton(partition));
}
-
- return newQueueSize - oldQueueSize;
}
/**
@@ -692,10 +695,4 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
RecordCollector recordCollector() {
return recordCollector;
}
-
- // visible for testing only
- RecordCollector createRecordCollector(final LogContext logContext,
- final ProductionExceptionHandler productionExceptionHandler) {
- return new RecordCollectorImpl(producer, id.toString(), logContext, productionExceptionHandler);
- }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 32cafb4..39727be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -26,27 +26,25 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Meter;
-import org.apache.kafka.common.metrics.stats.SampledStat;
-import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
@@ -54,15 +52,19 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.singleton;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelSensorName;
public class StreamThread extends Thread {
@@ -322,9 +324,8 @@ public class StreamThread extends Thread {
final String applicationId;
final InternalTopologyBuilder builder;
final StreamsConfig config;
- final StreamsMetrics streamsMetrics;
+ final StreamsMetricsThreadImpl streamsMetrics;
final StateDirectory stateDirectory;
- final Sensor taskCreatedSensor;
final ChangelogReader storeChangelogReader;
final Time time;
final Logger log;
@@ -332,9 +333,8 @@ public class StreamThread extends Thread {
AbstractTaskCreator(final InternalTopologyBuilder builder,
final StreamsConfig config,
- final StreamsMetrics streamsMetrics,
+ final StreamsMetricsThreadImpl streamsMetrics,
final StateDirectory stateDirectory,
- final Sensor taskCreatedSensor,
final ChangelogReader storeChangelogReader,
final Time time,
final Logger log) {
@@ -343,7 +343,6 @@ public class StreamThread extends Thread {
this.config = config;
this.streamsMetrics = streamsMetrics;
this.stateDirectory = stateDirectory;
- this.taskCreatedSensor = taskCreatedSensor;
this.storeChangelogReader = storeChangelogReader;
this.time = time;
this.log = log;
@@ -386,9 +385,8 @@ public class StreamThread extends Thread {
TaskCreator(final InternalTopologyBuilder builder,
final StreamsConfig config,
- final StreamsMetrics streamsMetrics,
+ final StreamsMetricsThreadImpl streamsMetrics,
final StateDirectory stateDirectory,
- final Sensor taskCreatedSensor,
final ChangelogReader storeChangelogReader,
final ThreadCache cache,
final Time time,
@@ -401,7 +399,6 @@ public class StreamThread extends Thread {
config,
streamsMetrics,
stateDirectory,
- taskCreatedSensor,
storeChangelogReader,
time,
log);
@@ -415,7 +412,7 @@ public class StreamThread extends Thread {
StreamTask createTask(final Consumer<byte[], byte[]> consumer,
final TaskId taskId,
final Set<TopicPartition> partitions) {
- taskCreatedSensor.record();
+ streamsMetrics.taskCreatedSensor.record();
return new StreamTask(
taskId,
@@ -428,8 +425,8 @@ public class StreamThread extends Thread {
stateDirectory,
cache,
time,
- createProducer(taskId));
-
+ createProducer(taskId)
+ );
}
private Producer<byte[], byte[]> createProducer(final TaskId id) {
@@ -459,9 +456,8 @@ public class StreamThread extends Thread {
static class StandbyTaskCreator extends AbstractTaskCreator<StandbyTask> {
StandbyTaskCreator(final InternalTopologyBuilder builder,
final StreamsConfig config,
- final StreamsMetrics streamsMetrics,
+ final StreamsMetricsThreadImpl streamsMetrics,
final StateDirectory stateDirectory,
- final Sensor taskCreatedSensor,
final ChangelogReader storeChangelogReader,
final Time time,
final Logger log) {
@@ -470,7 +466,6 @@ public class StreamThread extends Thread {
config,
streamsMetrics,
stateDirectory,
- taskCreatedSensor,
storeChangelogReader,
time,
log);
@@ -480,7 +475,7 @@ public class StreamThread extends Thread {
StandbyTask createTask(final Consumer<byte[], byte[]> consumer,
final TaskId taskId,
final Set<TopicPartition> partitions) {
- taskCreatedSensor.record();
+ streamsMetrics.taskCreatedSensor.record();
final ProcessorTopology topology = builder.build(taskId.topicGroupId);
@@ -505,80 +500,67 @@ public class StreamThread extends Thread {
}
}
- /**
- * This class extends {@link StreamsMetricsImpl(Metrics, String, String, Map)} and
- * overrides one of its functions for efficiency
- */
static class StreamsMetricsThreadImpl extends StreamsMetricsImpl {
- final Sensor commitTimeSensor;
- final Sensor pollTimeSensor;
- final Sensor processTimeSensor;
- final Sensor punctuateTimeSensor;
- final Sensor taskCreatedSensor;
- final Sensor tasksClosedSensor;
- final Sensor skippedRecordsSensor;
-
- StreamsMetricsThreadImpl(final Metrics metrics, final String groupName, final String prefix, final Map<String, String> tags) {
- super(metrics, groupName, tags);
- commitTimeSensor = metrics.sensor(prefix + ".commit-latency", Sensor.RecordingLevel.INFO);
- commitTimeSensor.add(metrics.metricName("commit-latency-avg", this.groupName, "The average commit time in ms", this.tags), new Avg());
- commitTimeSensor.add(metrics.metricName("commit-latency-max", this.groupName, "The maximum commit time in ms", this.tags), new Max());
- commitTimeSensor.add(createMeter(metrics, new Count(), "commit", "commit calls"));
-
- pollTimeSensor = metrics.sensor(prefix + ".poll-latency", Sensor.RecordingLevel.INFO);
- pollTimeSensor.add(metrics.metricName("poll-latency-avg", this.groupName, "The average poll time in ms", this.tags), new Avg());
- pollTimeSensor.add(metrics.metricName("poll-latency-max", this.groupName, "The maximum poll time in ms", this.tags), new Max());
- pollTimeSensor.add(createMeter(metrics, new Count(), "poll", "record-poll calls"));
-
- processTimeSensor = metrics.sensor(prefix + ".process-latency", Sensor.RecordingLevel.INFO);
- processTimeSensor.add(metrics.metricName("process-latency-avg", this.groupName, "The average process time in ms", this.tags), new Avg());
- processTimeSensor.add(metrics.metricName("process-latency-max", this.groupName, "The maximum process time in ms", this.tags), new Max());
- processTimeSensor.add(createMeter(metrics, new Count(), "process", "process calls"));
-
- punctuateTimeSensor = metrics.sensor(prefix + ".punctuate-latency", Sensor.RecordingLevel.INFO);
- punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", this.groupName, "The average punctuate time in ms", this.tags), new Avg());
- punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", this.groupName, "The maximum punctuate time in ms", this.tags), new Max());
- punctuateTimeSensor.add(createMeter(metrics, new Count(), "punctuate", "punctuate calls"));
-
- taskCreatedSensor = metrics.sensor(prefix + ".task-created", Sensor.RecordingLevel.INFO);
- taskCreatedSensor.add(createMeter(metrics, new Count(), "task-created", "newly created tasks"));
-
- tasksClosedSensor = metrics.sensor(prefix + ".task-closed", Sensor.RecordingLevel.INFO);
- tasksClosedSensor.add(createMeter(metrics, new Count(), "task-closed", "closed tasks"));
-
- skippedRecordsSensor = metrics.sensor(prefix + ".skipped-records");
- skippedRecordsSensor.add(createMeter(metrics, new Sum(), "skipped-records", "skipped records"));
+ private final Sensor commitTimeSensor;
+ private final Sensor pollTimeSensor;
+ private final Sensor processTimeSensor;
+ private final Sensor punctuateTimeSensor;
+ private final Sensor taskCreatedSensor;
+ private final Sensor tasksClosedSensor;
+
+ private final Deque<String> ownedSensors = new LinkedList<>();
+
+ StreamsMetricsThreadImpl(final Metrics metrics, final String threadName) {
+ super(metrics, threadName);
+ final String groupName = "stream-metrics";
+
+ commitTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "commit-latency"), Sensor.RecordingLevel.INFO);
+ commitTimeSensor.add(metrics.metricName("commit-latency-avg", groupName, "The average commit time in ms", tags()), new Avg());
+ commitTimeSensor.add(metrics.metricName("commit-latency-max", groupName, "The maximum commit time in ms", tags()), new Max());
+ commitTimeSensor.add(metrics.metricName("commit-rate", groupName, "The average per-second number of commit calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+ commitTimeSensor.add(metrics.metricName("commit-total", groupName, "The total number of commit calls", tags()), new Count());
+ ownedSensors.push(commitTimeSensor.name());
+
+ pollTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "poll-latency"), Sensor.RecordingLevel.INFO);
+ pollTimeSensor.add(metrics.metricName("poll-latency-avg", groupName, "The average poll time in ms", tags()), new Avg());
+ pollTimeSensor.add(metrics.metricName("poll-latency-max", groupName, "The maximum poll time in ms", tags()), new Max());
+ pollTimeSensor.add(metrics.metricName("poll-rate", groupName, "The average per-second number of record-poll calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+ pollTimeSensor.add(metrics.metricName("poll-total", groupName, "The total number of record-poll calls", tags()), new Count());
+ ownedSensors.push(pollTimeSensor.name());
+
+ processTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "process-latency"), Sensor.RecordingLevel.INFO);
+ processTimeSensor.add(metrics.metricName("process-latency-avg", groupName, "The average process time in ms", tags()), new Avg());
+ processTimeSensor.add(metrics.metricName("process-latency-max", groupName, "The maximum process time in ms", tags()), new Max());
+ processTimeSensor.add(metrics.metricName("process-rate", groupName, "The average per-second number of process calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+ processTimeSensor.add(metrics.metricName("process-total", groupName, "The total number of process calls", tags()), new Count());
+ ownedSensors.push(processTimeSensor.name());
+
+ punctuateTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "punctuate-latency"), Sensor.RecordingLevel.INFO);
+ punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", groupName, "The average punctuate time in ms", tags()), new Avg());
+ punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", groupName, "The maximum punctuate time in ms", tags()), new Max());
+ punctuateTimeSensor.add(metrics.metricName("punctuate-rate", groupName, "The average per-second number of punctuate calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+ punctuateTimeSensor.add(metrics.metricName("punctuate-total", groupName, "The total number of punctuate calls", tags()), new Count());
+ ownedSensors.push(punctuateTimeSensor.name());
+
+ taskCreatedSensor = metrics.sensor(threadLevelSensorName(threadName, "task-created"), Sensor.RecordingLevel.INFO);
+ taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+ taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", tags()), new Total());
+ ownedSensors.push(taskCreatedSensor.name());
+
+ tasksClosedSensor = metrics.sensor(threadLevelSensorName(threadName, "task-closed"), Sensor.RecordingLevel.INFO);
+ tasksClosedSensor.add(metrics.metricName("task-closed-rate", groupName, "The average per-second number of closed tasks", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+ tasksClosedSensor.add(metrics.metricName("task-closed-total", groupName, "The total number of closed tasks", tags()), new Total());
+ ownedSensors.push(tasksClosedSensor.name());
}
- private Meter createMeter(final Metrics metrics,
- final SampledStat stat,
- final String baseName,
- final String descriptiveName) {
- final MetricName rateMetricName = metrics.metricName(
- baseName + "-rate",
- groupName,
- String.format("The average per-second number of %s", descriptiveName),
- tags
- );
- final MetricName totalMetricName = metrics.metricName(
- baseName + "-total",
- groupName,
- String.format("The total number of %s", descriptiveName),
- tags
- );
- return new Meter(stat, rateMetricName, totalMetricName);
- }
-
- void removeAllSensors() {
- removeSensor(commitTimeSensor);
- removeSensor(pollTimeSensor);
- removeSensor(processTimeSensor);
- removeSensor(punctuateTimeSensor);
- removeSensor(taskCreatedSensor);
- removeSensor(tasksClosedSensor);
- removeSensor(skippedRecordsSensor);
-
+ public void removeOwnedSensors() {
+ synchronized (ownedSensors) {
+ super.removeOwnedSensors();
+ while (!ownedSensors.isEmpty()) {
+ registry().removeSensor(ownedSensors.pop());
+ }
+ }
}
}
@@ -640,9 +622,8 @@ public class StreamThread extends Thread {
final StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(
metrics,
- "stream-metrics",
- "thread." + threadClientId,
- Collections.singletonMap("client-id", threadClientId));
+ threadClientId
+ );
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
@@ -651,7 +632,6 @@ public class StreamThread extends Thread {
config,
streamsMetrics,
stateDirectory,
- streamsMetrics.taskCreatedSensor,
changelogReader,
cache,
time,
@@ -664,7 +644,6 @@ public class StreamThread extends Thread {
config,
streamsMetrics,
stateDirectory,
- streamsMetrics.taskCreatedSensor,
changelogReader,
time,
log);
@@ -930,7 +909,6 @@ public class StreamThread extends Thread {
* @param records Records, can be null
*/
private void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) {
- int numAddedRecords = 0;
for (final TopicPartition partition : records.partitions()) {
final StreamTask task = taskManager.activeTask(partition);
@@ -941,9 +919,8 @@ public class StreamThread extends Thread {
throw new TaskMigratedException(task);
}
- numAddedRecords += task.addRecords(partition, records.records(partition));
+ task.addRecords(partition, records.records(partition));
}
- streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
}
/**
@@ -1188,7 +1165,7 @@ public class StreamThread extends Thread {
} catch (final Throwable e) {
log.error("Failed to close restore consumer due to the following error:", e);
}
- streamsMetrics.removeAllSensors();
+ streamsMetrics.removeOwnedSensors();
setState(State.DEAD);
log.info("Shutdown complete");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java
similarity index 52%
copy from streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java
index d374136..cfe206c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java
@@ -14,16 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor.internals.metrics;
-import org.apache.kafka.common.metrics.Metrics;
+import java.util.LinkedHashMap;
+import java.util.Map;
-import java.util.Collections;
+public final class StreamsMetricsConventions {
+ private StreamsMetricsConventions() {
+ }
-public class MockStreamsMetrics extends StreamsMetricsImpl {
+ public static String threadLevelSensorName(final String threadName, final String sensorName) {
+ return "thread." + threadName + "." + sensorName;
+ }
- public MockStreamsMetrics(final Metrics metrics) {
- super(metrics, "mock-stream-metrics",
- Collections.<String, String>emptyMap());
+ static Map<String, String> threadLevelTags(final String threadName, final Map<String, String> tags) {
+ if (tags.containsKey("client-id")) {
+ return tags;
+ } else {
+ final LinkedHashMap<String, String> newTags = new LinkedHashMap<>(tags);
+ newTags.put("client-id", threadName);
+ return newTags;
+ }
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
similarity index 64%
rename from streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
rename to streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 83b4f12..76a1d2b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -14,49 +14,62 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor.internals.metrics;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Meter;
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.streams.StreamsMetrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Deque;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
-public class StreamsMetricsImpl implements StreamsMetrics {
- private static final Logger log = LoggerFactory.getLogger(StreamsMetricsImpl.class);
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelSensorName;
- final Metrics metrics;
- final String groupName;
- final Map<String, String> tags;
+public class StreamsMetricsImpl implements StreamsMetrics {
+ private final Metrics metrics;
+ private final Map<String, String> tags;
private final Map<Sensor, Sensor> parentSensors;
+ private final Deque<String> ownedSensors = new LinkedList<>();
+ private final Sensor skippedRecordsSensor;
- public StreamsMetricsImpl(final Metrics metrics, final String groupName, final Map<String, String> tags) {
+ public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
this.metrics = metrics;
- this.groupName = groupName;
- this.tags = tags;
+ this.tags = StreamsMetricsConventions.threadLevelTags(threadName, Collections.<String, String>emptyMap());
this.parentSensors = new HashMap<>();
+
+ skippedRecordsSensor = metrics.sensor(threadLevelSensorName(threadName, "skipped-records"), Sensor.RecordingLevel.INFO);
+ skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", "stream-metrics", "The average per-second number of skipped records", tags), new Rate(TimeUnit.SECONDS, new Count()));
+ skippedRecordsSensor.add(metrics.metricName("skipped-records-total", "stream-metrics", "The total number of skipped records", tags), new Total());
+ ownedSensors.push(skippedRecordsSensor.name());
}
- public Metrics registry() {
+ public final Metrics registry() {
return metrics;
}
+ protected final Map<String, String> tags() {
+ return tags;
+ }
+
+ public final Sensor skippedRecordsSensor() {
+ return skippedRecordsSensor;
+ }
+
@Override
public Sensor addSensor(final String name, final Sensor.RecordingLevel recordingLevel) {
return metrics.sensor(name, recordingLevel);
@@ -131,11 +144,13 @@ public class StreamsMetricsImpl implements StreamsMetrics {
// first add the global operation metrics if not yet, with the global tags only
final Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
- addLatencyAndThroughputMetrics(scopeName, parent, operationName, allTagMap);
+ addLatencyMetrics(scopeName, parent, operationName, allTagMap);
+ addThroughputMetrics(scopeName, parent, operationName, allTagMap);
// add the operation metrics with additional tags
final Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
- addLatencyAndThroughputMetrics(scopeName, sensor, operationName, tagMap);
+ addLatencyMetrics(scopeName, sensor, operationName, tagMap);
+ addThroughputMetrics(scopeName, sensor, operationName, tagMap);
parentSensors.put(sensor, parent);
@@ -167,64 +182,44 @@ public class StreamsMetricsImpl implements StreamsMetrics {
return sensor;
}
- private void addLatencyAndThroughputMetrics(final String scopeName,
- final Sensor sensor,
- final String opName,
- final Map<String, String> tags) {
- maybeAddMetric(sensor, metrics.metricName(opName + "-latency-avg", groupNameFromScope(scopeName),
- "The average latency of " + opName + " operation.", tags), new Avg());
- maybeAddMetric(sensor, metrics.metricName(opName + "-latency-max", groupNameFromScope(scopeName),
- "The max latency of " + opName + " operation.", tags), new Max());
- addThroughputMetrics(scopeName, sensor, opName, tags);
- }
-
- private void addThroughputMetrics(final String scopeName,
- final Sensor sensor,
- final String opName,
- final Map<String, String> tags) {
- final MetricName rateMetricName = metrics.metricName(opName + "-rate", groupNameFromScope(scopeName),
- "The average number of occurrence of " + opName + " operation per second.", tags);
- final MetricName totalMetricName = metrics.metricName(opName + "-total", groupNameFromScope(scopeName),
- "The total number of occurrence of " + opName + " operations.", tags);
- if (!metrics.metrics().containsKey(rateMetricName) && !metrics.metrics().containsKey(totalMetricName)) {
- sensor.add(new Meter(new Count(), rateMetricName, totalMetricName));
- } else {
- log.trace("Trying to add metric twice: {} {}", rateMetricName, totalMetricName);
- }
- }
-
- /**
- * Register a metric on the sensor if it isn't already there.
- *
- * @param sensor The sensor on which to register the metric
- * @param name The name of the metric
- * @param stat The metric to track
- * @throws IllegalArgumentException if the same metric name is already in use elsewhere in the metrics
- */
- public void maybeAddMetric(final Sensor sensor, final MetricName name, final MeasurableStat stat) {
- sensor.add(name, stat);
- }
-
- /**
- * Helper function. Measure the latency of an action. This is equivalent to
- * startTs = time.nanoseconds()
- * action.run()
- * endTs = time.nanoseconds()
- * sensor.record(endTs - startTs)
- *
- * @param time Time object.
- * @param action Action to run.
- * @param sensor Sensor to record value.
- */
- void measureLatencyNs(final Time time, final Runnable action, final Sensor sensor) {
- long startNs = -1;
- if (sensor.shouldRecord()) {
- startNs = time.nanoseconds();
- }
- action.run();
- if (startNs != -1) {
- recordLatency(sensor, startNs, time.nanoseconds());
- }
+ private void addLatencyMetrics(final String scopeName, final Sensor sensor, final String opName, final Map<String, String> tags) {
+ sensor.add(
+ metrics.metricName(
+ opName + "-latency-avg",
+ groupNameFromScope(scopeName),
+ "The average latency of " + opName + " operation.", tags),
+ new Avg()
+ );
+ sensor.add(
+ metrics.metricName(
+ opName + "-latency-max",
+ groupNameFromScope(scopeName),
+ "The max latency of " + opName + " operation.",
+ tags
+ ),
+ new Max()
+ );
+ }
+
+ private void addThroughputMetrics(final String scopeName, final Sensor sensor, final String opName, final Map<String, String> tags) {
+ sensor.add(
+ metrics.metricName(
+ opName + "-rate",
+ groupNameFromScope(scopeName),
+ "The average number of occurrence of " + opName + " operation per second.",
+ tags
+ ),
+ new Rate(TimeUnit.SECONDS, new Count())
+ );
+ sensor.add(
+ metrics.metricName(
+ opName + "-total",
+ groupNameFromScope(scopeName),
+ "The total number of occurrence of " + opName + " operations.",
+ tags
+ ),
+ new Count()
+ );
}
/**
@@ -239,6 +234,13 @@ public class StreamsMetricsImpl implements StreamsMetrics {
if (parent != null) {
metrics.removeSensor(parent.name());
}
+ }
+ public void removeOwnedSensors() {
+ synchronized (ownedSensors) {
+ while (!ownedSensors.isEmpty()) {
+ metrics.removeSensor(ownedSensors.pop());
+ }
+ }
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 01ee6b1..de62a2d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -374,21 +374,21 @@ class NamedCache {
// add parent
final Sensor parent = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG);
- ((StreamsMetricsImpl) metrics).maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-avg", groupName,
- "The average cache hit ratio.", allMetricTags), new Avg());
- ((StreamsMetricsImpl) metrics).maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-min", groupName,
- "The minimum cache hit ratio.", allMetricTags), new Min());
- ((StreamsMetricsImpl) metrics).maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-max", groupName,
- "The maximum cache hit ratio.", allMetricTags), new Max());
+ parent.add(this.metrics.registry().metricName(opName + "-avg", groupName,
+ "The average cache hit ratio.", allMetricTags), new Avg());
+ parent.add(this.metrics.registry().metricName(opName + "-min", groupName,
+ "The minimum cache hit ratio.", allMetricTags), new Min());
+ parent.add(this.metrics.registry().metricName(opName + "-max", groupName,
+ "The maximum cache hit ratio.", allMetricTags), new Max());
// add child
hitRatioSensor = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG, parent);
- ((StreamsMetricsImpl) metrics).maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-avg", groupName,
- "The average cache hit ratio.", metricTags), new Avg());
- ((StreamsMetricsImpl) metrics).maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-min", groupName,
- "The minimum cache hit ratio.", metricTags), new Min());
- ((StreamsMetricsImpl) metrics).maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-max", groupName,
- "The maximum cache hit ratio.", metricTags), new Max());
+ hitRatioSensor.add(this.metrics.registry().metricName(opName + "-avg", groupName,
+ "The average cache hit ratio.", metricTags), new Avg());
+ hitRatioSensor.add(this.metrics.registry().metricName(opName + "-min", groupName,
+ "The minimum cache hit ratio.", metricTags), new Min());
+ hitRatioSensor.add(this.metrics.registry().metricName(opName + "-max", groupName,
+ "The maximum cache hit ratio.", metricTags), new Max());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 2b87b30..c74d0dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@@ -37,6 +39,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
@@ -55,9 +58,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
public class KGroupedStreamImplTest {
@@ -558,6 +564,19 @@ public class KGroupedStreamImplTest {
assertThat(count.get("3"), equalTo(2L));
}
+ @Test
+ public void shouldLogAndMeasureSkipsInAggregate() {
+ groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count").withKeySerde(Serdes.String()));
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ processData();
+ LogCaptureAppender.unregister(appender);
+
+ final Map<MetricName, ? extends Metric> metrics = driver.context().metrics().metrics();
+ assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
+ assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[-1] offset=[-1]"));
+ }
+
@SuppressWarnings("unchecked")
@Test
@@ -577,6 +596,26 @@ public class KGroupedStreamImplTest {
assertThat(reduced.get("3"), equalTo("E+F"));
}
+ @Test
+ public void shouldLogAndMeasureSkipsInReduce() {
+ groupedStream.reduce(
+ MockReducer.STRING_ADDER,
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("reduce")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String())
+ );
+
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ processData();
+ LogCaptureAppender.unregister(appender);
+
+ final Map<MetricName, ? extends Metric> metrics = driver.context().metrics().metrics();
+ assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
+ assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[-1] offset=[-1]"));
+ }
+
+
@SuppressWarnings("unchecked")
@Test
public void shouldAggregateAndMaterializeResults() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 83fee9b..4d2bce5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -24,7 +24,9 @@ import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -40,7 +42,10 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
public class KStreamKStreamJoinTest {
@@ -61,6 +66,35 @@ public class KStreamKStreamJoinTest {
}
@Test
+ public void shouldLogAndMeterOnSkippedRecordsWithNullValue() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KStream<String, Integer> left = builder.stream("left", Consumed.with(stringSerde, intSerde));
+ final KStream<String, Integer> right = builder.stream("right", Consumed.with(stringSerde, intSerde));
+
+ left.join(
+ right,
+ new ValueJoiner<Integer, Integer, Integer>() {
+ @Override
+ public Integer apply(final Integer value1, final Integer value2) {
+ return value1 + value2;
+ }
+ },
+ JoinWindows.of(100),
+ Joined.with(stringSerde, intSerde, intSerde)
+ );
+
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ driver.setUp(builder, stateDir);
+ driver.process("left", "A", null);
+ LogCaptureAppender.unregister(appender);
+
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[-1] offset=[-1]"));
+
+ assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ }
+
+ @Test
public void testJoin() {
final StreamsBuilder builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 851808c..1800385 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@@ -37,6 +38,9 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
public class KStreamKTableJoinTest {
@@ -181,4 +185,23 @@ public class KStreamKTableJoinTest {
processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
}
+ @Test
+ public void shouldLogAndMeterWhenSkippingNullLeftKey() {
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ driver.process(streamTopic, null, "A");
+ LogCaptureAppender.unregister(appender);
+
+ assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[null] value=[A] topic=[streamTopic] partition=[-1] offset=[-1]"));
+ }
+
+ @Test
+ public void shouldLogAndMeterWhenSkippingNullLeftValue() {
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ driver.process(streamTopic, 1, null);
+ LogCaptureAppender.unregister(appender);
+
+ assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[1] value=[null] topic=[streamTopic] partition=[-1] offset=[-1]"));
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 2acf859..301d448 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -27,6 +27,8 @@ import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
@@ -44,6 +46,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -300,4 +305,16 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
+ @Test
+ public void shouldLogAndMeterWhenSkippingNullKey() {
+ initStore(false);
+ processor.init(context);
+ context.setRecordContext(new ProcessorRecordContext(-1, -2, -3, "topic"));
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ processor.process(null, "1");
+ LogCaptureAppender.unregister(appender);
+
+ assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[-3] offset=[-2]"));
+ }
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 3d107c2..7082251 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -18,15 +18,20 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
@@ -39,6 +44,9 @@ import org.junit.Test;
import java.io.File;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
public class KStreamWindowAggregateTest {
@@ -292,4 +300,29 @@ public class KStreamWindowAggregateTest {
);
}
+ @Test
+ public void shouldLogAndMeterWhenSkippingNullKey() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+
+ final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(strSerde, strSerde));
+ stream1.groupByKey(Serialized.with(strSerde, strSerde))
+ .windowedBy(TimeWindows.of(10).advanceBy(5))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.<String, String>toStringInstance("+"),
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(strSerde)
+ );
+
+ driver.setUp(builder, stateDir);
+
+ setRecordContext(0, topic);
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ driver.process(topic, null, "1");
+ driver.flushState();
+ LogCaptureAppender.unregister(appender);
+
+ assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[-1] offset=[-1]"));
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
new file mode 100644
index 0000000..ee8ea14
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+public class KStreamWindowReduceTest {
+ @Test
+ public void shouldLogAndMeterOnNullKey() {
+ final KStreamTestDriver driver = new KStreamTestDriver();
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder
+ .stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+ .windowedBy(TimeWindows.of(500L))
+ .reduce(new Reducer<String>() {
+ @Override
+ public String apply(final String value1, final String value2) {
+ return value1 + "+" + value2;
+ }
+ });
+
+ driver.setUp(builder, TestUtils.tempDirectory(), 0);
+
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ driver.process("TOPIC", null, "asdf");
+ driver.flushState();
+ LogCaptureAppender.unregister(appender);
+
+ assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[asdf] topic=[TOPIC] partition=[-1] offset=[-1]"));
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 98d3e52..9f5603b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -23,6 +23,9 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@@ -37,6 +40,9 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -337,6 +343,27 @@ public class KTableKTableInnerJoinTest {
}
+ @Test
+ public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final Processor<String, Change<String>> join = new KTableKTableInnerJoin<>(
+ (KTableImpl<String, String, String>) builder.table("left", Consumed.with(stringSerde, stringSerde)),
+ (KTableImpl<String, String, String>) builder.table("right", Consumed.with(stringSerde, stringSerde)),
+ null
+ ).get();
+
+ final MockProcessorContext context = new MockProcessorContext();
+ context.setRecordMetadata("left", -1, -2, -3);
+ join.init(context);
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ join.process(null, new Change<>("new", "old"));
+ LogCaptureAppender.unregister(appender);
+
+ assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
+ }
+
private KeyValue<Integer, String> kv(final Integer key, final String value) {
return new KeyValue<>(key, value);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 7261ae0..6331b57 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -26,6 +26,9 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
@@ -43,6 +46,9 @@ import java.util.Locale;
import java.util.Random;
import java.util.Set;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -398,6 +404,27 @@ public class KTableKTableLeftJoinTest {
}
}
+ @Test
+ public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final Processor<String, Change<String>> join = new KTableKTableLeftJoin<>(
+ (KTableImpl<String, String, String>) builder.table("left", Consumed.with(stringSerde, stringSerde)),
+ (KTableImpl<String, String, String>) builder.table("right", Consumed.with(stringSerde, stringSerde)),
+ null
+ ).get();
+
+ final MockProcessorContext context = new MockProcessorContext();
+ context.setRecordMetadata("left", -1, -2, -3);
+ join.init(context);
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ join.process(null, new Change<>("new", "old"));
+ LogCaptureAppender.unregister(appender);
+
+ assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
+ }
+
private KeyValue<Integer, String> kv(final Integer key, final String value) {
return new KeyValue<>(key, value);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index e094591..16694d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -23,6 +23,9 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@@ -37,6 +40,9 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -342,6 +348,27 @@ public class KTableKTableOuterJoinTest {
proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
}
+ @Test
+ public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final Processor<String, Change<String>> join = new KTableKTableOuterJoin<>(
+ (KTableImpl<String, String, String>) builder.table("left", Consumed.with(stringSerde, stringSerde)),
+ (KTableImpl<String, String, String>) builder.table("right", Consumed.with(stringSerde, stringSerde)),
+ null
+ ).get();
+
+ final MockProcessorContext context = new MockProcessorContext();
+ context.setRecordMetadata("left", -1, -2, -3);
+ join.init(context);
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ join.process(null, new Change<>("new", "old"));
+ LogCaptureAppender.unregister(appender);
+
+ assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
+ }
+
private KeyValue<Integer, String> kv(final Integer key, final String value) {
return new KeyValue<>(key, value);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
new file mode 100644
index 0000000..67adf2b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.junit.Test;
+
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+public class KTableKTableRightJoinTest {
+ @Test
+ public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final Processor<String, Change<String>> join = new KTableKTableRightJoin<>(
+ (KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())),
+ (KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())),
+ null
+ ).get();
+
+ final MockProcessorContext context = new MockProcessorContext();
+ context.setRecordMetadata("left", -1, -2, -3);
+ join.init(context);
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ join.process(null, new Change<>("new", "old"));
+ LogCaptureAppender.unregister(appender);
+
+ assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 6ee1b1b..97c9c7f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
@@ -31,8 +32,11 @@ import org.junit.Test;
import java.io.File;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class KTableSourceTest {
@@ -74,6 +78,22 @@ public class KTableSourceTest {
}
@Test
+ public void kTableShouldLogAndMeterOnSkippedRecords() {
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+ final String topic = "topic";
+ streamsBuilder.table(topic, Consumed.with(stringSerde, intSerde));
+
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ driver.setUp(streamsBuilder, stateDir);
+ driver.process(topic, null, "value");
+ driver.flushState();
+ LogCaptureAppender.unregister(appender);
+
+ assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key. topic=[topic] partition=[-1] offset=[-1]"));
+ }
+
+ @Test
public void testValueGetter() {
final StreamsBuilder builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 924bc52..5637dab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -178,7 +178,8 @@ public class GlobalStateTaskTest {
context,
stateMgr,
new LogAndContinueExceptionHandler(),
- logContext);
+ logContext
+ );
final byte[] key = new LongSerializer().serialize(topic2, 1L);
final byte[] recordValue = new IntegerSerializer().serialize(topic2, 10);
@@ -192,7 +193,8 @@ public class GlobalStateTaskTest {
context,
stateMgr,
new LogAndContinueExceptionHandler(),
- logContext);
+ logContext
+ );
final byte[] key = new IntegerSerializer().serialize(topic2, 1);
final byte[] recordValue = new LongSerializer().serialize(topic2, 10L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
index d374136..bd35530 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
@@ -17,13 +17,11 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.metrics.Metrics;
-
-import java.util.Collections;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
public class MockStreamsMetrics extends StreamsMetricsImpl {
public MockStreamsMetrics(final Metrics metrics) {
- super(metrics, "mock-stream-metrics",
- Collections.<String, String>emptyMap());
+ super(metrics, "test");
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 5a182de..1d2f613 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.junit.Test;
@@ -49,15 +50,17 @@ public class PartitionGroupTest {
new MockSourceNode<>(topics, intDeserializer, intDeserializer),
timestampExtractor,
new LogAndContinueExceptionHandler(),
- null,
- logContext);
+ new InternalMockProcessorContext(),
+ logContext
+ );
private final RecordQueue queue2 = new RecordQueue(
partition2,
new MockSourceNode<>(topics, intDeserializer, intDeserializer),
timestampExtractor,
new LogAndContinueExceptionHandler(),
- null,
- logContext);
+ new InternalMockProcessorContext(),
+ logContext
+ );
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 146f3ff..1409d68 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.Collections;
@@ -93,19 +94,6 @@ public class ProcessorNodeTest {
}
}
- private void testSpecificMetrics(final Metrics metrics,
- @SuppressWarnings("SameParameterValue") final String groupName,
- final String opName,
- final Map<String, String> metricTags) {
- assertNotNull(metrics.metrics().get(metrics.metricName(opName + "-latency-avg", groupName,
- "The average latency of " + opName + " operation.", metricTags)));
- assertNotNull(metrics.metrics().get(metrics.metricName(opName + "-latency-max", groupName,
- "The max latency of " + opName + " operation.", metricTags)));
- assertNotNull(metrics.metrics().get(metrics.metricName(opName + "-rate", groupName,
- "The average number of occurrence of " + opName + " operation per second.", metricTags)));
-
- }
-
@Test
public void testMetrics() {
final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
@@ -113,7 +101,13 @@ public class ProcessorNodeTest {
final Metrics metrics = new Metrics();
final InternalMockProcessorContext context = new InternalMockProcessorContext(
anyStateSerde,
- new RecordCollectorImpl(null, null, new LogContext("processnode-test "), new DefaultProductionExceptionHandler()),
+ new RecordCollectorImpl(
+ null,
+ null,
+ new LogContext("processnode-test "),
+ new DefaultProductionExceptionHandler(),
+ metrics.sensor("skipped-records")
+ ),
metrics
);
final ProcessorNode<Object, Object> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
@@ -125,6 +119,7 @@ public class ProcessorNodeTest {
final Map<String, String> metricTags = new LinkedHashMap<>();
metricTags.put("processor-node-id", node.name());
metricTags.put("task-id", context.taskId().toString());
+ metricTags.put("client-id", "mock");
for (final String operation : latencyOperations) {
@@ -133,7 +128,10 @@ public class ProcessorNodeTest {
assertNotNull(metrics.getSensor(throughputOperation));
for (final String opName : latencyOperations) {
- testSpecificMetrics(metrics, groupName, opName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-avg", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-max", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-rate", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-total", groupName, metricTags);
}
assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName,
"The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
@@ -141,7 +139,10 @@ public class ProcessorNodeTest {
// test "all"
metricTags.put("processor-node-id", "all");
for (final String opName : latencyOperations) {
- testSpecificMetrics(metrics, groupName, opName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-avg", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-max", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-rate", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-total", groupName, metricTags);
}
assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName,
"The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index d877450..8a2f171 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -23,9 +23,13 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
@@ -33,6 +37,7 @@ import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.junit.Test;
import java.util.Arrays;
@@ -42,6 +47,7 @@ import java.util.Map;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class RecordCollectorTest {
@@ -75,7 +81,9 @@ public class RecordCollectorTest {
new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
"RecordCollectorTest-TestSpecificPartition",
new LogContext("RecordCollectorTest-TestSpecificPartition "),
- new DefaultProductionExceptionHandler());
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records")
+ );
collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
@@ -109,7 +117,9 @@ public class RecordCollectorTest {
new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
"RecordCollectorTest-TestStreamPartitioner",
new LogContext("RecordCollectorTest-TestStreamPartitioner "),
- new DefaultProductionExceptionHandler());
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records")
+ );
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner);
@@ -142,7 +152,8 @@ public class RecordCollectorTest {
},
"test",
logContext,
- new DefaultProductionExceptionHandler());
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records"));
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
@@ -160,7 +171,9 @@ public class RecordCollectorTest {
},
"test",
logContext,
- new DefaultProductionExceptionHandler());
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records"));
+
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
try {
@@ -182,7 +195,9 @@ public class RecordCollectorTest {
},
"test",
logContext,
- new AlwaysContinueProductionExceptionHandler());
+ new AlwaysContinueProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records"));
+
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
@@ -190,6 +205,32 @@ public class RecordCollectorTest {
@SuppressWarnings("unchecked")
@Test
+ public void shouldRecordSkippedMetricAndLogWarningIfSendFailsWithContinueExceptionHandler() {
+ final Metrics metrics = new Metrics();
+ final Sensor sensor = metrics.sensor("skipped-records");
+ final LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister();
+ final MetricName metricName = new MetricName("name", "group", "description", Collections.EMPTY_MAP);
+ sensor.add(metricName, new Sum());
+ final RecordCollector collector = new RecordCollectorImpl(
+ new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ @Override
+ public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+ callback.onCompletion(null, new Exception());
+ return null;
+ }
+ },
+ "test",
+ logContext,
+ new AlwaysContinueProductionExceptionHandler(),
+ sensor);
+ collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ assertEquals(1.0, metrics.metrics().get(metricName).metricValue());
+ assertTrue(logCaptureAppender.getMessages().contains("test Error sending records (key=[3] value=[0] timestamp=[null]) to topic=[topic1] and partition=[0]; The exception handler chose to CONTINUE processing in spite of this error."));
+ LogCaptureAppender.unregister(logCaptureAppender);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
public void shouldThrowStreamsExceptionOnFlushIfASendFailedWithDefaultExceptionHandler() {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@@ -201,7 +242,9 @@ public class RecordCollectorTest {
},
"test",
logContext,
- new DefaultProductionExceptionHandler());
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records"));
+
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
try {
@@ -223,7 +266,9 @@ public class RecordCollectorTest {
},
"test",
logContext,
- new AlwaysContinueProductionExceptionHandler());
+ new AlwaysContinueProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records"));
+
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.flush();
@@ -242,7 +287,9 @@ public class RecordCollectorTest {
},
"test",
logContext,
- new DefaultProductionExceptionHandler());
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records"));
+
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
try {
@@ -264,7 +311,9 @@ public class RecordCollectorTest {
},
"test",
logContext,
- new AlwaysContinueProductionExceptionHandler());
+ new AlwaysContinueProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records"));
+
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.close();
@@ -283,7 +332,8 @@ public class RecordCollectorTest {
},
"test",
logContext,
- new DefaultProductionExceptionHandler());
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records"));
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
@@ -300,7 +350,8 @@ public class RecordCollectorTest {
},
"test",
logContext,
- new AlwaysContinueProductionExceptionHandler());
+ new AlwaysContinueProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records"));
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index 7e90466..de8e17b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.junit.Test;
@@ -44,7 +45,15 @@ public class RecordDeserializerTest {
@Test
public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() {
final RecordDeserializer recordDeserializer = new RecordDeserializer(
- new TheSourceNode(false, false, "key", "value"), null, new LogContext());
+ new TheSourceNode(
+ false,
+ false,
+ "key", "value"
+ ),
+ null,
+ new LogContext(),
+ new Metrics().sensor("skipped-records")
+ );
final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(null, rawRecord);
assertEquals(rawRecord.topic(), record.topic());
assertEquals(rawRecord.partition(), record.partition());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 48be292..3ed9e3b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -54,8 +56,18 @@ public class RecordQueueTest {
private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
private final String[] topics = {"topic"};
- final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
- new RecordCollectorImpl(null, null, new LogContext("record-queue-test "), new DefaultProductionExceptionHandler()));
+ private final Sensor skippedRecordsSensor = new Metrics().sensor("skipped-records");
+
+ final InternalMockProcessorContext context = new InternalMockProcessorContext(
+ StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
+ new RecordCollectorImpl(
+ null,
+ null,
+ new LogContext("record-queue-test "),
+ new DefaultProductionExceptionHandler(),
+ skippedRecordsSensor
+ )
+ );
private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer);
private final RecordQueue queue = new RecordQueue(
new TopicPartition(topics[0], 1),
@@ -230,8 +242,9 @@ public class RecordQueueTest {
new MockSourceNode<>(topics, intDeserializer, intDeserializer),
new FailOnInvalidTimestamp(),
new LogAndContinueExceptionHandler(),
- null,
+ new InternalMockProcessorContext(),
new LogContext());
+
queue.addRawRecords(records);
}
@@ -245,7 +258,7 @@ public class RecordQueueTest {
new MockSourceNode<>(topics, intDeserializer, intDeserializer),
new LogAndSkipOnInvalidTimestamp(),
new LogAndContinueExceptionHandler(),
- null,
+ new InternalMockProcessorContext(),
new LogContext());
queue.addRawRecords(records);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index d005bbd..0013167 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
@@ -43,7 +44,8 @@ public class SinkNodeTest {
new MockProducer<>(true, anySerializer, anySerializer),
null,
new LogContext("sinknode-test "),
- new DefaultProductionExceptionHandler()
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records")
)
);
private final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 8d81b1f..28e0b46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
@@ -34,14 +35,13 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
@@ -125,7 +125,8 @@ public class StreamTaskTest {
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
private final Metrics metrics = new Metrics();
- private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(metrics);
+ private final Sensor skippedRecordsSensor = metrics.sensor("skipped-records");
+ private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(metrics);
private final TaskId taskId00 = new TaskId(0, 0);
private final MockTime time = new MockTime();
private final File baseDir = TestUtils.tempDirectory();
@@ -246,7 +247,7 @@ public class StreamTaskTest {
String.format(nameFormat, "commit"),
"stream-task-metrics",
String.format(descriptionFormat, "commit"),
- mkMap(mkEntry("task-id", taskId))
+ mkMap(mkEntry("task-id", taskId), mkEntry("client-id", "test"))
));
}
@@ -647,21 +648,26 @@ public class StreamTaskTest {
@Test
public void shouldFlushRecordCollectorOnFlushState() {
final AtomicBoolean flushed = new AtomicBoolean(false);
- final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
- final StreamTask streamTask = new StreamTask(taskId00, partitions, topology, consumer,
- changelogReader, createConfig(false), streamsMetrics, stateDirectory, null, time, producer) {
-
- @Override
- RecordCollector createRecordCollector(final LogContext logContext,
- final ProductionExceptionHandler exHandler) {
- return new NoOpRecordCollector() {
- @Override
- public void flush() {
- flushed.set(true);
- }
- };
+ final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(new Metrics());
+ final StreamTask streamTask = new StreamTask(
+ taskId00,
+ partitions,
+ topology,
+ consumer,
+ changelogReader,
+ createConfig(false),
+ streamsMetrics,
+ stateDirectory,
+ null,
+ time,
+ producer,
+ new NoOpRecordCollector() {
+ @Override
+ public void flush() {
+ flushed.set(true);
+ }
}
- };
+ );
streamTask.flushState();
assertTrue(flushed.get());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index b95507d..36a1bce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -31,11 +31,13 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
@@ -51,6 +53,7 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockStateRestoreListener;
@@ -79,6 +82,7 @@ import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
@@ -287,7 +291,7 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, String>emptyMap());
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -319,7 +323,7 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, String>emptyMap());
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -351,7 +355,7 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, String>emptyMap());
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -497,12 +501,7 @@ public class StreamThreadTest {
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
- metrics,
- "",
- "",
- Collections.<String, String>emptyMap()
- );
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -537,12 +536,7 @@ public class StreamThreadTest {
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
- metrics,
- "",
- "",
- Collections.<String, String>emptyMap()
- );
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -568,11 +562,7 @@ public class StreamThreadTest {
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
- metrics,
- "",
- "",
- Collections.<String, String>emptyMap());
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -783,7 +773,8 @@ public class StreamThreadTest {
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
restoreConsumer.updatePartitions(
"stream-thread-test-count-one-changelog",
- singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
+ singletonList(
+ new PartitionInfo("stream-thread-test-count-one-changelog",
0,
null,
new Node[0],
@@ -1127,7 +1118,54 @@ public class StreamThreadTest {
}
@Test
+ public void shouldRecordSkippedMetricForDeserializationException() {
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+ internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
+
+ final Properties config = configProps(false);
+ config.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
+ config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config), false);
+
+ thread.setState(StreamThread.State.RUNNING);
+ thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+
+ final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
+ thread.taskManager().setAssignmentMetadata(
+ Collections.singletonMap(
+ new TaskId(0, t1p1.partition()),
+ assignedPartitions),
+ Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+ final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+ mockConsumer.assign(Collections.singleton(t1p1));
+ mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+ thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+ thread.runOnce(-1);
+
+ final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
+ final MetricName skippedRateMetric = metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
+ assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
+ assertEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
+
+ long offset = -1;
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], "I am not an integer.".getBytes()));
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], "I am not an integer.".getBytes()));
+ thread.runOnce(-1);
+ assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
+ assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
+
+ LogCaptureAppender.unregister(appender);
+ final List<String> strings = appender.getMessages();
+ assertTrue(strings.contains("task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[0]"));
+ assertTrue(strings.contains("task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[1]"));
+ }
+
+ @Test
public void shouldReportSkippedRecordsForInvalidTimestamps() {
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
final Properties config = configProps(false);
@@ -1151,13 +1189,16 @@ public class StreamThreadTest {
thread.runOnce(-1);
final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
+ final MetricName skippedRateMetric = metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
+ assertEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
long offset = -1;
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
thread.runOnce(-1);
assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
+ assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
@@ -1165,11 +1206,46 @@ public class StreamThreadTest {
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
thread.runOnce(-1);
assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
+ assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
thread.runOnce(-1);
assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
+ assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
+
+ LogCaptureAppender.unregister(appender);
+ final List<String> strings = appender.getMessages();
+ assertTrue(strings.contains(
+ "task [0_1] Skipping record due to negative extracted timestamp. " +
+ "topic=[topic1] partition=[1] offset=[0] extractedTimestamp=[-1] " +
+ "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
+ ));
+ assertTrue(strings.contains(
+ "task [0_1] Skipping record due to negative extracted timestamp. " +
+ "topic=[topic1] partition=[1] offset=[1] extractedTimestamp=[-1] " +
+ "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
+ ));
+ assertTrue(strings.contains(
+ "task [0_1] Skipping record due to negative extracted timestamp. " +
+ "topic=[topic1] partition=[1] offset=[2] extractedTimestamp=[-1] " +
+ "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
+ ));
+ assertTrue(strings.contains(
+ "task [0_1] Skipping record due to negative extracted timestamp. " +
+ "topic=[topic1] partition=[1] offset=[3] extractedTimestamp=[-1] " +
+ "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
+ ));
+ assertTrue(strings.contains(
+ "task [0_1] Skipping record due to negative extracted timestamp. " +
+ "topic=[topic1] partition=[1] offset=[4] extractedTimestamp=[-1] " +
+ "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
+ ));
+ assertTrue(strings.contains(
+ "task [0_1] Skipping record due to negative extracted timestamp. " +
+ "topic=[topic1] partition=[1] offset=[5] extractedTimestamp=[-1] " +
+ "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
+ ));
}
private void assertThreadMetadataHasEmptyTasksWithState(final ThreadMetadata metadata,
@@ -1178,4 +1254,7 @@ public class StreamThreadTest {
assertTrue(metadata.activeTasks().isEmpty());
assertTrue(metadata.standbyTasks().isEmpty());
}
+
+
+
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index ee8abd0..b065e2c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -17,43 +17,33 @@
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.junit.Test;
-import java.util.HashMap;
-import java.util.Map;
-
import static org.junit.Assert.assertEquals;
public class StreamsMetricsImplTest {
@Test(expected = NullPointerException.class)
public void testNullMetrics() {
- final String groupName = "doesNotMatter";
- final Map<String, String> tags = new HashMap<>();
- new StreamsMetricsImpl(null, groupName, tags);
+ new StreamsMetricsImpl(null, "");
}
@Test(expected = NullPointerException.class)
public void testRemoveNullSensor() {
- final String groupName = "doesNotMatter";
- final Map<String, String> tags = new HashMap<>();
- final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags);
+ final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
streamsMetrics.removeSensor(null);
}
@Test
public void testRemoveSensor() {
- final String groupName = "doesNotMatter";
final String sensorName = "sensor1";
final String scope = "scope";
final String entity = "entity";
final String operation = "put";
- final Map<String, String> tags = new HashMap<>();
- final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags);
+ final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
final Sensor sensor1 = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor1);
@@ -70,44 +60,40 @@ public class StreamsMetricsImplTest {
@Test
public void testLatencyMetrics() {
- final String groupName = "doesNotMatter";
+ final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
+ final int defaultMetrics = streamsMetrics.metrics().size();
+
final String scope = "scope";
final String entity = "entity";
final String operation = "put";
- final Map<String, String> tags = new HashMap<>();
- final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags);
final Sensor sensor1 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
- Map<MetricName, ? extends Metric> metrics = streamsMetrics.metrics();
// 2 meters and 4 non-meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
final int otherMetricsCount = 4;
- assertEquals(meterMetricsCount * 2 + otherMetricsCount + 1, metrics.size());
+ assertEquals(defaultMetrics + meterMetricsCount * 2 + otherMetricsCount, streamsMetrics.metrics().size());
streamsMetrics.removeSensor(sensor1);
- metrics = streamsMetrics.metrics();
- assertEquals(metrics.size(), 1);
+ assertEquals(defaultMetrics, streamsMetrics.metrics().size());
}
@Test
public void testThroughputMetrics() {
- final String groupName = "doesNotMatter";
+ final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
+ final int defaultMetrics = streamsMetrics.metrics().size();
+
final String scope = "scope";
final String entity = "entity";
final String operation = "put";
- final Map<String, String> tags = new HashMap<>();
- final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags);
final Sensor sensor1 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
- Map<MetricName, ? extends Metric> metrics = streamsMetrics.metrics();
final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
// 2 meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
- assertEquals(meterMetricsCount * 2 + 1, metrics.size());
+ assertEquals(defaultMetrics + meterMetricsCount * 2, streamsMetrics.metrics().size());
streamsMetrics.removeSensor(sensor1);
- metrics = streamsMetrics.metrics();
- assertEquals(metrics.size(), 1);
+ assertEquals(defaultMetrics, streamsMetrics.metrics().size());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
new file mode 100644
index 0000000..b6f5769
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.testutil;
+
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class LogCaptureAppender extends AppenderSkeleton {
+ private final LinkedList<LoggingEvent> events = new LinkedList<>();
+
+ public static LogCaptureAppender createAndRegister() {
+ final LogCaptureAppender logCaptureAppender = new LogCaptureAppender();
+ Logger.getRootLogger().addAppender(logCaptureAppender);
+ return logCaptureAppender;
+ }
+
+ public static void unregister(final LogCaptureAppender logCaptureAppender) {
+ Logger.getRootLogger().removeAppender(logCaptureAppender);
+ }
+
+ @Override
+ protected void append(final LoggingEvent event) {
+ synchronized (events) {
+ events.add(event);
+ }
+ }
+
+ public List<String> getMessages() {
+ final LinkedList<String> result = new LinkedList<>();
+ synchronized (events) {
+ for (final LoggingEvent event : events) {
+ result.add(event.getRenderedMessage());
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index ff1efc9..33591c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
@@ -186,7 +187,13 @@ public class KeyValueStoreTestDriver<K, V> {
final ByteArraySerializer rawSerializer = new ByteArraySerializer();
final Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
- final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver", new LogContext("KeyValueStoreTestDriver "), new DefaultProductionExceptionHandler()) {
+ final RecordCollector recordCollector = new RecordCollectorImpl(
+ producer,
+ "KeyValueStoreTestDriver",
+ new LogContext("KeyValueStoreTestDriver "),
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records")
+ ) {
@Override
public <K1, V1> void send(final String topic,
final K1 key,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
index 8880007..a5e0d79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
@@ -40,10 +40,11 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -53,7 +54,11 @@ import static org.junit.Assert.assertTrue;
public class MeteredKeyValueBytesStoreTest {
private final TaskId taskId = new TaskId(0, 0);
- private final Map<String, String> tags = new HashMap<>();
+ private final Map<String, String> tags = mkMap(
+ mkEntry("client-id", "test"),
+ mkEntry("task-id", taskId.toString()),
+ mkEntry("scope-id", "metered")
+ );
@Mock(type = MockType.NICE)
private KeyValueStore<Bytes, byte[]> inner;
@Mock(type = MockType.NICE)
@@ -68,13 +73,13 @@ public class MeteredKeyValueBytesStoreTest {
@Before
public void before() {
- metered = new MeteredKeyValueBytesStore<>(inner,
- "scope",
- new MockTime(),
- Serdes.String(),
- Serdes.String());
- tags.put("task-id", taskId.toString());
- tags.put("scope-id", "metered");
+ metered = new MeteredKeyValueBytesStore<>(
+ inner,
+ "scope",
+ new MockTime(),
+ Serdes.String(),
+ Serdes.String()
+ );
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
EasyMock.expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
EasyMock.expect(context.taskId()).andReturn(taskId);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 256e5ff..3bd190a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -42,9 +42,10 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -54,7 +55,11 @@ import static org.junit.Assert.assertTrue;
public class MeteredSessionStoreTest {
private final TaskId taskId = new TaskId(0, 0);
- private final Map<String, String> tags = new HashMap<>();
+ private final Map<String, String> tags = mkMap(
+ mkEntry("client-id", "test"),
+ mkEntry("task-id", taskId.toString()),
+ mkEntry("scope-id", "metered")
+ );
private final Metrics metrics = new Metrics();
private MeteredSessionStore<String, String> metered;
@Mock(type = MockType.NICE)
@@ -74,8 +79,6 @@ public class MeteredSessionStoreTest {
Serdes.String(),
Serdes.String(),
new MockTime());
- tags.put("task-id", taskId.toString());
- tags.put("scope-id", "metered");
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
EasyMock.expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
EasyMock.expect(context.taskId()).andReturn(taskId);
@@ -96,7 +99,7 @@ public class MeteredSessionStoreTest {
metered.put(new Windowed<>(key, new SessionWindow(0, 0)), key);
final KafkaMetric metric = metric("put-rate");
- assertTrue(metric.value() > 0);
+ assertTrue(((Double) metric.metricValue()) > 0);
EasyMock.verify(inner);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 4fd7f30..eab523e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -18,99 +18,69 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
public class MeteredWindowStoreTest {
private InternalMockProcessorContext context;
@SuppressWarnings("unchecked")
private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
- private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(innerStoreMock, "scope", new MockTime(), Serdes.String(), new SerdeThatDoesntHandleNull());
- private final Set<String> latencyRecorded = new HashSet<>();
- private final Set<String> throughputRecorded = new HashSet<>();
+ private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
+ innerStoreMock,
+ "scope",
+ new MockTime(),
+ Serdes.String(),
+ new SerdeThatDoesntHandleNull()
+ );
+
+ {
+ EasyMock.expect(innerStoreMock.name()).andReturn("mocked-store").anyTimes();
+ }
@Before
- public void setUp() throws Exception {
- final Metrics metrics = new Metrics();
- final StreamsMetrics streamsMetrics = new StreamsMetrics() {
-
- @Override
- public Map<MetricName, ? extends Metric> metrics() {
- return Collections.unmodifiableMap(metrics.metrics());
- }
-
- @Override
- public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordLevel, String... tags) {
- return metrics.sensor(operationName);
- }
-
- @Override
- public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
- latencyRecorded.add(sensor.name());
- }
-
- @Override
- public Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordLevel, String... tags) {
- return metrics.sensor(operationName);
- }
-
- @Override
- public void recordThroughput(Sensor sensor, long value) {
- throughputRecorded.add(sensor.name());
- }
-
- @Override
- public void removeSensor(Sensor sensor) {
- metrics.removeSensor(sensor.name());
- }
-
- @Override
- public Sensor addSensor(String name, Sensor.RecordingLevel recordLevel) {
- return metrics.sensor(name);
- }
-
- @Override
- public Sensor addSensor(String name, Sensor.RecordingLevel recordLevel, Sensor... parents) {
- return metrics.sensor(name);
- }
-
- };
+ public void setUp() {
+ final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
+ final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
context = new InternalMockProcessorContext(
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.Long(),
- new NoOpRecordCollector(),
- new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)) {
-
- @Override
- public StreamsMetrics metrics() {
- return streamsMetrics;
- }
- };
- EasyMock.expect(innerStoreMock.name()).andReturn("store").anyTimes();
+ streamsMetrics,
+ new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+ new RecordCollector.Supplier() {
+ @Override
+ public RecordCollector recordCollector() {
+ return new NoOpRecordCollector();
+ }
+ },
+ new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+ );
}
@After
@@ -124,7 +94,9 @@ public class MeteredWindowStoreTest {
EasyMock.expectLastCall();
EasyMock.replay(innerStoreMock);
store.init(context, store);
- assertTrue(latencyRecorded.contains("restore"));
+ final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
}
@Test
@@ -136,7 +108,9 @@ public class MeteredWindowStoreTest {
store.init(context, store);
store.put("a", "a");
- assertTrue(latencyRecorded.contains("put"));
+ final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
EasyMock.verify(innerStoreMock);
}
@@ -147,7 +121,9 @@ public class MeteredWindowStoreTest {
store.init(context, store);
store.fetch("a", 1, 1).close(); // recorded on close;
- assertTrue(latencyRecorded.contains("fetch"));
+ final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
EasyMock.verify(innerStoreMock);
}
@@ -158,7 +134,9 @@ public class MeteredWindowStoreTest {
store.init(context, store);
store.fetch("a", "b", 1, 1).close(); // recorded on close;
- assertTrue(latencyRecorded.contains("fetch"));
+ final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
EasyMock.verify(innerStoreMock);
}
@@ -171,7 +149,9 @@ public class MeteredWindowStoreTest {
store.init(context, store);
store.flush();
- assertTrue(latencyRecorded.contains("flush"));
+ final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+ assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
EasyMock.verify(innerStoreMock);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 1bc3423..6b410dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
@@ -31,6 +33,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -73,33 +76,25 @@ public class NamedCacheTest {
}
}
- private void testSpecificMetrics(final String groupName, final String entityName, final String opName,
- final Map<String, String> metricTags) {
- assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-avg",
- groupName, "The average cache hit ratio of " + entityName, metricTags)));
- assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-min",
- groupName, "The minimum cache hit ratio of " + entityName, metricTags)));
- assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-max",
- groupName, "The maximum cache hit ratio of " + entityName, metricTags)));
- }
@Test
public void testMetrics() {
- final String scope = "record-cache";
- final String entityName = cache.name();
- final String opName = "hitRatio";
- final String tagKey = "record-cache-id";
- final String tagValue = underlyingStoreName;
- final String groupName = "stream-" + scope + "-metrics";
final Map<String, String> metricTags = new LinkedHashMap<>();
- metricTags.put(tagKey, tagValue);
+ metricTags.put("record-cache-id", underlyingStoreName);
metricTags.put("task-id", taskIDString);
+ metricTags.put("client-id", "test");
- assertNotNull(streamMetrics.registry().getSensor(opName));
- testSpecificMetrics(groupName, entityName, opName, metricTags);
+ assertNotNull(streamMetrics.registry().getSensor("hitRatio"));
+ final Map<MetricName, KafkaMetric> metrics1 = streamMetrics.registry().metrics();
+ getMetricByNameFilterByTags(metrics1, "hitRatio-avg", "stream-record-cache-metrics", metricTags);
+ getMetricByNameFilterByTags(metrics1, "hitRatio-min", "stream-record-cache-metrics", metricTags);
+ getMetricByNameFilterByTags(metrics1, "hitRatio-max", "stream-record-cache-metrics", metricTags);
// test "all"
- metricTags.put(tagKey, "all");
- testSpecificMetrics(groupName, entityName, opName, metricTags);
+ metricTags.put("record-cache-id", "all");
+ final Map<MetricName, KafkaMetric> metrics = streamMetrics.registry().metrics();
+ getMetricByNameFilterByTags(metrics, "hitRatio-avg", "stream-record-cache-metrics", metricTags);
+ getMetricByNameFilterByTags(metrics, "hitRatio-min", "stream-record-cache-metrics", metricTags);
+ getMetricByNameFilterByTags(metrics, "hitRatio-max", "stream-record-cache-metrics", metricTags);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 554169e..bf556ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -79,7 +79,13 @@ public class RocksDBWindowStoreTest {
private final ThreadCache cache = new ThreadCache(new LogContext("TestCache "), DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
private final Producer<byte[], byte[]> producer = new MockProducer<>(true, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());
- private final RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTestTask", new LogContext("RocksDBWindowStoreTestTask "), new DefaultProductionExceptionHandler()) {
+ private final RecordCollector recordCollector = new RecordCollectorImpl(
+ producer,
+ "RocksDBWindowStoreTestTask",
+ new LogContext("RocksDBWindowStoreTestTask "),
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records")
+ ) {
@Override
public <K1, V1> void send(final String topic,
final K1 key,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index dcc2305..21b5c5c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
@@ -40,7 +41,7 @@ public class StoreChangeLoggerTest {
private final Map<Integer, String> logged = new HashMap<>();
private final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
- new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler()) {
+ new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")) {
@Override
public <K1, V1> void send(final String topic,
final K1 key,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index a9c47b5..dc04536 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -178,6 +178,7 @@ public class StreamThreadStateStoreProviderTest {
final MockClientSupplier clientSupplier,
final ProcessorTopology topology,
final TaskId taskId) {
+ final Metrics metrics = new Metrics();
return new StreamTask(
taskId,
Collections.singletonList(new TopicPartition(topicName, taskId.partition)),
@@ -185,7 +186,7 @@ public class StreamThreadStateStoreProviderTest {
clientSupplier.consumer,
new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener(), new LogContext("test-stream-task ")),
streamsConfig,
- new MockStreamsMetrics(new Metrics()),
+ new MockStreamsMetrics(metrics),
stateDirectory,
null,
new MockTime(),
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 74bb5d1..57e3efb 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -31,12 +31,12 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.ToInternal;
import org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -61,9 +61,20 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
private Serde<?> valSerde;
private long timestamp = -1L;
+ public InternalMockProcessorContext() {
+ this(null,
+ null,
+ null,
+ new StreamsMetricsImpl(new Metrics(), "mock"),
+ new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+ null,
+ null
+ );
+ }
+
public InternalMockProcessorContext(final File stateDir,
final StreamsConfig config) {
- this(stateDir, null, null, new Metrics(), config, null, null);
+ this(stateDir, null, null, new StreamsMetricsImpl(new Metrics(), "mock"), config, null, null);
}
public InternalMockProcessorContext(final StateSerdes<?, ?> serdes,
@@ -74,12 +85,19 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
public InternalMockProcessorContext(final StateSerdes<?, ?> serdes,
final RecordCollector collector,
final Metrics metrics) {
- this(null, serdes.keySerde(), serdes.valueSerde(), metrics, new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() {
- @Override
- public RecordCollector recordCollector() {
- return collector;
- }
- }, null);
+ this(
+ null,
+ serdes.keySerde(),
+ serdes.valueSerde(),
+ new StreamsMetricsImpl(metrics, "mock"),
+ new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() {
+ @Override
+ public RecordCollector recordCollector() {
+ return collector;
+ }
+ },
+ null
+ );
}
public InternalMockProcessorContext(final File stateDir,
@@ -87,30 +105,37 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
final Serde<?> valSerde,
final RecordCollector collector,
final ThreadCache cache) {
- this(stateDir, keySerde, valSerde, new Metrics(), new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() {
- @Override
- public RecordCollector recordCollector() {
- return collector;
- }
- }, cache);
+ this(stateDir,
+ keySerde,
+ valSerde,
+ new StreamsMetricsImpl(new Metrics(), "mock"),
+ new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+ new RecordCollector.Supplier() {
+ @Override
+ public RecordCollector recordCollector() {
+ return collector;
+ }
+ },
+ cache
+ );
}
- private InternalMockProcessorContext(final File stateDir,
- final Serde<?> keySerde,
- final Serde<?> valSerde,
- final Metrics metrics,
- final StreamsConfig config,
- final RecordCollector.Supplier collectorSupplier,
- final ThreadCache cache) {
+ public InternalMockProcessorContext(final File stateDir,
+ final Serde<?> keySerde,
+ final Serde<?> valSerde,
+ final StreamsMetricsImpl metrics,
+ final StreamsConfig config,
+ final RecordCollector.Supplier collectorSupplier,
+ final ThreadCache cache) {
super(new TaskId(0, 0),
- config,
- new MockStreamsMetrics(metrics),
- null,
- cache);
+ config,
+ metrics,
+ null,
+ cache);
this.stateDir = stateDir;
this.keySerde = keySerde;
this.valSerde = valSerde;
- this.metrics = metrics;
+ this.metrics = metrics.registry();
this.recordCollectorSupplier = collectorSupplier;
}
@@ -169,7 +194,8 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
return storeMap.get(name);
}
- @Override public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) {
+ @Override
+ public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) {
throw new UnsupportedOperationException("schedule() not supported.");
}
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index eb137db..7313414 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -173,7 +173,7 @@ public class KStreamTestDriver extends ExternalResource {
final ProcessorNode currNode = sourceNodeByTopicName(topicName);
if (currNode != null) {
- context.setRecordContext(createRecordContext(context.timestamp()));
+ context.setRecordContext(createRecordContext(topicName, context.timestamp()));
context.setCurrentNode(currNode);
try {
context.forward(key, value);
@@ -203,7 +203,7 @@ public class KStreamTestDriver extends ExternalResource {
final ProcessorNode prevNode = context.currentNode();
for (final ProcessorNode processor : topology.processors()) {
if (processor.processor() != null) {
- context.setRecordContext(createRecordContext(timestamp));
+ context.setRecordContext(createRecordContext(context.topic(), timestamp));
context.setCurrentNode(processor);
try {
processor.processor().punctuate(timestamp);
@@ -277,13 +277,13 @@ public class KStreamTestDriver extends ExternalResource {
}
}
- private ProcessorRecordContext createRecordContext(final long timestamp) {
- return new ProcessorRecordContext(timestamp, -1, -1, "topic");
+ private ProcessorRecordContext createRecordContext(final String topicName, final long timestamp) {
+ return new ProcessorRecordContext(timestamp, -1, -1, topicName);
}
private class MockRecordCollector extends RecordCollectorImpl {
MockRecordCollector() {
- super(null, "KStreamTestDriver", new LogContext("KStreamTestDriver "), new DefaultProductionExceptionHandler());
+ super(null, "KStreamTestDriver", new LogContext("KStreamTestDriver "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 38b0e7d..a526bfd 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -16,9 +16,9 @@
*/
package org.apache.kafka.test;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import java.util.Collections;
@@ -49,7 +49,7 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
}
@Override
- public void init(final ProcessorContext context) {
+ public void init(final InternalProcessorContext context) {
super.init(context);
initialized = true;
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 8df4e6c..1644daa 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -18,7 +18,7 @@ package org.apache.kafka.test;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.SourceNode;
import java.util.ArrayList;
@@ -48,7 +48,7 @@ public class MockSourceNode<K, V> extends SourceNode<K, V> {
}
@Override
- public void init(final ProcessorContext context) {
+ public void init(final InternalProcessorContext context) {
super.init(context);
initialized = true;
}
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index fecf7e4..afd2bb2 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -33,7 +33,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.StateStore;
@@ -50,6 +49,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -192,7 +192,7 @@ public class ProcessorTopologyTestDriver {
consumer.assign(offsetsByTopicPartition.keySet());
final StateDirectory stateDirectory = new StateDirectory(config, Time.SYSTEM);
- final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
+ final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(new Metrics());
final ThreadCache cache = new ThreadCache(new LogContext("mock "), 1024 * 1024, streamsMetrics);
if (globalTopology != null) {
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 67304e1..a19b55c 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -17,6 +17,8 @@
package org.apache.kafka.test;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
@@ -25,13 +27,15 @@ import org.apache.kafka.streams.kstream.Windowed;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
-public class StreamsTestUtils {
+public final class StreamsTestUtils {
+ private StreamsTestUtils() {}
public static Properties getStreamsConfig(final String applicationId,
final String bootstrapServers,
@@ -87,4 +91,63 @@ public class StreamsTestUtils {
assertThat(actual.key.key(), equalTo(expectedKey.key()));
assertThat(actual.value, equalTo(expectedValue.getBytes()));
}
+
+ public static Metric getMetricByName(final Map<MetricName, ? extends Metric> metrics,
+ final String name,
+ final String group) {
+ Metric metric = null;
+ for (final Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
+ if (entry.getKey().name().equals(name) && entry.getKey().group().equals(group)) {
+ if (metric == null) {
+ metric = entry.getValue();
+ } else {
+ throw new IllegalStateException(
+ "Found two metrics with name=[" + name + "]: \n" +
+ metric.metricName().toString() +
+ " AND \n" +
+ entry.getKey().toString()
+ );
+ }
+ }
+ }
+ if (metric == null) {
+ throw new IllegalStateException("Didn't find metric with name=[" + name + "]");
+ } else {
+ return metric;
+ }
+ }
+
+ public static Metric getMetricByNameFilterByTags(final Map<MetricName, ? extends Metric> metrics,
+ final String name,
+ final String group,
+ final Map<String, String> filterTags) {
+ Metric metric = null;
+ for (final Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
+ if (entry.getKey().name().equals(name) && entry.getKey().group().equals(group)) {
+ boolean filtersMatch = true;
+ for (final Map.Entry<String, String> filter : filterTags.entrySet()) {
+ if (!filter.getValue().equals(entry.getKey().tags().get(filter.getKey()))) {
+ filtersMatch = false;
+ }
+ }
+ if (filtersMatch) {
+ if (metric == null) {
+ metric = entry.getValue();
+ } else {
+ throw new IllegalStateException(
+ "Found two metrics with name=[" + name + "] and tags=[" + filterTags + "]: \n" +
+ metric.metricName().toString() +
+ " AND \n" +
+ entry.getKey().toString()
+ );
+ }
+ }
+ }
+ }
+ if (metric == null) {
+ throw new IllegalStateException("Didn't find metric with name=[" + name + "] and tags=[" + filterTags + "]");
+ } else {
+ return metric;
+ }
+ }
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index c03bf1a..bde70b4 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -23,6 +23,8 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
@@ -53,7 +55,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
-import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
@@ -178,8 +180,9 @@ public class TopologyTestDriver implements Closeable {
private final GlobalStateManager globalStateManager;
private final StateDirectory stateDirectory;
+ private final Metrics metrics;
private final ProcessorTopology processorTopology;
-
+
private final MockProducer<byte[], byte[]> producer;
private final Set<String> internalTopics = new HashSet<>();
@@ -232,10 +235,11 @@ public class TopologyTestDriver implements Closeable {
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
stateDirectory = new StateDirectory(streamsConfig, mockTime);
- final StreamsMetrics streamsMetrics = new StreamsMetricsImpl(
- new Metrics(),
- "topology-test-driver-stream-metrics",
- Collections.<String, String>emptyMap());
+ metrics = new Metrics();
+ final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
+ metrics,
+ "topology-test-driver-virtual-thread"
+ );
final ThreadCache cache = new ThreadCache(
new LogContext("topology-test-driver "),
Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
@@ -290,7 +294,8 @@ public class TopologyTestDriver implements Closeable {
globalProcessorContext,
globalStateManager,
new LogAndContinueExceptionHandler(),
- new LogContext());
+ new LogContext()
+ );
globalStateTask.initialize();
} else {
globalStateManager = null;
@@ -321,6 +326,15 @@ public class TopologyTestDriver implements Closeable {
}
/**
+ * Get read-only handle on global metrics registry.
+ *
+ * @return Map of all metrics.
+ */
+ public Map<MetricName, ? extends Metric> metrics() {
+ return Collections.unmodifiableMap(metrics.metrics());
+ }
+
+ /**
* Send an input message with the given key, value, and timestamp on the specified topic to the topology and then
* commit the messages.
*
@@ -498,7 +512,7 @@ public class TopologyTestDriver implements Closeable {
final V value = valueDeserializer.deserialize(record.topic(), record.value());
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value);
}
-
+
/**
* Get all {@link StateStore StateStores} from the topology.
* The stores can be a "regular" or global stores.
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 88801f7..15b2da6 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -27,11 +27,10 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import java.io.File;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -194,7 +193,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
this.taskId = taskId;
this.config = streamsConfig;
this.stateDir = stateDir;
- this.metrics = new StreamsMetricsImpl(new Metrics(), "mock-processor-context", Collections.<String, String>emptyMap());
+ this.metrics = new StreamsMetricsImpl(new Metrics(), "mock-processor-context-virtual-thread");
}
@Override
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.