You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/09/13 20:04:17 UTC
[kafka] branch trunk updated: MINOR: Insure that KafkaStreams
client is closed if test fails (#5618)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 466d893 MINOR: Insure that KafkaStreams client is closed if test fails (#5618)
466d893 is described below
commit 466d89306ea23d1f17be0914416944618ceb9f86
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Sep 13 13:04:09 2018 -0700
MINOR: Insure that KafkaStreams client is closed if test fails (#5618)
Reviewers: Guozhang Wang <gu...@confluent.io>, John Roessler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Eno Thereska <en...@amazon.com>
---
.../kstream/internals/InternalStreamsBuilder.java | 2 +-
.../internals/InternalTopologyBuilder.java | 70 ++--
.../org/apache/kafka/streams/KafkaStreamsTest.java | 381 ++++++++++-----------
.../org/apache/kafka/streams/TopologyTest.java | 4 +-
.../integration/RegexSourceIntegrationTest.java | 4 +-
.../internals/AbstractProcessorContextTest.java | 4 +-
.../internals/InternalTopologyBuilderTest.java | 62 +++-
.../internals/ProcessorStateManagerTest.java | 60 ++--
.../processor/internals/StandbyTaskTest.java | 12 +-
.../processor/internals/StreamTaskTest.java | 4 +-
.../internals/StreamsPartitionAssignorTest.java | 12 +-
...{MockStateStore.java => MockKeyValueStore.java} | 49 ++-
...eBuilder.java => MockKeyValueStoreBuilder.java} | 10 +-
13 files changed, 368 insertions(+), 306 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index c1532af..49f49d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -149,7 +149,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
// explicitly disable logging for global stores
materialized.withLoggingDisabled();
final StoreBuilder storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
- final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
+ final String sourceName = newProcessorName(KTableImpl.SOURCE_NAME);
final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index edff470..7fa2851 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -390,6 +390,7 @@ public class InternalTopologyBuilder {
nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
nodeToSourceTopics.put(name, Arrays.asList(topics));
nodeGrouper.add(name);
+ nodeGroups = null;
}
public final void addSource(final Topology.AutoOffsetReset offsetReset,
@@ -428,6 +429,7 @@ public class InternalTopologyBuilder {
nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
nodeToSourcePatterns.put(name, topicPattern);
nodeGrouper.add(name);
+ nodeGroups = null;
}
public final <K, V> void addSink(final String name,
@@ -445,6 +447,7 @@ public class InternalTopologyBuilder {
addSink(name, new StaticTopicNameExtractor<>(topic), keySerializer, valSerializer, partitioner, predecessorNames);
nodeToSinkTopic.put(name, topic);
+ nodeGroups = null;
}
public final <K, V> void addSink(final String name,
@@ -479,6 +482,7 @@ public class InternalTopologyBuilder {
nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topicExtractor, keySerializer, valSerializer, partitioner));
nodeGrouper.add(name);
nodeGrouper.unite(name, predecessorNames);
+ nodeGroups = null;
}
public final void addProcessor(final String name,
@@ -507,6 +511,7 @@ public class InternalTopologyBuilder {
nodeFactories.put(name, new ProcessorNodeFactory(name, predecessorNames, supplier));
nodeGrouper.add(name);
nodeGrouper.unite(name, predecessorNames);
+ nodeGroups = null;
}
public final void addStateStore(final StoreBuilder storeBuilder,
@@ -530,6 +535,7 @@ public class InternalTopologyBuilder {
connectProcessorAndStateStore(processorName, storeBuilder.name());
}
}
+ nodeGroups = null;
}
public final void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
@@ -549,15 +555,29 @@ public class InternalTopologyBuilder {
storeBuilder.loggingEnabled());
validateTopicNotAlreadyRegistered(topic);
- addGlobalStore(sourceName,
- timestampExtractor,
- keyDeserializer,
- valueDeserializer,
- topic,
- processorName,
- stateUpdateSupplier,
- storeBuilder.name(),
- storeBuilder);
+ final String[] topics = {topic};
+ final String[] predecessors = {sourceName};
+
+ final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName,
+ predecessors,
+ stateUpdateSupplier);
+
+ globalTopics.add(topic);
+ nodeFactories.put(sourceName, new SourceNodeFactory(sourceName,
+ topics,
+ null,
+ timestampExtractor,
+ keyDeserializer,
+ valueDeserializer));
+ nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
+ nodeGrouper.add(sourceName);
+ nodeFactory.addStateStore(storeBuilder.name());
+ nodeFactories.put(processorName, nodeFactory);
+ nodeGrouper.add(processorName);
+ nodeGrouper.unite(processorName, predecessors);
+ globalStateBuilders.put(storeBuilder.name(), storeBuilder);
+ connectSourceStoreAndTopic(storeBuilder.name(), topic);
+ nodeGroups = null;
}
private void validateTopicNotAlreadyRegistered(final String topic) {
@@ -583,6 +603,7 @@ public class InternalTopologyBuilder {
Objects.requireNonNull(stateStoreName, "state store name must not be null");
connectProcessorAndStateStore(processorName, stateStoreName);
}
+ nodeGroups = null;
}
private void connectSourceStoreAndTopic(final String sourceStoreName,
@@ -637,37 +658,6 @@ public class InternalTopologyBuilder {
}
}
- private void addGlobalStore(final String sourceName,
- final TimestampExtractor timestampExtractor,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
- final String topic,
- final String processorName,
- final ProcessorSupplier stateUpdateSupplier,
- final String name,
- final StoreBuilder<KeyValueStore> storeBuilder) {
- final String[] topics = {topic};
- final String[] predecessors = {sourceName};
- final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName,
- predecessors,
- stateUpdateSupplier);
- globalTopics.add(topic);
- nodeFactories.put(sourceName, new SourceNodeFactory(sourceName,
- topics,
- null,
- timestampExtractor,
- keyDeserializer,
- valueDeserializer));
- nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
- nodeGrouper.add(sourceName);
- nodeFactory.addStateStore(name);
- nodeFactories.put(processorName, nodeFactory);
- nodeGrouper.add(processorName);
- nodeGrouper.unite(processorName, predecessors);
- globalStateBuilders.put(name, storeBuilder);
- connectSourceStoreAndTopic(name, topic);
- }
-
private void connectProcessorAndStateStore(final String processorName,
final String stateStoreName) {
if (globalStateBuilders.containsKey(stateStoreName)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 481c860..dd5cff7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
@@ -30,8 +29,6 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread;
@@ -39,8 +36,8 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@@ -62,6 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -70,12 +68,12 @@ public class KafkaStreamsTest {
private static final int NUM_BROKERS = 1;
private static final int NUM_THREADS = 2;
- // We need this to avoid the KafkaConsumer hanging on poll (this may occur if the test doesn't complete
- // quick enough)
+ // We need this to avoid the KafkaConsumer hanging on poll
+ // (this may occur if the test doesn't complete quickly enough)
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final StreamsBuilder builder = new StreamsBuilder();
- private KafkaStreams streams;
+ private KafkaStreams globalStreams;
private Properties props;
@Before
@@ -87,59 +85,64 @@ public class KafkaStreamsTest {
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
- streams = new KafkaStreams(builder.build(), props);
+ globalStreams = new KafkaStreams(builder.build(), props);
+ }
+
+ @After
+ public void cleanup() {
+ if (globalStreams != null) {
+ globalStreams.close();
+ }
}
@Test
public void testStateChanges() throws InterruptedException {
- final StreamsBuilder builder = new StreamsBuilder();
- final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-
final StateListenerStub stateListener = new StateListenerStub();
- streams.setStateListener(stateListener);
- Assert.assertEquals(streams.state(), KafkaStreams.State.CREATED);
+ globalStreams.setStateListener(stateListener);
+
+ Assert.assertEquals(globalStreams.state(), KafkaStreams.State.CREATED);
Assert.assertEquals(stateListener.numChanges, 0);
- streams.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return streams.state() == KafkaStreams.State.RUNNING;
- }
- }, 10 * 1000, "Streams never started.");
- streams.close();
- Assert.assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
+ globalStreams.start();
+ TestUtils.waitForCondition(
+ () -> globalStreams.state() == KafkaStreams.State.RUNNING,
+ 10 * 1000,
+ "Streams never started.");
+
+ globalStreams.close();
+
+ Assert.assertEquals(globalStreams.state(), KafkaStreams.State.NOT_RUNNING);
}
@Test
public void testStateCloseAfterCreate() {
- final StreamsBuilder builder = new StreamsBuilder();
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
- final StateListenerStub stateListener = new StateListenerStub();
- streams.setStateListener(stateListener);
- streams.close();
+ try {
+ final StateListenerStub stateListener = new StateListenerStub();
+ streams.setStateListener(stateListener);
+ } finally {
+ streams.close();
+ }
+
Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
}
@Test
public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
- final StreamsBuilder builder = new StreamsBuilder();
builder.globalTable("anyTopic");
final List<Node> nodes = Arrays.asList(new Node(0, "localhost", 8121));
final Cluster cluster = new Cluster("mockClusterId", nodes,
- Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
- Collections.<String>emptySet(), nodes.get(0));
+ Collections.emptySet(), Collections.<String>emptySet(),
+ Collections.emptySet(), nodes.get(0));
final MockClientSupplier clientSupplier = new MockClientSupplier();
clientSupplier.setClusterForAdminClient(cluster);
final KafkaStreams streams = new KafkaStreams(builder.build(), props, clientSupplier);
streams.close();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return streams.state() == KafkaStreams.State.NOT_RUNNING;
- }
- }, 10 * 1000, "Streams never stopped.");
+ TestUtils.waitForCondition(
+ () -> streams.state() == KafkaStreams.State.NOT_RUNNING,
+ 10 * 1000,
+ "Streams never stopped.");
// Ensure that any created clients are closed
assertTrue(clientSupplier.consumer.closed());
@@ -151,88 +154,78 @@ public class KafkaStreamsTest {
@Test
public void testStateThreadClose() throws Exception {
- final StreamsBuilder builder = new StreamsBuilder();
// make sure we have the global state thread running too
builder.globalTable("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+ try {
+ final java.lang.reflect.Field threadsField = streams.getClass().getDeclaredField("threads");
+ threadsField.setAccessible(true);
+ final StreamThread[] threads = (StreamThread[]) threadsField.get(streams);
+ assertEquals(NUM_THREADS, threads.length);
+ assertEquals(streams.state(), KafkaStreams.State.CREATED);
- final java.lang.reflect.Field threadsField = streams.getClass().getDeclaredField("threads");
- threadsField.setAccessible(true);
- final StreamThread[] threads = (StreamThread[]) threadsField.get(streams);
-
- assertEquals(NUM_THREADS, threads.length);
- assertEquals(streams.state(), KafkaStreams.State.CREATED);
-
- streams.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return streams.state() == KafkaStreams.State.RUNNING;
+ streams.start();
+ TestUtils.waitForCondition(
+ () -> streams.state() == KafkaStreams.State.RUNNING,
+ 10 * 1000,
+ "Streams never started.");
+
+ for (int i = 0; i < NUM_THREADS; i++) {
+ final StreamThread tmpThread = threads[i];
+ tmpThread.shutdown();
+ TestUtils.waitForCondition(
+ () -> tmpThread.state() == StreamThread.State.DEAD,
+ 10 * 1000,
+ "Thread never stopped.");
+ threads[i].join();
}
- }, 10 * 1000, "Streams never started.");
-
- for (int i = 0; i < NUM_THREADS; i++) {
- final StreamThread tmpThread = threads[i];
- tmpThread.shutdown();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return tmpThread.state() == StreamThread.State.DEAD;
- }
- }, 10 * 1000, "Thread never stopped.");
- threads[i].join();
+ TestUtils.waitForCondition(
+ () -> streams.state() == KafkaStreams.State.ERROR,
+ 10 * 1000,
+ "Streams never stopped.");
+ } finally {
+ streams.close();
}
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return streams.state() == KafkaStreams.State.ERROR;
- }
- }, 10 * 1000, "Streams never stopped.");
- streams.close();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return streams.state() == KafkaStreams.State.NOT_RUNNING;
- }
- }, 10 * 1000, "Streams never stopped.");
+
+ TestUtils.waitForCondition(
+ () -> streams.state() == KafkaStreams.State.NOT_RUNNING,
+ 10 * 1000,
+ "Streams never stopped.");
final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
globalThreadField.setAccessible(true);
final GlobalStreamThread globalStreamThread = (GlobalStreamThread) globalThreadField.get(streams);
- assertEquals(globalStreamThread, null);
+ assertNull(globalStreamThread);
}
@Test
public void testStateGlobalThreadClose() throws Exception {
- final StreamsBuilder builder = new StreamsBuilder();
// make sure we have the global state thread running too
builder.globalTable("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+ try {
+ streams.start();
+ TestUtils.waitForCondition(
+ () -> streams.state() == KafkaStreams.State.RUNNING,
+ 10 * 1000,
+ "Streams never started.");
+ final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
+ globalThreadField.setAccessible(true);
+ final GlobalStreamThread globalStreamThread = (GlobalStreamThread) globalThreadField.get(streams);
+ globalStreamThread.shutdown();
+ TestUtils.waitForCondition(
+ () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD,
+ 10 * 1000,
+ "Thread never stopped.");
+ globalStreamThread.join();
+ assertEquals(streams.state(), KafkaStreams.State.ERROR);
+ } finally {
+ streams.close();
+ }
- streams.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return streams.state() == KafkaStreams.State.RUNNING;
- }
- }, 10 * 1000, "Streams never started.");
- final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
- globalThreadField.setAccessible(true);
- final GlobalStreamThread globalStreamThread = (GlobalStreamThread) globalThreadField.get(streams);
- globalStreamThread.shutdown();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return globalStreamThread.state() == GlobalStreamThread.State.DEAD;
- }
- }, 10 * 1000, "Thread never stopped.");
- globalStreamThread.join();
- assertEquals(streams.state(), KafkaStreams.State.ERROR);
-
- streams.close();
assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
}
@@ -247,7 +240,6 @@ public class KafkaStreamsTest {
props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 200);
- final StreamsBuilder builder = new StreamsBuilder();
// make sure we have the global state thread running too
builder.globalTable("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
@@ -256,6 +248,8 @@ public class KafkaStreamsTest {
fail("expected start() to time out and throw an exception.");
} catch (final StreamsException expected) {
// This is a result of not being able to connect to the broker.
+ } finally {
+ streams.close();
}
// There's nothing to assert... We're testing that this operation actually completes.
}
@@ -269,12 +263,14 @@ public class KafkaStreamsTest {
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
- final StreamsBuilder builder = new StreamsBuilder();
// make sure we have the global state thread running too
builder.table("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
- streams.start();
- streams.close();
+ try {
+ streams.start();
+ } finally {
+ streams.close();
+ }
// There's nothing to assert... We're testing that this operation actually completes.
}
@@ -282,98 +278,97 @@ public class KafkaStreamsTest {
@Test
public void testInitializesAndDestroysMetricsReporters() {
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
- final StreamsBuilder builder = new StreamsBuilder();
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
- final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
- final int initDiff = newInitCount - oldInitCount;
- assertTrue("some reporters should be initialized by calling on construction", initDiff > 0);
- streams.start();
- final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
- streams.close();
- assertEquals(oldCloseCount + initDiff, MockMetricsReporter.CLOSE_COUNT.get());
+ try {
+ final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
+ final int initDiff = newInitCount - oldInitCount;
+ assertTrue("some reporters should be initialized by calling on construction", initDiff > 0);
+
+ streams.start();
+ final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
+ streams.close();
+ assertEquals(oldCloseCount + initDiff, MockMetricsReporter.CLOSE_COUNT.get());
+ } finally {
+ streams.close();
+ }
}
@Test
public void testCloseIsIdempotent() {
- streams.close();
+ globalStreams.close();
final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
- streams.close();
+ globalStreams.close();
Assert.assertEquals("subsequent close() calls should do nothing",
closeCount, MockMetricsReporter.CLOSE_COUNT.get());
}
@Test
public void testCannotStartOnceClosed() {
- streams.start();
- streams.close();
+ globalStreams.start();
+ globalStreams.close();
try {
- streams.start();
+ globalStreams.start();
fail("Should have throw IllegalStateException");
} catch (final IllegalStateException expected) {
// this is ok
} finally {
- streams.close();
+ globalStreams.close();
}
}
@Test
public void testCannotStartTwice() {
- streams.start();
+ globalStreams.start();
try {
- streams.start();
+ globalStreams.start();
} catch (final IllegalStateException e) {
// this is ok
} finally {
- streams.close();
+ globalStreams.close();
}
}
@Test
public void shouldNotSetGlobalRestoreListenerAfterStarting() {
- streams.start();
+ globalStreams.start();
try {
- streams.setGlobalStateRestoreListener(new MockStateRestoreListener());
+ globalStreams.setGlobalStateRestoreListener(new MockStateRestoreListener());
fail("Should throw an IllegalStateException");
} catch (final IllegalStateException e) {
// expected
} finally {
- streams.close();
+ globalStreams.close();
}
}
@Test
public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
- streams.start();
+ globalStreams.start();
try {
- streams.setUncaughtExceptionHandler(null);
+ globalStreams.setUncaughtExceptionHandler(null);
fail("Should throw IllegalStateException");
} catch (final IllegalStateException e) {
// expected
- } finally {
- streams.close();
}
}
@Test
public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
- streams.start();
+ globalStreams.start();
try {
- streams.setStateListener(null);
+ globalStreams.setStateListener(null);
fail("Should throw IllegalStateException");
} catch (final IllegalStateException e) {
// expected
- } finally {
- streams.close();
}
}
@Test
public void testIllegalMetricsConfig() {
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
- final StreamsBuilder builder = new StreamsBuilder();
try {
new KafkaStreams(builder.build(), props);
@@ -384,86 +379,79 @@ public class KafkaStreamsTest {
@Test
public void testLegalMetricsConfig() {
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
- final StreamsBuilder builder1 = new StreamsBuilder();
- final KafkaStreams streams1 = new KafkaStreams(builder1.build(), props);
- streams1.close();
+ new KafkaStreams(builder.build(), props).close();
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString());
- final StreamsBuilder builder2 = new StreamsBuilder();
- new KafkaStreams(builder2.build(), props);
+ new KafkaStreams(builder.build(), props).close();
}
@Test(expected = IllegalStateException.class)
public void shouldNotGetAllTasksWhenNotRunning() {
- streams.allMetadata();
+ globalStreams.allMetadata();
}
@Test(expected = IllegalStateException.class)
public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
- streams.allMetadataForStore("store");
+ globalStreams.allMetadataForStore("store");
}
@Test(expected = IllegalStateException.class)
public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
- streams.metadataForKey("store", "key", Serdes.String().serializer());
+ globalStreams.metadataForKey("store", "key", Serdes.String().serializer());
}
@Test(expected = IllegalStateException.class)
public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
- streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
- @Override
- public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
- return 0;
- }
- });
+ globalStreams.metadataForKey("store", "key", (topic, key, value, numPartitions) -> 0);
}
@Test
public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ KafkaStreams streams = null;
try {
final StreamsBuilder builder = new StreamsBuilder();
final CountDownLatch latch = new CountDownLatch(1);
final String topic = "input";
- CLUSTER.createTopic(topic);
+ CLUSTER.createTopics(topic);
builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
- .foreach(new ForeachAction<String, String>() {
- @Override
- public void apply(final String key, final String value) {
- try {
- latch.countDown();
- while (keepRunning.get()) {
- Thread.sleep(10);
- }
- } catch (final InterruptedException e) {
- // no-op
+ .foreach((key, value) -> {
+ try {
+ latch.countDown();
+ while (keepRunning.get()) {
+ Thread.sleep(10);
}
+ } catch (final InterruptedException e) {
+ // no-op
}
});
- final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+ streams = new KafkaStreams(builder.build(), props);
streams.start();
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
- Collections.singletonList(new KeyValue<>("A", "A")),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- System.currentTimeMillis());
+ Collections.singletonList(new KeyValue<>("A", "A")),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ System.currentTimeMillis());
assertTrue("Timed out waiting to receive single message", latch.await(30, TimeUnit.SECONDS));
assertFalse(streams.close(10, TimeUnit.MILLISECONDS));
} finally {
// stop the thread so we don't interfere with other tests etc
keepRunning.set(false);
+ if (streams != null) {
+ streams.close();
+ }
}
}
@Test
public void shouldReturnThreadMetadata() {
- streams.start();
- final Set<ThreadMetadata> threadMetadata = streams.localThreadsMetadata();
+ globalStreams.start();
+ final Set<ThreadMetadata> threadMetadata = globalStreams.localThreadsMetadata();
assertNotNull(threadMetadata);
assertEquals(2, threadMetadata.size());
for (final ThreadMetadata metadata : threadMetadata) {
@@ -472,39 +460,32 @@ public class KafkaStreamsTest {
assertEquals(0, metadata.standbyTasks().size());
assertEquals(0, metadata.activeTasks().size());
}
- streams.close();
}
@Test
- public void testCleanup() {
- final StreamsBuilder builder = new StreamsBuilder();
- final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-
- streams.cleanUp();
- streams.start();
- streams.close();
- streams.cleanUp();
+ public void shouldAllowCleanupBeforeStartAndAfterClose() {
+ try {
+ globalStreams.cleanUp();
+ globalStreams.start();
+ } finally {
+ globalStreams.close();
+ }
+ globalStreams.cleanUp();
}
@Test
- public void testCannotCleanupWhileRunning() throws InterruptedException {
- final StreamsBuilder builder = new StreamsBuilder();
- final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+ public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
+ globalStreams.start();
+ TestUtils.waitForCondition(
+ () -> globalStreams.state() == KafkaStreams.State.RUNNING,
+ 10 * 1000,
+ "Streams never started.");
- streams.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return streams.state() == KafkaStreams.State.RUNNING;
- }
- }, 10 * 1000, "Streams never started.");
try {
- streams.cleanUp();
+ globalStreams.cleanUp();
fail("Should have thrown IllegalStateException");
} catch (final IllegalStateException expected) {
assertEquals("Cannot clean up while running.", expected.getMessage());
- } finally {
- streams.close();
}
}
@@ -520,19 +501,17 @@ public class KafkaStreamsTest {
builder.table(topic, consumed);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
- final CountDownLatch latch = new CountDownLatch(1);
- streams.setStateListener(new KafkaStreams.StateListener() {
- @Override
- public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
+ try {
+ final CountDownLatch latch = new CountDownLatch(1);
+ streams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
latch.countDown();
}
- }
- });
- final String appDir = props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
- final File oldTaskDir = new File(appDir, "10_1");
- assertTrue(oldTaskDir.mkdirs());
- try {
+ });
+ final String appDir = props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
+ final File oldTaskDir = new File(appDir, "10_1");
+ assertTrue(oldTaskDir.mkdirs());
+
streams.start();
latch.await(30, TimeUnit.SECONDS);
verifyCleanupStateDir(appDir, oldTaskDir);
@@ -545,12 +524,10 @@ public class KafkaStreamsTest {
private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException {
final File taskDir = new File(appDir, "0_0");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return !oldTaskDir.exists() && taskDir.exists();
- }
- }, 30000, "cleanup has not successfully run");
+ TestUtils.waitForCondition(
+ () -> !oldTaskDir.exists() && taskDir.exists(),
+ 30000,
+ "cleanup has not successfully run");
assertTrue(taskDir.exists());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index caf2b10..02840b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -33,8 +33,8 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStore;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
@@ -311,7 +311,7 @@ public class TopologyTest {
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
mockStoreBuilder();
- EasyMock.expect(storeBuilder.build()).andReturn(new MockStateStore("store", false));
+ EasyMock.expect(storeBuilder.build()).andReturn(new MockKeyValueStore("store", false));
EasyMock.replay(storeBuilder);
topology
.addSource(sourceNodeName, "topic")
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 7cfde61..be87eb2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -41,7 +41,7 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStoreBuilder;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -239,7 +239,7 @@ public class RegexSourceIntegrationTest {
public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException {
final ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
- final StoreBuilder storeBuilder = new MockStoreBuilder("testStateStore", false);
+ final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("testStateStore", false);
final long thirtySecondTimeout = 30 * 1000;
final TopologyWrapper topology = new TopologyWrapper();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index c4699ec..2869826 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.test.MockStateStore;
+import org.apache.kafka.test.MockKeyValueStore;
import org.junit.Before;
import org.junit.Test;
@@ -47,7 +47,7 @@ public class AbstractProcessorContextTest {
private final MockStreamsMetrics metrics = new MockStreamsMetrics(new Metrics());
private final AbstractProcessorContext context = new TestProcessorContext(metrics);
- private final MockStateStore stateStore = new MockStateStore("store", false);
+ private final MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
private final ProcessorRecordContext recordContext = new ProcessorRecordContext(10, System.currentTimeMillis(), 1, "foo", headers);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 7230e5f..daf1f33 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStoreBuilder;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@@ -53,6 +53,7 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -61,7 +62,7 @@ public class InternalTopologyBuilderTest {
private final Serde<String> stringSerde = Serdes.String();
private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
- private final StoreBuilder storeBuilder = new MockStoreBuilder("store", false);
+ private final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("store", false);
@Test
public void shouldAddSourceWithOffsetReset() {
@@ -374,14 +375,14 @@ public class InternalTopologyBuilderTest {
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
- builder.addStateStore(new MockStoreBuilder("store-1", false), "processor-1", "processor-2");
+ builder.addStateStore(new MockKeyValueStoreBuilder("store-1", false), "processor-1", "processor-2");
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
- builder.addStateStore(new MockStoreBuilder("store-2", false), "processor-3", "processor-4");
+ builder.addStateStore(new MockKeyValueStoreBuilder("store-2", false), "processor-3", "processor-4");
builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5");
- builder.addStateStore(new MockStoreBuilder("store-3", false));
+ builder.addStateStore(new MockKeyValueStoreBuilder("store-3", false));
builder.connectProcessorAndStateStores("processor-5", "store-3");
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
@@ -429,6 +430,57 @@ public class InternalTopologyBuilderTest {
assertEquals(mkSet("source-5"), nodeNames(topology2.processors()));
}
+ @Test
+ public void shouldAllowIncrementalBuilds() {
+ Map<Integer, Set<String>> oldNodeGroups, newNodeGroups;
+
+ oldNodeGroups = builder.nodeGroups();
+ builder.addSource(null, "source-1", null, null, null, "topic-1");
+ builder.addSource(null, "source-2", null, null, null, "topic-2");
+ newNodeGroups = builder.nodeGroups();
+ assertNotEquals(oldNodeGroups, newNodeGroups);
+
+ oldNodeGroups = newNodeGroups;
+ builder.addSource(null, "source-3", null, null, null, Pattern.compile(""));
+ builder.addSource(null, "source-4", null, null, null, Pattern.compile(""));
+ newNodeGroups = builder.nodeGroups();
+ assertNotEquals(oldNodeGroups, newNodeGroups);
+
+ oldNodeGroups = newNodeGroups;
+ builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+ builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
+ builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
+ newNodeGroups = builder.nodeGroups();
+ assertNotEquals(oldNodeGroups, newNodeGroups);
+
+ oldNodeGroups = newNodeGroups;
+ builder.addSink("sink-1", "sink-topic", null, null, null, "processor-1");
+ newNodeGroups = builder.nodeGroups();
+ assertNotEquals(oldNodeGroups, newNodeGroups);
+
+ oldNodeGroups = newNodeGroups;
+ builder.addSink("sink-2", (k, v, ctx) -> "sink-topic", null, null, null, "processor-2");
+ newNodeGroups = builder.nodeGroups();
+ assertNotEquals(oldNodeGroups, newNodeGroups);
+
+ oldNodeGroups = newNodeGroups;
+ builder.addStateStore(new MockKeyValueStoreBuilder("store-1", false), "processor-1", "processor-2");
+ newNodeGroups = builder.nodeGroups();
+ assertNotEquals(oldNodeGroups, newNodeGroups);
+
+ oldNodeGroups = newNodeGroups;
+ builder.addStateStore(new MockKeyValueStoreBuilder("store-2", false));
+ builder.connectProcessorAndStateStores("processor-2", "store-2");
+ builder.connectProcessorAndStateStores("processor-3", "store-2");
+ newNodeGroups = builder.nodeGroups();
+ assertNotEquals(oldNodeGroups, newNodeGroups);
+
+ oldNodeGroups = newNodeGroups;
+ builder.addGlobalStore(new MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled(), "globalSource", null, null, null, "globalTopic", "global-processor", new MockProcessorSupplier());
+ newNodeGroups = builder.nodeGroups();
+ assertNotEquals(oldNodeGroups, newNodeGroups);
+ }
+
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullNameWhenAddingSink() {
builder.addSink(null, "topic", null, null, null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 4287c77..cd95a68 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockBatchingStateRestoreListener;
-import org.apache.kafka.test.MockStateStore;
+import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.NoOpProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -69,15 +69,15 @@ public class ProcessorStateManagerTest {
private final String nonPersistentStoreName = "nonPersistentStore";
private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName);
private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName);
- private final MockStateStore persistentStore = new MockStateStore(persistentStoreName, true);
- private final MockStateStore nonPersistentStore = new MockStateStore(nonPersistentStoreName, false);
+ private final MockKeyValueStore persistentStore = new MockKeyValueStore(persistentStoreName, true);
+ private final MockKeyValueStore nonPersistentStore = new MockKeyValueStore(nonPersistentStoreName, false);
private final TopicPartition persistentStorePartition = new TopicPartition(persistentStoreTopicName, 1);
- private final String storeName = "mockStateStore";
+ private final String storeName = "mockKeyValueStore";
private final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
private final TopicPartition changelogTopicPartition = new TopicPartition(changelogTopic, 0);
private final TaskId taskId = new TaskId(0, 1);
private final MockChangelogReader changelogReader = new MockChangelogReader();
- private final MockStateStore mockStateStore = new MockStateStore(storeName, true);
+ private final MockKeyValueStore mockKeyValueStore = new MockKeyValueStore(storeName, true);
private final byte[] key = new byte[]{0x0, 0x0, 0x0, 0x1};
private final byte[] value = "the-value".getBytes(Charset.forName("UTF-8"));
private final ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(changelogTopic, 0, 0, key, value);
@@ -115,7 +115,7 @@ public class ProcessorStateManagerTest {
final KeyValue<byte[], byte[]> expectedKeyValue = KeyValue.pair(key, value);
- final MockStateStore persistentStore = getPersistentStore();
+ final MockKeyValueStore persistentStore = getPersistentStore();
final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
try {
@@ -137,7 +137,7 @@ public class ProcessorStateManagerTest {
final TaskId taskId = new TaskId(0, 2);
final Integer intKey = 1;
- final MockStateStore persistentStore = getPersistentStore();
+ final MockKeyValueStore persistentStore = getPersistentStore();
final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
try {
@@ -158,7 +158,7 @@ public class ProcessorStateManagerTest {
public void testRegisterPersistentStore() throws IOException {
final TaskId taskId = new TaskId(0, 2);
- final MockStateStore persistentStore = getPersistentStore();
+ final MockKeyValueStore persistentStore = getPersistentStore();
final ProcessorStateManager stateMgr = new ProcessorStateManager(
taskId,
noPartitions,
@@ -184,8 +184,8 @@ public class ProcessorStateManagerTest {
@Test
public void testRegisterNonPersistentStore() throws IOException {
- final MockStateStore nonPersistentStore
- = new MockStateStore(nonPersistentStoreName, false); // non persistent store
+ final MockKeyValueStore nonPersistentStore
+ = new MockKeyValueStore(nonPersistentStoreName, false); // non persistent store
final ProcessorStateManager stateMgr = new ProcessorStateManager(
new TaskId(0, 2),
noPartitions,
@@ -233,9 +233,9 @@ public class ProcessorStateManagerTest {
final TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
final TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
- final MockStateStore store1 = new MockStateStore(storeName1, true);
- final MockStateStore store2 = new MockStateStore(storeName2, true);
- final MockStateStore store3 = new MockStateStore(storeName3, true);
+ final MockKeyValueStore store1 = new MockKeyValueStore(storeName1, true);
+ final MockKeyValueStore store2 = new MockKeyValueStore(storeName2, true);
+ final MockKeyValueStore store3 = new MockKeyValueStore(storeName3, true);
// if there is a source partition, inherit the partition id
final Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
@@ -272,7 +272,7 @@ public class ProcessorStateManagerTest {
@Test
public void testGetStore() throws IOException {
- final MockStateStore mockStateStore = new MockStateStore(nonPersistentStoreName, false);
+ final MockKeyValueStore mockKeyValueStore = new MockKeyValueStore(nonPersistentStoreName, false);
final ProcessorStateManager stateMgr = new ProcessorStateManager(
new TaskId(0, 1),
noPartitions,
@@ -283,10 +283,10 @@ public class ProcessorStateManagerTest {
false,
logContext);
try {
- stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
+ stateMgr.register(mockKeyValueStore, mockKeyValueStore.stateRestoreCallback);
assertNull(stateMgr.getStore("noSuchStore"));
- assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName));
+ assertEquals(mockKeyValueStore, stateMgr.getStore(nonPersistentStoreName));
} finally {
stateMgr.close(Collections.emptyMap());
@@ -361,7 +361,7 @@ public class ProcessorStateManagerTest {
final Map<TopicPartition, Long> offsets = Collections.singletonMap(persistentStorePartition, 99L);
checkpoint.write(offsets);
- final MockStateStore persistentStore = new MockStateStore(persistentStoreName, true);
+ final MockKeyValueStore persistentStore = new MockKeyValueStore(persistentStoreName, true);
final ProcessorStateManager stateMgr = new ProcessorStateManager(
taskId,
noPartitions,
@@ -476,7 +476,7 @@ public class ProcessorStateManagerTest {
logContext);
try {
- stateManager.register(new MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), null);
+ stateManager.register(new MockKeyValueStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), null);
fail("should have thrown illegal argument exception when store name same as checkpoint file");
} catch (final IllegalArgumentException e) {
//pass
@@ -495,10 +495,10 @@ public class ProcessorStateManagerTest {
false,
logContext);
- stateManager.register(mockStateStore, null);
+ stateManager.register(mockKeyValueStore, null);
try {
- stateManager.register(mockStateStore, null);
+ stateManager.register(mockKeyValueStore, null);
fail("should have thrown illegal argument exception when store with same name already registered");
} catch (final IllegalArgumentException e) {
// pass
@@ -519,7 +519,7 @@ public class ProcessorStateManagerTest {
false,
logContext);
- final MockStateStore stateStore = new MockStateStore(storeName, true) {
+ final MockKeyValueStore stateStore = new MockKeyValueStore(storeName, true) {
@Override
public void flush() {
throw new RuntimeException("KABOOM!");
@@ -548,7 +548,7 @@ public class ProcessorStateManagerTest {
false,
logContext);
- final MockStateStore stateStore = new MockStateStore(storeName, true) {
+ final MockKeyValueStore stateStore = new MockKeyValueStore(storeName, true) {
@Override
public void close() {
throw new RuntimeException("KABOOM!");
@@ -578,13 +578,13 @@ public class ProcessorStateManagerTest {
final AtomicBoolean flushedStore = new AtomicBoolean(false);
- final MockStateStore stateStore1 = new MockStateStore(storeName, true) {
+ final MockKeyValueStore stateStore1 = new MockKeyValueStore(storeName, true) {
@Override
public void flush() {
throw new RuntimeException("KABOOM!");
}
};
- final MockStateStore stateStore2 = new MockStateStore(storeName + "2", true) {
+ final MockKeyValueStore stateStore2 = new MockKeyValueStore(storeName + "2", true) {
@Override
public void flush() {
flushedStore.set(true);
@@ -613,13 +613,13 @@ public class ProcessorStateManagerTest {
final AtomicBoolean closedStore = new AtomicBoolean(false);
- final MockStateStore stateStore1 = new MockStateStore(storeName, true) {
+ final MockKeyValueStore stateStore1 = new MockKeyValueStore(storeName, true) {
@Override
public void close() {
throw new RuntimeException("KABOOM!");
}
};
- final MockStateStore stateStore2 = new MockStateStore(storeName + "2", true) {
+ final MockKeyValueStore stateStore2 = new MockKeyValueStore(storeName + "2", true) {
@Override
public void close() {
closedStore.set(true);
@@ -690,8 +690,8 @@ public class ProcessorStateManagerTest {
eosEnabled,
logContext);
- final MockStateStore stateStore = new MockStateStore(storeName, true);
- final MockStateStore stateStore2 = new MockStateStore(store2Name, true);
+ final MockKeyValueStore stateStore = new MockKeyValueStore(storeName, true);
+ final MockKeyValueStore stateStore2 = new MockKeyValueStore(store2Name, true);
stateManager.register(stateStore, stateStore.stateRestoreCallback);
stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
@@ -726,8 +726,8 @@ public class ProcessorStateManagerTest {
logContext);
}
- private MockStateStore getPersistentStore() {
- return new MockStateStore("persistentStore", true);
+ private MockKeyValueStore getPersistentStore() {
+ return new MockKeyValueStore("persistentStore", true);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index c822fc3..820191d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -49,10 +49,10 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
+import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockRestoreConsumer;
import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.MockStateStore;
-import org.apache.kafka.test.MockStoreBuilder;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -104,7 +104,7 @@ public class StandbyTaskTest {
private final Set<TopicPartition> topicPartitions = Collections.emptySet();
private final ProcessorTopology topology = ProcessorTopology.withLocalStores(
- mkList(new MockStoreBuilder(storeName1, false).build(), new MockStoreBuilder(storeName2, true).build()),
+ mkList(new MockKeyValueStoreBuilder(storeName1, false).build(), new MockKeyValueStoreBuilder(storeName2, true).build()),
mkMap(
mkEntry(storeName1, storeChangelogTopicName1),
mkEntry(storeName2, storeChangelogTopicName2)
@@ -113,7 +113,7 @@ public class StandbyTaskTest {
private final TopicPartition globalTopicPartition = new TopicPartition(globalStoreName, 0);
private final Set<TopicPartition> ktablePartitions = Utils.mkSet(globalTopicPartition);
private final ProcessorTopology ktableTopology = ProcessorTopology.withLocalStores(
- singletonList(new MockStoreBuilder(globalTopicPartition.topic(), true).withLoggingDisabled().build()),
+ singletonList(new MockKeyValueStoreBuilder(globalTopicPartition.topic(), true).withLoggingDisabled().build()),
mkMap(
mkEntry(globalStoreName, globalTopicPartition.topic())
)
@@ -208,8 +208,8 @@ public class StandbyTaskTest {
task.update(partition2, restoreStateConsumer.poll(Duration.ofMillis(100)).records(partition2));
final StandbyContextImpl context = (StandbyContextImpl) task.context();
- final MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1);
- final MockStateStore store2 = (MockStateStore) context.getStateMgr().getStore(storeName2);
+ final MockKeyValueStore store1 = (MockKeyValueStore) context.getStateMgr().getStore(storeName1);
+ final MockKeyValueStore store2 = (MockKeyValueStore) context.getStateMgr().getStore(storeName2);
assertEquals(Collections.emptyList(), store1.keys);
assertEquals(mkList(1, 2, 3), store2.keys);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 834ab3e..d332b5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -50,7 +50,7 @@ import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.MockStateStore;
+import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
@@ -109,7 +109,7 @@ public class StreamTaskTest {
private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode<>(10L, PunctuationType.WALL_CLOCK_TIME);
private final String storeName = "store";
- private final StateStore stateStore = new MockStateStore(storeName, false);
+ private final StateStore stateStore = new MockKeyValueStore(storeName, false);
private final TopicPartition changelogPartition = new TopicPartition("store-changelog", 0);
private final Long offset = 543L;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 9693184..6102969 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -43,7 +43,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStoreBuilder;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Test;
@@ -333,10 +333,10 @@ public class StreamsPartitionAssignorTest {
public void testAssignWithPartialTopology() {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
- builder.addStateStore(new MockStoreBuilder("store1", false), "processor1");
+ builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1");
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
- builder.addStateStore(new MockStoreBuilder("store2", false), "processor2");
+ builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), "processor2");
final List<String> topics = Utils.mkList("topic1", "topic2");
final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
@@ -474,11 +474,11 @@ public class StreamsPartitionAssignorTest {
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
- builder.addStateStore(new MockStoreBuilder("store1", false), "processor-1");
+ builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor-1");
builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
- builder.addStateStore(new MockStoreBuilder("store2", false), "processor-2");
- builder.addStateStore(new MockStoreBuilder("store3", false), "processor-2");
+ builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), "processor-2");
+ builder.addStateStore(new MockKeyValueStoreBuilder("store3", false), "processor-2");
final List<String> topics = Utils.mkList("topic1", "topic2");
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
similarity index 70%
rename from streams/src/test/java/org/apache/kafka/test/MockStateStore.java
rename to streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
index a2b0d21..8fd1f74 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
@@ -21,10 +21,13 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
import java.util.ArrayList;
+import java.util.List;
-public class MockStateStore implements StateStore {
+public class MockKeyValueStore implements KeyValueStore {
private final String name;
private final boolean persistent;
@@ -33,8 +36,8 @@ public class MockStateStore implements StateStore {
public boolean closed = true;
public final ArrayList<Integer> keys = new ArrayList<>();
- public MockStateStore(final String name,
- final boolean persistent) {
+ public MockKeyValueStore(final String name,
+ final boolean persistent) {
this.name = name;
this.persistent = persistent;
}
@@ -81,4 +84,44 @@ public class MockStateStore implements StateStore {
keys.add(deserializer.deserialize("", key));
}
};
+
+ @Override
+ public void put(final Object key, final Object value) {
+
+ }
+
+ @Override
+ public Object putIfAbsent(final Object key, final Object value) {
+ return null;
+ }
+
+ @Override
+ public Object delete(final Object key) {
+ return null;
+ }
+
+ @Override
+ public void putAll(final List entries) {
+
+ }
+
+ @Override
+ public Object get(final Object key) {
+ return null;
+ }
+
+ @Override
+ public KeyValueIterator range(final Object from, final Object to) {
+ return null;
+ }
+
+ @Override
+ public KeyValueIterator all() {
+ return null;
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return 0;
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStoreBuilder.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java
similarity index 78%
rename from streams/src/test/java/org/apache/kafka/test/MockStoreBuilder.java
rename to streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java
index 41aa239..70b4e02 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStoreBuilder.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java
@@ -18,22 +18,22 @@ package org.apache.kafka.test;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.AbstractStoreBuilder;
-public class MockStoreBuilder extends AbstractStoreBuilder<Integer, byte[], StateStore> {
+public class MockKeyValueStoreBuilder extends AbstractStoreBuilder<Integer, byte[], KeyValueStore> {
private final boolean persistent;
- public MockStoreBuilder(final String storeName, final boolean persistent) {
+ public MockKeyValueStoreBuilder(final String storeName, final boolean persistent) {
super(storeName, Serdes.Integer(), Serdes.ByteArray(), new MockTime());
this.persistent = persistent;
}
@Override
- public StateStore build() {
- return new MockStateStore(name, persistent);
+ public KeyValueStore build() {
+ return new MockKeyValueStore(name, persistent);
}
}