You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/13 16:16:23 UTC
[kafka] branch trunk updated: KAFKA-10437: Implement new PAPI
support for test-utils (#9396)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 27b0e35 KAFKA-10437: Implement new PAPI support for test-utils (#9396)
27b0e35 is described below
commit 27b0e35e7a98f94c341a10a58e20dee177054712
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Tue Oct 13 11:15:22 2020 -0500
KAFKA-10437: Implement new PAPI support for test-utils (#9396)
Implements KIP-478 for the test-utils module:
* adds mocks of the new ProcessorContext and StateStoreContext
* adds tests that all stores and store builders are usable with the new mock
* adds tests that the new Processor api is usable with the new mock
* updates the demonstration Processor to the new api
Reviewers: Guozhang Wang <gu...@apache.org>
---
checkstyle/suppressions.xml | 4 +-
.../apache/kafka/streams/processor/api/Record.java | 28 ++
.../state/internals/InMemorySessionStore.java | 33 +-
.../processor/api/MockProcessorContext.java | 494 +++++++++++++++++++++
.../kafka/streams/MockProcessorContextTest.java | 8 +-
.../streams/internals/KeyValueStoreFacadeTest.java | 17 +-
.../streams/internals/WindowStoreFacadeTest.java | 17 +-
.../streams/test/MockProcessorContextAPITest.java | 353 +++++++++++++++
.../test/MockProcessorContextStateStoreTest.java | 200 +++++++++
.../WindowedWordCountProcessorSupplier.java | 33 +-
.../wordcount/WindowedWordCountProcessorTest.java | 115 ++---
11 files changed, 1184 insertions(+), 118 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2b19a82..c5c7f1f 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -194,13 +194,13 @@
files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
<suppress checks="CyclomaticComplexity"
- files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest).java"/>
+ files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
<suppress checks="JavaNCSS"
files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|TaskManagerTest).java"/>
<suppress checks="NPathComplexity"
- files="(EosBetaUpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest).java"/>
+ files="(EosBetaUpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
<suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
index 652a647..ab8844c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.errors.StreamsException;
+import java.util.Objects;
+
/**
* A data class representing an incoming record for processing in a {@link Processor}
* or a record to forward to downstream processors via {@link ProcessorContext}.
@@ -162,4 +164,30 @@ public class Record<K, V> {
public Record<K, V> withHeaders(final Headers headers) {
return new Record<>(key, value, timestamp, headers);
}
+
+ @Override
+ public String toString() {
+ return "Record{" +
+ "key=" + key +
+ ", value=" + value +
+ ", timestamp=" + timestamp +
+ ", headers=" + headers +
+ '}';
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ final Record<?, ?> record = (Record<?, ?>) o;
+ return timestamp == record.timestamp &&
+ Objects.equals(key, record.key) &&
+ Objects.equals(value, record.value) &&
+ Objects.equals(headers, record.headers);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value, timestamp, headers);
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index 46c4de2..b35eaa2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -75,18 +75,25 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
@Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
- this.context = (InternalProcessorContext) context;
-
- final StreamsMetricsImpl metrics = this.context.metrics();
final String threadId = Thread.currentThread().getName();
final String taskName = context.taskId().toString();
- expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(
- threadId,
- taskName,
- metricScope,
- name,
- metrics
- );
+
+ // The provided context is not required to implement InternalProcessorContext,
+ // If it doesn't, we can't record this metric.
+ if (context instanceof InternalProcessorContext) {
+ this.context = (InternalProcessorContext) context;
+ final StreamsMetricsImpl metrics = this.context.metrics();
+ expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(
+ threadId,
+ taskName,
+ metricScope,
+ name,
+ metrics
+ );
+ } else {
+ this.context = null;
+ expiredRecordSensor = null;
+ }
if (root != null) {
context.register(root, (key, value) -> put(SessionKeySchema.from(Bytes.wrap(key)), value));
@@ -102,7 +109,11 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
observedStreamTime = Math.max(observedStreamTime, windowEndTimestamp);
if (windowEndTimestamp <= observedStreamTime - retentionPeriod) {
- expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
+ // The provided context is not required to implement InternalProcessorContext,
+ // If it doesn't, we can't record this metric (in fact, we wouldn't have even initialized it).
+ if (expiredRecordSensor != null && context != null) {
+ expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
+ }
LOG.warn("Skipping record for expired segment.");
} else {
if (aggregate != null) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
new file mode 100644
index 0000000..e96bdcd
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
+ // Immutable fields ================================================
+ private final StreamsMetricsImpl metrics;
+ private final TaskId taskId;
+ private final StreamsConfig config;
+ private final File stateDir;
+
+ // settable record metadata ================================================
+ private MockRecordMetadata recordMetadata;
+
+ // mocks ================================================
+ private final Map<String, StateStore> stateStores = new HashMap<>();
+ private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+ private final List<CapturedForward<? extends KForward, ? extends VForward>> capturedForwards = new LinkedList<>();
+ private boolean committed = false;
+
+ private static final class MockRecordMetadata implements RecordMetadata {
+ private final String topic;
+ private final int partition;
+ private final long offset;
+
+ private MockRecordMetadata(final String topic, final int partition, final long offset) {
+ this.topic = topic;
+ this.partition = partition;
+ this.offset = offset;
+ }
+
+ @Override
+ public String topic() {
+ return topic;
+ }
+
+ @Override
+ public int partition() {
+ return partition;
+ }
+
+ @Override
+ public long offset() {
+ return offset;
+ }
+ }
+
+ /**
+ * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
+ */
+ public static final class CapturedPunctuator {
+ private final Duration interval;
+ private final PunctuationType type;
+ private final Punctuator punctuator;
+ private boolean cancelled = false;
+
+ private CapturedPunctuator(final Duration interval, final PunctuationType type, final Punctuator punctuator) {
+ this.interval = interval;
+ this.type = type;
+ this.punctuator = punctuator;
+ }
+
+ public Duration getInterval() {
+ return interval;
+ }
+
+ public PunctuationType getType() {
+ return type;
+ }
+
+ public Punctuator getPunctuator() {
+ return punctuator;
+ }
+
+ public void cancel() {
+ cancelled = true;
+ }
+
+ public boolean cancelled() {
+ return cancelled;
+ }
+ }
+
+ public static final class CapturedForward<K, V> {
+
+ private final Record<K, V> record;
+ private final Optional<String> childName;
+
+ public CapturedForward(final Record<K, V> record) {
+ this(record, Optional.empty());
+ }
+
+ public CapturedForward(final Record<K, V> record, final Optional<String> childName) {
+ this.record = Objects.requireNonNull(record);
+ this.childName = Objects.requireNonNull(childName);
+ }
+
+ /**
+ * The child this data was forwarded to.
+ *
+ * @return If present, the child name the record was forwarded to.
+ * If empty, the forward was a broadcast.
+ */
+ public Optional<String> childName() {
+ return childName;
+ }
+
+ /**
+ * The record that was forwarded.
+ *
+ * @return The forwarded record. Not null.
+ */
+ public Record<K, V> record() {
+ return record;
+ }
+
+ @Override
+ public String toString() {
+ return "CapturedForward{" +
+ "record=" + record +
+ ", childName=" + childName +
+ '}';
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ final CapturedForward<?, ?> that = (CapturedForward<?, ?>) o;
+ return Objects.equals(record, that.record) &&
+ Objects.equals(childName, that.childName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(record, childName);
+ }
+ }
+
+ // constructors ================================================
+
+ /**
+ * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
+ * Most unit tests using this mock won't need to know the taskId,
+ * and most unit tests should be able to get by with the
+ * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+ */
+ public MockProcessorContext() {
+ this(
+ mkProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+ )),
+ new TaskId(0, 0),
+ null
+ );
+ }
+
+ /**
+ * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
+ * Most unit tests using this mock won't need to know the taskId,
+ * and most unit tests should be able to get by with the
+ * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+ *
+ * @param config a Properties object, used to configure the context and the processor.
+ */
+ public MockProcessorContext(final Properties config) {
+ this(config, new TaskId(0, 0), null);
+ }
+
+ /**
+ * Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
+ *
+ * @param config a {@link Properties} object, used to configure the context and the processor.
+ * @param taskId a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
+ * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
+ */
+ public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+ final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+ this.taskId = taskId;
+ this.config = streamsConfig;
+ this.stateDir = stateDir;
+ final MetricConfig metricConfig = new MetricConfig();
+ metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
+ final String threadId = Thread.currentThread().getName();
+ metrics = new StreamsMetricsImpl(
+ new Metrics(metricConfig),
+ threadId,
+ streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+ Time.SYSTEM
+ );
+ TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics);
+ }
+
+ @Override
+ public String applicationId() {
+ return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+ }
+
+ @Override
+ public TaskId taskId() {
+ return taskId;
+ }
+
+ @Override
+ public Map<String, Object> appConfigs() {
+ final Map<String, Object> combined = new HashMap<>();
+ combined.putAll(config.originals());
+ combined.putAll(config.values());
+ return combined;
+ }
+
+ @Override
+ public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+ return config.originalsWithPrefix(prefix);
+ }
+
+ @Override
+ public Serde<?> keySerde() {
+ return config.defaultKeySerde();
+ }
+
+ @Override
+ public Serde<?> valueSerde() {
+ return config.defaultValueSerde();
+ }
+
+ @Override
+ public File stateDir() {
+ return Objects.requireNonNull(
+ stateDir,
+ "The stateDir constructor argument was needed (probably for a state store) but not supplied. " +
+ "You can either reconfigure your test so that it doesn't need access to the disk " +
+ "(such as using an in-memory store), or use the full MockProcessorContext constructor to supply " +
+ "a non-null stateDir argument."
+ );
+ }
+
+ @Override
+ public StreamsMetrics metrics() {
+ return metrics;
+ }
+
+ // settable record metadata ================================================
+
+ /**
+ * The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+ * but for the purpose of driving unit tests, you can set them directly.
+ *
+ * @param topic A topic name
+ * @param partition A partition number
+ * @param offset A record offset
+ */
+ public void setRecordMetadata(final String topic,
+ final int partition,
+ final long offset) {
+ recordMetadata = new MockRecordMetadata(topic, partition, offset);
+ }
+
+ @Override
+ public Optional<RecordMetadata> recordMetadata() {
+ return Optional.ofNullable(recordMetadata);
+ }
+
+ // mocks ================================================
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <S extends StateStore> S getStateStore(final String name) {
+ return (S) stateStores.get(name);
+ }
+
+ public <S extends StateStore> void addStateStore(final S stateStore) {
+ stateStores.put(stateStore.name(), stateStore);
+ }
+
+ @Override
+ public Cancellable schedule(final Duration interval,
+ final PunctuationType type,
+ final Punctuator callback) {
+ final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(interval, type, callback);
+
+ punctuators.add(capturedPunctuator);
+
+ return capturedPunctuator::cancel;
+ }
+
+ /**
+ * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
+ *
+ * @return A list of captured punctuators.
+ */
+ public List<CapturedPunctuator> scheduledPunctuators() {
+ return new LinkedList<>(punctuators);
+ }
+
+ @Override
+ public <K extends KForward, V extends VForward> void forward(final Record<K, V> record) {
+ forward(record, null);
+ }
+
+ @Override
+ public <K extends KForward, V extends VForward> void forward(final Record<K, V> record, final String childName) {
+ capturedForwards.add(new CapturedForward<>(record, Optional.ofNullable(childName)));
+ }
+
+ /**
+ * Get all the forwarded data this context has observed. The returned list will not be
+ * affected by subsequent interactions with the context. The data in the list is in the same order as the calls to
+ * {@code forward(...)}.
+ *
+ * @return A list of records that were previously passed to the context.
+ */
+ public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded() {
+ return new LinkedList<>(capturedForwards);
+ }
+
+ /**
+ * Get all the forwarded data this context has observed for a specific child by name.
+ * The returned list will not be affected by subsequent interactions with the context.
+ * The data in the list is in the same order as the calls to {@code forward(...)}.
+ *
+ * @param childName The child name to retrieve forwards for
+ * @return A list of records that were previously passed to the context.
+ */
+ public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded(final String childName) {
+ final LinkedList<CapturedForward<? extends KForward, ? extends VForward>> result = new LinkedList<>();
+ for (final CapturedForward<? extends KForward, ? extends VForward> capture : capturedForwards) {
+ if (!capture.childName().isPresent() || capture.childName().equals(Optional.of(childName))) {
+ result.add(capture);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Clear the captured forwarded data.
+ */
+ public void resetForwards() {
+ capturedForwards.clear();
+ }
+
+ @Override
+ public void commit() {
+ committed = true;
+ }
+
+ /**
+ * Whether {@link ProcessorContext#commit()} has been called in this context.
+ *
+ * @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
+ */
+ public boolean committed() {
+ return committed;
+ }
+
+ /**
+ * Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
+ */
+ public void resetCommit() {
+ committed = false;
+ }
+
+ @Override
+ public RecordCollector recordCollector() {
+ // This interface is assumed by state stores that add change-logging.
+ // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
+
+ throw new UnsupportedOperationException(
+ "MockProcessorContext does not provide record collection. " +
+ "For processor unit tests, use an in-memory state store with change-logging disabled. " +
+ "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."
+ );
+ }
+
+ /**
+ * Used to get a {@link StateStoreContext} for use with
+ * {@link StateStore#init(StateStoreContext, StateStore)}
+ * if you need to initialize a store for your tests.
+ * @return a {@link StateStoreContext} that delegates to this ProcessorContext.
+ */
+ public StateStoreContext getStateStoreContext() {
+ return new StateStoreContext() {
+ @Override
+ public String applicationId() {
+ return MockProcessorContext.this.applicationId();
+ }
+
+ @Override
+ public TaskId taskId() {
+ return MockProcessorContext.this.taskId();
+ }
+
+ @Override
+ public Serde<?> keySerde() {
+ return MockProcessorContext.this.keySerde();
+ }
+
+ @Override
+ public Serde<?> valueSerde() {
+ return MockProcessorContext.this.valueSerde();
+ }
+
+ @Override
+ public File stateDir() {
+ return MockProcessorContext.this.stateDir();
+ }
+
+ @Override
+ public StreamsMetrics metrics() {
+ return MockProcessorContext.this.metrics();
+ }
+
+ @Override
+ public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
+ stateStores.put(store.name(), store);
+ }
+
+ @Override
+ public Map<String, Object> appConfigs() {
+ return MockProcessorContext.this.appConfigs();
+ }
+
+ @Override
+ public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+ return MockProcessorContext.this.appConfigsWithPrefix(prefix);
+ }
+ };
+ }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 6e2f4ed..85d0a8b 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -43,6 +43,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+@SuppressWarnings("deprecation") // this is a test of a deprecated API
public class MockProcessorContextTest {
@Test
public void shouldCaptureOutputRecords() {
@@ -160,7 +161,6 @@ public class MockProcessorContextTest {
@Test
public void shouldThrowIfForwardedWithDeprecatedChildIndex() {
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
- @SuppressWarnings("deprecation")
@Override
public void process(final String key, final Long value) {
context().forward(key, value, 0);
@@ -182,7 +182,6 @@ public class MockProcessorContextTest {
@Test
public void shouldThrowIfForwardedWithDeprecatedChildName() {
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
- @SuppressWarnings("deprecation")
@Override
public void process(final String key, final Long value) {
context().forward(key, value, "child1");
@@ -231,14 +230,13 @@ public class MockProcessorContextTest {
assertFalse(context.committed());
}
- @SuppressWarnings({"deprecation", "unchecked"}) // TODO deprecation will be fixed in KAFKA-10437
+
@Test
public void shouldStoreAndReturnStateStores() {
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
@Override
public void process(final String key, final Long value) {
- @SuppressWarnings("unchecked")
- final KeyValueStore<String, Long> stateStore = (KeyValueStore<String, Long>) context().getStateStore("my-state");
+ final KeyValueStore<String, Long> stateStore = context().getStateStore("my-state");
stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value);
stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value);
}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
index 4d7a277..927db6b 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -53,9 +54,9 @@ public class KeyValueStoreFacadeTest {
keyValueStoreFacade = new KeyValueStoreFacade<>(mockedKeyValueTimestampStore);
}
- @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
+ @SuppressWarnings("deprecation") // test of deprecated method
@Test
- public void shouldForwardInit() {
+ public void shouldForwardDeprecatedInit() {
final ProcessorContext context = mock(ProcessorContext.class);
final StateStore store = mock(StateStore.class);
mockedKeyValueTimestampStore.init(context, store);
@@ -67,6 +68,18 @@ public class KeyValueStoreFacadeTest {
}
@Test
+ public void shouldForwardInit() {
+ final StateStoreContext context = mock(StateStoreContext.class);
+ final StateStore store = mock(StateStore.class);
+ mockedKeyValueTimestampStore.init(context, store);
+ expectLastCall();
+ replay(mockedKeyValueTimestampStore);
+
+ keyValueStoreFacade.init(context, store);
+ verify(mockedKeyValueTimestampStore);
+ }
+
+ @Test
public void shouldPutWithUnknownTimestamp() {
mockedKeyValueTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP));
expectLastCall();
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
index 6a2c6bd..981e137 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.easymock.EasyMockRunner;
@@ -47,9 +48,9 @@ public class WindowStoreFacadeTest {
windowStoreFacade = new WindowStoreFacade<>(mockedWindowTimestampStore);
}
- @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
+ @SuppressWarnings("deprecation") // test of deprecated method
@Test
- public void shouldForwardInit() {
+ public void shouldForwardDeprecatedInit() {
final ProcessorContext context = mock(ProcessorContext.class);
final StateStore store = mock(StateStore.class);
mockedWindowTimestampStore.init(context, store);
@@ -61,6 +62,18 @@ public class WindowStoreFacadeTest {
}
@Test
+ public void shouldForwardInit() {
+ final StateStoreContext context = mock(StateStoreContext.class);
+ final StateStore store = mock(StateStore.class);
+ mockedWindowTimestampStore.init(context, store);
+ expectLastCall();
+ replay(mockedWindowTimestampStore);
+
+ windowStoreFacade.init(context, store);
+ verify(mockedWindowTimestampStore);
+ }
+
+ @Test
@SuppressWarnings("deprecation")
public void shouldPutWithUnknownTimestamp() {
mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP));
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
new file mode 100644
index 0000000..7d2816a
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+public class MockProcessorContextAPITest {
+ @Test
+ public void shouldCaptureOutputRecords() {
+ final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() {
+ private ProcessorContext<String, Long> context;
+
+ @Override
+ public void init(final ProcessorContext<String, Long> context) {
+ this.context = context;
+ }
+
+ @Override
+ public void process(final Record<String, Long> record) {
+ final String key = record.key();
+ final Long value = record.value();
+ context.forward(record.withKey(key + value).withValue(key.length() + value));
+ }
+ };
+
+ final MockProcessorContext<String, Long> context = new MockProcessorContext<>();
+ processor.init(context);
+
+ processor.process(new Record<>("foo", 5L, 0L));
+ processor.process(new Record<>("barbaz", 50L, 0L));
+
+ final List<CapturedForward<? extends String, ? extends Long>> actual = context.forwarded();
+ final List<CapturedForward<String, Long>> expected = asList(
+ new CapturedForward<>(new Record<>("foo5", 8L, 0L)),
+ new CapturedForward<>(new Record<>("barbaz50", 56L, 0L))
+ );
+ assertThat(actual, is(expected));
+
+ context.resetForwards();
+
+ assertThat(context.forwarded(), empty());
+ }
+
+ @Test
+ public void shouldCaptureRecordsOutputToChildByName() {
+ final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() {
+ private ProcessorContext<String, Long> context;
+
+ @Override
+ public void process(final Record<String, Long> record) {
+ final String key = record.key();
+ final Long value = record.value();
+ if (count == 0) {
+ context.forward(new Record<>("start", -1L, 0L)); // broadcast
+ }
+ final String toChild = count % 2 == 0 ? "george" : "pete";
+ context.forward(new Record<>(key + value, key.length() + value, 0L), toChild);
+ count++;
+ }
+
+ @Override
+ public void init(final ProcessorContext<String, Long> context) {
+ this.context = context;
+ }
+
+ private int count = 0;
+
+ };
+
+ final MockProcessorContext<String, Long> context = new MockProcessorContext<>();
+
+ processor.init(context);
+
+ processor.process(new Record<>("foo", 5L, 0L));
+ processor.process(new Record<>("barbaz", 50L, 0L));
+
+ {
+ final List<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded();
+ final List<CapturedForward<? extends String, ? extends Long>> expected = asList(
+ new CapturedForward<>(new Record<>("start", -1L, 0L), Optional.empty()),
+ new CapturedForward<>(new Record<>("foo5", 8L, 0L), Optional.of("george")),
+ new CapturedForward<>(new Record<>("barbaz50", 56L, 0L), Optional.of("pete"))
+ );
+
+ assertThat(forwarded, is(expected));
+ }
+ {
+ final List<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded("george");
+ final List<CapturedForward<? extends String, ? extends Long>> expected = asList(
+ new CapturedForward<>(new Record<>("start", -1L, 0L), Optional.empty()),
+ new CapturedForward<>(new Record<>("foo5", 8L, 0L), Optional.of("george"))
+ );
+
+ assertThat(forwarded, is(expected));
+ }
+ {
+ final List<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded("pete");
+ final List<CapturedForward<? extends String, ? extends Long>> expected = asList(
+ new CapturedForward<>(new Record<>("start", -1L, 0L), Optional.empty()),
+ new CapturedForward<>(new Record<>("barbaz50", 56L, 0L), Optional.of("pete"))
+ );
+
+ assertThat(forwarded, is(expected));
+ }
+ {
+ final List<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded("steve");
+ final List<CapturedForward<? extends String, ? extends Long>> expected = singletonList(
+ new CapturedForward<>(new Record<>("start", -1L, 0L))
+ );
+
+ assertThat(forwarded, is(expected));
+ }
+ }
+
+ @Test
+ public void shouldCaptureCommitsAndAllowReset() {
+ final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
+ private ProcessorContext<Void, Void> context;
+ private int count = 0;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ this.context = context;
+ }
+
+ @Override
+ public void process(final Record<String, Long> record) {
+ if (++count > 2) {
+ context.commit();
+ }
+ }
+ };
+
+ final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
+
+ processor.init(context);
+
+ processor.process(new Record<>("foo", 5L, 0L));
+ processor.process(new Record<>("barbaz", 50L, 0L));
+
+ assertThat(context.committed(), is(false));
+
+ processor.process(new Record<>("foobar", 500L, 0L));
+
+ assertThat(context.committed(), is(true));
+
+ context.resetCommit();
+
+ assertThat(context.committed(), is(false));
+ }
+
+ @Test
+ public void shouldStoreAndReturnStateStores() {
+ final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
+ private ProcessorContext<Void, Void> context;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ this.context = context;
+ }
+
+ @Override
+ public void process(final Record<String, Long> record) {
+ final String key = record.key();
+ final Long value = record.value();
+ final KeyValueStore<String, Long> stateStore = context.getStateStore("my-state");
+
+ stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value);
+ stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value);
+ }
+
+ };
+
+ final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
+
+ final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("my-state"),
+ Serdes.String(),
+ Serdes.Long()).withLoggingDisabled();
+
+ final KeyValueStore<String, Long> store = storeBuilder.build();
+
+ store.init(context.getStateStoreContext(), store);
+
+ processor.init(context);
+
+ processor.process(new Record<>("foo", 5L, 0L));
+ processor.process(new Record<>("bar", 50L, 0L));
+
+ assertThat(store.get("foo"), is(5L));
+ assertThat(store.get("bar"), is(50L));
+ assertThat(store.get("all"), is(55L));
+ }
+
+
+ @Test
+ public void shouldCaptureApplicationAndRecordMetadata() {
+ final Properties config = mkProperties(
+ mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata"),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+ )
+ );
+
+ final Processor<String, Object, String, Object> processor = new Processor<String, Object, String, Object>() {
+ private ProcessorContext<String, Object> context;
+
+ @Override
+ public void init(final ProcessorContext<String, Object> context) {
+ this.context = context;
+ }
+
+ @Override
+ public void process(final Record<String, Object> record) {
+ context.forward(new Record<String, Object>("appId", context.applicationId(), 0L));
+ context.forward(new Record<String, Object>("taskId", context.taskId(), 0L));
+
+ if (context.recordMetadata().isPresent()) {
+ final RecordMetadata recordMetadata = context.recordMetadata().get();
+ context.forward(new Record<String, Object>("topic", recordMetadata.topic(), 0L));
+ context.forward(new Record<String, Object>("partition", recordMetadata.partition(), 0L));
+ context.forward(new Record<String, Object>("offset", recordMetadata.offset(), 0L));
+ }
+
+ context.forward(new Record<String, Object>("record", record, 0L));
+ }
+ };
+
+ final MockProcessorContext<String, Object> context = new MockProcessorContext<>(config);
+ processor.init(context);
+
+ processor.process(new Record<>("foo", 5L, 0L));
+ {
+ final List<CapturedForward<? extends String, ?>> forwarded = context.forwarded();
+ final List<CapturedForward<? extends String, ?>> expected = asList(
+ new CapturedForward<>(new Record<>("appId", "testMetadata", 0L)),
+ new CapturedForward<>(new Record<>("taskId", new TaskId(0, 0), 0L)),
+ new CapturedForward<>(new Record<>("record", new Record<>("foo", 5L, 0L), 0L))
+ );
+ assertThat(forwarded, is(expected));
+ }
+ context.resetForwards();
+ context.setRecordMetadata("t1", 0, 0L);
+ processor.process(new Record<>("foo", 5L, 0L));
+ {
+ final List<CapturedForward<? extends String, ?>> forwarded = context.forwarded();
+ final List<CapturedForward<? extends String, ?>> expected = asList(
+ new CapturedForward<>(new Record<>("appId", "testMetadata", 0L)),
+ new CapturedForward<>(new Record<>("taskId", new TaskId(0, 0), 0L)),
+ new CapturedForward<>(new Record<>("topic", "t1", 0L)),
+ new CapturedForward<>(new Record<>("partition", 0, 0L)),
+ new CapturedForward<>(new Record<>("offset", 0L, 0L)),
+ new CapturedForward<>(new Record<>("record", new Record<>("foo", 5L, 0L), 0L))
+ );
+ assertThat(forwarded, is(expected));
+ }
+ }
+
+ @Test
+ public void shouldCapturePunctuator() {
+ final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ context.schedule(
+ Duration.ofSeconds(1L),
+ PunctuationType.WALL_CLOCK_TIME,
+ timestamp -> context.commit()
+ );
+ }
+
+ @Override
+ public void process(final Record<String, Long> record) {}
+ };
+
+ final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
+
+ processor.init(context);
+
+ final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
+ assertThat(capturedPunctuator.getInterval(), is(Duration.ofMillis(1000L)));
+ assertThat(capturedPunctuator.getType(), is(PunctuationType.WALL_CLOCK_TIME));
+ assertThat(capturedPunctuator.cancelled(), is(false));
+
+ final Punctuator punctuator = capturedPunctuator.getPunctuator();
+ assertThat(context.committed(), is(false));
+ punctuator.punctuate(1234L);
+ assertThat(context.committed(), is(true));
+ }
+
+ @Test
+ public void fullConstructorShouldSetAllExpectedAttributes() {
+ final Properties config = new Properties();
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
+ config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+ config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+
+ final File dummyFile = new File("");
+ final MockProcessorContext<Void, Void> context =
+ new MockProcessorContext<>(config, new TaskId(1, 1), dummyFile);
+
+ assertThat(context.applicationId(), is("testFullConstructor"));
+ assertThat(context.taskId(), is(new TaskId(1, 1)));
+ assertThat(context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG), is("testFullConstructor"));
+ assertThat(context.appConfigsWithPrefix("application.").get("id"), is("testFullConstructor"));
+ assertThat(context.keySerde().getClass(), is(Serdes.String().getClass()));
+ assertThat(context.valueSerde().getClass(), is(Serdes.Long().getClass()));
+ assertThat(context.stateDir(), is(dummyFile));
+ }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextStateStoreTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextStateStoreTest.java
new file mode 100644
index 0000000..4662ddf
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextStateStoreTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(value = Parameterized.class)
+public class MockProcessorContextStateStoreTest {
+ private final StoreBuilder<StateStore> builder;
+ private final boolean timestamped;
+ private final boolean caching;
+ private final boolean logging;
+
+ @Parameterized.Parameters(name = "builder = {0}, timestamped = {1}, caching = {2}, logging = {3}")
+ public static Collection<Object[]> data() {
+ final List<Boolean> booleans = asList(true, false);
+
+ final List<Object[]> values = new ArrayList<>();
+
+ for (final Boolean timestamped : booleans) {
+ for (final Boolean caching : booleans) {
+ for (final Boolean logging : booleans) {
+ final List<KeyValueBytesStoreSupplier> keyValueBytesStoreSuppliers = asList(
+ Stores.inMemoryKeyValueStore("kv" + timestamped + caching + logging),
+ Stores.persistentKeyValueStore("kv" + timestamped + caching + logging),
+ Stores.persistentTimestampedKeyValueStore("kv" + timestamped + caching + logging)
+ );
+ for (final KeyValueBytesStoreSupplier supplier : keyValueBytesStoreSuppliers) {
+ final StoreBuilder<? extends KeyValueStore<String, ?>> builder;
+ if (timestamped) {
+ builder = Stores.timestampedKeyValueStoreBuilder(supplier, Serdes.String(), Serdes.Long());
+ } else {
+ builder = Stores.keyValueStoreBuilder(supplier, Serdes.String(), Serdes.Long());
+ }
+ if (caching) {
+ builder.withCachingEnabled();
+ } else {
+ builder.withCachingDisabled();
+ }
+ if (logging) {
+ builder.withLoggingEnabled(Collections.emptyMap());
+ } else {
+ builder.withLoggingDisabled();
+ }
+
+ values.add(new Object[] {builder, timestamped, caching, logging});
+ }
+ }
+ }
+ }
+
+ for (final Boolean timestamped : booleans) {
+ for (final Boolean caching : booleans) {
+ for (final Boolean logging : booleans) {
+ final List<WindowBytesStoreSupplier> windowBytesStoreSuppliers = asList(
+ Stores.inMemoryWindowStore("w" + timestamped + caching + logging, Duration.ofSeconds(1), Duration.ofSeconds(1), false),
+ Stores.persistentWindowStore("w" + timestamped + caching + logging, Duration.ofSeconds(1), Duration.ofSeconds(1), false),
+ Stores.persistentTimestampedWindowStore("w" + timestamped + caching + logging, Duration.ofSeconds(1), Duration.ofSeconds(1), false)
+ );
+
+ for (final WindowBytesStoreSupplier supplier : windowBytesStoreSuppliers) {
+ final StoreBuilder<? extends WindowStore<String, ?>> builder;
+ if (timestamped) {
+ builder = Stores.timestampedWindowStoreBuilder(supplier, Serdes.String(), Serdes.Long());
+ } else {
+ builder = Stores.windowStoreBuilder(supplier, Serdes.String(), Serdes.Long());
+ }
+ if (caching) {
+ builder.withCachingEnabled();
+ } else {
+ builder.withCachingDisabled();
+ }
+ if (logging) {
+ builder.withLoggingEnabled(Collections.emptyMap());
+ } else {
+ builder.withLoggingDisabled();
+ }
+
+ values.add(new Object[] {builder, timestamped, caching, logging});
+ }
+ }
+ }
+ }
+
+ for (final Boolean caching : booleans) {
+ for (final Boolean logging : booleans) {
+ final List<SessionBytesStoreSupplier> sessionBytesStoreSuppliers = asList(
+ Stores.inMemorySessionStore("s" + caching + logging, Duration.ofSeconds(1)),
+ Stores.persistentSessionStore("s" + caching + logging, Duration.ofSeconds(1))
+ );
+
+ for (final SessionBytesStoreSupplier supplier : sessionBytesStoreSuppliers) {
+ final StoreBuilder<? extends SessionStore<String, ?>> builder =
+ Stores.sessionStoreBuilder(supplier, Serdes.String(), Serdes.Long());
+ if (caching) {
+ builder.withCachingEnabled();
+ } else {
+ builder.withCachingDisabled();
+ }
+ if (logging) {
+ builder.withLoggingEnabled(Collections.emptyMap());
+ } else {
+ builder.withLoggingDisabled();
+ }
+
+ values.add(new Object[] {builder, false, caching, logging});
+ }
+ }
+ }
+
+ return values;
+ }
+
+ public MockProcessorContextStateStoreTest(final StoreBuilder<StateStore> builder,
+ final boolean timestamped,
+ final boolean caching,
+ final boolean logging) {
+
+ this.builder = builder;
+ this.timestamped = timestamped;
+ this.caching = caching;
+ this.logging = logging;
+ }
+
+ @Test
+ public void shouldEitherInitOrThrow() {
+ final File stateDir = TestUtils.tempDirectory();
+ try {
+ final MockProcessorContext<Void, Void> context = new MockProcessorContext<>(
+ mkProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+ )),
+ new TaskId(0, 0),
+ stateDir
+ );
+ final StateStore store = builder.build();
+ if (caching || logging) {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> store.init(context.getStateStoreContext(), store)
+ );
+ } else {
+ store.init(context.getStateStoreContext(), store);
+ store.close();
+ }
+ } finally {
+ try {
+ Utils.delete(stateDir);
+ } catch (final IOException e) {
+ // Failed to clean up the state dir. The JVM hooks will try again later.
+ }
+ }
+ }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorSupplier.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorSupplier.java
index 7318f4548..403e453 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorSupplier.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorSupplier.java
@@ -18,43 +18,41 @@ package org.apache.kafka.streams.test.wordcount;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import java.time.Duration;
import java.util.Locale;
-public final class WindowedWordCountProcessorSupplier implements ProcessorSupplier<String, String> {
+public final class WindowedWordCountProcessorSupplier implements ProcessorSupplier<String, String, String, String> {
@Override
- public Processor<String, String> get() {
- return new Processor<String, String>() {
- private ProcessorContext context;
+ public Processor<String, String, String, String> get() {
+ return new Processor<String, String, String, String>() {
private WindowStore<String, Integer> windowStore;
@Override
- @SuppressWarnings("unchecked")
- public void init(final ProcessorContext context) {
- this.context = context;
- this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
+ public void init(final ProcessorContext<String, String> context) {
+ context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator<Windowed<String>, Integer> iter = windowStore.all()) {
while (iter.hasNext()) {
final KeyValue<Windowed<String>, Integer> entry = iter.next();
- context.forward(entry.key.toString(), entry.value.toString());
+ context.forward(new Record<>(entry.key.toString(), entry.value.toString(), timestamp));
}
}
});
- windowStore = (WindowStore<String, Integer>) context.getStateStore("WindowedCounts");
+ windowStore = context.getStateStore("WindowedCounts");
}
@Override
- public void process(final String key, final String value) {
- final String[] words = value.toLowerCase(Locale.getDefault()).split(" ");
- final long timestamp = context.timestamp();
+ public void process(final Record<String, String> record) {
+ final String[] words = record.value().toLowerCase(Locale.getDefault()).split(" ");
+ final long timestamp = record.timestamp();
// calculate the window as every 100 ms
// Note this has to be aligned with the configuration for the window store you register separately
@@ -70,9 +68,6 @@ public final class WindowedWordCountProcessorSupplier implements ProcessorSuppli
}
}
}
-
- @Override
- public void close() {}
};
}
}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
index 00a9f8f..a61ead4 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
@@ -18,11 +18,12 @@ package org.apache.kafka.streams.test.wordcount;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.MockProcessorContext;
-import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.TestUtils;
@@ -31,19 +32,17 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
-import java.util.HashMap;
-import java.util.Iterator;
+import java.util.List;
import java.util.Properties;
+import static java.util.Arrays.asList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThrows;
public class WindowedWordCountProcessorTest {
- @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
@Test
public void shouldWorkWithInMemoryStore() {
- final MockProcessorContext context = new MockProcessorContext();
+ final MockProcessorContext<String, String> context = new MockProcessorContext<>();
// Create, initialize, and register the state store.
final WindowStore<String, Integer> store =
@@ -56,20 +55,18 @@ public class WindowedWordCountProcessorTest {
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
.withCachingDisabled() // Caching is not supported by MockProcessorContext.
.build();
- store.init(context, store);
- context.register(store, null);
+ store.init(context.getStateStoreContext(), store);
+ context.getStateStoreContext().register(store, null);
// Create and initialize the processor under test
- final Processor<String, String> processor = new WindowedWordCountProcessorSupplier().get();
+ final Processor<String, String, String, String> processor = new WindowedWordCountProcessorSupplier().get();
processor.init(context);
// send a record to the processor
- context.setTimestamp(101);
- processor.process("key", "alpha beta gamma alpha");
+ processor.process(new Record<>("key", "alpha beta gamma alpha", 101L));
// send a record to the processor in a new window
- context.setTimestamp(221);
- processor.process("key", "gamma delta");
+ processor.process(new Record<>("key", "gamma delta", 221L));
// note that the processor does not forward during process()
assertThat(context.forwarded().isEmpty(), is(true));
@@ -78,16 +75,18 @@ public class WindowedWordCountProcessorTest {
context.scheduledPunctuators().get(0).getPunctuator().punctuate(1_000L);
// finally, we can verify the output.
- final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
- assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[alpha@100/200]", "2")));
- assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[beta@100/200]", "1")));
- assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@100/200]", "1")));
- assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[delta@200/300]", "1")));
- assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@200/300]", "1")));
- assertThat(capturedForwards.hasNext(), is(false));
+ final List<CapturedForward<? extends String, ? extends String>> capturedForwards = context.forwarded();
+ final List<CapturedForward<? extends String, ? extends String>> expected = asList(
+ new CapturedForward<>(new Record<>("[alpha@100/200]", "2", 1_000L)),
+ new CapturedForward<>(new Record<>("[beta@100/200]", "1", 1_000L)),
+ new CapturedForward<>(new Record<>("[gamma@100/200]", "1", 1_000L)),
+ new CapturedForward<>(new Record<>("[delta@200/300]", "1", 1_000L)),
+ new CapturedForward<>(new Record<>("[gamma@200/300]", "1", 1_000L))
+ );
+
+ assertThat(capturedForwards, is(expected));
}
- @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
@Test
public void shouldWorkWithPersistentStore() throws IOException {
final Properties properties = new Properties();
@@ -97,7 +96,7 @@ public class WindowedWordCountProcessorTest {
final File stateDir = TestUtils.tempDirectory();
try {
- final MockProcessorContext context = new MockProcessorContext(
+ final MockProcessorContext<String, String> context = new MockProcessorContext<>(
properties,
new TaskId(0, 0),
stateDir
@@ -114,20 +113,18 @@ public class WindowedWordCountProcessorTest {
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
.withCachingDisabled() // Caching is not supported by MockProcessorContext.
.build();
- store.init(context, store);
- context.register(store, null);
+ store.init(context.getStateStoreContext(), store);
+ context.getStateStoreContext().register(store, null);
// Create and initialize the processor under test
- final Processor<String, String> processor = new WindowedWordCountProcessorSupplier().get();
+ final Processor<String, String, String, String> processor = new WindowedWordCountProcessorSupplier().get();
processor.init(context);
// send a record to the processor
- context.setTimestamp(101);
- processor.process("key", "alpha beta gamma alpha");
+ processor.process(new Record<>("key", "alpha beta gamma alpha", 101L));
// send a record to the processor in a new window
- context.setTimestamp(221);
- processor.process("key", "gamma delta");
+ processor.process(new Record<>("key", "gamma delta", 221L));
// note that the processor does not forward during process()
assertThat(context.forwarded().isEmpty(), is(true));
@@ -136,54 +133,18 @@ public class WindowedWordCountProcessorTest {
context.scheduledPunctuators().get(0).getPunctuator().punctuate(1_000L);
// finally, we can verify the output.
- final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
- assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[alpha@100/200]", "2")));
- assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[beta@100/200]", "1")));
- assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[delta@200/300]", "1")));
- assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@100/200]", "1")));
- assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@200/300]", "1")));
- assertThat(capturedForwards.hasNext(), is(false));
+ final List<CapturedForward<? extends String, ? extends String>> capturedForwards = context.forwarded();
+ final List<CapturedForward<? extends String, ? extends String>> expected = asList(
+ new CapturedForward<>(new Record<>("[alpha@100/200]", "2", 1_000L)),
+ new CapturedForward<>(new Record<>("[beta@100/200]", "1", 1_000L)),
+ new CapturedForward<>(new Record<>("[delta@200/300]", "1", 1_000L)),
+ new CapturedForward<>(new Record<>("[gamma@100/200]", "1", 1_000L)),
+ new CapturedForward<>(new Record<>("[gamma@200/300]", "1", 1_000L))
+ );
+
+ assertThat(capturedForwards, is(expected));
} finally {
Utils.delete(stateDir);
}
}
-
- @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
- @Test
- public void shouldFailWithLogging() {
- final MockProcessorContext context = new MockProcessorContext();
-
- // Create, initialize, and register the state store.
- final WindowStore<String, Integer> store =
- Stores.windowStoreBuilder(Stores.inMemoryWindowStore("WindowedCounts",
- Duration.ofDays(24),
- Duration.ofMillis(100),
- false),
- Serdes.String(),
- Serdes.Integer())
- .withLoggingEnabled(new HashMap<>()) // Changelog is not supported by MockProcessorContext.
- .withCachingDisabled() // Caching is not supported by MockProcessorContext.
- .build();
- assertThrows(IllegalArgumentException.class, () -> store.init(context, store));
- }
-
- @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
- @Test
- public void shouldFailWithCaching() {
- final MockProcessorContext context = new MockProcessorContext();
-
- // Create, initialize, and register the state store.
- final WindowStore<String, Integer> store =
- Stores.windowStoreBuilder(Stores.inMemoryWindowStore("WindowedCounts",
- Duration.ofDays(24),
- Duration.ofMillis(100),
- false),
- Serdes.String(),
- Serdes.Integer())
- .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
- .withCachingEnabled() // Caching is not supported by MockProcessorContext.
- .build();
-
- assertThrows(IllegalArgumentException.class, () -> store.init(context, store));
- }
}