You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/01 17:06:18 UTC
[kafka] 01/02: KAFKA-10535: Split ProcessorContext into
Processor/StateStore/RecordContext
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch kip-478-part-4-record-processor
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 22b9c3ab92dcaac8afae628547500e043e44de87
Author: John Roesler <vv...@apache.org>
AuthorDate: Tue Aug 25 16:51:15 2020 -0500
KAFKA-10535: Split ProcessorContext into Processor/StateStore/RecordContext
---
checkstyle/suppressions.xml | 2 +-
.../examples/docs/DeveloperGuideTesting.java | 20 +-
.../kafka/streams/processor/ProcessorContext.java | 5 +-
.../apache/kafka/streams/processor/StateStore.java | 31 ++-
.../kafka/streams/processor/StateStoreContext.java | 112 ++++++++++
.../kafka/streams/processor/api/Processor.java | 14 +-
.../streams/processor/api/ProcessorContext.java | 87 +-------
.../apache/kafka/streams/processor/api/Record.java | 160 +++++++++++++
.../ToInternal.java => api/RecordMetadata.java} | 37 ++-
.../internals/AbstractProcessorContext.java | 65 +++---
.../ForwardingDisabledProcessorContext.java | 2 +-
.../internals/GlobalProcessorContextImpl.java | 30 ++-
.../internals/GlobalStateManagerImpl.java | 3 +-
.../processor/internals/GlobalStateUpdateTask.java | 13 +-
.../internals/InternalApiProcessorContext.java | 119 ----------
.../internals/InternalProcessorContext.java | 5 +-
.../processor/internals/ProcessorAdapter.java | 24 +-
.../internals/ProcessorContextAdapter.java | 235 -------------------
.../processor/internals/ProcessorContextImpl.java | 138 +++++++-----
.../internals/ProcessorContextReverseAdapter.java | 248 ---------------------
.../streams/processor/internals/ProcessorNode.java | 15 +-
.../internals/ProcessorRecordContext.java | 3 +-
.../processor/internals/ProcessorStateManager.java | 3 +-
.../streams/processor/internals/SinkNode.java | 40 +++-
.../streams/processor/internals/SourceNode.java | 8 +-
...xt.java => StoreToProcessorContextAdapter.java} | 70 +++---
.../streams/processor/internals/StreamTask.java | 40 +++-
.../streams/processor/internals/ToInternal.java | 4 +
.../org/apache/kafka/streams/KafkaStreamsTest.java | 15 +-
.../apache/kafka/streams/StreamsBuilderTest.java | 7 +-
.../org/apache/kafka/streams/TopologyTest.java | 10 +-
...KStreamSessionWindowAggregateProcessorTest.java | 5 +-
.../kstream/internals/KStreamTransformTest.java | 5 +-
.../KTableSuppressProcessorMetricsTest.java | 3 +-
.../suppress/KTableSuppressProcessorTest.java | 3 +-
.../internals/AbstractProcessorContextTest.java | 46 ++--
.../internals/GlobalProcessorContextImplTest.java | 19 +-
.../processor/internals/GlobalStateTaskTest.java | 9 +-
.../internals/GlobalStreamThreadTest.java | 4 +-
.../internals/ProcessorContextImplTest.java | 2 +-
.../processor/internals/ProcessorNodeTest.java | 9 +-
.../internals/ProcessorStateManagerTest.java | 5 +-
.../processor/internals/ProcessorTopologyTest.java | 35 +--
.../streams/processor/internals/SinkNodeTest.java | 4 +-
.../processor/internals/StreamTaskTest.java | 16 +-
.../processor/internals/StreamThreadTest.java | 6 +-
.../internals/testutil/ConsumerRecordUtil.java | 46 ++++
.../AbstractRocksDBSegmentedBytesStoreTest.java | 13 +-
.../internals/AbstractSessionBytesStoreTest.java | 9 +-
.../internals/AbstractWindowBytesStoreTest.java | 13 +-
.../state/internals/CachingKeyValueStoreTest.java | 5 +-
.../state/internals/CachingSessionStoreTest.java | 5 +-
.../state/internals/CachingWindowStoreTest.java | 5 +-
.../ChangeLoggingKeyValueBytesStoreTest.java | 3 +-
.../ChangeLoggingSessionBytesStoreTest.java | 6 +-
...geLoggingTimestampedKeyValueBytesStoreTest.java | 3 +-
...angeLoggingTimestampedWindowBytesStoreTest.java | 6 +-
.../ChangeLoggingWindowBytesStoreTest.java | 6 +-
.../CompositeReadOnlyKeyValueStoreTest.java | 3 +-
.../internals/GlobalStateStoreProviderTest.java | 3 +-
.../state/internals/MeteredKeyValueStoreTest.java | 5 +-
.../state/internals/MeteredSessionStoreTest.java | 5 +-
.../MeteredTimestampedKeyValueStoreTest.java | 9 +-
.../MeteredTimestampedWindowStoreTest.java | 11 +-
.../state/internals/MeteredWindowStoreTest.java | 25 ++-
.../streams/state/internals/RocksDBStoreTest.java | 39 ++--
.../internals/RocksDBTimestampedStoreTest.java | 15 +-
.../state/internals/RocksDBWindowStoreTest.java | 7 +-
.../state/internals/SegmentIteratorTest.java | 5 +-
.../internals/TimeOrderedKeyValueBufferTest.java | 37 +--
.../kafka/test/InternalMockProcessorContext.java | 32 ++-
.../org/apache/kafka/test/MockApiProcessor.java | 23 +-
.../kafka/test/MockInternalProcessorContext.java | 12 +
.../java/org/apache/kafka/test/MockProcessor.java | 11 +-
.../org/apache/kafka/test/MockProcessorNode.java | 7 +-
.../java/org/apache/kafka/test/MockSourceNode.java | 13 +-
.../apache/kafka/test/NoOpProcessorContext.java | 15 +-
.../apache/kafka/streams/TopologyTestDriver.java | 18 +-
.../streams/processor/MockProcessorContext.java | 5 +-
.../kafka/streams/TopologyTestDriverTest.java | 127 ++++++-----
80 files changed, 1107 insertions(+), 1176 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index dead182..dd074a4 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -185,7 +185,7 @@
<!-- Streams tests -->
<suppress checks="ClassFanOutComplexity"
- files="StreamThreadTest.java"/>
+ files="(StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/>
<suppress checks="MethodLength"
files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index 90bed05..1ce3445 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.examples.docs;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
@@ -26,6 +27,8 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
@@ -37,6 +40,7 @@ import org.junit.Test;
import java.time.Duration;
import java.time.Instant;
+import java.util.Optional;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -160,24 +164,24 @@ public class DeveloperGuideTesting {
@Override
public void init(final ProcessorContext<String, Long> context) {
this.context = context;
- context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
- context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
+ context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, this::flushStore);
+ context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, this::flushStore);
store = context.getStateStore("aggStore");
}
@Override
- public void process(final String key, final Long value) {
- final Long oldValue = store.get(key);
- if (oldValue == null || value > oldValue) {
- store.put(key, value);
+ public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+ final Long oldValue = store.get(record.key());
+ if (oldValue == null || record.value() > oldValue) {
+ store.put(record.key(), record.value());
}
}
- private void flushStore() {
+ private void flushStore(final long timestamp) {
final KeyValueIterator<String, Long> it = store.all();
while (it.hasNext()) {
final KeyValue<String, Long> next = it.next();
- context.forward(next.key, next.value);
+ context.forward(new Record<>(next.key, next.value, timestamp, new RecordHeaders()));
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index f036869..aafe64e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -88,9 +88,12 @@ public interface ProcessorContext {
* Get the state store given the store name.
*
* @param name The store name
+ * @param <S> The type or interface of the store to return
* @return The state store instance
+ *
+ * @throws ClassCastException if the return type isn't a type or interface of the actual returned store.
*/
- StateStore getStateStore(final String name);
+ <S extends StateStore> S getStateStore(final String name);
/**
* Schedules a periodic operation for processors. A processor may call this method during
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index df53ee2..4f47b12 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
/**
* A storage engine for managing state maintained by a stream processor.
@@ -25,7 +27,7 @@ import org.apache.kafka.streams.errors.StreamsException;
* all data into this store directory.
* The store directory must be created with the state directory.
* The state directory can be obtained via {@link ProcessorContext#stateDir() #stateDir()} using the
- * {@link ProcessorContext} provided via {@link #init(ProcessorContext, StateStore) init(...)}.
+ * {@link ProcessorContext} provided via {@link #init(StateStoreContext, StateStore) init(...)}.
* <p>
* Using nested store directories within the state directory isolates different state stores.
* If a state store would write into the state directory directly, it might conflict with others state stores and thus,
@@ -49,7 +51,28 @@ public interface StateStore {
* Initializes this state store.
* <p>
* The implementation of this function must register the root store in the context via the
- * {@link ProcessorContext#register(StateStore, StateRestoreCallback)} function, where the
+ * {@link org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, StateRestoreCallback)} function,
+ * where the first {@link StateStore} parameter should always be the passed-in {@code root} object, and
+ * the second parameter should be an object of user's implementation
+ * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
+ * <p>
+ * Note that if the state store engine itself supports bulk writes, users can implement another
+ * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
+ * let users implement bulk-load restoration logic instead of restoring one record at a time.
+ * <p>
+ * This method is not called if {@link StateStore#init(StateStoreContext, StateStore)}
+ * is implemented.
+ *
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
+ void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);
+
+ /**
+ * Initializes this state store.
+ * <p>
+ * The implementation of this function must register the root store in the context via the
+ * {@link StateStoreContext#register(StateStore, StateRestoreCallback)} function, where the
* first {@link StateStore} parameter should always be the passed-in {@code root} object, and
* the second parameter should be an object of user's implementation
* of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
@@ -61,7 +84,9 @@ public interface StateStore {
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
- void init(ProcessorContext context, StateStore root);
+ default void init(final StateStoreContext context, final StateStore root) {
+ init(StoreToProcessorContextAdapter.adapt(context), root);
+ }
/**
* Flush any cached data
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
new file mode 100644
index 0000000..43810a2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * State store context interface.
+ */
+public interface StateStoreContext {
+
+ /**
+ * Returns the application id.
+ *
+ * @return the application id
+ */
+ String applicationId();
+
+ /**
+ * Returns the task id.
+ *
+ * @return the task id
+ */
+ TaskId taskId();
+
+ /**
+ * Returns the default key serde.
+ *
+ * @return the key serializer
+ */
+ Serde<?> keySerde();
+
+ /**
+ * Returns the default value serde.
+ *
+ * @return the value serializer
+ */
+ Serde<?> valueSerde();
+
+ /**
+ * Returns the state directory for the partition.
+ *
+ * @return the state directory
+ */
+ File stateDir();
+
+ /**
+ * Returns Metrics instance.
+ *
+ * @return StreamsMetrics
+ */
+ StreamsMetrics metrics();
+
+ /**
+ * Registers and possibly restores the specified storage engine.
+ *
+ * @param store the storage engine
+ * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
+ *
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
+ void register(final StateStore store,
+ final StateRestoreCallback stateRestoreCallback);
+
+ /**
+ * Returns all the application config properties as key/value pairs.
+ *
+ * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
+ * object and associated to the StateStoreContext.
+ *
+ * <p> The type of the values is dependent on the {@link org.apache.kafka.common.config.ConfigDef.Type type} of the property
+ * (e.g. the value of {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG DEFAULT_KEY_SERDE_CLASS_CONFIG}
+ * will be of type {@link Class}, even if it was specified as a String to
+ * {@link org.apache.kafka.streams.StreamsConfig#StreamsConfig(Map) StreamsConfig(Map)}).
+ *
+ * @return all the key/values from the StreamsConfig properties
+ */
+ Map<String, Object> appConfigs();
+
+ /**
+ * Returns all the application config properties with the given key prefix, as key/value pairs
+ * stripping the prefix.
+ *
+ * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
+ * object and associated to the StateStoreContext.
+ *
+ * @param prefix the properties prefix
+ * @return the key/values matching the given prefix from the StreamsConfig properties.
+ */
+ Map<String, Object> appConfigsWithPrefix(final String prefix);
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
index d3656c7..e6feccb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import java.time.Duration;
+import java.util.Optional;
/**
* A processor of key-value pair records.
@@ -46,12 +47,17 @@ public interface Processor<KIn, VIn, KOut, VOut> {
default void init(final ProcessorContext<KOut, VOut> context) {}
/**
- * Process the record with the given key and value.
+ * Process the record. Note that record metadata is undefined in cases such as a forward call from a punctuator.
*
- * @param key the key for the record
- * @param value the value for the record
+ * @param record the record to process
+ * @param recordMetadata the metadata of the record, if it is defined. Note that as long as the processor is
+ * receiving a record downstream of a Source (i.e., the current record is coming from an
+ * input topic), the metadata is defined. On the other hand, if a parent processor has
+ * registered a punctuator and called {@link ProcessorContext#forward(Record)} from that
+ * punctuator, then there is no record from an input topic, and therefore the metadata
+ * would be undefined.
*/
- void process(KIn key, VIn value);
+ void process(Record<KIn, VIn> record, Optional<RecordMetadata> recordMetadata);
/**
* Close this processor and clean up any resources. Be aware that {@code #close()} is called after an internal cleanup.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
index 958126a..f4f0fdb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
@@ -16,22 +16,19 @@
*/
package org.apache.kafka.streams.processor.api;
-import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.To;
import java.io.File;
import java.time.Duration;
import java.util.Map;
+import java.util.Optional;
/**
* Processor context interface.
@@ -84,29 +81,20 @@ public interface ProcessorContext<KForward, VForward> {
StreamsMetrics metrics();
/**
- * Registers and possibly restores the specified storage engine.
- *
- * @param store the storage engine
- * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
- *
- * @throws IllegalStateException If store gets registered after initialized is already finished
- * @throws StreamsException if the store's change log does not contain the partition
- */
- void register(final StateStore store,
- final StateRestoreCallback stateRestoreCallback);
-
- /**
* Get the state store given the store name.
*
* @param name The store name
+ * @param <S> The type or interface of the store to return
* @return The state store instance
+ *
+ * @throws ClassCastException if the return type isn't a type or interface of the actual returned store.
*/
<S extends StateStore> S getStateStore(final String name);
/**
* Schedules a periodic operation for processors. A processor may call this method during
* {@link Processor#init(ProcessorContext) initialization} or
- * {@link Processor#process(Object, Object) processing} to
+ * {@link Processor#process(Record, Optional)} processing} to
* schedule a periodic callback — called a punctuation — to {@link Punctuator#punctuate(long)}.
* The type parameter controls what notion of time is used for punctuation:
* <ul>
@@ -140,23 +128,19 @@ public interface ProcessorContext<KForward, VForward> {
final Punctuator callback);
/**
- * Forwards a key/value pair to all downstream processors.
- * Used the input record's timestamp as timestamp for the output record.
+ * Forwards a record to all child processors.
*
- * @param key key
- * @param value value
+ * @param record The record to forward to all children
*/
- <K extends KForward, V extends VForward> void forward(final K key, final V value);
+ <K extends KForward, V extends VForward> void forward(Record<K, V> record);
/**
- * Forwards a key/value pair to the specified downstream processors.
- * Can be used to set the timestamp of the output record.
+ * Forwards a record to the specified child processor.
*
- * @param key key
- * @param value value
- * @param to the options to use when forwarding
+ * @param record The record to forward
+ * @param childName The name of the child processor to receive the record
*/
- <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to);
+ <K extends KForward, V extends VForward> void forward(Record<K, V> record, final String childName);
/**
* Requests a commit.
@@ -164,53 +148,6 @@ public interface ProcessorContext<KForward, VForward> {
void commit();
/**
- * Returns the topic name of the current input record; could be null if it is not
- * available (for example, if this method is invoked from the punctuate call).
- *
- * @return the topic name
- */
- String topic();
-
- /**
- * Returns the partition id of the current input record; could be -1 if it is not
- * available (for example, if this method is invoked from the punctuate call).
- *
- * @return the partition id
- */
- int partition();
-
- /**
- * Returns the offset of the current input record; could be -1 if it is not
- * available (for example, if this method is invoked from the punctuate call).
- *
- * @return the offset
- */
- long offset();
-
- /**
- * Returns the headers of the current input record; could be null if it is not
- * available (for example, if this method is invoked from the punctuate call).
- *
- * @return the headers
- */
- Headers headers();
-
- /**
- * Returns the current timestamp.
- *
- * <p> If it is triggered while processing a record streamed from the source processor,
- * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
- * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
- *
- * <p> If it is triggered while processing a record generated not from the source processor (for example,
- * if this method is invoked from the punctuate call), timestamp is defined as the current
- * task's stream time, which is defined as the largest timestamp of any record processed by the task.
- *
- * @return the timestamp
- */
- long timestamp();
-
- /**
* Returns all the application config properties as key/value pairs.
*
* <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
new file mode 100644
index 0000000..3be74f8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link Processor}
+ * or a record to forward to downstream processors via {@link ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {
+ private final K key;
+ private final V value;
+ private final long timestamp;
+ private final Headers headers;
+
+ /**
+ * The full constructor, specifying all the attributes of the record.
+ *
+ * @param key The key of the record. May be null.
+ * @param value The value of the record. May be null.
+ * @param timestamp The timestamp of the record. May not be negative.
+ * @param headers The headers of the record. May be null, which will cause subsequent calls
+ * to {@link this#headers()} to return a non-null, empty, {@link Headers} collection.
+ *
+ * @throws IllegalArgumentException if the timestamp is negative.
+ */
+ public Record(final K key, final V value, final long timestamp, final Headers headers) {
+ this.key = key;
+ this.value = value;
+ if (timestamp < 0) {
+ throw new StreamsException(
+ "Malformed Record",
+ new IllegalArgumentException("Timestamp may not be negative. Got: " + timestamp)
+ );
+ }
+ this.timestamp = timestamp;
+ this.headers = new RecordHeaders(headers);
+ }
+
+ /**
+ * Convenience constructor in case you do not wish to specify any headers.
+ * Subsequent calls to {@link this#headers()} will return a non-null, empty, {@link Headers} collection.
+ *
+ * @param key The key of the record. May be null.
+ * @param value The value of the record. May be null.
+ * @param timestamp The timestamp of the record. May not be negative.
+ *
+ * @throws IllegalArgumentException if the timestamp is negative.
+ */
+ public Record(final K key, final V value, final long timestamp) {
+ this.key = key;
+ this.value = value;
+ this.timestamp = timestamp;
+ headers = new RecordHeaders();
+ }
+
+ /**
+ * The key of the record. May be null.
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * The value of the record. May be null.
+ */
+ public V value() {
+ return value;
+ }
+
+ /**
+ * The timestamp of the record. Will never be negative.
+ */
+ public long timestamp() {
+ return timestamp;
+ }
+
+ /**
+ * The headers of the record. Never null.
+ */
+ public Headers headers() {
+ return headers;
+ }
+
+ /**
+ * A convenient way to produce a new record if you only need to change the key.
+ *
+ * Copies the attributes of this record with the key replaced.
+ *
+ * @param key The key of the result record. May be null.
+ * @param <NewK> The type of the new record's key.
+ * @return A new Record instance with all the same attributes (except that the key is replaced).
+ */
+ public <NewK> Record<NewK, V> withKey(final NewK key) {
+ return new Record<>(key, value, timestamp, headers);
+ }
+
+ /**
+ * A convenient way to produce a new record if you only need to change the value.
+ *
+ * Copies the attributes of this record with the value replaced.
+ *
+ * @param value The value of the result record.
+ * @param <NewV> The type of the new record's value.
+ * @return A new Record instance with all the same attributes (except that the value is replaced).
+ */
+ public <NewV> Record<K, NewV> withValue(final NewV value) {
+ return new Record<>(key, value, timestamp, headers);
+ }
+
+ /**
+ * A convenient way to produce a new record if you only need to change the timestamp.
+ *
+ * Copies the attributes of this record with the timestamp replaced.
+ *
+ * @param timestamp The timestamp of the result record.
+ * @return A new Record instance with all the same attributes (except that the timestamp is replaced).
+ */
+ public Record<K, V> withTimestamp(final long timestamp) {
+ return new Record<>(key, value, timestamp, headers);
+ }
+
+ /**
+ * A convenient way to produce a new record if you only need to change the headers.
+ *
+ * Copies the attributes of this record with the headers replaced.
+ *
+ * @param headers The headers of the result record.
+ * @return A new Record instance with all the same attributes (except that the headers are replaced).
+ */
+ public Record<K, V> withHeaders(final Headers headers) {
+ return new Record<>(key, value, timestamp, headers);
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
similarity index 63%
copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
index 6c5798e..532104a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
@@ -14,28 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor.api;
-import org.apache.kafka.streams.processor.To;
+public interface RecordMetadata {
+ /**
+ * @return The topic of the original record received from Kafka
+ */
+ String topic();
-public class ToInternal extends To {
- public ToInternal() {
- super(To.all());
- }
+ /**
+ * @return The partition of the original record received from Kafka
+ */
+ int partition();
- public void update(final To to) {
- super.update(to);
- }
-
- public boolean hasTimestamp() {
- return timestamp != -1;
- }
-
- public long timestamp() {
- return timestamp;
- }
-
- public String child() {
- return childName;
- }
-}
\ No newline at end of file
+ /**
+ * @return The offset of the original record received from Kafka
+ */
+ long offset();
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 6012817..ef222e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -33,7 +34,6 @@ import java.util.Objects;
public abstract class AbstractProcessorContext implements InternalProcessorContext {
- public static final String NONEXIST_TOPIC = "__null_topic__";
private final TaskId taskId;
private final String applicationId;
private final StreamsConfig config;
@@ -112,64 +112,69 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
stateManager().registerStore(store, stateRestoreCallback);
}
- /**
- * @throws IllegalStateException if the task's record is null
- */
@Override
public String topic() {
if (recordContext == null) {
- throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
- }
-
- final String topic = recordContext.topic();
-
- if (NONEXIST_TOPIC.equals(topic)) {
+ // This is only exposed via the deprecated ProcessorContext,
+ // in which case, we're preserving the pre-existing behavior
+ // of returning dummy values when the record context is undefined.
+ // For topic, the dummy value is `null`.
return null;
+ } else {
+ return recordContext.topic();
}
-
- return topic;
}
- /**
- * @throws IllegalStateException if partition is null
- */
@Override
public int partition() {
if (recordContext == null) {
- throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed");
+ // This is only exposed via the deprecated ProcessorContext,
+ // in which case, we're preserving the pre-existing behavior
+ // of returning dummy values when the record context is undefined.
+ // For partition, the dummy value is `-1`.
+ return -1;
+ } else {
+ return recordContext.partition();
}
-
- return recordContext.partition();
}
- /**
- * @throws IllegalStateException if offset is null
- */
@Override
public long offset() {
if (recordContext == null) {
- throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed");
+ // This is only exposed via the deprecated ProcessorContext,
+ // in which case, we're preserving the pre-existing behavior
+ // of returning dummy values when the record context is undefined.
+ // For offset, the dummy value is `-1L`.
+ return -1L;
+ } else {
+ return recordContext.offset();
}
- return recordContext.offset();
}
@Override
public Headers headers() {
if (recordContext == null) {
- throw new IllegalStateException("This should not happen as headers() should only be called while a record is processed");
+ // This is only exposed via the deprecated ProcessorContext,
+ // in which case, we're preserving the pre-existing behavior
+ // of returning dummy values when the record context is undefined.
+ // For headers, the dummy value is an empty headers collection.
+ return new RecordHeaders();
+ } else {
+ return recordContext.headers();
}
- return recordContext.headers();
}
- /**
- * @throws IllegalStateException if timestamp is null
- */
@Override
public long timestamp() {
if (recordContext == null) {
- throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed");
+ // This is only exposed via the deprecated ProcessorContext,
+ // in which case, we're preserving the pre-existing behavior
+ // of returning dummy values when the record context is undefined.
+ // For timestamp, the dummy value is `0L`.
+ return 0L;
+ } else {
+ return recordContext.timestamp();
}
- return recordContext.timestamp();
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 2b8043a..5e654c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -86,7 +86,7 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
}
@Override
- public StateStore getStateStore(final String name) {
+ public <S extends StateStore> S getStateStore(final String name) {
return delegate.getStateStore(name);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 695eb77..9f31309 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
-
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Cancellable;
@@ -26,11 +24,16 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
import java.time.Duration;
-import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
+import java.util.Optional;
+
+import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
public class GlobalProcessorContextImpl extends AbstractProcessorContext {
@@ -49,26 +52,39 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
return stateManager;
}
+ @SuppressWarnings("unchecked")
@Override
- public StateStore getStateStore(final String name) {
+ public <S extends StateStore> S getStateStore(final String name) {
final StateStore store = stateManager.getGlobalStore(name);
- return getReadWriteStore(store);
+ return (S) getReadWriteStore(store);
}
@SuppressWarnings("unchecked")
@Override
- public <KIn, VIn> void forward(final KIn key, final VIn value) {
+ public <K, V> void forward(final Record<K, V> record) {
final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
+ final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(recordContext);
try {
for (final ProcessorNode<?, ?, ?, ?> child : currentNode().children()) {
setCurrentNode(child);
- ((ProcessorNode<KIn, VIn, ?, ?>) child).process(key, value);
+ ((ProcessorNode<K, V, ?, ?>) child).process(record, recordMetadata);
}
} finally {
setCurrentNode(previousNode);
}
}
+ @Override
+ public <K, V> void forward(final Record<K, V> record, final String childName) {
+ throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <KIn, VIn> void forward(final KIn key, final VIn value) {
+ forward(new Record<>(key, value, timestamp(), headers()));
+ }
+
/**
* No-op. This should only be called on GlobalStateStore#flush and there should be no child nodes
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 2864415..bd3aa3a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.RecordConverter;
@@ -147,7 +148,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
globalStoreNames.add(stateStore.name());
final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
changelogTopics.add(sourceTopic);
- stateStore.init(globalProcessorContext, stateStore);
+ stateStore.init((StateStoreContext) globalProcessorContext, stateStore);
}
// make sure each topic-partition from checkpointFileCache is associated with a global state store
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 3664137..360e50e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -22,11 +22,13 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
@@ -104,7 +106,16 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
deserialized.headers());
processorContext.setRecordContext(recordContext);
processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
- ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(deserialized.key(), deserialized.value());
+ final Record<Object, Object> toProcess = new Record<>(
+ deserialized.key(),
+ deserialized.value(),
+ processorContext.timestamp(),
+ processorContext.headers()
+ );
+ ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(
+ toProcess,
+ Optional.of(recordContext)
+ );
}
offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
deleted file mode 100644
index 39c3084..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.RecordContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
-
-/**
- * For internal use so we can update the {@link RecordContext} and current
- * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
- * {@link ThreadCache}
- */
-public interface InternalApiProcessorContext<KForward, VForward> extends ProcessorContext<KForward, VForward> {
-
- @Override
- StreamsMetricsImpl metrics();
-
- /**
- * @param timeMs current wall-clock system timestamp in milliseconds
- */
- void setSystemTimeMs(long timeMs);
-
- /**
- * @return the current wall-clock system timestamp in milliseconds
- */
- long currentSystemTimeMs();
-
- /**
- * Returns the current {@link RecordContext}
- * @return the current {@link RecordContext}
- */
- ProcessorRecordContext recordContext();
-
- /**
- * @param recordContext the {@link ProcessorRecordContext} for the record about to be processes
- */
- void setRecordContext(ProcessorRecordContext recordContext);
-
- /**
- * @param currentNode the current {@link ProcessorNode}
- */
- void setCurrentNode(ProcessorNode<?, ?, ?, ?> currentNode);
-
- /**
- * Get the current {@link ProcessorNode}
- */
- ProcessorNode<?, ?, ?, ?> currentNode();
-
- /**
- * Get the thread-global cache
- */
- ThreadCache cache();
-
- /**
- * Mark this context as being initialized
- */
- void initialize();
-
- /**
- * Mark this context as being uninitialized
- */
- void uninitialize();
-
- /**
- * @return the type of task (active/standby/global) that this context corresponds to
- */
- TaskType taskType();
-
- /**
- * Transition to active task and register a new task and cache to this processor context
- */
- void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache);
-
- /**
- * Transition to standby task and register a dummy cache to this processor context
- */
- void transitionToStandby(final ThreadCache newCache);
-
- /**
- * Register a dirty entry flush listener for a particular namespace
- */
- void registerCacheFlushListener(final String namespace, final DirtyEntryFlushListener listener);
-
- /**
- * Get a correctly typed state store, given a handle on the original builder.
- */
- @SuppressWarnings("unchecked")
- default <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
- return (T) getStateStore(builder.name());
- }
-
- void logChange(final String storeName,
- final Bytes key,
- final byte[] value,
- final long timestamp);
-
- String changelogFor(final String storeName);
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 8e4ec25..f4b922b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -33,7 +34,9 @@ import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe
* {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
* {@link ThreadCache}
*/
-public interface InternalProcessorContext extends ProcessorContext {
+public interface InternalProcessorContext
+ extends ProcessorContext, org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>, StateStoreContext {
+
BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer();
ByteArraySerializer BYTEARRAY_VALUE_SERIALIZER = new ByteArraySerializer();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
index d8e4af4..291a99e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
@@ -19,9 +19,14 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+
+import java.util.Optional;
public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
private final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate;
+ private InternalProcessorContext context;
public static <KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut> adapt(final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate) {
if (delegate == null) {
@@ -47,12 +52,25 @@ public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<K
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext<KOut, VOut> context) {
- delegate.init(ProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context));
+ this.context = (InternalProcessorContext) context;
+ delegate.init((org.apache.kafka.streams.processor.ProcessorContext) context);
}
@Override
- public void process(final KIn key, final VIn value) {
- delegate.process(key, value);
+ public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+ final ProcessorRecordContext processorRecordContext = context.recordContext();
+ try {
+ context.setRecordContext(new ProcessorRecordContext(
+ record.timestamp(),
+ context.offset(),
+ context.partition(),
+ context.topic(),
+ record.headers()
+ ));
+ delegate.process(record.key(), record.value());
+ } finally {
+ context.setRecordContext(processorRecordContext);
+ }
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
deleted file mode 100644
index 85dbd52..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class ProcessorContextAdapter<KForward, VForward>
- implements ProcessorContext<KForward, VForward>, InternalApiProcessorContext<KForward, VForward> {
-
- private final InternalProcessorContext delegate;
-
- @SuppressWarnings("unchecked")
- public static <KForward, VForward> InternalApiProcessorContext<KForward, VForward> adapt(final InternalProcessorContext delegate) {
- if (delegate instanceof ProcessorContextReverseAdapter) {
- return (InternalApiProcessorContext<KForward, VForward>) ((ProcessorContextReverseAdapter) delegate).delegate();
- } else {
- return new ProcessorContextAdapter<>(delegate);
- }
- }
-
- private ProcessorContextAdapter(final InternalProcessorContext delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public String applicationId() {
- return delegate.applicationId();
- }
-
- @Override
- public TaskId taskId() {
- return delegate.taskId();
- }
-
- @Override
- public Serde<?> keySerde() {
- return delegate.keySerde();
- }
-
- @Override
- public Serde<?> valueSerde() {
- return delegate.valueSerde();
- }
-
- @Override
- public File stateDir() {
- return delegate.stateDir();
- }
-
- @Override
- public StreamsMetricsImpl metrics() {
- return delegate.metrics();
- }
-
- @Override
- public void setSystemTimeMs(final long timeMs) {
- delegate.setSystemTimeMs(timeMs);
- }
-
- @Override
- public long currentSystemTimeMs() {
- return delegate.currentSystemTimeMs();
- }
-
- @Override
- public ProcessorRecordContext recordContext() {
- return delegate.recordContext();
- }
-
- @Override
- public void setRecordContext(final ProcessorRecordContext recordContext) {
- delegate.setRecordContext(recordContext);
- }
-
- @Override
- public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
- delegate.setCurrentNode(currentNode);
- }
-
- @Override
- public ProcessorNode<?, ?, ?, ?> currentNode() {
- return delegate.currentNode();
- }
-
- @Override
- public ThreadCache cache() {
- return delegate.cache();
- }
-
- @Override
- public void initialize() {
- delegate.initialize();
- }
-
- @Override
- public void uninitialize() {
- delegate.uninitialize();
- }
-
- @Override
- public Task.TaskType taskType() {
- return delegate.taskType();
- }
-
- @Override
- public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
- delegate.transitionToActive(streamTask, recordCollector, newCache);
- }
-
- @Override
- public void transitionToStandby(final ThreadCache newCache) {
- delegate.transitionToStandby(newCache);
- }
-
- @Override
- public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) {
- delegate.registerCacheFlushListener(namespace, listener);
- }
-
- @Override
- public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
- return delegate.getStateStore(builder);
- }
-
- @Override
- public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) {
- delegate.logChange(storeName, key, value, timestamp);
- }
-
- @Override
- public String changelogFor(final String storeName) {
- return delegate.changelogFor(storeName);
- }
-
- @Override
- public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
- delegate.register(store, stateRestoreCallback);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <S extends StateStore> S getStateStore(final String name) {
- return (S) delegate.getStateStore(name);
- }
-
- @Override
- public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
- return delegate.schedule(interval, type, callback);
- }
-
- @Override
- public <K extends KForward, V extends VForward> void forward(final K key, final V value) {
- delegate.forward(key, value);
- }
-
- @Override
- public <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to) {
- delegate.forward(key, value, to);
- }
-
- @Override
- public void commit() {
- delegate.commit();
- }
-
- @Override
- public String topic() {
- return delegate.topic();
- }
-
- @Override
- public int partition() {
- return delegate.partition();
- }
-
- @Override
- public long offset() {
- return delegate.offset();
- }
-
- @Override
- public Headers headers() {
- return delegate.headers();
- }
-
- @Override
- public long timestamp() {
- return delegate.timestamp();
- }
-
- @Override
- public Map<String, Object> appConfigs() {
- return delegate.appConfigs();
- }
-
- @Override
- public Map<String, Object> appConfigsWithPrefix(final String prefix) {
- return delegate.appConfigsWithPrefix(prefix);
- }
-
- InternalProcessorContext delegate() {
- return delegate;
- }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index e12dfe1..309b813 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
@@ -30,13 +27,18 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
import java.time.Duration;
+import java.util.HashMap;
import java.util.List;
-import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
+import java.util.Map;
+import java.util.Optional;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
@@ -47,9 +49,6 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
private StreamTask streamTask;
private RecordCollector collector;
- private final ToInternal toInternal = new ToInternal();
- private final static To SEND_TO_ALL = To.all();
-
private final ProcessorStateManager stateManager;
final Map<String, DirtyEntryFlushListener> cacheNameToFlushListener = new HashMap<>();
@@ -135,8 +134,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
* @throws StreamsException if an attempt is made to access this state store from an unknown node
* @throws UnsupportedOperationException if the current streamTask type is standby
*/
+ @SuppressWarnings("unchecked")
@Override
- public StateStore getStateStore(final String name) {
+ public <S extends StateStore> S getStateStore(final String name) {
throwUnsupportedOperationExceptionIfStandby("getStateStore");
if (currentNode() == null) {
throw new StreamsException("Accessing from an unknown node");
@@ -144,7 +144,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
final StateStore globalStore = stateManager.getGlobalStore(name);
if (globalStore != null) {
- return getReadOnlyStore(globalStore);
+ return (S) getReadOnlyStore(globalStore);
}
if (!currentNode().stateStores.contains(name)) {
@@ -159,84 +159,116 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
final StateStore store = stateManager.getStore(name);
- return getReadWriteStore(store);
- }
-
- @Override
- public <K, V> void forward(final K key,
- final V value) {
- throwUnsupportedOperationExceptionIfStandby("forward");
- forward(key, value, SEND_TO_ALL);
- }
-
- @Override
- @Deprecated
- public <K, V> void forward(final K key,
- final V value,
- final int childIndex) {
- throwUnsupportedOperationExceptionIfStandby("forward");
- forward(
- key,
- value,
- To.child((currentNode().children()).get(childIndex).name()));
+ return (S) getReadWriteStore(store);
}
@Override
- @Deprecated
- public <K, V> void forward(final K key,
- final V value,
- final String childName) {
- throwUnsupportedOperationExceptionIfStandby("forward");
- forward(key, value, To.child(childName));
+ public <K, V> void forward(final Record<K, V> record) {
+ forward(record, null);
}
@SuppressWarnings("unchecked")
@Override
- public <K, V> void forward(final K key,
- final V value,
- final To to) {
+ public <K, V> void forward(final Record<K, V> record, final String childName) {
throwUnsupportedOperationExceptionIfStandby("forward");
+
final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
final ProcessorRecordContext previousContext = recordContext;
try {
- toInternal.update(to);
- if (toInternal.hasTimestamp()) {
+ if (recordContext != null && record.timestamp() != recordContext.timestamp()) {
recordContext = new ProcessorRecordContext(
- toInternal.timestamp(),
+ record.timestamp(),
recordContext.offset(),
recordContext.partition(),
recordContext.topic(),
recordContext.headers());
}
- final String sendTo = toInternal.child();
- if (sendTo == null) {
+ if (childName == null) {
final List<? extends ProcessorNode<?, ?, ?, ?>> children = currentNode().children();
for (final ProcessorNode<?, ?, ?, ?> child : children) {
- forward((ProcessorNode<K, V, ?, ?>) child, key, value);
+ forwardInternal((ProcessorNode<K, V, ?, ?>) child, record);
}
} else {
- final ProcessorNode<?, ?, ?, ?> child = currentNode().getChild(sendTo);
+ final ProcessorNode<?, ?, ?, ?> child = currentNode().getChild(childName);
if (child == null) {
- throw new StreamsException("Unknown downstream node: " + sendTo
- + " either does not exist or is not connected to this processor.");
+ throw new StreamsException("Unknown downstream node: " + childName
+ + " either does not exist or is not connected to this processor.");
}
- forward((ProcessorNode<K, V, ?, ?>) child, key, value);
+ forwardInternal((ProcessorNode<K, V, ?, ?>) child, record);
}
+
} finally {
recordContext = previousContext;
setCurrentNode(previousNode);
}
}
- private <K, V> void forward(final ProcessorNode<K, V, ?, ?> child,
- final K key,
- final V value) {
+ @Override
+ public <K, V> void forward(final K key,
+ final V value) {
+ final Record<K, V> toForward = new Record<>(
+ key,
+ value,
+ timestamp(),
+ headers()
+ );
+ forward(toForward);
+ }
+
+ @Override
+ @Deprecated
+ public <K, V> void forward(final K key,
+ final V value,
+ final int childIndex) {
+ final Record<K, V> toForward = new Record<>(
+ key,
+ value,
+ timestamp(),
+ headers()
+ );
+ forward(toForward, (currentNode().children()).get(childIndex).name());
+ }
+
+ @Override
+ @Deprecated
+ public <K, V> void forward(final K key,
+ final V value,
+ final String childName) {
+ final Record<K, V> toForward = new Record<>(
+ key,
+ value,
+ timestamp(),
+ headers()
+ );
+ forward(toForward, childName);
+ }
+
+ @Override
+ public <K, V> void forward(final K key,
+ final V value,
+ final To to) {
+ final ToInternal toInternal = new ToInternal(to);
+ final Record<K, V> toForward = new Record<>(
+ key,
+ value,
+ toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
+ headers()
+ );
+ forward(toForward, toInternal.child());
+ }
+
+ private <K, V> void forwardInternal(final ProcessorNode<K, V, ?, ?> child,
+ final Record<K, V> record) {
setCurrentNode(child);
- child.process(key, value);
+
+ final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(recordContext);
+
+ child.process(record, recordMetadata);
+
if (child.isTerminalNode()) {
- streamTask.maybeRecordE2ELatency(timestamp(), currentSystemTimeMs(), child.name());
+ streamTask.maybeRecordE2ELatency(record.timestamp(), currentSystemTimeMs(), child.name());
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
deleted file mode 100644
index 6e82a5e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
- private final InternalApiProcessorContext<Object, Object> delegate;
-
- static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
- if (delegate instanceof ProcessorContextAdapter) {
- return ((ProcessorContextAdapter<Object, Object>) delegate).delegate();
- } else {
- return new ProcessorContextReverseAdapter(delegate);
- }
- }
-
- private ProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public String applicationId() {
- return delegate.applicationId();
- }
-
- @Override
- public TaskId taskId() {
- return delegate.taskId();
- }
-
- @Override
- public Serde<?> keySerde() {
- return delegate.keySerde();
- }
-
- @Override
- public Serde<?> valueSerde() {
- return delegate.valueSerde();
- }
-
- @Override
- public File stateDir() {
- return delegate.stateDir();
- }
-
- @Override
- public StreamsMetricsImpl metrics() {
- return delegate.metrics();
- }
-
- @Override
- public void setSystemTimeMs(final long timeMs) {
- delegate.setSystemTimeMs(timeMs);
- }
-
- @Override
- public long currentSystemTimeMs() {
- return delegate.currentSystemTimeMs();
- }
-
- @Override
- public ProcessorRecordContext recordContext() {
- return delegate.recordContext();
- }
-
- @Override
- public void setRecordContext(final ProcessorRecordContext recordContext) {
- delegate.setRecordContext(recordContext);
- }
-
- @Override
- public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
- delegate.setCurrentNode(currentNode);
- }
-
- @Override
- public ProcessorNode<?, ?, ?, ?> currentNode() {
- return delegate.currentNode();
- }
-
- @Override
- public ThreadCache cache() {
- return delegate.cache();
- }
-
- @Override
- public void initialize() {
- delegate.initialize();
- }
-
- @Override
- public void uninitialize() {
- delegate.uninitialize();
- }
-
- @Override
- public Task.TaskType taskType() {
- return delegate.taskType();
- }
-
- @Override
- public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
- delegate.transitionToActive(streamTask, recordCollector, newCache);
- }
-
- @Override
- public void transitionToStandby(final ThreadCache newCache) {
- delegate.transitionToStandby(newCache);
- }
-
- @Override
- public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) {
- delegate.registerCacheFlushListener(namespace, listener);
- }
-
- @Override
- public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
- return delegate.getStateStore(builder);
- }
-
- @Override
- public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) {
- delegate.logChange(storeName, key, value, timestamp);
- }
-
- @Override
- public String changelogFor(final String storeName) {
- return delegate.changelogFor(storeName);
- }
-
- @Override
- public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
- delegate.register(store, stateRestoreCallback);
- }
-
- @Override
- public StateStore getStateStore(final String name) {
- return delegate.getStateStore(name);
- }
-
- @Deprecated
- @Override
- public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) {
- return delegate.schedule(Duration.ofMillis(intervalMs), type, callback);
- }
-
- @Override
- public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException {
- return delegate.schedule(interval, type, callback);
- }
-
- @Override
- public <K, V> void forward(final K key, final V value) {
- delegate.forward(key, value);
- }
-
- @Override
- public <K, V> void forward(final K key, final V value, final To to) {
- delegate.forward(key, value, to);
- }
-
- @Deprecated
- @Override
- public <K, V> void forward(final K key, final V value, final int childIndex) {
- delegate.forward(key, value, To.child((currentNode().children()).get(childIndex).name()));
- }
-
- @Deprecated
- @Override
- public <K, V> void forward(final K key, final V value, final String childName) {
- delegate.forward(key, value, To.child(childName));
- }
-
- @Override
- public void commit() {
- delegate.commit();
- }
-
- @Override
- public String topic() {
- return delegate.topic();
- }
-
- @Override
- public int partition() {
- return delegate.partition();
- }
-
- @Override
- public long offset() {
- return delegate.offset();
- }
-
- @Override
- public Headers headers() {
- return delegate.headers();
- }
-
- @Override
- public long timestamp() {
- return delegate.timestamp();
- }
-
- @Override
- public Map<String, Object> appConfigs() {
- return delegate.appConfigs();
- }
-
- @Override
- public Map<String, Object> appConfigsWithPrefix(final String prefix) {
- return delegate.appConfigsWithPrefix(prefix);
- }
-
- InternalApiProcessorContext<Object, Object> delegate() {
- return delegate;
- }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 122a3bc..2939525 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -22,6 +22,9 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -29,6 +32,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
@@ -104,6 +108,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
childByName.put(child.name, child);
}
+ @SuppressWarnings("unchecked")
public void init(final InternalProcessorContext context) {
if (!closed)
throw new IllegalStateException("The processor is not closed");
@@ -114,7 +119,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
maybeMeasureLatency(
() -> {
if (processor != null) {
- processor.init(ProcessorContextAdapter.adapt(context));
+ processor.init((ProcessorContext<KOut, VOut>) context);
}
},
time,
@@ -171,14 +176,14 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
}
- public void process(final KIn key, final VIn value) {
+ public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
throwIfClosed();
try {
- maybeMeasureLatency(() -> processor.process(key, value), time, processSensor);
+ maybeMeasureLatency(() -> processor.process(record, recordMetadata), time, processSensor);
} catch (final ClassCastException e) {
- final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
- final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
+ final String keyClass = record.key() == null ? "unknown because key is null" : record.key().getClass().getName();
+ final String valueClass = record.value() == null ? "unknown because value is null" : record.value().getClass().getName();
throw new StreamsException(String.format("ClassCastException invoking Processor. Do the Processor's "
+ "input types match the deserialized types? Check the Serde setup and change the default Serdes in "
+ "StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept "
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 5dd0062..2e979ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.processor.RecordContext;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import java.nio.ByteBuffer;
import java.util.Objects;
@@ -29,7 +30,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
-public class ProcessorRecordContext implements RecordContext {
+public class ProcessorRecordContext implements RecordContext, RecordMetadata {
private final long timestamp;
private final long offset;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index aa69752..4948ccf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -197,7 +198,7 @@ public class ProcessorStateManager implements StateManager {
if (stores.containsKey(store.name())) {
maybeRegisterStoreWithChangelogReader(store.name());
} else {
- store.init(processorContext, store);
+ store.init((StateStoreContext) processorContext, store);
}
log.trace("Registered state store {}", store.name());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index a77deb8..f8840e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -21,6 +21,10 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+
+import java.util.Optional;
public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
@@ -76,19 +80,43 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut
}
}
-
@Override
- public void process(final KIn key, final VIn value) {
+ public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
final RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
- final long timestamp = context.timestamp();
+ final KIn key = record.key();
+ final VIn value = record.value();
+
+ final long timestamp = record.timestamp();
if (timestamp < 0) {
- throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + key + ":" + value + ">.");
+ throw new StreamsException(
+ "Invalid (negative) timestamp of "
+ + timestamp
+ + " for output record <" + key + ":" + value + ">."
+ );
}
- final String topic = topicExtractor.extract(key, value, this.context.recordContext());
+ // Prefer the record metadata if defined,
+ // and fall back to the context (which is undefined and dummy values,
+ // but extractors may still depend on the current behavior.
+ final Optional<ProcessorRecordContext> maybeContext =
+ recordMetadata.map(
+ m -> new ProcessorRecordContext(timestamp, m.offset(), m.partition(), m.topic(), record.headers())
+ );
+ final ProcessorRecordContext contextForExtraction =
+ maybeContext.orElseGet(
+ () -> new ProcessorRecordContext(
+ timestamp,
+ context.offset(),
+ context.partition(),
+ context.topic(),
+ record.headers()
+ )
+ );
+
+ final String topic = topicExtractor.extract(key, value, contextForExtraction);
- collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner);
+ collector.send(topic, key, value, record.headers(), timestamp, keySerializer, valSerializer, partitioner);
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 6dbdfee..711b4c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -21,8 +21,12 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
+import java.util.Optional;
+
public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
private InternalProcessorContext context;
@@ -92,8 +96,8 @@ public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KO
@Override
- public void process(final KIn key, final VIn value) {
- context.forward(key, value);
+ public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+ context.forward(record);
processAtSourceSensor.record(1.0d, context.currentSystemTimeMs());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
similarity index 60%
copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
index 2b8043a..ae2e42f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
@@ -19,34 +19,33 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import java.io.File;
import java.time.Duration;
import java.util.Map;
-import java.util.Objects;
-/**
- * {@code ProcessorContext} implementation that will throw on any forward call.
- */
-public final class ForwardingDisabledProcessorContext implements ProcessorContext {
- private final ProcessorContext delegate;
+public final class StoreToProcessorContextAdapter implements ProcessorContext {
+ private final StateStoreContext delegate;
- private static final String EXPLANATION = "ProcessorContext#forward() is not supported from this context, "
- + "as the framework must ensure the key is not changed (#forward allows changing the key on "
- + "messages which are sent). Try another function, which doesn't allow the key to be changed "
- + "(for example - #tranformValues).";
+ public static ProcessorContext adapt(final StateStoreContext delegate) {
+ if (delegate instanceof ProcessorContext) {
+ return (ProcessorContext) delegate;
+ } else {
+ return new StoreToProcessorContextAdapter(delegate);
+ }
+ }
- public ForwardingDisabledProcessorContext(final ProcessorContext delegate) {
- this.delegate = Objects.requireNonNull(delegate, "delegate");
+ private StoreToProcessorContextAdapter(final StateStoreContext delegate) {
+ this.delegate = delegate;
}
@Override
@@ -80,81 +79,76 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
}
@Override
- public void register(final StateStore store,
- final StateRestoreCallback stateRestoreCallback) {
+ public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
delegate.register(store, stateRestoreCallback);
}
@Override
- public StateStore getStateStore(final String name) {
- return delegate.getStateStore(name);
+ public <S extends StateStore> S getStateStore(final String name) {
+ throw new UnsupportedOperationException("StateStores can't access getStateStore.");
}
- @Override
@Deprecated
- public Cancellable schedule(final long intervalMs,
- final PunctuationType type,
- final Punctuator callback) {
- return delegate.schedule(intervalMs, type, callback);
+ @Override
+ public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) {
+ throw new UnsupportedOperationException("StateStores can't access schedule.");
}
@Override
- public Cancellable schedule(final Duration interval,
- final PunctuationType type,
- final Punctuator callback) throws IllegalArgumentException {
- return delegate.schedule(interval, type, callback);
+ public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
+ throw new UnsupportedOperationException("StateStores can't access schedule.");
}
@Override
public <K, V> void forward(final K key, final V value) {
- throw new StreamsException(EXPLANATION);
+ throw new UnsupportedOperationException("StateStores can't access forward.");
}
@Override
public <K, V> void forward(final K key, final V value, final To to) {
- throw new StreamsException(EXPLANATION);
+ throw new UnsupportedOperationException("StateStores can't access forward.");
}
- @Override
@Deprecated
+ @Override
public <K, V> void forward(final K key, final V value, final int childIndex) {
- throw new StreamsException(EXPLANATION);
+ throw new UnsupportedOperationException("StateStores can't access forward.");
}
- @Override
@Deprecated
+ @Override
public <K, V> void forward(final K key, final V value, final String childName) {
- throw new StreamsException(EXPLANATION);
+ throw new UnsupportedOperationException("StateStores can't access forward.");
}
@Override
public void commit() {
- delegate.commit();
+ throw new UnsupportedOperationException("StateStores can't access commit.");
}
@Override
public String topic() {
- return delegate.topic();
+ throw new UnsupportedOperationException("StateStores can't access topic.");
}
@Override
public int partition() {
- return delegate.partition();
+ throw new UnsupportedOperationException("StateStores can't access partition.");
}
@Override
public long offset() {
- return delegate.offset();
+ throw new UnsupportedOperationException("StateStores can't access offset.");
}
@Override
public Headers headers() {
- return delegate.headers();
+ throw new UnsupportedOperationException("StateStores can't access headers.");
}
@Override
public long timestamp() {
- return delegate.timestamp();
+ throw new UnsupportedOperationException("StateStores can't access timestamp.");
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 271724c..c4e4ff3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -36,6 +36,8 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
@@ -54,6 +56,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -670,9 +673,27 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
log.trace("Start processing one record [{}]", record);
- updateProcessorContext(record, currNode, wallClockTime);
+ updateProcessorContext(
+ currNode,
+ wallClockTime,
+ new ProcessorRecordContext(
+ record.timestamp,
+ record.offset(),
+ record.partition(),
+ record.topic(),
+ record.headers()
+ )
+ );
+
maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());
- maybeMeasureLatency(() -> currNode.process(record.key(), record.value()), time, processLatencySensor);
+ final Record<Object, Object> toProcess = new Record<>(
+ record.key(),
+ record.value(),
+ processorContext.timestamp(),
+ processorContext.headers()
+ );
+ final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(processorContext.recordContext());
+ maybeMeasureLatency(() -> currNode.process(toProcess, recordMetadata), time, processLatencySensor);
log.trace("Completed processing one record [{}]", record);
@@ -742,8 +763,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix));
}
- updateProcessorContext(new StampedRecord(new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null),
- timestamp), node, time.milliseconds());
+ updateProcessorContext(node, time.milliseconds(), null);
if (log.isTraceEnabled()) {
log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", node.name(), timestamp, type);
@@ -760,14 +780,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
}
- private void updateProcessorContext(final StampedRecord record, final ProcessorNode<?, ?, ?, ?> currNode, final long wallClockTime) {
- processorContext.setRecordContext(
- new ProcessorRecordContext(
- record.timestamp,
- record.offset(),
- record.partition(),
- record.topic(),
- record.headers()));
+ private void updateProcessorContext(final ProcessorNode<?, ?, ?, ?> currNode,
+ final long wallClockTime,
+ final ProcessorRecordContext recordContext) {
+ processorContext.setRecordContext(recordContext);
processorContext.setCurrentNode(currNode);
processorContext.setSystemTimeMs(wallClockTime);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
index 6c5798e..8865846 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
@@ -23,6 +23,10 @@ public class ToInternal extends To {
super(To.all());
}
+ public ToInternal(final To to) {
+ super(to);
+ }
+
public void update(final To to) {
super.update(to);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 326d277..7ff2c6c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -40,6 +40,8 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -76,6 +78,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
@@ -866,9 +869,9 @@ public class KafkaStreamsTest {
}
@Override
- public void process(final String key, final String value) {
- if (value.length() % 2 == 0) {
- context.forward(key, key + value);
+ public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ if (record.value().length() % 2 == 0) {
+ context.forward(record.withValue(record.key() + record.value()));
}
}
}, "source")
@@ -967,11 +970,11 @@ public class KafkaStreamsTest {
}
@Override
- public void process(final String key, final String value) {
+ public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
final KeyValueStore<String, Long> kvStore = context.getStateStore(storeName);
- kvStore.put(key, 5L);
+ kvStore.put(record.key(), 5L);
- context.forward(key, "5");
+ context.forward(record.withValue("5"));
context.commit();
}
}, "source")
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 807c50e..b308b4f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -39,6 +39,8 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -58,6 +60,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import static java.util.Arrays.asList;
@@ -106,8 +109,8 @@ public class StreamsBuilderTest {
}
@Override
- public void process(final String key, final String value) {
- store.put(key, value);
+ public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ store.put(record.key(), record.value());
}
}
);
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 57e742b..9e9f415 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -29,6 +29,8 @@ import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -44,6 +46,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
@@ -371,15 +374,12 @@ public class TopologyTest {
public Processor<Object, Object, Object, Object> get() {
return new Processor<Object, Object, Object, Object>() {
@Override
- public void init(final ProcessorContext context) {
+ public void init(final ProcessorContext<Object, Object> context) {
context.getStateStore(STORE_NAME);
}
@Override
- public void process(final Object key, final Object value) { }
-
- @Override
- public void close() { }
+ public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) { }
};
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 8a65bc3..ab2adbf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.To;
@@ -131,7 +132,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
sessionStore = storeBuilder.build();
- sessionStore.init(context, sessionStore);
+ sessionStore.init((StateStoreContext) context, sessionStore);
}
@After
@@ -640,7 +641,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
Serdes.Long())
.withLoggingDisabled();
final SessionStore<String, Long> sessionStore = storeBuilder.build();
- sessionStore.init(context, sessionStore);
+ sessionStore.init((StateStoreContext) context, sessionStore);
return context;
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 19eb1d2..08bd5e8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@@ -60,7 +61,7 @@ public class KStreamTransformTest {
context.schedule(
Duration.ofMillis(1),
PunctuationType.WALL_CLOCK_TIME,
- timestamp -> context.forward(-1, (int) timestamp)
+ timestamp -> context.forward(-1, (int) timestamp, To.all().withTimestamp(timestamp))
);
}
@@ -126,7 +127,7 @@ public class KStreamTransformTest {
context.schedule(
Duration.ofMillis(1),
PunctuationType.WALL_CLOCK_TIME,
- timestamp -> context.forward(-1, (int) timestamp));
+ timestamp -> context.forward(-1, (int) timestamp, To.all().withTimestamp(timestamp)));
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index dc0b69d..3b46765 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
@@ -243,7 +244,7 @@ public class KTableSuppressProcessorMetricsTest {
context.setCurrentNode(new ProcessorNode("testNode"));
context.setSystemTimeMs(time.milliseconds());
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
processor.init(context);
final long timestamp = 100L;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 1d1d6fb..778af9a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
import org.apache.kafka.test.MockInternalProcessorContext;
@@ -85,7 +86,7 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = new MockInternalProcessorContext();
context.setCurrentNode(new ProcessorNode("testNode"));
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
processor.init(context);
this.processor = processor;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index f39d730..4cef063 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
@@ -44,6 +45,8 @@ import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
public class AbstractProcessorContextTest {
@@ -81,14 +84,9 @@ public class AbstractProcessorContextTest {
}
@Test
- public void shouldThrowIllegalStateExceptionOnTopicIfNoRecordContext() {
+ public void shouldReturnNullTopicIfNoRecordContext() {
context.setRecordContext(null);
- try {
- context.topic();
- fail("should throw illegal state exception when record context is null");
- } catch (final IllegalStateException e) {
- // pass
- }
+ assertThat(context.topic(), is(nullValue()));
}
@Test
@@ -104,19 +102,14 @@ public class AbstractProcessorContextTest {
@Test
public void shouldReturnNullIfTopicEqualsNonExistTopic() {
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null));
+ context.setRecordContext(null);
assertThat(context.topic(), nullValue());
}
@Test
- public void shouldThrowIllegalStateExceptionOnPartitionIfNoRecordContext() {
+ public void shouldReturnDummyPartitionIfNoRecordContext() {
context.setRecordContext(null);
- try {
- context.partition();
- fail("should throw illegal state exception when record context is null");
- } catch (final IllegalStateException e) {
- // pass
- }
+ assertThat(context.partition(), is(-1));
}
@Test
@@ -140,14 +133,9 @@ public class AbstractProcessorContextTest {
}
@Test
- public void shouldThrowIllegalStateExceptionOnTimestampIfNoRecordContext() {
+ public void shouldReturnDummyTimestampIfNoRecordContext() {
context.setRecordContext(null);
- try {
- context.timestamp();
- fail("should throw illegal state exception when record context is null");
- } catch (final IllegalStateException e) {
- // pass
- }
+ assertThat(context.timestamp(), is(0L));
}
@Test
@@ -161,9 +149,9 @@ public class AbstractProcessorContextTest {
}
@Test
- public void shouldReturnNullIfHeadersAreNotSet() {
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null));
- assertThat(context.headers(), nullValue());
+ public void shouldReturnEmptyHeadersIfHeadersAreNotSet() {
+ context.setRecordContext(null);
+ assertThat(context.headers(), is(emptyIterable()));
}
@Test
@@ -211,7 +199,7 @@ public class AbstractProcessorContextTest {
}
@Override
- public StateStore getStateStore(final String name) {
+ public <S extends StateStore> S getStateStore(final String name) {
return null;
}
@@ -231,6 +219,12 @@ public class AbstractProcessorContextTest {
}
@Override
+ public <K, V> void forward(final Record<K, V> record) {}
+
+ @Override
+ public <K, V> void forward(final Record<K, V> record, final String childName) {}
+
+ @Override
public <K, V> void forward(final K key, final V value) {}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index ad8cd0a..6322fd2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
@@ -30,6 +32,7 @@ import org.hamcrest.core.IsInstanceOf;
import org.junit.Before;
import org.junit.Test;
+import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
@@ -97,11 +100,13 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldForwardToSingleChild() {
- child.process(null, null);
+ child.process(anyObject(), anyObject());
expectLastCall();
+ expect(recordContext.timestamp()).andStubReturn(0L);
+ expect(recordContext.headers()).andStubReturn(new RecordHeaders());
replay(child, recordContext);
- globalContext.forward(null, null);
+ globalContext.forward((Object /*forcing a call to the K/V forward*/) null, null);
verify(child, recordContext);
}
@@ -142,7 +147,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForKeyValueStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
try {
- store.init(null, null);
+ store.init((ProcessorContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -151,7 +156,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForTimestampedKeyValueStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
try {
- store.init(null, null);
+ store.init((ProcessorContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -160,7 +165,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForWindowStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
try {
- store.init(null, null);
+ store.init((ProcessorContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -169,7 +174,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForTimestampedWindowStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
try {
- store.init(null, null);
+ store.init((ProcessorContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -178,7 +183,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForSessionStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
try {
- store.init(null, null);
+ store.init((ProcessorContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 6ce2bae..3cc06be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Set;
import static java.util.Arrays.asList;
+import static org.apache.kafka.streams.processor.internals.testutil.ConsumerRecordUtil.record;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@@ -127,7 +128,7 @@ public class GlobalStateTaskTest {
@Test
public void shouldProcessRecordsForTopic() {
globalStateTask.initialize();
- globalStateTask.update(new ConsumerRecord<>(topic1, 1, 1, "foo".getBytes(), "bar".getBytes()));
+ globalStateTask.update(record(topic1, 1, 1, "foo".getBytes(), "bar".getBytes()));
assertEquals(1, sourceOne.numReceived);
assertEquals(0, sourceTwo.numReceived);
}
@@ -136,7 +137,7 @@ public class GlobalStateTaskTest {
public void shouldProcessRecordsForOtherTopic() {
final byte[] integerBytes = new IntegerSerializer().serialize("foo", 1);
globalStateTask.initialize();
- globalStateTask.update(new ConsumerRecord<>(topic2, 1, 1, integerBytes, integerBytes));
+ globalStateTask.update(record(topic2, 1, 1, integerBytes, integerBytes));
assertEquals(1, sourceTwo.numReceived);
assertEquals(0, sourceOne.numReceived);
}
@@ -215,7 +216,7 @@ public class GlobalStateTaskTest {
expectedOffsets.put(t1, 52L);
expectedOffsets.put(t2, 100L);
globalStateTask.initialize();
- globalStateTask.update(new ConsumerRecord<>(topic1, 1, 51, "foo".getBytes(), "foo".getBytes()));
+ globalStateTask.update(record(topic1, 1, 51, "foo".getBytes(), "foo".getBytes()));
globalStateTask.flushState();
assertEquals(expectedOffsets, stateMgr.changelogOffsets());
}
@@ -226,7 +227,7 @@ public class GlobalStateTaskTest {
expectedOffsets.put(t1, 102L);
expectedOffsets.put(t2, 100L);
globalStateTask.initialize();
- globalStateTask.update(new ConsumerRecord<>(topic1, 1, 101, "foo".getBytes(), "foo".getBytes()));
+ globalStateTask.update(record(topic1, 1, 101, "foo".getBytes(), "foo".getBytes()));
globalStateTask.flushState();
assertThat(stateMgr.changelogOffsets(), equalTo(expectedOffsets));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 7d5773b..a69bd12 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -49,6 +48,7 @@ import java.util.Set;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING;
+import static org.apache.kafka.streams.processor.internals.testutil.ConsumerRecordUtil.record;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@@ -253,7 +253,7 @@ public class GlobalStreamThreadTest {
"Thread never started.");
mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 1L));
- mockConsumer.addRecord(new ConsumerRecord<>(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes()));
+ mockConsumer.addRecord(record(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes()));
TestUtils.waitForCondition(
() -> mockConsumer.position(topicPartition) == 1L,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index f4b62c3d..ab88efa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -767,7 +767,7 @@ public class ProcessorContextImplTest {
assertTrue(store.persistent());
assertTrue(store.isOpen());
- checkThrowsUnsupportedOperation(() -> store.init(null, null), "init()");
+ checkThrowsUnsupportedOperation(() -> store.init((ProcessorContext) null, null), "init()");
checkThrowsUnsupportedOperation(store::close, "close()");
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 0d79ae5..a4efcbc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -16,7 +16,10 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.util.Optional;
import java.util.Properties;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
@@ -28,6 +31,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
@@ -202,7 +206,10 @@ public class ProcessorNodeTest {
final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
node.init(context);
- final StreamsException se = assertThrows(StreamsException.class, () -> node.process("aKey", "aValue"));
+ final StreamsException se = assertThrows(
+ StreamsException.class,
+ () -> node.process(new Record<>("aKey", "aValue", 0, new RecordHeaders()), Optional.ofNullable(context.recordContext()))
+ );
assertThat(se.getCause(), instanceOf(ClassCastException.class));
assertThat(se.getMessage(), containsString("default Serdes"));
assertThat(se.getMessage(), containsString("input types"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 448c2b1..439cd59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
@@ -295,7 +296,7 @@ public class ProcessorStateManagerTest {
expect(store.name()).andStubReturn(persistentStoreName);
context.uninitialize();
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
replay(storeMetadata, context, store);
stateMgr.registerStateStores(singletonList(store), context);
@@ -325,7 +326,7 @@ public class ProcessorStateManagerTest {
expect(store.name()).andStubReturn(persistentStoreName);
context.uninitialize();
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
replay(storeMetadata, context, store);
stateMgr.registerStateStores(singletonList(store), context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 77dc6af..0b7c1b3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -35,10 +35,11 @@ import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -54,6 +55,7 @@ import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
@@ -775,8 +777,8 @@ public class ProcessorTopologyTest {
}
@Override
- public void process(final String key, final String value) {
- context.forward(key, value);
+ public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ context.forward(record);
}
}
@@ -792,8 +794,8 @@ public class ProcessorTopologyTest {
}
@Override
- public void process(final String key, final String value) {
- context.forward(key, value, To.all().withTimestamp(context.timestamp() + 10));
+ public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ context.forward(record.withTimestamp(record.timestamp() + 10));
}
}
@@ -814,11 +816,11 @@ public class ProcessorTopologyTest {
}
@Override
- public void process(final String key, final String value) {
- context.forward(key, value);
- context.forward(key, value, To.child(firstChild).withTimestamp(context.timestamp() + 5));
- context.forward(key, value, To.child(secondChild));
- context.forward(key, value, To.all().withTimestamp(context.timestamp() + 2));
+ public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ context.forward(record);
+ context.forward(record.withTimestamp(record.timestamp() + 5), firstChild);
+ context.forward(record, secondChild);
+ context.forward(record.withTimestamp(record.timestamp() + 2));
}
}
@@ -831,9 +833,8 @@ public class ProcessorTopologyTest {
}
@Override
- public void process(final String key, final String value) {
- context.headers().add(HEADER);
- context.forward(key, value);
+ public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ context.forward(record.withHeaders(record.headers().add(HEADER)));
}
}
@@ -850,8 +851,8 @@ public class ProcessorTopologyTest {
}
@Override
- public void process(final String key, final String value) {
- context.forward(key, value.split("@")[0]);
+ public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ context.forward(record.withValue(record.value().split("@")[0]));
}
}
@@ -935,8 +936,8 @@ public class ProcessorTopologyTest {
}
@Override
- public void process(final String key, final String value) {
- store.put(key, value);
+ public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ store.put(record.key(), record.value());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 6234b0f..bc6f08b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -16,10 +16,12 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
@@ -50,7 +52,7 @@ public class SinkNodeTest {
// When/Then
context.setTime(-1); // ensures a negative timestamp is set for the record we send next
try {
- illTypedSink.process("any key".getBytes(), "any value".getBytes());
+ illTypedSink.process(new Record<>("any key".getBytes(), "any value".getBytes(), -1, new RecordHeaders()), java.util.Optional.empty());
fail("Should have thrown StreamsException");
} catch (final StreamsException ignored) {
// expected
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index d0c5804..b54aa6c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -50,6 +50,8 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -78,6 +80,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -132,7 +135,7 @@ public class StreamTaskTest {
private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
@Override
- public void process(final Integer key, final Integer value) {
+ public void process(final Record<Integer, Integer> record, final Optional<RecordMetadata> recordMetadata) {
throw new RuntimeException("KABOOM!");
}
@@ -466,10 +469,11 @@ public class StreamTaskTest {
this.context = context;
super.init(context);
}
+
@Override
- public void process(final Integer key, final Integer value) {
- if (key % 2 == 0) {
- context.forward(key, value);
+ public void process(final Record<Integer, Integer> record, final Optional<RecordMetadata> recordMetadata) {
+ if (record.key() % 2 == 0) {
+ context.forward(record);
}
}
};
@@ -1230,10 +1234,10 @@ public class StreamTaskTest {
task.completeRestoration();
task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
- task.processorContext().recordContext().headers().add("dummy", (byte[]) null);
+ task.processorContext().headers().add("dummy", (byte[]) null);
});
task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
- assertFalse(task.processorContext().recordContext().headers().iterator().hasNext());
+ assertFalse(task.processorContext().headers().iterator().hasNext());
});
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 575802e..6520778 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -63,6 +63,8 @@ import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -92,6 +94,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
@@ -1201,9 +1204,8 @@ public class StreamThreadTest {
internalTopologyBuilder.addProcessor(
"proc",
() -> new Processor<Object, Object, Object, Object>() {
-
@Override
- public void process(final Object key, final Object value) {
+ public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
if (shouldThrow.get()) {
throw new TaskCorruptedException(singletonMap(task1, new HashSet<>(singleton(storeChangelogTopicPartition))));
} else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
new file mode 100644
index 0000000..a702fdc
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.testutil;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+
+public final class ConsumerRecordUtil {
+ private ConsumerRecordUtil() {}
+
+ public static <K, V> ConsumerRecord<K, V> record(final String topic,
+ final int partition,
+ final long offset,
+ final K key,
+ final V value) {
+ // the no-time constructor in ConsumerRecord initializes the
+ // timestamp to -1, which is an invalid configuration. Here,
+ // we initialize it to 0.
+ return new ConsumerRecord<>(
+ topic,
+ partition,
+ offset,
+ 0L,
+ TimestampType.CREATE_TIME,
+ 0L,
+ 0,
+ 0,
+ key,
+ value
+ );
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 4b3f9d5..61d27b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -130,7 +131,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
new MockRecordCollector(),
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
);
- bytesStore.init(context, bytesStore);
+ bytesStore.init((StateStoreContext) context, bytesStore);
}
@After
@@ -287,7 +288,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
bytesStore = getBytesStore();
- bytesStore.init(context, bytesStore);
+ bytesStore.init((StateStoreContext) context, bytesStore);
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L));
assertThat(
results,
@@ -317,7 +318,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
bytesStore = getBytesStore();
- bytesStore.init(context, bytesStore);
+ bytesStore.init((StateStoreContext) context, bytesStore);
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L));
assertThat(
results,
@@ -336,7 +337,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
// need to create a segment so we can attempt to write to it again.
bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50));
bytesStore.close();
- bytesStore.init(context, bytesStore);
+ bytesStore.init((StateStoreContext) context, bytesStore);
bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(100));
}
@@ -365,7 +366,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
}
private void shouldRestoreToByteStore(final TaskType taskType) {
- bytesStore.init(context, bytesStore);
+ bytesStore.init((StateStoreContext) context, bytesStore);
// 0 segments initially.
assertEquals(0, bytesStore.getSegments().size());
final String key = "a";
@@ -405,7 +406,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
);
final Time time = new SystemTime();
context.setSystemTimeMs(time.milliseconds());
- bytesStore.init(context, bytesStore);
+ bytesStore.init((StateStoreContext) context, bytesStore);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
// write a record to advance stream time, with a high enough timestamp
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index ce3aa86..bb425a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -98,7 +99,7 @@ public abstract class AbstractSessionBytesStoreTest {
new MockStreamsMetrics(new Metrics())));
context.setTime(1L);
- sessionStore.init(context, sessionStore);
+ sessionStore.init((StateStoreContext) context, sessionStore);
}
@After
@@ -263,7 +264,7 @@ public abstract class AbstractSessionBytesStoreTest {
@Test
public void shouldFetchExactKeys() {
sessionStore = buildSessionStore(0x7a00000000000000L, Serdes.String(), Serdes.Long());
- sessionStore.init(context, sessionStore);
+ sessionStore.init((StateStoreContext) context, sessionStore);
sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L);
@@ -302,7 +303,7 @@ public abstract class AbstractSessionBytesStoreTest {
final SessionStore<Bytes, String> sessionStore =
buildSessionStore(RETENTION_PERIOD, Serdes.Bytes(), Serdes.String());
- sessionStore.init(context, sessionStore);
+ sessionStore.init((StateStoreContext) context, sessionStore);
final Bytes key1 = Bytes.wrap(new byte[] {0});
final Bytes key2 = Bytes.wrap(new byte[] {0, 0});
@@ -427,7 +428,7 @@ public abstract class AbstractSessionBytesStoreTest {
final Time time = new SystemTime();
context.setTime(1L);
context.setSystemTimeMs(time.milliseconds());
- sessionStore.init(context, sessionStore);
+ sessionStore.init((StateStoreContext) context, sessionStore);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
// Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index a2c6b7a..6aeb28d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -111,7 +112,7 @@ public abstract class AbstractWindowBytesStoreTest {
new MockStreamsMetrics(new Metrics())));
context.setTime(1L);
- windowStore.init(context, windowStore);
+ windowStore.init((StateStoreContext) context, windowStore);
}
@After
@@ -713,7 +714,7 @@ public abstract class AbstractWindowBytesStoreTest {
@SuppressWarnings("deprecation")
public void testPutSameKeyTimestamp() {
windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
- windowStore.init(context, windowStore);
+ windowStore.init((StateStoreContext) context, windowStore);
final long startTime = SEGMENT_INTERVAL - 4L;
@@ -797,7 +798,7 @@ public abstract class AbstractWindowBytesStoreTest {
Serdes.String(),
Serdes.String());
- windowStore.init(context, windowStore);
+ windowStore.init((StateStoreContext) context, windowStore);
windowStore.put("a", "0001", 0);
windowStore.put("aa", "0002", 0);
@@ -882,7 +883,7 @@ public abstract class AbstractWindowBytesStoreTest {
true,
Serdes.Bytes(),
Serdes.String());
- windowStore.init(context, windowStore);
+ windowStore.init((StateStoreContext) context, windowStore);
final Bytes key1 = Bytes.wrap(new byte[] {0});
final Bytes key2 = Bytes.wrap(new byte[] {0, 0});
@@ -970,7 +971,7 @@ public abstract class AbstractWindowBytesStoreTest {
final Time time = new SystemTime();
context.setSystemTimeMs(time.milliseconds());
context.setTime(1L);
- windowStore.init(context, windowStore);
+ windowStore.init((StateStoreContext) context, windowStore);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
// Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired
@@ -1111,7 +1112,7 @@ public abstract class AbstractWindowBytesStoreTest {
@SuppressWarnings("deprecation")
public void testFetchDuplicates() {
windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
- windowStore.init(context, windowStore);
+ windowStore.init((StateStoreContext) context, windowStore);
long currentTime = 0;
setCurrentTime(currentTime);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index d0e10d5..98f0ba6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -77,7 +78,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
context = new InternalMockProcessorContext(null, null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
- store.init(context, null);
+ store.init((StateStoreContext) context, null);
}
@After
@@ -178,7 +179,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
cache = EasyMock.niceMock(ThreadCache.class);
context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index d8e97b8..f36629d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -90,7 +91,7 @@ public class CachingSessionStoreTest {
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
- cachingStore.init(context, cachingStore);
+ cachingStore.init((StateStoreContext) context, cachingStore);
}
@After
@@ -186,7 +187,7 @@ public class CachingSessionStoreTest {
cache = EasyMock.niceMock(ThreadCache.class);
context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
- cachingStore.init(context, cachingStore);
+ cachingStore.init((StateStoreContext) context, cachingStore);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 6c4ddf6..42b750b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -104,7 +105,7 @@ public class CachingWindowStoreTest {
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
- cachingStore.init(context, cachingStore);
+ cachingStore.init((StateStoreContext) context, cachingStore);
}
@After
@@ -886,7 +887,7 @@ public class CachingWindowStoreTest {
cache = EasyMock.createNiceMock(ThreadCache.class);
context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
- cachingStore.init(context, cachingStore);
+ cachingStore.init((StateStoreContext) context, cachingStore);
}
private static KeyValue<Windowed<Bytes>, byte[]> windowedPair(final String key, final String value, final long timestamp) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index d2d1d73..9106580 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
@@ -55,7 +56,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
collector,
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
context.setTime(0);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
}
@After
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 5ab035c..426a334 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.state.SessionStore;
@@ -55,11 +57,11 @@ public class ChangeLoggingSessionBytesStoreTest {
private void init() {
EasyMock.expect(context.taskId()).andReturn(taskId);
EasyMock.expect(context.recordCollector()).andReturn(collector);
- inner.init(context, store);
+ inner.init((ProcessorContext) context, store);
EasyMock.expectLastCall();
EasyMock.replay(inner, context);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
index f630abb..e05b171 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.InternalMockProcessorContext;
@@ -60,7 +61,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
collector,
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
context.setTime(0);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
}
@After
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index 4a240b1..9de2207 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -18,6 +18,8 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.state.WindowStore;
@@ -57,11 +59,11 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
private void init() {
EasyMock.expect(context.taskId()).andReturn(taskId);
EasyMock.expect(context.recordCollector()).andReturn(collector);
- inner.init(context, store);
+ inner.init((ProcessorContext) context, store);
EasyMock.expectLastCall();
EasyMock.replay(inner, context);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index 0728a1e..c877ac6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -18,6 +18,8 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.state.WindowStore;
@@ -55,11 +57,11 @@ public class ChangeLoggingWindowBytesStoreTest {
private void init() {
EasyMock.expect(context.taskId()).andReturn(taskId);
EasyMock.expect(context.recordCollector()).andReturn(collector);
- inner.init(context, store);
+ inner.init((ProcessorContext) context, store);
EasyMock.expectLastCall();
EasyMock.replay(inner, context);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 27dcff4..736721a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -79,7 +80,7 @@ public class CompositeReadOnlyKeyValueStoreTest {
Serdes.String(), Serdes.String()), new MockRecordCollector());
context.setTime(1L);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
return store;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 62059f1..fd427e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -100,7 +101,7 @@ public class GlobalStateStoreProviderTest {
expect(mockContext.recordCollector()).andStubReturn(null);
replay(mockContext);
for (final StateStore store : stores.values()) {
- store.init(mockContext, null);
+ store.init((StateStoreContext) mockContext, null);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index f639255..83fffef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -154,7 +155,7 @@ public class MeteredKeyValueStoreTest {
private void init() {
replay(inner, context);
- metered.init(context, metered);
+ metered.init((StateStoreContext) context, metered);
}
@Test
@@ -190,7 +191,7 @@ public class MeteredKeyValueStoreTest {
keySerde,
valueSerde
);
- metered.init(context, metered);
+ metered.init((StateStoreContext) context, metered);
metered.get(KEY);
metered.put(KEY, VALUE);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 28efbf9..0ff822e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -158,7 +159,7 @@ public class MeteredSessionStoreTest {
private void init() {
replay(innerStore, context);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
}
@Test
@@ -195,7 +196,7 @@ public class MeteredSessionStoreTest {
valueSerde,
new MockTime()
);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
store.fetchSession(KEY, START_TIMESTAMP, END_TIMESTAMP);
store.put(WINDOWED_KEY, VALUE);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index c03ffef..3d28266 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -158,7 +159,7 @@ public class MeteredTimestampedKeyValueStoreTest {
private void init() {
replay(inner, context);
- metered.init(context, metered);
+ metered.init((StateStoreContext) context, metered);
}
@Test
@@ -194,7 +195,7 @@ public class MeteredTimestampedKeyValueStoreTest {
keySerde,
valueSerde
);
- metered.init(context, metered);
+ metered.init((StateStoreContext) context, metered);
metered.get(KEY);
metered.put(KEY, VALUE_AND_TIMESTAMP);
@@ -430,7 +431,7 @@ public class MeteredTimestampedKeyValueStoreTest {
null
);
replay(inner, context);
- store.init(context, inner);
+ store.init((StateStoreContext) context, inner);
try {
store.put("key", ValueAndTimestamp.make(42L, 60000));
@@ -455,7 +456,7 @@ public class MeteredTimestampedKeyValueStoreTest {
new ValueAndTimestampSerde<>(Serdes.Long())
);
replay(inner, context);
- store.init(context, inner);
+ store.init((StateStoreContext) context, inner);
try {
store.put("key", ValueAndTimestamp.make(42L, 60000));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index 18be067..9a9d763 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -129,7 +130,7 @@ public class MeteredTimestampedWindowStoreTest {
keySerde,
valueSerde
);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
store.fetch(KEY, TIMESTAMP);
store.put(KEY, VALUE_AND_TIMESTAMP, TIMESTAMP);
@@ -143,7 +144,7 @@ public class MeteredTimestampedWindowStoreTest {
EasyMock.expectLastCall();
EasyMock.replay(innerStoreMock);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
store.close();
EasyMock.verify(innerStoreMock);
}
@@ -153,7 +154,7 @@ public class MeteredTimestampedWindowStoreTest {
EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null);
EasyMock.replay(innerStoreMock);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
assertNull(store.fetch("a", 0));
}
@@ -170,7 +171,7 @@ public class MeteredTimestampedWindowStoreTest {
null,
null
);
- store.init(context, innerStoreMock);
+ store.init((StateStoreContext) context, innerStoreMock);
try {
store.put("key", ValueAndTimestamp.make(42L, 60000));
@@ -195,7 +196,7 @@ public class MeteredTimestampedWindowStoreTest {
Serdes.String(),
new ValueAndTimestampSerde<>(Serdes.Long())
);
- store.init(context, innerStoreMock);
+ store.init((StateStoreContext) context, innerStoreMock);
try {
store.put("key", ValueAndTimestamp.make(42L, 60000));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 8671521..7301694 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.WindowStore;
@@ -184,7 +185,7 @@ public class MeteredWindowStoreTest {
keySerde,
valueSerde
);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
store.fetch(KEY, TIMESTAMP);
store.put(KEY, VALUE, TIMESTAMP);
@@ -195,7 +196,7 @@ public class MeteredWindowStoreTest {
@Test
public void testMetrics() {
replay(innerStoreMock);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
final JmxReporter reporter = new JmxReporter();
final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
reporter.contextChange(metricsContext);
@@ -225,10 +226,10 @@ public class MeteredWindowStoreTest {
@Test
public void shouldRecordRestoreLatencyOnInit() {
- innerStoreMock.init(context, store);
+ innerStoreMock.init((StateStoreContext) context, store);
expectLastCall();
replay(innerStoreMock);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
assertEquals(1.0, getMetricByNameFilterByTags(
@@ -254,7 +255,7 @@ public class MeteredWindowStoreTest {
expectLastCall();
replay(innerStoreMock);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
store.put("a", "a");
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
@@ -279,7 +280,7 @@ public class MeteredWindowStoreTest {
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
replay(innerStoreMock);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
store.fetch("a", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
@@ -304,7 +305,7 @@ public class MeteredWindowStoreTest {
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
replay(innerStoreMock);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
@@ -330,7 +331,7 @@ public class MeteredWindowStoreTest {
expectLastCall();
replay(innerStoreMock);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
store.flush();
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
@@ -355,7 +356,7 @@ public class MeteredWindowStoreTest {
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null);
replay(innerStoreMock);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
assertNull(store.fetch("a", 0));
}
@@ -393,7 +394,7 @@ public class MeteredWindowStoreTest {
innerStoreMock.close();
expectLastCall();
replay(innerStoreMock);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
store.close();
verify(innerStoreMock);
@@ -404,7 +405,7 @@ public class MeteredWindowStoreTest {
innerStoreMock.close();
expectLastCall();
replay(innerStoreMock);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
assertThat(storeMetrics(), not(empty()));
store.close();
@@ -417,7 +418,7 @@ public class MeteredWindowStoreTest {
innerStoreMock.close();
expectLastCall().andThrow(new RuntimeException("Oops!"));
replay(innerStoreMock);
- store.init(context, store);
+ store.init((StateStoreContext) context, store);
// There's always a "count" metric registered
assertThat(storeMetrics(), not(empty()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index ca28181..dead979 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -274,7 +275,7 @@ public class RocksDBStoreTest {
@Test
public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
rocksDBStore.put(new Bytes("existingKey".getBytes(UTF_8)), "existingValue".getBytes(UTF_8));
rocksDBStore.flush();
@@ -297,7 +298,7 @@ public class RocksDBStoreTest {
public void shouldCallRocksDbConfigSetter() {
MockRocksDbConfigSetter.called = false;
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
assertTrue(MockRocksDbConfigSetter.called);
}
@@ -325,7 +326,7 @@ public class RocksDBStoreTest {
new Bytes(stringSerializer.serialize(null, "3")),
stringSerializer.serialize(null, "c")));
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
rocksDBStore.putAll(entries);
rocksDBStore.flush();
@@ -350,7 +351,7 @@ public class RocksDBStoreTest {
public void shouldRestoreAll() {
final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
context.restore(rocksDBStore.name(), entries);
assertEquals(
@@ -372,7 +373,7 @@ public class RocksDBStoreTest {
@Test
public void shouldPutOnlyIfAbsentValue() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
final Bytes keyBytes = new Bytes(stringSerializer.serialize(null, "one"));
final byte[] valueBytes = stringSerializer.serialize(null, "A");
final byte[] valueBytesUpdate = stringSerializer.serialize(null, "B");
@@ -389,7 +390,7 @@ public class RocksDBStoreTest {
final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
entries.add(new KeyValue<>("1".getBytes(UTF_8), null));
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
context.restore(rocksDBStore.name(), entries);
final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
@@ -413,7 +414,7 @@ public class RocksDBStoreTest {
// this will restore key "1" as WriteBatch applies updates in order
entries.add(new KeyValue<>("1".getBytes(UTF_8), "restored".getBytes(UTF_8)));
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
context.restore(rocksDBStore.name(), entries);
final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
@@ -446,7 +447,7 @@ public class RocksDBStoreTest {
public void shouldRestoreThenDeleteOnRestoreAll() {
final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
context.restore(rocksDBStore.name(), entries);
@@ -486,7 +487,7 @@ public class RocksDBStoreTest {
@Test
public void shouldThrowNullPointerExceptionOnNullPut() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
assertThrows(
NullPointerException.class,
() -> rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")));
@@ -494,7 +495,7 @@ public class RocksDBStoreTest {
@Test
public void shouldThrowNullPointerExceptionOnNullPutAll() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
assertThrows(
NullPointerException.class,
() -> rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")));
@@ -502,7 +503,7 @@ public class RocksDBStoreTest {
@Test
public void shouldThrowNullPointerExceptionOnNullGet() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
assertThrows(
NullPointerException.class,
() -> rocksDBStore.get(null));
@@ -510,7 +511,7 @@ public class RocksDBStoreTest {
@Test
public void shouldThrowNullPointerExceptionOnDelete() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
assertThrows(
NullPointerException.class,
() -> rocksDBStore.delete(null));
@@ -518,7 +519,7 @@ public class RocksDBStoreTest {
@Test
public void shouldThrowNullPointerExceptionOnRange() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
assertThrows(
NullPointerException.class,
() -> rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2"))));
@@ -526,7 +527,7 @@ public class RocksDBStoreTest {
@Test
public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
Utils.delete(dir);
rocksDBStore.put(
new Bytes(stringSerializer.serialize(null, "anyKey")),
@@ -545,7 +546,7 @@ public class RocksDBStoreTest {
new StreamsConfig(props));
enableBloomFilters = false;
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
final List<String> expectedValues = new ArrayList<>();
expectedValues.add("a");
@@ -570,7 +571,7 @@ public class RocksDBStoreTest {
// reopen with Bloom Filters enabled
// should open fine without errors
enableBloomFilters = true;
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
for (final KeyValue<byte[], byte[]> keyValue : keyValues) {
final byte[] valBytes = rocksDBStore.get(new Bytes(keyValue.key));
@@ -596,7 +597,7 @@ public class RocksDBStoreTest {
EasyMock.expect(context.stateDir()).andStubReturn(dir);
EasyMock.replay(context);
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
final byte[] key = "hello".getBytes();
final byte[] value = "world".getBytes();
rocksDBStore.put(Bytes.wrap(key), value);
@@ -628,7 +629,7 @@ public class RocksDBStoreTest {
EasyMock.expect(context.stateDir()).andStubReturn(dir);
EasyMock.replay(context);
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
final byte[] key = "hello".getBytes();
final byte[] value = "world".getBytes();
rocksDBStore.put(Bytes.wrap(key), value);
@@ -659,7 +660,7 @@ public class RocksDBStoreTest {
EasyMock.expect(context.stateDir()).andStubReturn(dir);
EasyMock.replay(context);
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
final List<String> propertyNames = Arrays.asList(
"num-entries-active-mem-table",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
index 042039c..75b4e80 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.hamcrest.core.IsNull;
@@ -49,7 +50,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
@Test
public void shouldOpenNewStoreInRegularMode() {
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode"));
}
@@ -62,13 +63,13 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
@Test
public void shouldOpenExistingStoreInRegularMode() throws Exception {
// prepare store
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
rocksDBStore.put(new Bytes("key".getBytes()), "timestamped".getBytes());
rocksDBStore.close();
// re-open store
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode"));
} finally {
@@ -121,7 +122,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
prepareOldStore();
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode"));
}
@@ -404,7 +405,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
// check that still in upgrade mode
try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode"));
} finally {
@@ -438,7 +439,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
// check that still in regular mode
try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode"));
}
@@ -447,7 +448,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
private void prepareOldStore() {
final RocksDBStore keyValueStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
try {
- keyValueStore.init(context, keyValueStore);
+ keyValueStore.init((StateStoreContext) context, keyValueStore);
keyValueStore.put(new Bytes("key1".getBytes()), "1".getBytes());
keyValueStore.put(new Bytes("key2".getBytes()), "22".getBytes());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index c7021fd..9e9b0b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
@@ -389,7 +390,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(),
Serdes.String());
- windowStore.init(context, windowStore);
+ windowStore.init((StateStoreContext) context, windowStore);
context.setTime(0L);
setCurrentTime(0);
@@ -479,7 +480,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
windowStore.close();
windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, false, Serdes.Integer(), Serdes.String());
- windowStore.init(context, windowStore);
+ windowStore.init((StateStoreContext) context, windowStore);
// put something in the store to advance its stream time and expire the old segments
windowStore.put(1, "v", 6L * SEGMENT_INTERVAL);
@@ -546,7 +547,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
false,
Serdes.Integer(),
Serdes.String());
- windowStore.init(context, windowStore);
+ windowStore.init((StateStoreContext) context, windowStore);
assertEquals(
new HashSet<>(Collections.emptyList()),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 7df1404..97593e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.apache.kafka.test.InternalMockProcessorContext;
@@ -63,8 +64,8 @@ public class SegmentIteratorTest {
new LogContext("testCache "),
0,
new MockStreamsMetrics(new Metrics())));
- segmentOne.init(context, segmentOne);
- segmentTwo.init(context, segmentTwo);
+ segmentOne.init((StateStoreContext) context, segmentOne);
+ segmentTwo.init((StateStoreContext) context, segmentTwo);
segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes());
segmentOne.put(Bytes.wrap("b".getBytes()), "2".getBytes());
segmentTwo.put(Bytes.wrap("c".getBytes()), "3".getBytes());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index a054ac9..0037fbb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
@@ -123,7 +124,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldInit() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
cleanup(context, buffer);
}
@@ -131,7 +132,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldAcceptData() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
putRecord(buffer, context, 0L, 0L, "asdf", "2p93nf");
cleanup(context, buffer);
}
@@ -140,7 +141,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldRejectNullValues() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
try {
buffer.put(0, "asdf", null, getContext(0));
fail("expected an exception");
@@ -154,7 +155,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldRemoveData() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
putRecord(buffer, context, 0L, 0L, "asdf", "qwer");
assertThat(buffer.numRecords(), is(1));
buffer.evictWhile(() -> true, kv -> { });
@@ -166,7 +167,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldRespectEvictionPredicate() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
putRecord(buffer, context, 0L, 0L, "asdf", "eyt");
putRecord(buffer, context, 1L, 0L, "zxcv", "rtg");
assertThat(buffer.numRecords(), is(2));
@@ -183,7 +184,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldTrackCount() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
putRecord(buffer, context, 0L, 0L, "asdf", "oin");
assertThat(buffer.numRecords(), is(1));
putRecord(buffer, context, 1L, 0L, "asdf", "wekjn");
@@ -197,7 +198,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldTrackSize() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
putRecord(buffer, context, 0L, 0L, "asdf", "23roni");
assertThat(buffer.bufferSize(), is(43L));
putRecord(buffer, context, 1L, 0L, "asdf", "3l");
@@ -211,7 +212,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldTrackMinTimestamp() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
putRecord(buffer, context, 1L, 0L, "asdf", "2093j");
assertThat(buffer.minTimestamp(), is(1L));
putRecord(buffer, context, 0L, 0L, "zxcv", "3gon4i");
@@ -223,7 +224,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4");
assertThat(buffer.numRecords(), is(1));
@@ -269,7 +270,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldReturnUndefinedOnPriorValueForNotBufferedKey() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
assertThat(buffer.priorValueForBuffered("ASDF"), is(Maybe.undefined()));
}
@@ -278,7 +279,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldReturnPriorValueForBufferedKey() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
final ProcessorRecordContext recordContext = getContext(0L);
context.setRecordContext(recordContext);
@@ -292,7 +293,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldFlush() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
putRecord(buffer, context, 2L, 0L, "asdf", "2093j");
putRecord(buffer, context, 1L, 1L, "zxcv", "3gon4i");
putRecord(buffer, context, 0L, 2L, "deleteme", "deadbeef");
@@ -363,7 +364,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldRestoreOldUnversionedFormat() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -477,7 +478,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldRestoreV1Format() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -598,7 +599,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldRestoreV2Format() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -721,7 +722,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
// Note the data is the same as the V3 test.
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -841,7 +842,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldRestoreV3Format() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -961,7 +962,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
public void shouldNotRestoreUnrecognizedVersionRecord() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
+ buffer.init((StateStoreContext) context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 51b9231..bd82e29 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -54,6 +55,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
@@ -261,9 +263,10 @@ public class InternalMockProcessorContext
stateManager().registerStore(store, func);
}
+ @SuppressWarnings("unchecked")
@Override
- public StateStore getStateStore(final String name) {
- return storeMap.get(name);
+ public <S extends StateStore> S getStateStore(final String name) {
+ return (S) storeMap.get(name);
}
@Override
@@ -283,6 +286,28 @@ public class InternalMockProcessorContext
public void commit() {}
@Override
+ public <K, V> void forward(final Record<K, V> record) {
+ forward(record, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <K, V> void forward(final Record<K, V> record, final String childName) {
+ if (recordContext != null && record.timestamp() != recordContext.timestamp()) {
+ setTime(record.timestamp());
+ }
+ final ProcessorNode<?, ?, ?, ?> thisNode = currentNode;
+ try {
+ for (final ProcessorNode<?, ?, ?, ?> childNode : thisNode.children()) {
+ currentNode = childNode;
+ ((ProcessorNode<K, V, ?, ?>) childNode).process(record, Optional.ofNullable(recordContext));
+ }
+ } finally {
+ currentNode = thisNode;
+ }
+ }
+
+ @Override
public void forward(final Object key, final Object value) {
forward(key, value, To.all());
}
@@ -311,7 +336,8 @@ public class InternalMockProcessorContext
for (final ProcessorNode<?, ?, ?, ?> childNode : thisNode.children()) {
if (toInternal.child() == null || toInternal.child().equals(childNode.name())) {
currentNode = childNode;
- ((ProcessorNode<Object, Object, ?, ?>) childNode).process(key, value);
+ final Record<Object, Object> record = new Record<>(key, value, toInternal.timestamp(), headers());
+ ((ProcessorNode<Object, Object, ?, ?>) childNode).process(record, Optional.ofNullable(recordContext));
toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple
// Processors and toInternal might have been modified
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
index 262aecc..8bed338 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
@@ -21,6 +21,8 @@ import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.time.Duration;
@@ -28,6 +30,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -65,25 +68,19 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VI
scheduleCancellable = context.schedule(
Duration.ofMillis(scheduleInterval),
punctuationType,
- timestamp -> {
- if (punctuationType == PunctuationType.STREAM_TIME) {
- assertThat(context.timestamp(), is(timestamp));
- }
- assertThat(context.partition(), is(-1));
- assertThat(context.offset(), is(-1L));
-
- (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
- .add(timestamp);
- });
+ (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)::add
+ );
}
}
@Override
- public void process(final KIn key, final VIn value) {
- final KeyValueTimestamp<KIn, VIn> keyValueTimestamp = new KeyValueTimestamp<>(key, value, context.timestamp());
+ public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+ final KIn key = record.key();
+ final VIn value = record.value();
+ final KeyValueTimestamp<KIn, VIn> keyValueTimestamp = new KeyValueTimestamp<>(key, value, record.timestamp());
if (value != null) {
- lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context.timestamp()));
+ lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, record.timestamp()));
} else {
lastValueAndTimestampPerKey.remove(key);
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 58b90c1..82b24d1 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -21,6 +21,8 @@ import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -67,6 +69,16 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
}
@Override
+ public <K, V> void forward(final Record<K, V> record) {
+ forward(record.key(), record.value(), To.all().withTimestamp(record.timestamp()));
+ }
+
+ @Override
+ public <K, V> void forward(final Record<K, V> record, final String childName) {
+ forward(record.key(), record.value(), To.child(childName).withTimestamp(record.timestamp()));
+ }
+
+ @Override
public ProcessorRecordContext recordContext() {
return new ProcessorRecordContext(timestamp(), offset(), partition(), topic(), headers());
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 17391bc..6c653c3 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -21,16 +21,18 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorContextAdapter;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
private final MockApiProcessor<K, V, Object, Object> delegate;
+ private InternalProcessorContext internalProcessorContext;
public MockProcessor(final PunctuationType punctuationType,
final long scheduleInterval) {
@@ -41,15 +43,18 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
delegate = new MockApiProcessor<>();
}
+ @SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
- delegate.init(ProcessorContextAdapter.adapt((InternalProcessorContext) context));
+ internalProcessorContext = (InternalProcessorContext) context;
+ delegate.init((org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>) context);
}
@Override
public void process(final K key, final V value) {
- delegate.process(key, value);
+ final Record<K, V> record = new Record<>(key, value, context.timestamp(), context.headers());
+ delegate.process(record, Optional.ofNullable(internalProcessorContext.recordContext()));
}
public void checkAndClearProcessResult(final KeyValueTimestamp<?, ?>... expected) {
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 9a8b407..90fd905 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -17,10 +17,13 @@
package org.apache.kafka.test;
import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import java.util.Collections;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
@@ -58,8 +61,8 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn,
}
@Override
- public void process(final KIn key, final VIn value) {
- processor().process(key, value);
+ public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+ processor().process(record, recordMetadata);
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 5130d46..4c3fed1 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -17,10 +17,13 @@
package org.apache.kafka.test;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.SourceNode;
import java.util.ArrayList;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, KOut, VOut> {
@@ -39,10 +42,10 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K
}
@Override
- public void process(final KIn key, final VIn value) {
- this.numReceived++;
- this.keys.add(key);
- this.values.add(value);
+ public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+ numReceived++;
+ keys.add(record.key());
+ values.add(record.value());
}
@Override
@@ -54,6 +57,6 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K
@Override
public void close() {
super.close();
- this.closed = true;
+ closed = true;
}
}
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index b2b62ee..374f8ec 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
@@ -65,7 +66,7 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
}
@Override
- public StateStore getStateStore(final String name) {
+ public <S extends StateStore> S getStateStore(final String name) {
return null;
}
@@ -85,13 +86,23 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
}
@Override
+ public <K, V> void forward(final Record<K, V> record) {
+ forward(record.key(), record.value());
+ }
+
+ @Override
+ public <K, V> void forward(final Record<K, V> record, final String childName) {
+ forward(record.key(), record.value());
+ }
+
+ @Override
public <K, V> void forward(final K key, final V value) {
forwardedValues.put(key, value);
}
@Override
public <K, V> void forward(final K key, final V value, final To to) {
- forwardedValues.put(key, value);
+ forward(key, value);
}
@Override
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 54b09de..1b17f67 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
@@ -59,7 +58,6 @@ import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -444,13 +442,7 @@ public class TopologyTestDriver implements Closeable {
new LogAndContinueExceptionHandler()
);
globalStateTask.initialize();
- globalProcessorContext.setRecordContext(new ProcessorRecordContext(
- 0L,
- -1L,
- -1,
- ProcessorContextImpl.NONEXIST_TOPIC,
- new RecordHeaders())
- );
+ globalProcessorContext.setRecordContext(null);
} else {
globalStateManager = null;
globalStateTask = null;
@@ -510,13 +502,7 @@ public class TopologyTestDriver implements Closeable {
);
task.initializeIfNeeded();
task.completeRestoration();
- task.processorContext().setRecordContext(new ProcessorRecordContext(
- 0L,
- -1L,
- -1,
- ProcessorContextImpl.NONEXIST_TOPIC,
- new RecordHeaders())
- );
+ task.processorContext().setRecordContext(null);
} else {
task = null;
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 8e546e0..88e1660 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -401,9 +401,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
stateStores.put(store.name(), store);
}
+ @SuppressWarnings("unchecked")
@Override
- public StateStore getStateStore(final String name) {
- return stateStores.get(name);
+ public <S extends StateStore> S getStateStore(final String name) {
+ return (S) stateStores.get(name);
}
@Override
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 7ae43ef..48783a6 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -41,10 +41,11 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -71,6 +72,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -148,7 +150,7 @@ public class TopologyTestDriverTest {
}
}
- private final static class Record {
+ private final static class TTDTestRecord {
private final Object key;
private final Object value;
private final long timestamp;
@@ -156,8 +158,8 @@ public class TopologyTestDriverTest {
private final String topic;
private final Headers headers;
- Record(final ConsumerRecord<byte[], byte[]> consumerRecord,
- final long newOffset) {
+ TTDTestRecord(final ConsumerRecord<byte[], byte[]> consumerRecord,
+ final long newOffset) {
key = consumerRecord.key();
value = consumerRecord.value();
timestamp = consumerRecord.timestamp();
@@ -166,9 +168,9 @@ public class TopologyTestDriverTest {
headers = consumerRecord.headers();
}
- Record(final String newTopic,
- final TestRecord<byte[], byte[]> consumerRecord,
- final long newOffset) {
+ TTDTestRecord(final String newTopic,
+ final TestRecord<byte[], byte[]> consumerRecord,
+ final long newOffset) {
key = consumerRecord.key();
value = consumerRecord.value();
timestamp = consumerRecord.timestamp();
@@ -177,12 +179,12 @@ public class TopologyTestDriverTest {
headers = consumerRecord.headers();
}
- Record(final Object key,
- final Object value,
- final Headers headers,
- final long timestamp,
- final long offset,
- final String topic) {
+ TTDTestRecord(final Object key,
+ final Object value,
+ final Headers headers,
+ final long timestamp,
+ final long offset,
+ final String topic) {
this.key = key;
this.value = value;
this.headers = headers;
@@ -204,7 +206,7 @@ public class TopologyTestDriverTest {
if (o == null || getClass() != o.getClass()) {
return false;
}
- final Record record = (Record) o;
+ final TTDTestRecord record = (TTDTestRecord) o;
return timestamp == record.timestamp &&
offset == record.offset &&
Objects.equals(key, record.key) &&
@@ -248,7 +250,7 @@ public class TopologyTestDriverTest {
private boolean initialized = false;
private boolean closed = false;
- private final List<Record> processedRecords = new ArrayList<>();
+ private final List<TTDTestRecord> processedRecords = new ArrayList<>();
MockProcessor(final Collection<Punctuation> punctuations) {
this.punctuations = punctuations;
@@ -264,9 +266,16 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(final Object key, final Object value) {
- processedRecords.add(new Record(key, value, context.headers(), context.timestamp(), context.offset(), context.topic()));
- context.forward(key, value);
+ public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
+ processedRecords.add(new TTDTestRecord(
+ record.key(),
+ record.value(),
+ record.headers(),
+ record.timestamp(),
+ recordMetadata.map(RecordMetadata::offset).orElse(-1L),
+ recordMetadata.map(RecordMetadata::topic).orElse(null)
+ ));
+ context.forward(record);
}
@Override
@@ -399,8 +408,8 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(final Object key, final Object value) {
- store.put(key, value);
+ public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
+ store.put(record.key(), record.value());
}
}
);
@@ -578,11 +587,11 @@ public class TopologyTestDriverTest {
pipeRecord(SOURCE_TOPIC_1, testRecord1);
- final List<Record> processedRecords = mockProcessors.get(0).processedRecords;
+ final List<TTDTestRecord> processedRecords = mockProcessors.get(0).processedRecords;
assertEquals(1, processedRecords.size());
- final Record record = processedRecords.get(0);
- final Record expectedResult = new Record(SOURCE_TOPIC_1, testRecord1, 0L);
+ final TTDTestRecord record = processedRecords.get(0);
+ final TTDTestRecord expectedResult = new TTDTestRecord(SOURCE_TOPIC_1, testRecord1, 0L);
assertThat(record, equalTo(expectedResult));
}
@@ -598,16 +607,16 @@ public class TopologyTestDriverTest {
public void shouldSendRecordViaCorrectSourceTopicDeprecated() {
testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), config);
- final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
- final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+ final List<TTDTestRecord> processedRecords1 = mockProcessors.get(0).processedRecords;
+ final List<TTDTestRecord> processedRecords2 = mockProcessors.get(1).processedRecords;
testDriver.pipeInput(consumerRecord1);
assertEquals(1, processedRecords1.size());
assertEquals(0, processedRecords2.size());
- Record record = processedRecords1.get(0);
- Record expectedResult = new Record(consumerRecord1, 0L);
+ TTDTestRecord record = processedRecords1.get(0);
+ TTDTestRecord expectedResult = new TTDTestRecord(consumerRecord1, 0L);
assertThat(record, equalTo(expectedResult));
testDriver.pipeInput(consumerRecord2);
@@ -616,7 +625,7 @@ public class TopologyTestDriverTest {
assertEquals(1, processedRecords2.size());
record = processedRecords2.get(0);
- expectedResult = new Record(consumerRecord2, 0L);
+ expectedResult = new TTDTestRecord(consumerRecord2, 0L);
assertThat(record, equalTo(expectedResult));
}
@@ -841,8 +850,8 @@ public class TopologyTestDriverTest {
public void shouldProcessConsumerRecordList() {
testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), config);
- final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
- final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+ final List<TTDTestRecord> processedRecords1 = mockProcessors.get(0).processedRecords;
+ final List<TTDTestRecord> processedRecords2 = mockProcessors.get(1).processedRecords;
final List<ConsumerRecord<byte[], byte[]>> testRecords = new ArrayList<>(2);
testRecords.add(consumerRecord1);
@@ -853,12 +862,12 @@ public class TopologyTestDriverTest {
assertEquals(1, processedRecords1.size());
assertEquals(1, processedRecords2.size());
- Record record = processedRecords1.get(0);
- Record expectedResult = new Record(consumerRecord1, 0L);
+ TTDTestRecord record = processedRecords1.get(0);
+ TTDTestRecord expectedResult = new TTDTestRecord(consumerRecord1, 0L);
assertThat(record, equalTo(expectedResult));
record = processedRecords2.get(0);
- expectedResult = new Record(consumerRecord2, 0L);
+ expectedResult = new TTDTestRecord(consumerRecord2, 0L);
assertThat(record, equalTo(expectedResult));
}
@@ -1446,24 +1455,24 @@ public class TopologyTestDriverTest {
@Override
public void init(final ProcessorContext<String, Long> context) {
this.context = context;
- context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> flushStore());
- context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, timestamp -> flushStore());
+ context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, this::flushStore);
+ context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, this::flushStore);
store = context.getStateStore("aggStore");
}
@Override
- public void process(final String key, final Long value) {
- final Long oldValue = store.get(key);
- if (oldValue == null || value > oldValue) {
- store.put(key, value);
+ public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+ final Long oldValue = store.get(record.key());
+ if (oldValue == null || record.value() > oldValue) {
+ store.put(record.key(), record.value());
}
}
- private void flushStore() {
+ private void flushStore(final long timestamp) {
try (final KeyValueIterator<String, Long> it = store.all()) {
while (it.hasNext()) {
final KeyValue<String, Long> next = it.next();
- context.forward(next.key, next.value);
+ context.forward(new Record<>(next.key, next.value, timestamp, new RecordHeaders()));
}
}
}
@@ -1505,8 +1514,8 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(final String key, final Long value) {
- store.put(key, value);
+ public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+ store.put(record.key(), record.value());
}
};
}
@@ -1589,16 +1598,16 @@ public class TopologyTestDriverTest {
testDriver = new TopologyTestDriver(setupMultipleSourcesPatternTopology(pattern2Source1, pattern2Source2), config);
- final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
- final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+ final List<TTDTestRecord> processedRecords1 = mockProcessors.get(0).processedRecords;
+ final List<TTDTestRecord> processedRecords2 = mockProcessors.get(1).processedRecords;
pipeRecord(SOURCE_TOPIC_1, testRecord1);
assertEquals(1, processedRecords1.size());
assertEquals(0, processedRecords2.size());
- final Record record1 = processedRecords1.get(0);
- final Record expectedResult1 = new Record(SOURCE_TOPIC_1, testRecord1, 0L);
+ final TTDTestRecord record1 = processedRecords1.get(0);
+ final TTDTestRecord expectedResult1 = new TTDTestRecord(SOURCE_TOPIC_1, testRecord1, 0L);
assertThat(record1, equalTo(expectedResult1));
pipeRecord(consumerTopic2, consumerRecord2);
@@ -1606,8 +1615,8 @@ public class TopologyTestDriverTest {
assertEquals(1, processedRecords1.size());
assertEquals(1, processedRecords2.size());
- final Record record2 = processedRecords2.get(0);
- final Record expectedResult2 = new Record(consumerTopic2, consumerRecord2, 0L);
+ final TTDTestRecord record2 = processedRecords2.get(0);
+ final TTDTestRecord expectedResult2 = new TTDTestRecord(consumerTopic2, consumerRecord2, 0L);
assertThat(record2, equalTo(expectedResult2));
}
@@ -1694,11 +1703,12 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(final String key, final String value) {
+ public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ final String value = record.value();
if (!value.startsWith("recurse-")) {
- context.forward(key, "recurse-" + value, To.child("recursiveSink"));
+ context.forward(record.withValue("recurse-" + value), "recursiveSink");
}
- context.forward(key, value, To.child("sink"));
+ context.forward(record, "sink");
}
},
"source"
@@ -1751,8 +1761,8 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(final String key, final String value) {
- stateStore.put(key, value);
+ public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ stateStore.put(record.key(), record.value());
}
}
);
@@ -1767,12 +1777,13 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(final String key, final String value) {
+ public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+ final String value = record.value();
if (!value.startsWith("recurse-")) {
- context.forward(key, "recurse-" + value, To.child("recursiveSink"));
+ context.forward(record.withValue("recurse-" + value), "recursiveSink");
}
- context.forward(key, value, To.child("sink"));
- context.forward(key, value, To.child("globalSink"));
+ context.forward(record, "sink");
+ context.forward(record, "globalSink");
}
},
"source"