You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/01 17:06:19 UTC
[kafka] 02/02: move metadata to context
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch kip-478-part-4-record-processor
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit d3321aa0d199c80b04ab9e1121b4501b333db7a5
Author: John Roesler <vv...@apache.org>
AuthorDate: Thu Oct 1 12:06:18 2020 -0500
move metadata to context
---
.../examples/docs/DeveloperGuideTesting.java | 10 ++++-----
.../kafka/streams/processor/api/Processor.java | 9 +-------
.../streams/processor/api/ProcessorContext.java | 10 +++++++++
.../internals/AbstractProcessorContext.java | 7 ++++++
.../internals/GlobalProcessorContextImpl.java | 5 +----
.../processor/internals/GlobalStateUpdateTask.java | 6 +----
.../processor/internals/ProcessorAdapter.java | 5 +----
.../processor/internals/ProcessorContextImpl.java | 6 +----
.../streams/processor/internals/ProcessorNode.java | 8 +++----
.../streams/processor/internals/SinkNode.java | 26 ++++++----------------
.../streams/processor/internals/SourceNode.java | 5 +----
.../streams/processor/internals/StreamTask.java | 5 +----
.../org/apache/kafka/streams/KafkaStreamsTest.java | 6 ++---
.../apache/kafka/streams/StreamsBuilderTest.java | 4 +---
.../org/apache/kafka/streams/TopologyTest.java | 4 +---
.../internals/GlobalProcessorContextImplTest.java | 2 +-
.../processor/internals/ProcessorNodeTest.java | 6 ++---
.../processor/internals/ProcessorTopologyTest.java | 14 +++++-------
.../streams/processor/internals/SinkNodeTest.java | 2 +-
.../processor/internals/StreamTaskTest.java | 6 ++---
.../processor/internals/StreamThreadTest.java | 4 +---
.../kafka/test/InternalMockProcessorContext.java | 5 ++---
.../org/apache/kafka/test/MockApiProcessor.java | 4 +---
.../kafka/test/MockInternalProcessorContext.java | 7 ++++++
.../java/org/apache/kafka/test/MockProcessor.java | 7 +-----
.../org/apache/kafka/test/MockProcessorNode.java | 6 ++---
.../java/org/apache/kafka/test/MockSourceNode.java | 4 +---
.../kafka/streams/TopologyTestDriverTest.java | 19 ++++++++--------
28 files changed, 78 insertions(+), 124 deletions(-)
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index 1ce3445..72e704e 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -21,26 +21,24 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TestOutputTopic;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.time.Duration;
import java.time.Instant;
-import java.util.Optional;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -170,7 +168,7 @@ public class DeveloperGuideTesting {
}
@Override
- public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, Long> record) {
final Long oldValue = store.get(record.key());
if (oldValue == null || record.value() > oldValue) {
store.put(record.key(), record.value());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
index e6feccb..167976b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
@@ -21,7 +21,6 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import java.time.Duration;
-import java.util.Optional;
/**
* A processor of key-value pair records.
@@ -50,14 +49,8 @@ public interface Processor<KIn, VIn, KOut, VOut> {
* Process the record. Note that record metadata is undefined in cases such as a forward call from a punctuator.
*
* @param record the record to process
- * @param recordMetadata the metadata of the record, if it is defined. Note that as long as the processor is
- * receiving a record downstream of a Source (i.e., the current record is coming from an
- * input topic), the metadata is defined. On the other hand, if a parent processor has
- * registered a punctuator and called {@link ProcessorContext#forward(Record)} from that
- * punctuator, then there is no record from an input topic, and therefore the metadata
- * would be undefined.
*/
- void process(Record<KIn, VIn> record, Optional<RecordMetadata> recordMetadata);
+ void process(Record<KIn, VIn> record);
/**
* Close this processor and clean up any resources. Be aware that {@code #close()} is called after an internal cleanup.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
index f4f0fdb..a431993 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
@@ -53,6 +53,16 @@ public interface ProcessorContext<KForward, VForward> {
TaskId taskId();
/**
+ * The metadata of the record, if it is defined. Note that as long as the processor is
+ * receiving a record downstream of a Source (i.e., the current record is coming from an
+ * input topic), the metadata is defined. On the other hand, if a parent processor has
+ * registered a punctuator and called {@link ProcessorContext#forward(Record)} from that
+ * punctuator, then there is no record from an input topic, and therefore the metadata
+ * would be undefined.
+ */
+ Optional<RecordMetadata> recordMetadata();
+
+ /**
* Returns the default key serde.
*
* @return the key serializer
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index ef222e0..c29614a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -31,6 +32,7 @@ import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
public abstract class AbstractProcessorContext implements InternalProcessorContext {
@@ -201,6 +203,11 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
}
@Override
+ public Optional<RecordMetadata> recordMetadata() {
+ return Optional.ofNullable(recordContext);
+ }
+
+ @Override
public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
this.currentNode = currentNode;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 9f31309..3468833 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -25,13 +25,11 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
import java.time.Duration;
-import java.util.Optional;
import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
@@ -63,11 +61,10 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
@Override
public <K, V> void forward(final Record<K, V> record) {
final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
- final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(recordContext);
try {
for (final ProcessorNode<?, ?, ?, ?> child : currentNode().children()) {
setCurrentNode(child);
- ((ProcessorNode<K, V, ?, ?>) child).process(record, recordMetadata);
+ ((ProcessorNode<K, V, ?, ?>) child).process(record);
}
} finally {
setCurrentNode(previousNode);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 360e50e..6b1378b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
@@ -112,10 +111,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
processorContext.timestamp(),
processorContext.headers()
);
- ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(
- toProcess,
- Optional.of(recordContext)
- );
+ ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
}
offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
index 291a99e..84c8602 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
@@ -20,9 +20,6 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
-
-import java.util.Optional;
public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
private final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate;
@@ -57,7 +54,7 @@ public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<K
}
@Override
- public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<KIn, VIn> record) {
final ProcessorRecordContext processorRecordContext = context.recordContext();
try {
context.setRecordContext(new ProcessorRecordContext(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 309b813..d164428 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -38,7 +37,6 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
@@ -263,9 +261,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
final Record<K, V> record) {
setCurrentNode(child);
- final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(recordContext);
-
- child.process(record, recordMetadata);
+ child.process(record);
if (child.isTerminalNode()) {
streamTask.maybeRecordE2ELatency(record.timestamp(), currentSystemTimeMs(), child.name());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 2939525..38daa52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -20,11 +20,10 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -32,7 +31,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
@@ -176,11 +174,11 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
}
- public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<KIn, VIn> record) {
throwIfClosed();
try {
- maybeMeasureLatency(() -> processor.process(record, recordMetadata), time, processSensor);
+ maybeMeasureLatency(() -> processor.process(record), time, processSensor);
} catch (final ClassCastException e) {
final String keyClass = record.key() == null ? "unknown because key is null" : record.key().getClass().getName();
final String valueClass = record.value() == null ? "unknown because value is null" : record.value().getClass().getName();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index f8840e4..813bcb1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -22,9 +22,6 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
-
-import java.util.Optional;
public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
@@ -81,7 +78,7 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut
}
@Override
- public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<KIn, VIn> record) {
final RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
final KIn key = record.key();
@@ -96,22 +93,13 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut
);
}
- // Prefer the record metadata if defined,
- // and fall back to the context (which is undefined and dummy values,
- // but extractors may still depend on the current behavior.
- final Optional<ProcessorRecordContext> maybeContext =
- recordMetadata.map(
- m -> new ProcessorRecordContext(timestamp, m.offset(), m.partition(), m.topic(), record.headers())
- );
final ProcessorRecordContext contextForExtraction =
- maybeContext.orElseGet(
- () -> new ProcessorRecordContext(
- timestamp,
- context.offset(),
- context.partition(),
- context.topic(),
- record.headers()
- )
+ new ProcessorRecordContext(
+ timestamp,
+ context.offset(),
+ context.partition(),
+ context.topic(),
+ record.headers()
);
final String topic = topicExtractor.extract(key, value, contextForExtraction);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 711b4c3..7fa8c64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -22,11 +22,8 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
-import java.util.Optional;
-
public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
private InternalProcessorContext context;
@@ -96,7 +93,7 @@ public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KO
@Override
- public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<KIn, VIn> record) {
context.forward(record);
processAtSourceSensor.record(1.0d, context.currentSystemTimeMs());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index c4e4ff3..464ac9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -37,7 +37,6 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
@@ -56,7 +55,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -692,8 +690,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
processorContext.timestamp(),
processorContext.headers()
);
- final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(processorContext.recordContext());
- maybeMeasureLatency(() -> currNode.process(toProcess, recordMetadata), time, processLatencySensor);
+ maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);
log.trace("Completed processing one record [{}]", record);
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 7ff2c6c..38baeb6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -41,7 +41,6 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -78,7 +77,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
@@ -869,7 +867,7 @@ public class KafkaStreamsTest {
}
@Override
- public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, String> record) {
if (record.value().length() % 2 == 0) {
context.forward(record.withValue(record.key() + record.value()));
}
@@ -970,7 +968,7 @@ public class KafkaStreamsTest {
}
@Override
- public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, String> record) {
final KeyValueStore<String, Long> kvStore = context.getStateStore(storeName);
kvStore.put(record.key(), 5L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index b308b4f..415aaea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -40,7 +40,6 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -60,7 +59,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
import static java.util.Arrays.asList;
@@ -109,7 +107,7 @@ public class StreamsBuilderTest {
}
@Override
- public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, String> record) {
store.put(record.key(), record.value());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 9e9f415..ef9becf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -46,7 +45,6 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
@@ -379,7 +377,7 @@ public class TopologyTest {
}
@Override
- public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) { }
+ public void process(final Record<Object, Object> record) { }
};
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 6322fd2..a83c92b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -100,7 +100,7 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldForwardToSingleChild() {
- child.process(anyObject(), anyObject());
+ child.process(anyObject());
expectLastCall();
expect(recordContext.timestamp()).andStubReturn(0L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index a4efcbc..8ff9451 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.util.Optional;
-import java.util.Properties;
-
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -40,6 +37,7 @@ import org.junit.Test;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Properties;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.hamcrest.CoreMatchers.containsString;
@@ -208,7 +206,7 @@ public class ProcessorNodeTest {
node.init(context);
final StreamsException se = assertThrows(
StreamsException.class,
- () -> node.process(new Record<>("aKey", "aValue", 0, new RecordHeaders()), Optional.ofNullable(context.recordContext()))
+ () -> node.process(new Record<>("aKey", "aValue", 0, new RecordHeaders()))
);
assertThat(se.getCause(), instanceOf(ClassCastException.class));
assertThat(se.getMessage(), containsString("default Serdes"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 0b7c1b3..07ad044 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -39,7 +39,6 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -55,7 +54,6 @@ import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
-import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
@@ -777,7 +775,7 @@ public class ProcessorTopologyTest {
}
@Override
- public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, String> record) {
context.forward(record);
}
}
@@ -794,7 +792,7 @@ public class ProcessorTopologyTest {
}
@Override
- public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, String> record) {
context.forward(record.withTimestamp(record.timestamp() + 10));
}
}
@@ -816,7 +814,7 @@ public class ProcessorTopologyTest {
}
@Override
- public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, String> record) {
context.forward(record);
context.forward(record.withTimestamp(record.timestamp() + 5), firstChild);
context.forward(record, secondChild);
@@ -833,7 +831,7 @@ public class ProcessorTopologyTest {
}
@Override
- public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, String> record) {
context.forward(record.withHeaders(record.headers().add(HEADER)));
}
}
@@ -851,7 +849,7 @@ public class ProcessorTopologyTest {
}
@Override
- public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, String> record) {
context.forward(record.withValue(record.value().split("@")[0]));
}
}
@@ -936,7 +934,7 @@ public class ProcessorTopologyTest {
}
@Override
- public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, String> record) {
store.put(record.key(), record.value());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index bc6f08b..c877ab5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -52,7 +52,7 @@ public class SinkNodeTest {
// When/Then
context.setTime(-1); // ensures a negative timestamp is set for the record we send next
try {
- illTypedSink.process(new Record<>("any key".getBytes(), "any value".getBytes(), -1, new RecordHeaders()), java.util.Optional.empty());
+ illTypedSink.process(new Record<>("any key".getBytes(), "any value".getBytes(), -1, new RecordHeaders()));
fail("Should have thrown StreamsException");
} catch (final StreamsException ignored) {
// expected
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index b54aa6c..e620be7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -51,7 +51,6 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -80,7 +79,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -135,7 +133,7 @@ public class StreamTaskTest {
private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
@Override
- public void process(final Record<Integer, Integer> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<Integer, Integer> record) {
throw new RuntimeException("KABOOM!");
}
@@ -471,7 +469,7 @@ public class StreamTaskTest {
}
@Override
- public void process(final Record<Integer, Integer> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<Integer, Integer> record) {
if (record.key() % 2 == 0) {
context.forward(record);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 6520778..4ee44ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -64,7 +64,6 @@ import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -94,7 +93,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
@@ -1205,7 +1203,7 @@ public class StreamThreadTest {
"proc",
() -> new Processor<Object, Object, Object, Object>() {
@Override
- public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<Object, Object> record) {
if (shouldThrow.get()) {
throw new TaskCorruptedException(singletonMap(task1, new HashSet<>(singleton(storeChangelogTopicPartition))));
} else {
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index bd82e29..6086f97 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -55,7 +55,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
@@ -300,7 +299,7 @@ public class InternalMockProcessorContext
try {
for (final ProcessorNode<?, ?, ?, ?> childNode : thisNode.children()) {
currentNode = childNode;
- ((ProcessorNode<K, V, ?, ?>) childNode).process(record, Optional.ofNullable(recordContext));
+ ((ProcessorNode<K, V, ?, ?>) childNode).process(record);
}
} finally {
currentNode = thisNode;
@@ -337,7 +336,7 @@ public class InternalMockProcessorContext
if (toInternal.child() == null || toInternal.child().equals(childNode.name())) {
currentNode = childNode;
final Record<Object, Object> record = new Record<>(key, value, toInternal.timestamp(), headers());
- ((ProcessorNode<Object, Object, ?, ?>) childNode).process(record, Optional.ofNullable(recordContext));
+ ((ProcessorNode<Object, Object, ?, ?>) childNode).process(record);
toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple
// Processors and toInternal might have been modified
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
index 8bed338..dd56bad 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.time.Duration;
@@ -30,7 +29,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -74,7 +72,7 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VI
}
@Override
- public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<KIn, VIn> record) {
final KIn key = record.key();
final VIn value = record.value();
final KeyValueTimestamp<KIn, VIn> keyValueTimestamp = new KeyValueTimestamp<>(key, value, record.timestamp());
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 82b24d1..370dca7 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -35,6 +36,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.File;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
@@ -84,6 +86,11 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
}
@Override
+ public Optional<RecordMetadata> recordMetadata() {
+ return Optional.of(recordContext());
+ }
+
+ @Override
public void setRecordContext(final ProcessorRecordContext recordContext) {
setRecordMetadata(
recordContext.topic(),
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 6c653c3..f18b763 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -22,17 +22,14 @@ import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
private final MockApiProcessor<K, V, Object, Object> delegate;
- private InternalProcessorContext internalProcessorContext;
public MockProcessor(final PunctuationType punctuationType,
final long scheduleInterval) {
@@ -47,14 +44,12 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
@Override
public void init(final ProcessorContext context) {
super.init(context);
- internalProcessorContext = (InternalProcessorContext) context;
delegate.init((org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>) context);
}
@Override
public void process(final K key, final V value) {
- final Record<K, V> record = new Record<>(key, value, context.timestamp(), context.headers());
- delegate.process(record, Optional.ofNullable(internalProcessorContext.recordContext()));
+ delegate.process(new Record<>(key, value, context.timestamp(), context.headers()));
}
public void checkAndClearProcessResult(final KeyValueTimestamp<?, ?>... expected) {
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 90fd905..a75c250 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -18,12 +18,10 @@ package org.apache.kafka.test;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import java.util.Collections;
-import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
@@ -61,8 +59,8 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn,
}
@Override
- public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
- processor().process(record, recordMetadata);
+ public void process(final Record<KIn, VIn> record) {
+ processor().process(record);
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 4c3fed1..9d22e3b 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -18,12 +18,10 @@ package org.apache.kafka.test;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.SourceNode;
import java.util.ArrayList;
-import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, KOut, VOut> {
@@ -42,7 +40,7 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K
}
@Override
- public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<KIn, VIn> record) {
numReceived++;
keys.add(record.key());
values.add(record.value());
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 48783a6..fd9fb76 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -72,7 +72,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
-import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -266,14 +265,14 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<Object, Object> record) {
processedRecords.add(new TTDTestRecord(
record.key(),
record.value(),
record.headers(),
record.timestamp(),
- recordMetadata.map(RecordMetadata::offset).orElse(-1L),
- recordMetadata.map(RecordMetadata::topic).orElse(null)
+ context.recordMetadata().map(RecordMetadata::offset).orElse(-1L),
+ context.recordMetadata().map(RecordMetadata::topic).orElse(null)
));
context.forward(record);
}
@@ -408,7 +407,7 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<Object, Object> record) {
store.put(record.key(), record.value());
}
}
@@ -1461,7 +1460,7 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, Long> record) {
final Long oldValue = store.get(record.key());
if (oldValue == null || record.value() > oldValue) {
store.put(record.key(), record.value());
@@ -1514,7 +1513,7 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, Long> record) {
store.put(record.key(), record.value());
}
};
@@ -1703,7 +1702,7 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, String> record) {
final String value = record.value();
if (!value.startsWith("recurse-")) {
context.forward(record.withValue("recurse-" + value), "recursiveSink");
@@ -1761,7 +1760,7 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, String> record) {
stateStore.put(record.key(), record.value());
}
}
@@ -1777,7 +1776,7 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ public void process(final Record<String, String> record) {
final String value = record.value();
if (!value.startsWith("recurse-")) {
context.forward(record.withValue("recurse-" + value), "recursiveSink");