You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/08/26 02:11:33 UTC
[kafka] 01/01: Convert test-utils and StateStore
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch kip-478-part-4
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit eb55d70adadc0c692d29a4c23d039131ae2f1f80
Author: John Roesler <vv...@apache.org>
AuthorDate: Tue Aug 25 19:29:00 2020 -0500
Convert test-utils and StateStore
---
.../examples/wordcount/WordCountProcessorDemo.java | 21 +-
.../examples/wordcount/WordCountProcessorTest.java | 10 +-
.../apache/kafka/streams/processor/StateStore.java | 32 +-
...=> InternalProcessorContextReverseAdapter.java} | 6 +-
.../processor/internals/ProcessorAdapter.java | 2 +-
.../internals/ProcessorContextAdapter.java | 4 +-
.../internals/ProcessorContextReverseAdapter.java | 129 ++---
.../internals/GlobalProcessorContextImplTest.java | 11 +-
.../internals/ProcessorContextImplTest.java | 2 +-
.../processor/api/MockProcessorContext.java | 546 +++++++++++++++++++++
.../kafka/streams/MockApiProcessorContextTest.java | 405 +++++++++++++++
11 files changed, 1040 insertions(+), 128 deletions(-)
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index c3f47da..a6a9b8a 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -54,17 +54,17 @@ import java.util.concurrent.CountDownLatch;
*/
public final class WordCountProcessorDemo {
- static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
+ static class MyProcessorSupplier implements ProcessorSupplier<String, String, String, String> {
@Override
- public Processor<String, String> get() {
- return new Processor<String, String>() {
- private ProcessorContext context;
+ public Processor<String, String, String, String> get() {
+ return new Processor<String, String, String, String>() {
+ private ProcessorContext<String, String> context;
private KeyValueStore<String, Integer> kvStore;
@Override
@SuppressWarnings("unchecked")
- public void init(final ProcessorContext context) {
+ public void init(final ProcessorContext<String, String> context) {
this.context = context;
this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
@@ -79,7 +79,7 @@ public final class WordCountProcessorDemo {
}
}
});
- this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
+ this.kvStore = context.getStateStore("Counts");
}
@Override
@@ -96,9 +96,6 @@ public final class WordCountProcessorDemo {
}
}
}
-
- @Override
- public void close() {}
};
}
}
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
index bec77e6..5ddda08 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
@@ -18,8 +18,8 @@ package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.MockProcessorContext;
-import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.junit.Test;
@@ -36,7 +36,7 @@ import static org.junit.Assert.assertTrue;
public class WordCountProcessorTest {
@Test
public void test() {
- final MockProcessorContext context = new MockProcessorContext();
+ final MockProcessorContext<String, String> context = new MockProcessorContext<>();
// Create, initialize, and register the state store.
final KeyValueStore<String, Integer> store =
@@ -48,7 +48,7 @@ public class WordCountProcessorTest {
context.register(store, null);
// Create and initialize the processor under test
- final Processor<String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get();
+ final Processor<String, String, String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get();
processor.init(context);
// send a record to the processor
@@ -61,7 +61,7 @@ public class WordCountProcessorTest {
context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L);
// finally, we can verify the output.
- final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
+ final Iterator<MockProcessorContext.CapturedForward<String, String>> capturedForwards = context.forwarded().iterator();
assertEquals(new KeyValue<>("alpha", "2"), capturedForwards.next().keyValue());
assertEquals(new KeyValue<>("beta", "1"), capturedForwards.next().keyValue());
assertEquals(new KeyValue<>("gamma", "1"), capturedForwards.next().keyValue());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index df53ee2..d143f69 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.internals.ProcessorContextReverseAdapter;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
/**
* A storage engine for managing state maintained by a stream processor.
@@ -49,6 +51,27 @@ public interface StateStore {
* Initializes this state store.
* <p>
* The implementation of this function must register the root store in the context via the
+ * {@link org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, StateRestoreCallback)} function,
+ * where the first {@link StateStore} parameter should always be the passed-in {@code root} object, and
+ * the second parameter should be an object of user's implementation
+ * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
+ * <p>
+ * Note that if the state store engine itself supports bulk writes, users can implement another
+ * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
+ * let users implement bulk-load restoration logic instead of restoring one record at a time.
+ * <p>
+ * This method is not called if {@link StateStore#init(ProcessorContext, org.apache.kafka.streams.processor.StateStore)}
+ * is implemented.
+ *
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
+ void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);
+
+ /**
+ * Initializes this state store.
+ * <p>
+ * The implementation of this function must register the root store in the context via the
* {@link ProcessorContext#register(StateStore, StateRestoreCallback)} function, where the
* first {@link StateStore} parameter should always be the passed-in {@code root} object, and
* the second parameter should be an object of user's implementation
@@ -61,7 +84,14 @@ public interface StateStore {
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
- void init(ProcessorContext context, StateStore root);
+ default void init(final ProcessorContext<?, ?> context, final StateStore root) {
+ final org.apache.kafka.streams.processor.ProcessorContext adapted =
+ ProcessorContextReverseAdapter.adapt(
+ context,
+ new ProcessorContextReverseAdapter.UnsupportedDeprecatedForwarder()
+ );
+ init(adapted, root);
+ }
/**
* Flush any cached data
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java
similarity index 96%
copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java
index 6e82a5e..11fc1f9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java
@@ -34,18 +34,18 @@ import java.io.File;
import java.time.Duration;
import java.util.Map;
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
+public final class InternalProcessorContextReverseAdapter implements InternalProcessorContext {
private final InternalApiProcessorContext<Object, Object> delegate;
static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
if (delegate instanceof ProcessorContextAdapter) {
return ((ProcessorContextAdapter<Object, Object>) delegate).delegate();
} else {
- return new ProcessorContextReverseAdapter(delegate);
+ return new InternalProcessorContextReverseAdapter(delegate);
}
}
- private ProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) {
+ private InternalProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) {
this.delegate = delegate;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
index d8e4af4..3bbbe78 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
@@ -47,7 +47,7 @@ public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<K
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext<KOut, VOut> context) {
- delegate.init(ProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context));
+ delegate.init(InternalProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context));
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
index 85dbd52..8d18ec7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
@@ -42,8 +42,8 @@ public final class ProcessorContextAdapter<KForward, VForward>
@SuppressWarnings("unchecked")
public static <KForward, VForward> InternalApiProcessorContext<KForward, VForward> adapt(final InternalProcessorContext delegate) {
- if (delegate instanceof ProcessorContextReverseAdapter) {
- return (InternalApiProcessorContext<KForward, VForward>) ((ProcessorContextReverseAdapter) delegate).delegate();
+ if (delegate instanceof InternalProcessorContextReverseAdapter) {
+ return (InternalApiProcessorContext<KForward, VForward>) ((InternalProcessorContextReverseAdapter) delegate).delegate();
} else {
return new ProcessorContextAdapter<>(delegate);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
index 6e82a5e..131b776 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
@@ -18,35 +18,52 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.File;
import java.time.Duration;
import java.util.Map;
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
- private final InternalApiProcessorContext<Object, Object> delegate;
+public final class ProcessorContextReverseAdapter implements org.apache.kafka.streams.processor.ProcessorContext {
+ private final ProcessorContext<Object, Object> delegate;
+ private final DeprecatedForwarder deprecatedForwarder;
- static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
+ public interface DeprecatedForwarder {
+ <K, V> void forward(final K key, final V value, final int childIndex);
+ }
+
+ public static final class UnsupportedDeprecatedForwarder implements DeprecatedForwarder {
+ @Override
+ public <K, V> void forward(final K key, final V value, final int childIndex) {
+ throw new UnsupportedOperationException("Forwarding by index was deprecated in 2.0 and is not supported by this ProcessorContext.");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static org.apache.kafka.streams.processor.ProcessorContext adapt(final ProcessorContext<?, ?> delegate,
+ final DeprecatedForwarder deprecatedForwarder) {
if (delegate instanceof ProcessorContextAdapter) {
- return ((ProcessorContextAdapter<Object, Object>) delegate).delegate();
+ return ((ProcessorContextAdapter<?, ?>) delegate).delegate();
+ } else if (delegate instanceof InternalApiProcessorContext) {
+ return InternalProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) delegate);
} else {
- return new ProcessorContextReverseAdapter(delegate);
+ return new ProcessorContextReverseAdapter(delegate, deprecatedForwarder);
}
}
- private ProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) {
- this.delegate = delegate;
+ @SuppressWarnings("unchecked")
+ private ProcessorContextReverseAdapter(final ProcessorContext<?, ?> delegate,
+ final DeprecatedForwarder deprecatedForwarder) {
+ this.delegate = (ProcessorContext<Object, Object>) delegate;
+ this.deprecatedForwarder = deprecatedForwarder;
}
@Override
@@ -75,91 +92,11 @@ public final class ProcessorContextReverseAdapter implements InternalProcessorCo
}
@Override
- public StreamsMetricsImpl metrics() {
+ public StreamsMetrics metrics() {
return delegate.metrics();
}
@Override
- public void setSystemTimeMs(final long timeMs) {
- delegate.setSystemTimeMs(timeMs);
- }
-
- @Override
- public long currentSystemTimeMs() {
- return delegate.currentSystemTimeMs();
- }
-
- @Override
- public ProcessorRecordContext recordContext() {
- return delegate.recordContext();
- }
-
- @Override
- public void setRecordContext(final ProcessorRecordContext recordContext) {
- delegate.setRecordContext(recordContext);
- }
-
- @Override
- public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
- delegate.setCurrentNode(currentNode);
- }
-
- @Override
- public ProcessorNode<?, ?, ?, ?> currentNode() {
- return delegate.currentNode();
- }
-
- @Override
- public ThreadCache cache() {
- return delegate.cache();
- }
-
- @Override
- public void initialize() {
- delegate.initialize();
- }
-
- @Override
- public void uninitialize() {
- delegate.uninitialize();
- }
-
- @Override
- public Task.TaskType taskType() {
- return delegate.taskType();
- }
-
- @Override
- public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
- delegate.transitionToActive(streamTask, recordCollector, newCache);
- }
-
- @Override
- public void transitionToStandby(final ThreadCache newCache) {
- delegate.transitionToStandby(newCache);
- }
-
- @Override
- public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) {
- delegate.registerCacheFlushListener(namespace, listener);
- }
-
- @Override
- public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
- return delegate.getStateStore(builder);
- }
-
- @Override
- public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) {
- delegate.logChange(storeName, key, value, timestamp);
- }
-
- @Override
- public String changelogFor(final String storeName) {
- return delegate.changelogFor(storeName);
- }
-
- @Override
public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
delegate.register(store, stateRestoreCallback);
}
@@ -176,7 +113,7 @@ public final class ProcessorContextReverseAdapter implements InternalProcessorCo
}
@Override
- public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException {
+ public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
return delegate.schedule(interval, type, callback);
}
@@ -193,7 +130,7 @@ public final class ProcessorContextReverseAdapter implements InternalProcessorCo
@Deprecated
@Override
public <K, V> void forward(final K key, final V value, final int childIndex) {
- delegate.forward(key, value, To.child((currentNode().children()).get(childIndex).name()));
+ deprecatedForwarder.forward(key, value, childIndex);
}
@Deprecated
@@ -241,8 +178,4 @@ public final class ProcessorContextReverseAdapter implements InternalProcessorCo
public Map<String, Object> appConfigsWithPrefix(final String prefix) {
return delegate.appConfigsWithPrefix(prefix);
}
-
- InternalApiProcessorContext<Object, Object> delegate() {
- return delegate;
- }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index ad8cd0a..e180010 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
@@ -142,7 +143,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForKeyValueStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
try {
- store.init(null, null);
+ store.init((ProcessorContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -151,7 +152,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForTimestampedKeyValueStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
try {
- store.init(null, null);
+ store.init((ProcessorContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -160,7 +161,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForWindowStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
try {
- store.init(null, null);
+ store.init((ProcessorContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -169,7 +170,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForTimestampedWindowStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
try {
- store.init(null, null);
+ store.init((ProcessorContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -178,7 +179,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForSessionStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
try {
- store.init(null, null);
+ store.init((ProcessorContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index f4b62c3d..ab88efa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -767,7 +767,7 @@ public class ProcessorContextImplTest {
assertTrue(store.persistent());
assertTrue(store.isOpen());
- checkThrowsUnsupportedOperation(() -> store.init(null, null), "init()");
+ checkThrowsUnsupportedOperation(() -> store.init((ProcessorContext) null, null), "init()");
checkThrowsUnsupportedOperation(store::close, "close()");
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
new file mode 100644
index 0000000..acad94d
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
+ // Immutable fields ================================================
+ private final StreamsMetricsImpl metrics;
+ private final TaskId taskId;
+ private final StreamsConfig config;
+ private final File stateDir;
+
+ // settable record metadata ================================================
+ private String topic;
+ private Integer partition;
+ private Long offset;
+ private Headers headers;
+ private Long timestamp;
+
+ // mocks ================================================
+ private final Map<String, StateStore> stateStores = new HashMap<>();
+ private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+ private final List<CapturedForward<KForward, VForward>> capturedForwards = new LinkedList<>();
+ private boolean committed = false;
+
+
+ /**
+ * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
+ */
+ public static final class CapturedPunctuator {
+ private final long intervalMs;
+ private final PunctuationType type;
+ private final Punctuator punctuator;
+ private boolean cancelled = false;
+
+ private CapturedPunctuator(final long intervalMs, final PunctuationType type, final Punctuator punctuator) {
+ this.intervalMs = intervalMs;
+ this.type = type;
+ this.punctuator = punctuator;
+ }
+
+ @SuppressWarnings("unused")
+ public long getIntervalMs() {
+ return intervalMs;
+ }
+
+ @SuppressWarnings("unused")
+ public PunctuationType getType() {
+ return type;
+ }
+
+ @SuppressWarnings("unused")
+ public Punctuator getPunctuator() {
+ return punctuator;
+ }
+
+ @SuppressWarnings({"WeakerAccess", "unused"})
+ public void cancel() {
+ cancelled = true;
+ }
+
+ @SuppressWarnings("unused")
+ public boolean cancelled() {
+ return cancelled;
+ }
+ }
+
+ public static final class CapturedForward<KForward, VForward> {
+ private final String childName;
+ private final long timestamp;
+ private final KeyValue<KForward, VForward> keyValue;
+
+ private CapturedForward(final To to, final KeyValue<KForward, VForward> keyValue) {
+ if (keyValue == null) {
+ throw new IllegalArgumentException("keyValue can't be null");
+ }
+
+ try {
+ final Field field = To.class.getDeclaredField("childName");
+ field.setAccessible(true);
+ childName = (String) field.get(to);
+ } catch (final IllegalAccessException | NoSuchFieldException e) {
+ throw new RuntimeException(e);
+ }
+ timestamp = getTimestamp(to);
+
+ this.keyValue = keyValue;
+ }
+
+ /**
+ * The child this data was forwarded to.
+ *
+ * @return The child name, or {@code null} if it was broadcast.
+ */
+ @SuppressWarnings("unused")
+ public String childName() {
+ return childName;
+ }
+
+ /**
+ * The timestamp attached to the forwarded record.
+ *
+ * @return A timestamp, or {@code -1} if none was forwarded.
+ */
+ @SuppressWarnings("unused")
+ public long timestamp() {
+ return timestamp;
+ }
+
+ /**
+ * The data forwarded.
+ *
+ * @return A key/value pair. Not null.
+ */
+ @SuppressWarnings("unused")
+ public KeyValue<KForward, VForward> keyValue() {
+ return keyValue;
+ }
+
+ @Override
+ public String toString() {
+ return "CapturedForward{" +
+ "childName='" + childName + '\'' +
+ ", timestamp=" + timestamp +
+ ", keyValue=" + keyValue +
+ '}';
+ }
+ }
+
+ // constructors ================================================
+
+ /**
+ * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
+ * Most unit tests using this mock won't need to know the taskId,
+ * and most unit tests should be able to get by with the
+ * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+ */
+ @SuppressWarnings("unused")
+ public MockProcessorContext() {
+ this(
+ mkProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+ )),
+ new TaskId(0, 0),
+ null
+ );
+ }
+
+ /**
+ * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
+ * Most unit tests using this mock won't need to know the taskId,
+ * and most unit tests should be able to get by with the
+ * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+ *
+ * @param config a Properties object, used to configure the context and the processor.
+ */
+ @SuppressWarnings("unused")
+ public MockProcessorContext(final Properties config) {
+ this(config, new TaskId(0, 0), null);
+ }
+
+ /**
+ * Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
+ *
+ * @param config a {@link Properties} object, used to configure the context and the processor.
+ * @param taskId a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
+ * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
+ */
+ @SuppressWarnings("unused")
+ public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+ final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+ this.taskId = taskId;
+ this.config = streamsConfig;
+ this.stateDir = stateDir;
+ final MetricConfig metricConfig = new MetricConfig();
+ metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
+ final String threadId = Thread.currentThread().getName();
+ metrics = new StreamsMetricsImpl(
+ new Metrics(metricConfig),
+ threadId,
+ streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+ Time.SYSTEM
+ );
+ TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics);
+ }
+
+ @Override
+ public String applicationId() {
+ return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+ }
+
+ @Override
+ public TaskId taskId() {
+ return taskId;
+ }
+
+ @Override
+ public Map<String, Object> appConfigs() {
+ final Map<String, Object> combined = new HashMap<>();
+ combined.putAll(config.originals());
+ combined.putAll(config.values());
+ return combined;
+ }
+
+ @Override
+ public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+ return config.originalsWithPrefix(prefix);
+ }
+
+ @Override
+ public Serde<?> keySerde() {
+ return config.defaultKeySerde();
+ }
+
+ @Override
+ public Serde<?> valueSerde() {
+ return config.defaultValueSerde();
+ }
+
+ @Override
+ public File stateDir() {
+ return stateDir;
+ }
+
+ @Override
+ public StreamsMetrics metrics() {
+ return metrics;
+ }
+
+ // settable record metadata ================================================
+
+ /**
+ * The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+ * but for the purpose of driving unit tests, you can set them directly.
+ *
+ * @param topic A topic name
+ * @param partition A partition number
+ * @param offset A record offset
+ * @param timestamp A record timestamp
+ */
+ @SuppressWarnings("unused")
+ public void setRecordMetadata(final String topic,
+ final int partition,
+ final long offset,
+ final Headers headers,
+ final long timestamp) {
+ this.topic = topic;
+ this.partition = partition;
+ this.offset = offset;
+ this.headers = headers;
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+ * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+ *
+ * @param topic A topic name
+ */
+ @SuppressWarnings("unused")
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+ * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+ *
+ * @param partition A partition number
+ */
+ @SuppressWarnings("unused")
+ public void setPartition(final int partition) {
+ this.partition = partition;
+ }
+
+ /**
+ * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+ * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+ *
+ * @param offset A record offset
+ */
+ @SuppressWarnings("unused")
+ public void setOffset(final long offset) {
+ this.offset = offset;
+ }
+
+ /**
+ * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+ * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+ *
+ * @param headers Record headers
+ */
+ @SuppressWarnings("unused")
+ public void setHeaders(final Headers headers) {
+ this.headers = headers;
+ }
+
+ /**
+ * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+ * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+ *
+ * @param timestamp A record timestamp
+ */
+ @SuppressWarnings("unused")
+ public void setTimestamp(final long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String topic() {
+ if (topic == null) {
+ throw new IllegalStateException("Topic must be set before use via setRecordMetadata() or setTopic().");
+ }
+ return topic;
+ }
+
+ @Override
+ public int partition() {
+ if (partition == null) {
+ throw new IllegalStateException("Partition must be set before use via setRecordMetadata() or setPartition().");
+ }
+ return partition;
+ }
+
+ @Override
+ public long offset() {
+ if (offset == null) {
+ throw new IllegalStateException("Offset must be set before use via setRecordMetadata() or setOffset().");
+ }
+ return offset;
+ }
+
+ @Override
+ public Headers headers() {
+ return headers;
+ }
+
+ @Override
+ public long timestamp() {
+ if (timestamp == null) {
+ throw new IllegalStateException("Timestamp must be set before use via setRecordMetadata() or setTimestamp().");
+ }
+ return timestamp;
+ }
+
+ // mocks ================================================
+
+ @Override
+ public void register(final StateStore store,
+ final StateRestoreCallback stateRestoreCallbackIsIgnoredInMock) {
+ stateStores.put(store.name(), store);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <S extends StateStore> S getStateStore(final String name) {
+ return (S) stateStores.get(name);
+ }
+
+ @Override
+ public Cancellable schedule(final Duration interval,
+ final PunctuationType type,
+ final Punctuator callback) {
+ final CapturedPunctuator capturedPunctuator =
+ new CapturedPunctuator(ApiUtils.validateMillisecondDuration(interval, "interval"), type, callback);
+
+ punctuators.add(capturedPunctuator);
+
+ return capturedPunctuator::cancel;
+ }
+
+ /**
+ * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
+ *
+ * @return A list of captured punctuators.
+ */
+ @SuppressWarnings("unused")
+ public List<CapturedPunctuator> scheduledPunctuators() {
+ return new LinkedList<>(punctuators);
+ }
+
+ @Override
+ public <K extends KForward, V extends VForward> void forward(final K key, final V value) {
+ forward(key, value, To.all());
+ }
+
+ @Override
+ public <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to) {
+ capturedForwards.add(
+ new CapturedForward<>(
+ (getTimestamp(to)) == -1 ? to.withTimestamp(timestamp == null ? -1 : timestamp) : to,
+ new KeyValue<>(key, value)
+ )
+ );
+ }
+
+ /**
+ * Get all the forwarded data this context has observed. The returned list will not be
+ * affected by subsequent interactions with the context. The data in the list is in the same order as the calls to
+ * {@code forward(...)}.
+ *
+ * @return A list of key/value pairs that were previously passed to the context.
+ */
+ public List<CapturedForward<KForward, VForward>> forwarded() {
+ return new LinkedList<>(capturedForwards);
+ }
+
+ /**
+ * Get all the forwarded data this context has observed for a specific child by name.
+ * The returned list will not be affected by subsequent interactions with the context.
+ * The data in the list is in the same order as the calls to {@code forward(...)}.
+ *
+ * @param childName The child name to retrieve forwards for
+ * @return A list of key/value pairs that were previously passed to the context.
+ */
+ @SuppressWarnings("unused")
+ public List<CapturedForward<KForward, VForward>> forwarded(final String childName) {
+ final LinkedList<CapturedForward<KForward, VForward>> result = new LinkedList<>();
+ for (final CapturedForward<KForward, VForward> capture : capturedForwards) {
+ if (capture.childName() == null || capture.childName().equals(childName)) {
+ result.add(capture);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Clear the captured forwarded data.
+ */
+ @SuppressWarnings("unused")
+ public void resetForwards() {
+ capturedForwards.clear();
+ }
+
+ @Override
+ public void commit() {
+ committed = true;
+ }
+
+ /**
+ * Whether {@link ProcessorContext#commit()} has been called in this context.
+ *
+ * @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
+ */
+ public boolean committed() {
+ return committed;
+ }
+
+ /**
+ * Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
+ */
+ @SuppressWarnings("unused")
+ public void resetCommit() {
+ committed = false;
+ }
+
+ @Override
+ public RecordCollector recordCollector() {
+ // This interface is assumed by state stores that add change-logging.
+ // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
+
+ throw new UnsupportedOperationException(
+ "MockProcessorContext does not provide record collection. " +
+ "For processor unit tests, use an in-memory state store with change-logging disabled. " +
+ "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."
+ );
+ }
+
+ private static long getTimestamp(final To to) {
+ final long timestamp;
+ try {
+ final Field field = To.class.getDeclaredField("timestamp");
+ field.setAccessible(true);
+ timestamp = (long) field.get(to);
+ } catch (final IllegalAccessException | NoSuchFieldException e) {
+ throw new RuntimeException(e);
+ }
+ return timestamp;
+ }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java
new file mode 100644
index 0000000..6e9c5a6
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextReverseAdapter;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class MockApiProcessorContextTest {
+ @Test
+ public void shouldCaptureOutputRecords() {
+ final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() {
+ private ProcessorContext<String, Long> context;
+
+ @Override
+ public void init(final ProcessorContext<String, Long> context) {
+ this.context = context;
+ }
+
+ @Override
+ public void process(final String key, final Long value) {
+ context.forward(key + value, key.length() + value);
+ }
+ };
+
+ final MockProcessorContext<String, Long> context = new MockProcessorContext<>();
+ processor.init(context);
+
+ processor.process("foo", 5L);
+ processor.process("barbaz", 50L);
+
+ final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded().iterator();
+ assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
+ assertFalse(forwarded.hasNext());
+
+ context.resetForwards();
+
+ assertEquals(0, context.forwarded().size());
+ }
+
+ @Test
+ public void shouldCaptureOutputRecordsUsingTo() {
+ final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() {
+ private ProcessorContext<String, Long> context;
+
+ @Override
+ public void init(final ProcessorContext<String, Long> context) {
+ this.context = context;
+ }
+
+ @Override
+ public void process(final String key, final Long value) {
+ context.forward(key + value, key.length() + value, To.all());
+ }
+ };
+
+ final MockProcessorContext<String, Long> context = new MockProcessorContext<>();
+
+ processor.init(context);
+
+ processor.process("foo", 5L);
+ processor.process("barbaz", 50L);
+
+ final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded().iterator();
+ assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
+ assertFalse(forwarded.hasNext());
+
+ context.resetForwards();
+
+ assertEquals(0, context.forwarded().size());
+ }
+
+ @Test
+ public void shouldCaptureRecordsOutputToChildByName() {
+ final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() {
+ private int count = 0;
+ private ProcessorContext<String, Long> context;
+
+ @Override
+ public void init(final ProcessorContext<String, Long> context) {
+ this.context = context;
+ }
+
+ @Override
+ public void process(final String key, final Long value) {
+ if (count == 0) {
+ context.forward("start", -1L, To.all()); // broadcast
+ }
+ final To toChild = count % 2 == 0 ? To.child("george") : To.child("pete");
+ context.forward(key + value, key.length() + value, toChild);
+ count++;
+ }
+ };
+
+ final MockProcessorContext<String, Long> context = new MockProcessorContext<>();
+
+ processor.init(context);
+
+ processor.process("foo", 5L);
+ processor.process("barbaz", 50L);
+
+ {
+ final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded().iterator();
+
+ final CapturedForward forward1 = forwarded.next();
+ assertEquals(new KeyValue<>("start", -1L), forward1.keyValue());
+ assertNull(forward1.childName());
+
+ final CapturedForward forward2 = forwarded.next();
+ assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue());
+ assertEquals("george", forward2.childName());
+
+ final CapturedForward forward3 = forwarded.next();
+ assertEquals(new KeyValue<>("barbaz50", 56L), forward3.keyValue());
+ assertEquals("pete", forward3.childName());
+
+ assertFalse(forwarded.hasNext());
+ }
+
+ {
+ final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded("george").iterator();
+ assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
+ assertFalse(forwarded.hasNext());
+ }
+
+ {
+ final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded("pete").iterator();
+ assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
+ assertFalse(forwarded.hasNext());
+ }
+
+ {
+ final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded("steve").iterator();
+ assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
+ assertFalse(forwarded.hasNext());
+ }
+ }
+
+ @Test
+ public void shouldCaptureCommitsAndAllowReset() {
+ final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
+ private int count = 0;
+ private ProcessorContext<Void, Void> context;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ this.context = context;
+ }
+
+ @Override
+ public void process(final String key, final Long value) {
+ if (++count > 2) {
+ context.commit();
+ }
+ }
+ };
+
+ final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
+
+ processor.init(context);
+
+ processor.process("foo", 5L);
+ processor.process("barbaz", 50L);
+
+ assertFalse(context.committed());
+
+ processor.process("foobar", 500L);
+
+ assertTrue(context.committed());
+
+ context.resetCommit();
+
+ assertFalse(context.committed());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldStoreAndReturnStateStores() {
+ final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
+ private ProcessorContext<Void, Void> context;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ this.context = context;
+ }
+
+ @Override
+ public void process(final String key, final Long value) {
+ final KeyValueStore<String, Long> stateStore = context.getStateStore("my-state");
+ stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value);
+ stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value);
+ }
+ };
+
+ final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
+
+ final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("my-state"),
+ Serdes.String(),
+ Serdes.Long()).withLoggingDisabled();
+
+ final KeyValueStore<String, Long> store = storeBuilder.build();
+
+ store.init(ProcessorContextReverseAdapter.adapt(context, new ProcessorContextReverseAdapter.DeprecatedForwarder() {
+ @Override
+ public <K, V> void forward(final K key, final V value, final int childIndex) {
+ throw new UnsupportedOperationException();
+ }
+ }), store);
+
+ processor.init(context);
+
+ processor.process("foo", 5L);
+ processor.process("bar", 50L);
+
+ assertEquals(5L, (long) store.get("foo"));
+ assertEquals(50L, (long) store.get("bar"));
+ assertEquals(55L, (long) store.get("all"));
+ }
+
+ @Test
+ public void shouldCaptureApplicationAndRecordMetadata() {
+ final Properties config = new Properties();
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata");
+ config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+
+ final Processor<String, Object, String, Object> processor = new Processor<String, Object, String, Object>() {
+ private ProcessorContext<String, Object> context;
+
+ @Override
+ public void init(final ProcessorContext<String, Object> context) {
+ this.context = context;
+ }
+
+ @Override
+ public void process(final String key, final Object value) {
+ context.forward("appId", context.applicationId());
+ context.forward("taskId", context.taskId());
+
+ context.forward("topic", context.topic());
+ context.forward("partition", context.partition());
+ context.forward("offset", context.offset());
+ context.forward("timestamp", context.timestamp());
+
+ context.forward("key", key);
+ context.forward("value", value);
+ }
+ };
+
+ final MockProcessorContext<String, Object> context = new MockProcessorContext<>(config);
+ processor.init(context);
+
+ try {
+ processor.process("foo", 5L);
+ fail("Should have thrown an exception.");
+ } catch (final IllegalStateException expected) {
+ // expected, since the record metadata isn't initialized
+ }
+
+ context.resetForwards();
+ context.setRecordMetadata("t1", 0, 0L, null, 0L);
+
+ {
+ processor.process("foo", 5L);
+ final Iterator<CapturedForward<String, Object>> forwarded = context.forwarded().iterator();
+ assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("offset", 0L), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("timestamp", 0L), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("key", "foo"), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("value", 5L), forwarded.next().keyValue());
+ }
+
+ context.resetForwards();
+
+ // record metadata should be "sticky"
+ context.setOffset(1L);
+ context.setTimestamp(10L);
+
+ {
+ processor.process("bar", 50L);
+ final Iterator<CapturedForward<String, Object>> forwarded = context.forwarded().iterator();
+ assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("key", "bar"), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("value", 50L), forwarded.next().keyValue());
+ }
+
+ context.resetForwards();
+ // record metadata should be "sticky"
+ context.setTopic("t2");
+ context.setPartition(30);
+
+ {
+ processor.process("baz", 500L);
+ final Iterator<CapturedForward<String, Object>> forwarded = context.forwarded().iterator();
+ assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("topic", "t2"), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("partition", 30), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("key", "baz"), forwarded.next().keyValue());
+ assertEquals(new KeyValue<>("value", 500L), forwarded.next().keyValue());
+ }
+ }
+
+ @Test
+ public void shouldCapturePunctuator() {
+ final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ context.schedule(
+ Duration.ofSeconds(1L),
+ PunctuationType.WALL_CLOCK_TIME,
+ timestamp -> context.commit()
+ );
+ }
+
+ @Override
+ public void process(final String key, final Long value) {
+ }
+ };
+
+ final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
+
+ processor.init(context);
+
+ final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
+ assertEquals(1000L, capturedPunctuator.getIntervalMs());
+ assertEquals(PunctuationType.WALL_CLOCK_TIME, capturedPunctuator.getType());
+ assertFalse(capturedPunctuator.cancelled());
+
+ final Punctuator punctuator = capturedPunctuator.getPunctuator();
+ assertFalse(context.committed());
+ punctuator.punctuate(1234L);
+ assertTrue(context.committed());
+ }
+
+ @Test
+ public void fullConstructorShouldSetAllExpectedAttributes() {
+ final Properties config = new Properties();
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
+ config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+ config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+
+ final File dummyFile = new File("");
+ final MockProcessorContext<Void, Void> context = new MockProcessorContext<>(config, new TaskId(1, 1), dummyFile);
+
+ assertEquals("testFullConstructor", context.applicationId());
+ assertEquals(new TaskId(1, 1), context.taskId());
+ assertEquals("testFullConstructor", context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG));
+ assertEquals("testFullConstructor", context.appConfigsWithPrefix("application.").get("id"));
+ assertEquals(Serdes.String().getClass(), context.keySerde().getClass());
+ assertEquals(Serdes.Long().getClass(), context.valueSerde().getClass());
+ assertEquals(dummyFile, context.stateDir());
+ }
+}