You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/09/07 22:19:24 UTC
[kafka] branch trunk updated: KAFKA-13201: Convert KTable suppress
to new PAPI (#11213)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5f89ce5 KAFKA-13201: Convert KTable suppress to new PAPI (#11213)
5f89ce5 is described below
commit 5f89ce5f202bf560214e00fa1ac7add7a27248cf
Author: Jorge Esteban Quilcate Otoya <qu...@gmail.com>
AuthorDate: Tue Sep 7 23:17:44 2021 +0100
KAFKA-13201: Convert KTable suppress to new PAPI (#11213)
Migrate Suppress as part of the migration of KStream/KTable
operations to the new Processor API (KAFKA-8410)
Reviewers: John Roesler <vv...@apache.org>
---
.../streams/kstream/internals/KTableImpl.java | 4 +-
.../suppress/KTableSuppressProcessorSupplier.java | 35 ++--
.../InMemoryTimeOrderedKeyValueBuffer.java | 10 +-
.../state/internals/TimeOrderedKeyValueBuffer.java | 8 +-
.../KTableSuppressProcessorMetricsTest.java | 22 +--
.../suppress/KTableSuppressProcessorTest.java | 198 +++++++++++----------
.../internals/TimeOrderedKeyValueBufferTest.java | 9 +-
.../test/MockInternalNewProcessorContext.java | 198 +++++++++++++++++++++
.../streams/processor/MockProcessorContext.java | 11 +-
9 files changed, 365 insertions(+), 130 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index e681e62..5339b6a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -58,6 +58,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressi
import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -542,8 +543,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final String storeName =
suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME);
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> suppressionSupplier = new KTableSuppressProcessorSupplier<>(
+ final ProcessorSupplier<K, Change<V>, K, Change<V>> suppressionSupplier = new KTableSuppressProcessorSupplier<>(
suppressedInternal,
storeName,
this
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
index cdad658..ef7943b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
@@ -21,10 +21,14 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
-import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.KTableNewProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
@@ -35,8 +39,7 @@ import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import static java.util.Objects.requireNonNull;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSupplier<K, V, V> {
+public class KTableSuppressProcessorSupplier<K, V> implements KTableNewProcessorSupplier<K, V, K, V> {
private final SuppressedInternal<K> suppress;
private final String storeName;
private final KTableImpl<K, ?, V> parentKTable;
@@ -52,7 +55,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
}
@Override
- public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
+ public Processor<K, Change<V>, K, Change<V>> get() {
return new KTableSuppressProcessor<>(suppress, storeName);
}
@@ -109,7 +112,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
return parentKTable.enableSendingOldValues(forceMaterialization);
}
- private static final class KTableSuppressProcessor<K, V> extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
+ private static final class KTableSuppressProcessor<K, V> extends ContextualProcessor<K, Change<V>, K, Change<V>> {
private final long maxRecords;
private final long maxBytes;
private final long suppressDurationMillis;
@@ -119,7 +122,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
private final String storeName;
private TimeOrderedKeyValueBuffer<K, V> buffer;
- private InternalProcessorContext internalProcessorContext;
+ private InternalProcessorContext<K, Change<V>> internalProcessorContext;
private Sensor suppressionEmitSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
@@ -135,9 +138,9 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
}
@Override
- public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+ public void init(final ProcessorContext<K, Change<V>> context) {
super.init(context);
- internalProcessorContext = (InternalProcessorContext) context;
+ internalProcessorContext = (InternalProcessorContext<K, Change<V>>) context;
suppressionEmitSensor = ProcessorNodeMetrics.suppressionEmitSensor(
Thread.currentThread().getName(),
context.taskId().toString(),
@@ -150,16 +153,16 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
}
@Override
- public void process(final K key, final Change<V> value) {
- observedStreamTime = Math.max(observedStreamTime, internalProcessorContext.timestamp());
- buffer(key, value);
+ public void process(final Record<K, Change<V>> record) {
+ observedStreamTime = Math.max(observedStreamTime, record.timestamp());
+ buffer(record);
enforceConstraints();
}
- private void buffer(final K key, final Change<V> value) {
- final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key);
+ private void buffer(final Record<K, Change<V>> record) {
+ final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, record.key());
- buffer.put(bufferTime, key, value, internalProcessorContext.recordContext());
+ buffer.put(bufferTime, record, internalProcessorContext.recordContext());
}
private void enforceConstraints() {
@@ -198,7 +201,9 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
internalProcessorContext.setRecordContext(toEmit.recordContext());
try {
- internalProcessorContext.forward(toEmit.key(), toEmit.value());
+ internalProcessorContext.forward(toEmit.record()
+ .withTimestamp(toEmit.recordContext().timestamp())
+ .withHeaders(toEmit.recordContext().headers()));
suppressionEmitSensor.record(1.0d, internalProcessorContext.currentSystemTimeMs());
} finally {
internalProcessorContext.setRecordContext(prevRecordContext);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 5a653d6..ba8a745 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -467,14 +468,13 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
@Override
public void put(final long time,
- final K key,
- final Change<V> value,
+ final Record<K, Change<V>> record,
final ProcessorRecordContext recordContext) {
- requireNonNull(value, "value cannot be null");
+ requireNonNull(record.value(), "value cannot be null");
requireNonNull(recordContext, "recordContext cannot be null");
- final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
- final Change<byte[]> serialChange = valueSerde.serializeParts(changelogTopic, value);
+ final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, record.key()));
+ final Change<byte[]> serialChange = valueSerde.serializeParts(changelogTopic, record.value());
final BufferValue buffered = getBuffered(serializedKey);
final byte[] serializedPriorValue;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
index 2466639..e2096ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -47,6 +48,10 @@ public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore {
return value;
}
+ public Record<K, Change<V>> record() {
+ return new Record<>(key, value, recordContext.timestamp());
+ }
+
public ProcessorRecordContext recordContext() {
return recordContext;
}
@@ -70,6 +75,7 @@ public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore {
public int hashCode() {
return Objects.hash(key, value, recordContext);
}
+
}
void setSerdesIfNull(final SerdeGetter getter);
@@ -78,7 +84,7 @@ public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore {
Maybe<ValueAndTimestamp<V>> priorValueForBuffered(K key);
- void put(long time, K key, Change<V> value, ProcessorRecordContext recordContext);
+ void put(long time, Record<K, Change<V>> record, ProcessorRecordContext recordContext);
int numRecords();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 4db1342..009a70d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
@@ -29,9 +28,11 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
-import org.apache.kafka.test.MockInternalProcessorContext;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
@@ -49,7 +50,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.Is.is;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableSuppressProcessorMetricsTest {
private static final long ARBITRARY_LONG = 5L;
private static final TaskId TASK_ID = new TaskId(0, 0);
@@ -134,7 +134,7 @@ public class KTableSuppressProcessorMetricsTest {
.build();
final KTableImpl<String, ?, Long> mock = EasyMock.mock(KTableImpl.class);
- final org.apache.kafka.streams.processor.Processor<String, Change<Long>> processor =
+ final Processor<String, Change<Long>, String, Change<Long>> processor =
new KTableSuppressProcessorSupplier<>(
(SuppressedInternal<String>) Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)),
storeName,
@@ -142,8 +142,8 @@ public class KTableSuppressProcessorMetricsTest {
).get();
streamsConfig.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, StreamsConfig.METRICS_LATEST);
- final MockInternalProcessorContext context =
- new MockInternalProcessorContext(streamsConfig, TASK_ID, TestUtils.tempDirectory());
+ final MockInternalNewProcessorContext<String, Change<Long>> context =
+ new MockInternalNewProcessorContext<>(streamsConfig, TASK_ID, TestUtils.tempDirectory());
final Time time = new SystemTime();
context.setCurrentNode(new ProcessorNode("testNode"));
context.setSystemTimeMs(time.milliseconds());
@@ -152,10 +152,11 @@ public class KTableSuppressProcessorMetricsTest {
processor.init(context);
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
final String key = "longKey";
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
- processor.process(key, value);
+ processor.process(new Record<>(key, value, timestamp));
final MetricName evictionRateMetric = evictionRateMetricLatest;
final MetricName evictionTotalMetric = evictionTotalMetricLatest;
@@ -175,8 +176,9 @@ public class KTableSuppressProcessorMetricsTest {
verifyMetric(metrics, bufferCountMaxMetric, is(1.0));
}
- context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp + 1);
- processor.process("key", value);
+ context.setRecordMetadata("", 0, 1L);
+ context.setTimestamp(timestamp + 1);
+ processor.process(new Record<>("key", value, timestamp + 1));
{
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 23b91c5..1505d0d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -16,10 +16,10 @@
*/
package org.apache.kafka.streams.kstream.internals.suppress;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
@@ -29,18 +29,21 @@ import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
-import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
-import org.apache.kafka.test.MockInternalProcessorContext;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.easymock.EasyMock;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.junit.Test;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
@@ -59,15 +62,14 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableSuppressProcessorTest {
private static final long ARBITRARY_LONG = 5L;
private static final Change<Long> ARBITRARY_CHANGE = new Change<>(7L, 14L);
private static class Harness<K, V> {
- private final org.apache.kafka.streams.processor.Processor<K, Change<V>> processor;
- private final MockInternalProcessorContext context;
+ private final Processor<K, Change<V>, K, Change<V>> processor;
+ private final MockInternalNewProcessorContext<K, Change<V>> context;
Harness(final Suppressed<K> suppressed,
@@ -81,10 +83,10 @@ public class KTableSuppressProcessorTest {
.build();
final KTableImpl<K, ?, V> parent = EasyMock.mock(KTableImpl.class);
- final org.apache.kafka.streams.processor.Processor<K, Change<V>> processor =
+ final Processor<K, Change<V>, K, Change<V>> processor =
new KTableSuppressProcessorSupplier<>((SuppressedInternal<K>) suppressed, storeName, parent).get();
- final MockInternalProcessorContext context = new MockInternalProcessorContext();
+ final MockInternalNewProcessorContext<K, Change<V>> context = new MockInternalNewProcessorContext<>();
context.setCurrentNode(new ProcessorNode("testNode"));
buffer.init((StateStoreContext) context, buffer);
@@ -99,73 +101,75 @@ public class KTableSuppressProcessorTest {
public void zeroTimeLimitShouldImmediatelyEmit() {
final Harness<String, Long> harness =
new Harness<>(untilTimeLimit(ZERO, unbounded()), String(), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
final long timestamp = ARBITRARY_LONG;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
final String key = "hey";
final Change<Long> value = ARBITRARY_CHANGE;
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
- assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
- assertThat(capturedForward.timestamp(), is(timestamp));
+ assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
}
@Test
public void windowedZeroTimeLimitShouldImmediatelyEmit() {
final Harness<Windowed<String>, Long> harness =
new Harness<>(untilTimeLimit(ZERO, unbounded()), timeWindowedSerdeFrom(String.class, 100L), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
final long timestamp = ARBITRARY_LONG;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
final Change<Long> value = ARBITRARY_CHANGE;
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
- assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
- assertThat(capturedForward.timestamp(), is(timestamp));
+ assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
}
@Test
public void intermediateSuppressionShouldBufferAndEmitLater() {
final Harness<String, Long> harness =
new Harness<>(untilTimeLimit(ofMillis(1), unbounded()), String(), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
final long timestamp = 0L;
- context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("topic", 0, 0);
+ context.setTimestamp(timestamp);
final String key = "hey";
final Change<Long> value = new Change<>(null, 1L);
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
assertThat(context.forwarded(), hasSize(0));
- context.setRecordMetadata("topic", 0, 1, new RecordHeaders(), 1L);
- harness.processor.process("tick", new Change<>(null, null));
+ context.setRecordMetadata("topic", 0, 1);
+ context.setTimestamp(1L);
+ harness.processor.process(new Record<>("tick", new Change<>(null, null), 1L));
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
- assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
- assertThat(capturedForward.timestamp(), is(timestamp));
+ assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
}
@Test
public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() {
final Harness<Windowed<String>, Long> harness =
new Harness<>(finalResults(ofMillis(1L)), timeWindowedSerdeFrom(String.class, 1L), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
final long windowStart = 99L;
final long recordTime = 99L;
final long windowEnd = 100L;
- context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), recordTime);
+ context.setRecordMetadata("topic", 0, 0);
+ context.setTimestamp(recordTime);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(windowStart, windowEnd));
final Change<Long> value = ARBITRARY_CHANGE;
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, recordTime));
assertThat(context.forwarded(), hasSize(0));
// although the stream time is now 100, we have to wait 1 ms after the window *end* before we
@@ -173,21 +177,22 @@ public class KTableSuppressProcessorTest {
final long windowStart2 = 100L;
final long recordTime2 = 100L;
final long windowEnd2 = 101L;
- context.setRecordMetadata("topic", 0, 1, new RecordHeaders(), recordTime2);
- harness.processor.process(new Windowed<>("dummyKey1", new TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE);
+ context.setRecordMetadata("topic", 0, 1);
+ context.setTimestamp(recordTime2);
+ harness.processor.process(new Record<>(new Windowed<>("dummyKey1", new TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE, recordTime2));
assertThat(context.forwarded(), hasSize(0));
// ok, now it's time to emit "hey"
final long windowStart3 = 101L;
final long recordTime3 = 101L;
final long windowEnd3 = 102L;
- context.setRecordMetadata("topic", 0, 1, new RecordHeaders(), recordTime3);
- harness.processor.process(new Windowed<>("dummyKey2", new TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE);
+ context.setRecordMetadata("topic", 0, 1);
+ context.setTimestamp(recordTime3);
+ harness.processor.process(new Record<>(new Windowed<>("dummyKey2", new TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE, recordTime3));
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
- assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
- assertThat(capturedForward.timestamp(), is(recordTime));
+ assertThat(capturedForward.record(), is(new Record<>(key, value, recordTime)));
}
/**
@@ -199,43 +204,44 @@ public class KTableSuppressProcessorTest {
public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() {
final Harness<Windowed<String>, Long> harness =
new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
// note the record is in the past, but the window end is in the future, so we still have to buffer,
// even though the grace period is 0.
final long timestamp = 5L;
final long windowEnd = 100L;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, windowEnd));
final Change<Long> value = ARBITRARY_CHANGE;
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
assertThat(context.forwarded(), hasSize(0));
- context.setRecordMetadata("", 0, 1L, new RecordHeaders(), windowEnd);
- harness.processor.process(new Windowed<>("dummyKey", new TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE);
+ context.setRecordMetadata("", 0, 1L);
+ context.setTimestamp(windowEnd);
+ harness.processor.process(new Record<>(new Windowed<>("dummyKey", new TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE, windowEnd));
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
- assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
- assertThat(capturedForward.timestamp(), is(timestamp));
+ assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
}
@Test
public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
final Harness<Windowed<String>, Long> harness =
new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
final Change<Long> value = ARBITRARY_CHANGE;
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
- assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
- assertThat(capturedForward.timestamp(), is(timestamp));
+ assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
}
/**
@@ -246,13 +252,14 @@ public class KTableSuppressProcessorTest {
public void finalResultsShouldDropTombstonesForTimeWindows() {
final Harness<Windowed<String>, Long> harness =
new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
assertThat(context.forwarded(), hasSize(0));
}
@@ -266,13 +273,14 @@ public class KTableSuppressProcessorTest {
public void finalResultsShouldDropTombstonesForSessionWindows() {
final Harness<Windowed<String>, Long> harness =
new Harness<>(finalResults(ofMillis(0L)), sessionWindowedSerdeFrom(String.class), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
assertThat(context.forwarded(), hasSize(0));
}
@@ -285,18 +293,20 @@ public class KTableSuppressProcessorTest {
public void suppressShouldNotDropTombstonesForTimeWindows() {
final Harness<Windowed<String>, Long> harness =
new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), timeWindowedSerdeFrom(String.class, 100L), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ final Headers headers = new RecordHeaders().add("k", "v".getBytes(StandardCharsets.UTF_8));
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
+ context.setHeaders(headers);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
- assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
- assertThat(capturedForward.timestamp(), is(timestamp));
+ assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp, headers)));
}
@@ -308,18 +318,18 @@ public class KTableSuppressProcessorTest {
public void suppressShouldNotDropTombstonesForSessionWindows() {
final Harness<Windowed<String>, Long> harness =
new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), sessionWindowedSerdeFrom(String.class), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
- assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
- assertThat(capturedForward.timestamp(), is(timestamp));
+ assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
}
@@ -331,78 +341,82 @@ public class KTableSuppressProcessorTest {
public void suppressShouldNotDropTombstonesForKTable() {
final Harness<String, Long> harness =
new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), String(), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
final String key = "hey";
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
- assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
- assertThat(capturedForward.timestamp(), is(timestamp));
+ assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
}
@Test
public void suppressShouldEmitWhenOverRecordCapacity() {
final Harness<String, Long> harness =
new Harness<>(untilTimeLimit(Duration.ofDays(100), maxRecords(1)), String(), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
final String key = "hey";
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
- context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp + 1);
- harness.processor.process("dummyKey", value);
+ context.setRecordMetadata("", 0, 1L);
+ context.setTimestamp(timestamp + 1);
+ harness.processor.process(new Record<>("dummyKey", value, timestamp + 1));
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
- assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
- assertThat(capturedForward.timestamp(), is(timestamp));
+ assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
}
@Test
public void suppressShouldEmitWhenOverByteCapacity() {
final Harness<String, Long> harness =
new Harness<>(untilTimeLimit(Duration.ofDays(100), maxBytes(60L)), String(), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
final String key = "hey";
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
- context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp + 1);
- harness.processor.process("dummyKey", value);
+ context.setRecordMetadata("", 0, 1L);
+ context.setTimestamp(timestamp + 1);
+ harness.processor.process(new Record<>("dummyKey", value, timestamp + 1));
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
- assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
- assertThat(capturedForward.timestamp(), is(timestamp));
+ assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
}
@Test
public void suppressShouldShutDownWhenOverRecordCapacity() {
final Harness<String, Long> harness =
new Harness<>(untilTimeLimit(Duration.ofDays(100), maxRecords(1).shutDownWhenFull()), String(), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
context.setCurrentNode(new ProcessorNode("testNode"));
final String key = "hey";
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
- context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 1L);
+ context.setTimestamp(timestamp);
try {
- harness.processor.process("dummyKey", value);
+ harness.processor.process(new Record<>("dummyKey", value, timestamp));
fail("expected an exception");
} catch (final StreamsException e) {
assertThat(e.getMessage(), containsString("buffer exceeded its max capacity"));
@@ -413,18 +427,20 @@ public class KTableSuppressProcessorTest {
public void suppressShouldShutDownWhenOverByteCapacity() {
final Harness<String, Long> harness =
new Harness<>(untilTimeLimit(Duration.ofDays(100), maxBytes(60L).shutDownWhenFull()), String(), Long());
- final MockInternalProcessorContext context = harness.context;
+ final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 0L);
+ context.setTimestamp(timestamp);
context.setCurrentNode(new ProcessorNode("testNode"));
final String key = "hey";
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
- harness.processor.process(key, value);
+ harness.processor.process(new Record<>(key, value, timestamp));
- context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp);
+ context.setRecordMetadata("", 0, 1L);
+ context.setTimestamp(1L);
try {
- harness.processor.process("dummyKey", value);
+ harness.processor.process(new Record<>("dummyKey", value, timestamp));
fail("expected an exception");
} catch (final StreamsException e) {
assertThat(e.getMessage(), containsString("buffer exceeded its max capacity"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 26e9488..4c74df2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -144,7 +145,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final MockInternalProcessorContext context = makeContext();
buffer.init((StateStoreContext) context, buffer);
try {
- buffer.put(0, "asdf", null, getContext(0));
+ buffer.put(0, new Record<>("asdf", null, 0L), getContext(0));
fail("expected an exception");
} catch (final NullPointerException expected) {
// expected
@@ -284,8 +285,8 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final ProcessorRecordContext recordContext = getContext(0L);
context.setRecordContext(recordContext);
- buffer.put(1L, "A", new Change<>("new-value", "old-value"), recordContext);
- buffer.put(1L, "B", new Change<>("new-value", null), recordContext);
+ buffer.put(1L, new Record<>("A", new Change<>("new-value", "old-value"), 0L), recordContext);
+ buffer.put(1L, new Record<>("B", new Change<>("new-value", null), 0L), recordContext);
assertThat(buffer.priorValueForBuffered("A"), is(Maybe.defined(ValueAndTimestamp.make("old-value", -1))));
assertThat(buffer.priorValueForBuffered("B"), is(Maybe.defined(null)));
}
@@ -1012,7 +1013,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final String value) {
final ProcessorRecordContext recordContext = getContext(recordTimestamp);
context.setRecordContext(recordContext);
- buffer.put(streamTime, key, new Change<>(value, null), recordContext);
+ buffer.put(streamTime, new Record<>(key, new Change<>(value, null), 0L), recordContext);
}
private static BufferValue getBufferValue(final String value, final long timestamp) {
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
new file mode 100644
index 0000000..ffb503e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
+
+import java.io.File;
+import java.util.Properties;
+
+public class MockInternalNewProcessorContext<KOut, VOut> extends MockProcessorContext<KOut, VOut> implements InternalProcessorContext<KOut, VOut> {
+
+ private ProcessorNode currentNode;
+ private long currentSystemTimeMs;
+ private TaskType taskType = TaskType.ACTIVE;
+
+ private long timestamp = 0;
+ private Headers headers = new RecordHeaders();
+
+ public MockInternalNewProcessorContext() {
+ }
+
+ public MockInternalNewProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+ super(config, taskId, stateDir);
+ }
+
+ @Override
+ public void setSystemTimeMs(long timeMs) {
+ currentSystemTimeMs = timeMs;
+ }
+
+ @Override
+ public long currentSystemTimeMs() {
+ return currentSystemTimeMs;
+ }
+
+ @Override
+ public long currentStreamTimeMs() {
+ return 0;
+ }
+
+ @Override
+ public StreamsMetricsImpl metrics() {
+ return (StreamsMetricsImpl) super.metrics();
+ }
+
+ @Override
+ public ProcessorRecordContext recordContext() {
+ return new ProcessorRecordContext(timestamp(), offset(), partition(), topic(), headers());
+ }
+
+ @Override
+ public void setRecordContext(final ProcessorRecordContext recordContext) {
+ setRecordMetadata(
+ recordContext.topic(),
+ recordContext.partition(),
+ recordContext.offset()
+ );
+ this.headers = recordContext.headers();
+ this.timestamp = recordContext.timestamp();
+ }
+
+ public void setTimestamp(final long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public void setHeaders(final Headers headers) {
+ this.headers = headers;
+ }
+
+ @Override
+ public void setCurrentNode(final ProcessorNode currentNode) {
+ this.currentNode = currentNode;
+ }
+
+ @Override
+ public ProcessorNode currentNode() {
+ return currentNode;
+ }
+
+ @Override
+ public ThreadCache cache() {
+ return null;
+ }
+
+ @Override
+ public void initialize() {}
+
+ @Override
+ public void uninitialize() {}
+
+ @Override
+ public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
+ addStateStore(store);
+ }
+
+ @Override
+ public <K, V> void forward(K key, V value) {
+ throw new UnsupportedOperationException("Migrate to new implementation");
+ }
+
+ @Override
+ public <K, V> void forward(K key, V value, To to) {
+ throw new UnsupportedOperationException("Migrate to new implementation");
+ }
+
+ @Override
+ public String topic() {
+ if (recordMetadata().isPresent()) return recordMetadata().get().topic();
+ else return null;
+ }
+
+ @Override
+ public int partition() {
+ if (recordMetadata().isPresent()) return recordMetadata().get().partition();
+ else return 0;
+ }
+
+ @Override
+ public long offset() {
+ if (recordMetadata().isPresent()) return recordMetadata().get().offset();
+ else return 0;
+ }
+
+ @Override
+ public Headers headers() {
+ return headers;
+ }
+
+ @Override
+ public long timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public TaskType taskType() {
+ return taskType;
+ }
+
+ @Override
+ public void logChange(final String storeName,
+ final Bytes key,
+ final byte[] value,
+ final long timestamp) {
+ }
+
+ @Override
+ public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
+ }
+
+ @Override
+ public void transitionToStandby(final ThreadCache newCache) {
+ }
+
+ @Override
+ public void registerCacheFlushListener(final String namespace, final DirtyEntryFlushListener listener) {
+ }
+
+ @Override
+ public <T extends StateStore> T getStateStore(StoreBuilder<T> builder) {
+ return getStateStore(builder.name());
+ }
+
+ @Override
+ public String changelogFor(final String storeName) {
+ return "mock-changelog";
+ }
+}
\ No newline at end of file
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 0c81e3e..061c1fb 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -125,9 +125,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
public static class CapturedForward {
private final String childName;
private final long timestamp;
+ private final Headers headers;
private final KeyValue keyValue;
- private CapturedForward(final To to, final KeyValue keyValue) {
+ private CapturedForward(final KeyValue keyValue, final To to, final Headers headers) {
if (keyValue == null) {
throw new IllegalArgumentException();
}
@@ -135,6 +136,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
this.childName = to.childName;
this.timestamp = to.timestamp;
this.keyValue = keyValue;
+ this.headers = headers;
}
/**
@@ -175,6 +177,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
", keyValue=" + keyValue +
'}';
}
+
+ public Headers headers() {
+ return this.headers;
+ }
}
// constructors ================================================
@@ -498,8 +504,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
public <K, V> void forward(final K key, final V value, final To to) {
capturedForwards.add(
new CapturedForward(
+ new KeyValue<>(key, value),
to.timestamp == -1 ? to.withTimestamp(recordTimestamp == null ? -1 : recordTimestamp) : to,
- new KeyValue<>(key, value)
+ headers
)
);
}