You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/09/13 20:04:17 UTC

[kafka] branch trunk updated: MINOR: Insure that KafkaStreams client is closed if test fails (#5618)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 466d893  MINOR: Insure that KafkaStreams client is closed if test fails (#5618)
466d893 is described below

commit 466d89306ea23d1f17be0914416944618ceb9f86
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Sep 13 13:04:09 2018 -0700

    MINOR: Insure that KafkaStreams client is closed if test fails (#5618)
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, John Roessler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Eno Thereska <en...@amazon.com>
---
 .../kstream/internals/InternalStreamsBuilder.java  |   2 +-
 .../internals/InternalTopologyBuilder.java         |  70 ++--
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 381 ++++++++++-----------
 .../org/apache/kafka/streams/TopologyTest.java     |   4 +-
 .../integration/RegexSourceIntegrationTest.java    |   4 +-
 .../internals/AbstractProcessorContextTest.java    |   4 +-
 .../internals/InternalTopologyBuilderTest.java     |  62 +++-
 .../internals/ProcessorStateManagerTest.java       |  60 ++--
 .../processor/internals/StandbyTaskTest.java       |  12 +-
 .../processor/internals/StreamTaskTest.java        |   4 +-
 .../internals/StreamsPartitionAssignorTest.java    |  12 +-
 ...{MockStateStore.java => MockKeyValueStore.java} |  49 ++-
 ...eBuilder.java => MockKeyValueStoreBuilder.java} |  10 +-
 13 files changed, 368 insertions(+), 306 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index c1532af..49f49d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -149,7 +149,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         // explicitly disable logging for global stores
         materialized.withLoggingDisabled();
         final StoreBuilder storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
-        final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
+        final String sourceName = newProcessorName(KTableImpl.SOURCE_NAME);
         final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
         final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name());
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index edff470..7fa2851 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -390,6 +390,7 @@ public class InternalTopologyBuilder {
         nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
         nodeToSourceTopics.put(name, Arrays.asList(topics));
         nodeGrouper.add(name);
+        nodeGroups = null;
     }
 
     public final void addSource(final Topology.AutoOffsetReset offsetReset,
@@ -428,6 +429,7 @@ public class InternalTopologyBuilder {
         nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
         nodeToSourcePatterns.put(name, topicPattern);
         nodeGrouper.add(name);
+        nodeGroups = null;
     }
 
     public final <K, V> void addSink(final String name,
@@ -445,6 +447,7 @@ public class InternalTopologyBuilder {
 
         addSink(name, new StaticTopicNameExtractor<>(topic), keySerializer, valSerializer, partitioner, predecessorNames);
         nodeToSinkTopic.put(name, topic);
+        nodeGroups = null;
     }
 
     public final <K, V> void addSink(final String name,
@@ -479,6 +482,7 @@ public class InternalTopologyBuilder {
         nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topicExtractor, keySerializer, valSerializer, partitioner));
         nodeGrouper.add(name);
         nodeGrouper.unite(name, predecessorNames);
+        nodeGroups = null;
     }
 
     public final void addProcessor(final String name,
@@ -507,6 +511,7 @@ public class InternalTopologyBuilder {
         nodeFactories.put(name, new ProcessorNodeFactory(name, predecessorNames, supplier));
         nodeGrouper.add(name);
         nodeGrouper.unite(name, predecessorNames);
+        nodeGroups = null;
     }
 
     public final void addStateStore(final StoreBuilder storeBuilder,
@@ -530,6 +535,7 @@ public class InternalTopologyBuilder {
                 connectProcessorAndStateStore(processorName, storeBuilder.name());
             }
         }
+        nodeGroups = null;
     }
 
     public final void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
@@ -549,15 +555,29 @@ public class InternalTopologyBuilder {
                                      storeBuilder.loggingEnabled());
         validateTopicNotAlreadyRegistered(topic);
 
-        addGlobalStore(sourceName,
-                       timestampExtractor,
-                       keyDeserializer,
-                       valueDeserializer,
-                       topic,
-                       processorName,
-                       stateUpdateSupplier,
-                       storeBuilder.name(),
-                       storeBuilder);
+        final String[] topics = {topic};
+        final String[] predecessors = {sourceName};
+
+        final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName,
+            predecessors,
+            stateUpdateSupplier);
+
+        globalTopics.add(topic);
+        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName,
+            topics,
+            null,
+            timestampExtractor,
+            keyDeserializer,
+            valueDeserializer));
+        nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
+        nodeGrouper.add(sourceName);
+        nodeFactory.addStateStore(storeBuilder.name());
+        nodeFactories.put(processorName, nodeFactory);
+        nodeGrouper.add(processorName);
+        nodeGrouper.unite(processorName, predecessors);
+        globalStateBuilders.put(storeBuilder.name(), storeBuilder);
+        connectSourceStoreAndTopic(storeBuilder.name(), topic);
+        nodeGroups = null;
     }
 
     private void validateTopicNotAlreadyRegistered(final String topic) {
@@ -583,6 +603,7 @@ public class InternalTopologyBuilder {
             Objects.requireNonNull(stateStoreName, "state store name must not be null");
             connectProcessorAndStateStore(processorName, stateStoreName);
         }
+        nodeGroups = null;
     }
 
     private void connectSourceStoreAndTopic(final String sourceStoreName,
@@ -637,37 +658,6 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private void addGlobalStore(final String sourceName,
-                                final TimestampExtractor timestampExtractor,
-                                final Deserializer keyDeserializer,
-                                final Deserializer valueDeserializer,
-                                final String topic,
-                                final String processorName,
-                                final ProcessorSupplier stateUpdateSupplier,
-                                final String name,
-                                final StoreBuilder<KeyValueStore> storeBuilder) {
-        final String[] topics = {topic};
-        final String[] predecessors = {sourceName};
-        final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName,
-                                                                          predecessors,
-                                                                          stateUpdateSupplier);
-        globalTopics.add(topic);
-        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName,
-                                                            topics,
-                                                            null,
-                                                            timestampExtractor,
-                                                            keyDeserializer,
-                                                            valueDeserializer));
-        nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
-        nodeGrouper.add(sourceName);
-        nodeFactory.addStateStore(name);
-        nodeFactories.put(processorName, nodeFactory);
-        nodeGrouper.add(processorName);
-        nodeGrouper.unite(processorName, predecessors);
-        globalStateBuilders.put(name, storeBuilder);
-        connectSourceStoreAndTopic(name, topic);
-    }
-
     private void connectProcessorAndStateStore(final String processorName,
                                                final String stateStoreName) {
         if (globalStateBuilders.containsKey(stateStoreName)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 481c860..dd5cff7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
@@ -30,8 +29,6 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.StreamThread;
@@ -39,8 +36,8 @@ import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -62,6 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -70,12 +68,12 @@ public class KafkaStreamsTest {
 
     private static final int NUM_BROKERS = 1;
     private static final int NUM_THREADS = 2;
-    // We need this to avoid the KafkaConsumer hanging on poll (this may occur if the test doesn't complete
-    // quick enough)
+    // We need this to avoid the KafkaConsumer hanging on poll
+    // (this may occur if the test doesn't complete quickly enough)
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
     private final StreamsBuilder builder = new StreamsBuilder();
-    private KafkaStreams streams;
+    private KafkaStreams globalStreams;
     private Properties props;
 
     @Before
@@ -87,59 +85,64 @@ public class KafkaStreamsTest {
         props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
         props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-        streams = new KafkaStreams(builder.build(), props);
+        globalStreams = new KafkaStreams(builder.build(), props);
+    }
+
+    @After
+    public void cleanup() {
+        if (globalStreams != null) {
+            globalStreams.close();
+        }
     }
 
     @Test
     public void testStateChanges() throws InterruptedException {
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-
         final StateListenerStub stateListener = new StateListenerStub();
-        streams.setStateListener(stateListener);
-        Assert.assertEquals(streams.state(), KafkaStreams.State.CREATED);
+        globalStreams.setStateListener(stateListener);
+
+        Assert.assertEquals(globalStreams.state(), KafkaStreams.State.CREATED);
         Assert.assertEquals(stateListener.numChanges, 0);
 
-        streams.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return streams.state() == KafkaStreams.State.RUNNING;
-            }
-        }, 10 * 1000, "Streams never started.");
-        streams.close();
-        Assert.assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
+        globalStreams.start();
+        TestUtils.waitForCondition(
+            () -> globalStreams.state() == KafkaStreams.State.RUNNING,
+            10 * 1000,
+            "Streams never started.");
+
+        globalStreams.close();
+
+        Assert.assertEquals(globalStreams.state(), KafkaStreams.State.NOT_RUNNING);
     }
 
     @Test
     public void testStateCloseAfterCreate() {
-        final StreamsBuilder builder = new StreamsBuilder();
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
-        final StateListenerStub stateListener = new StateListenerStub();
-        streams.setStateListener(stateListener);
-        streams.close();
+        try {
+            final StateListenerStub stateListener = new StateListenerStub();
+            streams.setStateListener(stateListener);
+        } finally {
+            streams.close();
+        }
+
         Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
     }
 
     @Test
     public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
-        final StreamsBuilder builder = new StreamsBuilder();
         builder.globalTable("anyTopic");
         final List<Node> nodes = Arrays.asList(new Node(0, "localhost", 8121));
         final Cluster cluster = new Cluster("mockClusterId", nodes,
-                                            Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
-                                            Collections.<String>emptySet(), nodes.get(0));
+                                            Collections.emptySet(), Collections.<String>emptySet(),
+                                            Collections.emptySet(), nodes.get(0));
         final MockClientSupplier clientSupplier = new MockClientSupplier();
         clientSupplier.setClusterForAdminClient(cluster);
         final KafkaStreams streams = new KafkaStreams(builder.build(), props, clientSupplier);
         streams.close();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return streams.state() == KafkaStreams.State.NOT_RUNNING;
-            }
-        }, 10 * 1000, "Streams never stopped.");
+        TestUtils.waitForCondition(
+            () -> streams.state() == KafkaStreams.State.NOT_RUNNING,
+            10 * 1000,
+            "Streams never stopped.");
 
         // Ensure that any created clients are closed
         assertTrue(clientSupplier.consumer.closed());
@@ -151,88 +154,78 @@ public class KafkaStreamsTest {
 
     @Test
     public void testStateThreadClose() throws Exception {
-        final StreamsBuilder builder = new StreamsBuilder();
         // make sure we have the global state thread running too
         builder.globalTable("anyTopic");
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
+        try {
+            final java.lang.reflect.Field threadsField = streams.getClass().getDeclaredField("threads");
+            threadsField.setAccessible(true);
+            final StreamThread[] threads = (StreamThread[]) threadsField.get(streams);
 
+            assertEquals(NUM_THREADS, threads.length);
+            assertEquals(streams.state(), KafkaStreams.State.CREATED);
 
-        final java.lang.reflect.Field threadsField = streams.getClass().getDeclaredField("threads");
-        threadsField.setAccessible(true);
-        final StreamThread[] threads = (StreamThread[]) threadsField.get(streams);
-
-        assertEquals(NUM_THREADS, threads.length);
-        assertEquals(streams.state(), KafkaStreams.State.CREATED);
-
-        streams.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return streams.state() == KafkaStreams.State.RUNNING;
+            streams.start();
+            TestUtils.waitForCondition(
+                () -> streams.state() == KafkaStreams.State.RUNNING,
+                10 * 1000,
+                "Streams never started.");
+
+            for (int i = 0; i < NUM_THREADS; i++) {
+                final StreamThread tmpThread = threads[i];
+                tmpThread.shutdown();
+                TestUtils.waitForCondition(
+                    () -> tmpThread.state() == StreamThread.State.DEAD,
+                    10 * 1000,
+                    "Thread never stopped.");
+                threads[i].join();
             }
-        }, 10 * 1000, "Streams never started.");
-
-        for (int i = 0; i < NUM_THREADS; i++) {
-            final StreamThread tmpThread = threads[i];
-            tmpThread.shutdown();
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return tmpThread.state() == StreamThread.State.DEAD;
-                }
-            }, 10 * 1000, "Thread never stopped.");
-            threads[i].join();
+            TestUtils.waitForCondition(
+                () -> streams.state() == KafkaStreams.State.ERROR,
+                10 * 1000,
+                "Streams never stopped.");
+        } finally {
+            streams.close();
         }
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return streams.state() == KafkaStreams.State.ERROR;
-            }
-        }, 10 * 1000, "Streams never stopped.");
-        streams.close();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return streams.state() == KafkaStreams.State.NOT_RUNNING;
-            }
-        }, 10 * 1000, "Streams never stopped.");
+
+        TestUtils.waitForCondition(
+            () -> streams.state() == KafkaStreams.State.NOT_RUNNING,
+            10 * 1000,
+            "Streams never stopped.");
 
         final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
         globalThreadField.setAccessible(true);
         final GlobalStreamThread globalStreamThread = (GlobalStreamThread) globalThreadField.get(streams);
-        assertEquals(globalStreamThread, null);
+        assertNull(globalStreamThread);
     }
 
     @Test
     public void testStateGlobalThreadClose() throws Exception {
-        final StreamsBuilder builder = new StreamsBuilder();
         // make sure we have the global state thread running too
         builder.globalTable("anyTopic");
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
+        try {
+            streams.start();
+            TestUtils.waitForCondition(
+                () -> streams.state() == KafkaStreams.State.RUNNING,
+                10 * 1000,
+                "Streams never started.");
+            final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
+            globalThreadField.setAccessible(true);
+            final GlobalStreamThread globalStreamThread = (GlobalStreamThread) globalThreadField.get(streams);
+            globalStreamThread.shutdown();
+            TestUtils.waitForCondition(
+                () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD,
+                10 * 1000,
+                "Thread never stopped.");
+            globalStreamThread.join();
+            assertEquals(streams.state(), KafkaStreams.State.ERROR);
+        } finally {
+            streams.close();
+        }
 
-        streams.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return streams.state() == KafkaStreams.State.RUNNING;
-            }
-        }, 10 * 1000, "Streams never started.");
-        final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
-        globalThreadField.setAccessible(true);
-        final GlobalStreamThread globalStreamThread = (GlobalStreamThread) globalThreadField.get(streams);
-        globalStreamThread.shutdown();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return globalStreamThread.state() == GlobalStreamThread.State.DEAD;
-            }
-        }, 10 * 1000, "Thread never stopped.");
-        globalStreamThread.join();
-        assertEquals(streams.state(), KafkaStreams.State.ERROR);
-
-        streams.close();
         assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
     }
 
@@ -247,7 +240,6 @@ public class KafkaStreamsTest {
 
         props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 200);
 
-        final StreamsBuilder builder = new StreamsBuilder();
         // make sure we have the global state thread running too
         builder.globalTable("anyTopic");
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
@@ -256,6 +248,8 @@ public class KafkaStreamsTest {
             fail("expected start() to time out and throw an exception.");
         } catch (final StreamsException expected) {
             // This is a result of not being able to connect to the broker.
+        } finally {
+            streams.close();
         }
         // There's nothing to assert... We're testing that this operation actually completes.
     }
@@ -269,12 +263,14 @@ public class KafkaStreamsTest {
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
 
-        final StreamsBuilder builder = new StreamsBuilder();
         // make sure we have the global state thread running too
         builder.table("anyTopic");
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        streams.start();
-        streams.close();
+        try {
+            streams.start();
+        } finally {
+            streams.close();
+        }
         // There's nothing to assert... We're testing that this operation actually completes.
     }
 
@@ -282,98 +278,97 @@ public class KafkaStreamsTest {
     @Test
     public void testInitializesAndDestroysMetricsReporters() {
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
-        final StreamsBuilder builder = new StreamsBuilder();
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
-        final int initDiff = newInitCount - oldInitCount;
-        assertTrue("some reporters should be initialized by calling on construction", initDiff > 0);
 
-        streams.start();
-        final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
-        streams.close();
-        assertEquals(oldCloseCount + initDiff, MockMetricsReporter.CLOSE_COUNT.get());
+        try {
+            final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
+            final int initDiff = newInitCount - oldInitCount;
+            assertTrue("some reporters should be initialized by calling on construction", initDiff > 0);
+
+            streams.start();
+            final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
+            streams.close();
+            assertEquals(oldCloseCount + initDiff, MockMetricsReporter.CLOSE_COUNT.get());
+        } finally {
+            streams.close();
+        }
     }
 
     @Test
     public void testCloseIsIdempotent() {
-        streams.close();
+        globalStreams.close();
         final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
-        streams.close();
+        globalStreams.close();
         Assert.assertEquals("subsequent close() calls should do nothing",
             closeCount, MockMetricsReporter.CLOSE_COUNT.get());
     }
 
     @Test
     public void testCannotStartOnceClosed() {
-        streams.start();
-        streams.close();
+        globalStreams.start();
+        globalStreams.close();
         try {
-            streams.start();
+            globalStreams.start();
             fail("Should have throw IllegalStateException");
         } catch (final IllegalStateException expected) {
             // this is ok
         } finally {
-            streams.close();
+            globalStreams.close();
         }
     }
 
     @Test
     public void testCannotStartTwice() {
-        streams.start();
+        globalStreams.start();
 
         try {
-            streams.start();
+            globalStreams.start();
         } catch (final IllegalStateException e) {
             // this is ok
         } finally {
-            streams.close();
+            globalStreams.close();
         }
     }
 
     @Test
     public void shouldNotSetGlobalRestoreListenerAfterStarting() {
-        streams.start();
+        globalStreams.start();
         try {
-            streams.setGlobalStateRestoreListener(new MockStateRestoreListener());
+            globalStreams.setGlobalStateRestoreListener(new MockStateRestoreListener());
             fail("Should throw an IllegalStateException");
         } catch (final IllegalStateException e) {
             // expected
         } finally {
-            streams.close();
+            globalStreams.close();
         }
     }
 
     @Test
     public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
-        streams.start();
+        globalStreams.start();
         try {
-            streams.setUncaughtExceptionHandler(null);
+            globalStreams.setUncaughtExceptionHandler(null);
             fail("Should throw IllegalStateException");
         } catch (final IllegalStateException e) {
             // expected
-        } finally {
-            streams.close();
         }
     }
 
     @Test
     public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
-        streams.start();
+        globalStreams.start();
         try {
-            streams.setStateListener(null);
+            globalStreams.setStateListener(null);
             fail("Should throw IllegalStateException");
         } catch (final IllegalStateException e) {
             // expected
-        } finally {
-            streams.close();
         }
     }
 
     @Test
     public void testIllegalMetricsConfig() {
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
-        final StreamsBuilder builder = new StreamsBuilder();
 
         try {
             new KafkaStreams(builder.build(), props);
@@ -384,86 +379,79 @@ public class KafkaStreamsTest {
     @Test
     public void testLegalMetricsConfig() {
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
-        final StreamsBuilder builder1 = new StreamsBuilder();
-        final KafkaStreams streams1 = new KafkaStreams(builder1.build(), props);
-        streams1.close();
+        new KafkaStreams(builder.build(), props).close();
 
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString());
-        final StreamsBuilder builder2 = new StreamsBuilder();
-        new KafkaStreams(builder2.build(), props);
+        new KafkaStreams(builder.build(), props).close();
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetAllTasksWhenNotRunning() {
-        streams.allMetadata();
+        globalStreams.allMetadata();
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
-        streams.allMetadataForStore("store");
+        globalStreams.allMetadataForStore("store");
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
-        streams.metadataForKey("store", "key", Serdes.String().serializer());
+        globalStreams.metadataForKey("store", "key", Serdes.String().serializer());
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
-        streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
-            @Override
-            public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
-                return 0;
-            }
-        });
+        globalStreams.metadataForKey("store", "key", (topic, key, value, numPartitions) -> 0);
     }
 
     @Test
     public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
         final AtomicBoolean keepRunning = new AtomicBoolean(true);
+        KafkaStreams streams = null;
         try {
             final StreamsBuilder builder = new StreamsBuilder();
             final CountDownLatch latch = new CountDownLatch(1);
             final String topic = "input";
-            CLUSTER.createTopic(topic);
+            CLUSTER.createTopics(topic);
 
             builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-                    .foreach(new ForeachAction<String, String>() {
-                        @Override
-                        public void apply(final String key, final String value) {
-                            try {
-                                latch.countDown();
-                                while (keepRunning.get()) {
-                                    Thread.sleep(10);
-                                }
-                            } catch (final InterruptedException e) {
-                                // no-op
+                    .foreach((key, value) -> {
+                        try {
+                            latch.countDown();
+                            while (keepRunning.get()) {
+                                Thread.sleep(10);
                             }
+                        } catch (final InterruptedException e) {
+                            // no-op
                         }
                     });
-            final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+            streams = new KafkaStreams(builder.build(), props);
             streams.start();
             IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
-                                                                            Collections.singletonList(new KeyValue<>("A", "A")),
-                                                                            TestUtils.producerConfig(
-                                                                                    CLUSTER.bootstrapServers(),
-                                                                                    StringSerializer.class,
-                                                                                    StringSerializer.class,
-                                                                                    new Properties()),
-                                                                                    System.currentTimeMillis());
+                Collections.singletonList(new KeyValue<>("A", "A")),
+                TestUtils.producerConfig(
+                    CLUSTER.bootstrapServers(),
+                    StringSerializer.class,
+                    StringSerializer.class,
+                    new Properties()),
+                System.currentTimeMillis());
 
             assertTrue("Timed out waiting to receive single message", latch.await(30, TimeUnit.SECONDS));
             assertFalse(streams.close(10, TimeUnit.MILLISECONDS));
         } finally {
             // stop the thread so we don't interfere with other tests etc
             keepRunning.set(false);
+            if (streams != null) {
+                streams.close();
+            }
         }
     }
 
     @Test
     public void shouldReturnThreadMetadata() {
-        streams.start();
-        final Set<ThreadMetadata> threadMetadata = streams.localThreadsMetadata();
+        globalStreams.start();
+        final Set<ThreadMetadata> threadMetadata = globalStreams.localThreadsMetadata();
         assertNotNull(threadMetadata);
         assertEquals(2, threadMetadata.size());
         for (final ThreadMetadata metadata : threadMetadata) {
@@ -472,39 +460,32 @@ public class KafkaStreamsTest {
             assertEquals(0, metadata.standbyTasks().size());
             assertEquals(0, metadata.activeTasks().size());
         }
-        streams.close();
     }
 
     @Test
-    public void testCleanup() {
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-
-        streams.cleanUp();
-        streams.start();
-        streams.close();
-        streams.cleanUp();
+    public void shouldAllowCleanupBeforeStartAndAfterClose() {
+        try {
+            globalStreams.cleanUp();
+            globalStreams.start();
+        } finally {
+            globalStreams.close();
+        }
+        globalStreams.cleanUp();
     }
 
     @Test
-    public void testCannotCleanupWhileRunning() throws InterruptedException {
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+    public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
+        globalStreams.start();
+        TestUtils.waitForCondition(
+            () -> globalStreams.state() == KafkaStreams.State.RUNNING,
+            10 * 1000,
+            "Streams never started.");
 
-        streams.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return streams.state() == KafkaStreams.State.RUNNING;
-            }
-        }, 10 * 1000, "Streams never started.");
         try {
-            streams.cleanUp();
+            globalStreams.cleanUp();
             fail("Should have thrown IllegalStateException");
         } catch (final IllegalStateException expected) {
             assertEquals("Cannot clean up while running.", expected.getMessage());
-        } finally {
-            streams.close();
         }
     }
 
@@ -520,19 +501,17 @@ public class KafkaStreamsTest {
         builder.table(topic, consumed);
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        final CountDownLatch latch = new CountDownLatch(1);
-        streams.setStateListener(new KafkaStreams.StateListener() {
-            @Override
-            public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
+        try {
+            final CountDownLatch latch = new CountDownLatch(1);
+            streams.setStateListener((newState, oldState) -> {
                 if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                     latch.countDown();
                 }
-            }
-        });
-        final String appDir = props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
-        final File oldTaskDir = new File(appDir, "10_1");
-        assertTrue(oldTaskDir.mkdirs());
-        try {
+            });
+            final String appDir = props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
+            final File oldTaskDir = new File(appDir, "10_1");
+            assertTrue(oldTaskDir.mkdirs());
+
             streams.start();
             latch.await(30, TimeUnit.SECONDS);
             verifyCleanupStateDir(appDir, oldTaskDir);
@@ -545,12 +524,10 @@ public class KafkaStreamsTest {
 
     private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException {
         final File taskDir = new File(appDir, "0_0");
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return !oldTaskDir.exists() && taskDir.exists();
-            }
-        }, 30000, "cleanup has not successfully run");
+        TestUtils.waitForCondition(
+            () -> !oldTaskDir.exists() && taskDir.exists(),
+            30000,
+            "cleanup has not successfully run");
         assertTrue(taskDir.exists());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index caf2b10..02840b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -33,8 +33,8 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStore;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
 import org.junit.Assert;
@@ -311,7 +311,7 @@ public class TopologyTest {
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         mockStoreBuilder();
-        EasyMock.expect(storeBuilder.build()).andReturn(new MockStateStore("store", false));
+        EasyMock.expect(storeBuilder.build()).andReturn(new MockKeyValueStore("store", false));
         EasyMock.replay(storeBuilder);
         topology
             .addSource(sourceNodeName, "topic")
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 7cfde61..be87eb2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -41,7 +41,7 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStoreBuilder;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -239,7 +239,7 @@ public class RegexSourceIntegrationTest {
     public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException {
 
         final ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
-        final StoreBuilder storeBuilder = new MockStoreBuilder("testStateStore", false);
+        final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("testStateStore", false);
         final long thirtySecondTimeout = 30 * 1000;
 
         final TopologyWrapper topology = new TopologyWrapper();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index c4699ec..2869826 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.test.MockStateStore;
+import org.apache.kafka.test.MockKeyValueStore;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -47,7 +47,7 @@ public class AbstractProcessorContextTest {
 
     private final MockStreamsMetrics metrics = new MockStreamsMetrics(new Metrics());
     private final AbstractProcessorContext context = new TestProcessorContext(metrics);
-    private final MockStateStore stateStore = new MockStateStore("store", false);
+    private final MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
     private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
     private final ProcessorRecordContext recordContext = new ProcessorRecordContext(10, System.currentTimeMillis(), 1, "foo", headers);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 7230e5f..daf1f33 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStoreBuilder;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
@@ -53,6 +53,7 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -61,7 +62,7 @@ public class InternalTopologyBuilderTest {
 
     private final Serde<String> stringSerde = Serdes.String();
     private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
-    private final StoreBuilder storeBuilder = new MockStoreBuilder("store", false);
+    private final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("store", false);
 
     @Test
     public void shouldAddSourceWithOffsetReset() {
@@ -374,14 +375,14 @@ public class InternalTopologyBuilderTest {
 
         builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
         builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
-        builder.addStateStore(new MockStoreBuilder("store-1", false), "processor-1", "processor-2");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store-1", false), "processor-1", "processor-2");
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
         builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
-        builder.addStateStore(new MockStoreBuilder("store-2", false), "processor-3", "processor-4");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store-2", false), "processor-3", "processor-4");
 
         builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5");
-        builder.addStateStore(new MockStoreBuilder("store-3", false));
+        builder.addStateStore(new MockKeyValueStoreBuilder("store-3", false));
         builder.connectProcessorAndStateStores("processor-5", "store-3");
 
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
@@ -429,6 +430,57 @@ public class InternalTopologyBuilderTest {
         assertEquals(mkSet("source-5"), nodeNames(topology2.processors()));
     }
 
+    @Test
+    public void shouldAllowIncrementalBuilds() {
+        Map<Integer, Set<String>> oldNodeGroups, newNodeGroups;
+
+        oldNodeGroups = builder.nodeGroups();
+        builder.addSource(null, "source-1", null, null, null, "topic-1");
+        builder.addSource(null, "source-2", null, null, null, "topic-2");
+        newNodeGroups = builder.nodeGroups();
+        assertNotEquals(oldNodeGroups, newNodeGroups);
+
+        oldNodeGroups = newNodeGroups;
+        builder.addSource(null, "source-3", null, null, null, Pattern.compile(""));
+        builder.addSource(null, "source-4", null, null, null, Pattern.compile(""));
+        newNodeGroups = builder.nodeGroups();
+        assertNotEquals(oldNodeGroups, newNodeGroups);
+
+        oldNodeGroups = newNodeGroups;
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
+        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
+        newNodeGroups = builder.nodeGroups();
+        assertNotEquals(oldNodeGroups, newNodeGroups);
+
+        oldNodeGroups = newNodeGroups;
+        builder.addSink("sink-1", "sink-topic", null, null, null, "processor-1");
+        newNodeGroups = builder.nodeGroups();
+        assertNotEquals(oldNodeGroups, newNodeGroups);
+
+        oldNodeGroups = newNodeGroups;
+        builder.addSink("sink-2", (k, v, ctx) -> "sink-topic", null, null, null, "processor-2");
+        newNodeGroups = builder.nodeGroups();
+        assertNotEquals(oldNodeGroups, newNodeGroups);
+
+        oldNodeGroups = newNodeGroups;
+        builder.addStateStore(new MockKeyValueStoreBuilder("store-1", false), "processor-1", "processor-2");
+        newNodeGroups = builder.nodeGroups();
+        assertNotEquals(oldNodeGroups, newNodeGroups);
+
+        oldNodeGroups = newNodeGroups;
+        builder.addStateStore(new MockKeyValueStoreBuilder("store-2", false));
+        builder.connectProcessorAndStateStores("processor-2", "store-2");
+        builder.connectProcessorAndStateStores("processor-3", "store-2");
+        newNodeGroups = builder.nodeGroups();
+        assertNotEquals(oldNodeGroups, newNodeGroups);
+
+        oldNodeGroups = newNodeGroups;
+        builder.addGlobalStore(new MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled(), "globalSource", null, null, null, "globalTopic", "global-processor", new MockProcessorSupplier());
+        newNodeGroups = builder.nodeGroups();
+        assertNotEquals(oldNodeGroups, newNodeGroups);
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullNameWhenAddingSink() {
         builder.addSink(null, "topic", null, null, null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 4287c77..cd95a68 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockBatchingStateRestoreListener;
-import org.apache.kafka.test.MockStateStore;
+import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.NoOpProcessorContext;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -69,15 +69,15 @@ public class ProcessorStateManagerTest {
     private final String nonPersistentStoreName = "nonPersistentStore";
     private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName);
     private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName);
-    private final MockStateStore persistentStore = new MockStateStore(persistentStoreName, true);
-    private final MockStateStore nonPersistentStore = new MockStateStore(nonPersistentStoreName, false);
+    private final MockKeyValueStore persistentStore = new MockKeyValueStore(persistentStoreName, true);
+    private final MockKeyValueStore nonPersistentStore = new MockKeyValueStore(nonPersistentStoreName, false);
     private final TopicPartition persistentStorePartition = new TopicPartition(persistentStoreTopicName, 1);
-    private final String storeName = "mockStateStore";
+    private final String storeName = "mockKeyValueStore";
     private final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
     private final TopicPartition changelogTopicPartition = new TopicPartition(changelogTopic, 0);
     private final TaskId taskId = new TaskId(0, 1);
     private final MockChangelogReader changelogReader = new MockChangelogReader();
-    private final MockStateStore mockStateStore = new MockStateStore(storeName, true);
+    private final MockKeyValueStore mockKeyValueStore = new MockKeyValueStore(storeName, true);
     private final byte[] key = new byte[]{0x0, 0x0, 0x0, 0x1};
     private final byte[] value = "the-value".getBytes(Charset.forName("UTF-8"));
     private final ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(changelogTopic, 0, 0, key, value);
@@ -115,7 +115,7 @@ public class ProcessorStateManagerTest {
 
         final KeyValue<byte[], byte[]> expectedKeyValue = KeyValue.pair(key, value);
 
-        final MockStateStore persistentStore = getPersistentStore();
+        final MockKeyValueStore persistentStore = getPersistentStore();
         final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
 
         try {
@@ -137,7 +137,7 @@ public class ProcessorStateManagerTest {
         final TaskId taskId = new TaskId(0, 2);
         final Integer intKey = 1;
 
-        final MockStateStore persistentStore = getPersistentStore();
+        final MockKeyValueStore persistentStore = getPersistentStore();
         final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
 
         try {
@@ -158,7 +158,7 @@ public class ProcessorStateManagerTest {
     public void testRegisterPersistentStore() throws IOException {
         final TaskId taskId = new TaskId(0, 2);
 
-        final MockStateStore persistentStore = getPersistentStore();
+        final MockKeyValueStore persistentStore = getPersistentStore();
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             taskId,
             noPartitions,
@@ -184,8 +184,8 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testRegisterNonPersistentStore() throws IOException {
-        final MockStateStore nonPersistentStore
-            = new MockStateStore(nonPersistentStoreName, false); // non persistent store
+        final MockKeyValueStore nonPersistentStore
+            = new MockKeyValueStore(nonPersistentStoreName, false); // non persistent store
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             new TaskId(0, 2),
             noPartitions,
@@ -233,9 +233,9 @@ public class ProcessorStateManagerTest {
         final TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
         final TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
 
-        final MockStateStore store1 = new MockStateStore(storeName1, true);
-        final MockStateStore store2 = new MockStateStore(storeName2, true);
-        final MockStateStore store3 = new MockStateStore(storeName3, true);
+        final MockKeyValueStore store1 = new MockKeyValueStore(storeName1, true);
+        final MockKeyValueStore store2 = new MockKeyValueStore(storeName2, true);
+        final MockKeyValueStore store3 = new MockKeyValueStore(storeName3, true);
 
         // if there is a source partition, inherit the partition id
         final Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
@@ -272,7 +272,7 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testGetStore() throws IOException {
-        final MockStateStore mockStateStore = new MockStateStore(nonPersistentStoreName, false);
+        final MockKeyValueStore mockKeyValueStore = new MockKeyValueStore(nonPersistentStoreName, false);
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             new TaskId(0, 1),
             noPartitions,
@@ -283,10 +283,10 @@ public class ProcessorStateManagerTest {
             false,
             logContext);
         try {
-            stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
+            stateMgr.register(mockKeyValueStore, mockKeyValueStore.stateRestoreCallback);
 
             assertNull(stateMgr.getStore("noSuchStore"));
-            assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName));
+            assertEquals(mockKeyValueStore, stateMgr.getStore(nonPersistentStoreName));
 
         } finally {
             stateMgr.close(Collections.emptyMap());
@@ -361,7 +361,7 @@ public class ProcessorStateManagerTest {
         final Map<TopicPartition, Long> offsets = Collections.singletonMap(persistentStorePartition, 99L);
         checkpoint.write(offsets);
 
-        final MockStateStore persistentStore = new MockStateStore(persistentStoreName, true);
+        final MockKeyValueStore persistentStore = new MockKeyValueStore(persistentStoreName, true);
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             taskId,
             noPartitions,
@@ -476,7 +476,7 @@ public class ProcessorStateManagerTest {
             logContext);
 
         try {
-            stateManager.register(new MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), null);
+            stateManager.register(new MockKeyValueStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), null);
             fail("should have thrown illegal argument exception when store name same as checkpoint file");
         } catch (final IllegalArgumentException e) {
             //pass
@@ -495,10 +495,10 @@ public class ProcessorStateManagerTest {
             false,
             logContext);
 
-        stateManager.register(mockStateStore, null);
+        stateManager.register(mockKeyValueStore, null);
 
         try {
-            stateManager.register(mockStateStore, null);
+            stateManager.register(mockKeyValueStore, null);
             fail("should have thrown illegal argument exception when store with same name already registered");
         } catch (final IllegalArgumentException e) {
             // pass
@@ -519,7 +519,7 @@ public class ProcessorStateManagerTest {
             false,
             logContext);
 
-        final MockStateStore stateStore = new MockStateStore(storeName, true) {
+        final MockKeyValueStore stateStore = new MockKeyValueStore(storeName, true) {
             @Override
             public void flush() {
                 throw new RuntimeException("KABOOM!");
@@ -548,7 +548,7 @@ public class ProcessorStateManagerTest {
             false,
             logContext);
 
-        final MockStateStore stateStore = new MockStateStore(storeName, true) {
+        final MockKeyValueStore stateStore = new MockKeyValueStore(storeName, true) {
             @Override
             public void close() {
                 throw new RuntimeException("KABOOM!");
@@ -578,13 +578,13 @@ public class ProcessorStateManagerTest {
 
         final AtomicBoolean flushedStore = new AtomicBoolean(false);
 
-        final MockStateStore stateStore1 = new MockStateStore(storeName, true) {
+        final MockKeyValueStore stateStore1 = new MockKeyValueStore(storeName, true) {
             @Override
             public void flush() {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        final MockStateStore stateStore2 = new MockStateStore(storeName + "2", true) {
+        final MockKeyValueStore stateStore2 = new MockKeyValueStore(storeName + "2", true) {
             @Override
             public void flush() {
                 flushedStore.set(true);
@@ -613,13 +613,13 @@ public class ProcessorStateManagerTest {
 
         final AtomicBoolean closedStore = new AtomicBoolean(false);
 
-        final MockStateStore stateStore1 = new MockStateStore(storeName, true) {
+        final MockKeyValueStore stateStore1 = new MockKeyValueStore(storeName, true) {
             @Override
             public void close() {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        final MockStateStore stateStore2 = new MockStateStore(storeName + "2", true) {
+        final MockKeyValueStore stateStore2 = new MockKeyValueStore(storeName + "2", true) {
             @Override
             public void close() {
                 closedStore.set(true);
@@ -690,8 +690,8 @@ public class ProcessorStateManagerTest {
             eosEnabled,
             logContext);
 
-        final MockStateStore stateStore = new MockStateStore(storeName, true);
-        final MockStateStore stateStore2 = new MockStateStore(store2Name, true);
+        final MockKeyValueStore stateStore = new MockKeyValueStore(storeName, true);
+        final MockKeyValueStore stateStore2 = new MockKeyValueStore(store2Name, true);
 
         stateManager.register(stateStore, stateStore.stateRestoreCallback);
         stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
@@ -726,8 +726,8 @@ public class ProcessorStateManagerTest {
             logContext);
     }
 
-    private MockStateStore getPersistentStore() {
-        return new MockStateStore("persistentStore", true);
+    private MockKeyValueStore getPersistentStore() {
+        return new MockKeyValueStore("persistentStore", true);
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index c822fc3..820191d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -49,10 +49,10 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.WindowKeySchema;
+import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.MockRestoreConsumer;
 import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.MockStateStore;
-import org.apache.kafka.test.MockStoreBuilder;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -104,7 +104,7 @@ public class StandbyTaskTest {
 
     private final Set<TopicPartition> topicPartitions = Collections.emptySet();
     private final ProcessorTopology topology = ProcessorTopology.withLocalStores(
-        mkList(new MockStoreBuilder(storeName1, false).build(), new MockStoreBuilder(storeName2, true).build()),
+        mkList(new MockKeyValueStoreBuilder(storeName1, false).build(), new MockKeyValueStoreBuilder(storeName2, true).build()),
         mkMap(
             mkEntry(storeName1, storeChangelogTopicName1),
             mkEntry(storeName2, storeChangelogTopicName2)
@@ -113,7 +113,7 @@ public class StandbyTaskTest {
     private final TopicPartition globalTopicPartition = new TopicPartition(globalStoreName, 0);
     private final Set<TopicPartition> ktablePartitions = Utils.mkSet(globalTopicPartition);
     private final ProcessorTopology ktableTopology = ProcessorTopology.withLocalStores(
-        singletonList(new MockStoreBuilder(globalTopicPartition.topic(), true).withLoggingDisabled().build()),
+        singletonList(new MockKeyValueStoreBuilder(globalTopicPartition.topic(), true).withLoggingDisabled().build()),
         mkMap(
             mkEntry(globalStoreName, globalTopicPartition.topic())
         )
@@ -208,8 +208,8 @@ public class StandbyTaskTest {
         task.update(partition2, restoreStateConsumer.poll(Duration.ofMillis(100)).records(partition2));
 
         final StandbyContextImpl context = (StandbyContextImpl) task.context();
-        final MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1);
-        final MockStateStore store2 = (MockStateStore) context.getStateMgr().getStore(storeName2);
+        final MockKeyValueStore store1 = (MockKeyValueStore) context.getStateMgr().getStore(storeName1);
+        final MockKeyValueStore store2 = (MockKeyValueStore) context.getStateMgr().getStore(storeName2);
 
         assertEquals(Collections.emptyList(), store1.keys);
         assertEquals(mkList(1, 2, 3), store2.keys);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 834ab3e..d332b5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -50,7 +50,7 @@ import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.MockStateStore;
+import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
@@ -109,7 +109,7 @@ public class StreamTaskTest {
     private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode<>(10L, PunctuationType.WALL_CLOCK_TIME);
 
     private final String storeName = "store";
-    private final StateStore stateStore = new MockStateStore(storeName, false);
+    private final StateStore stateStore = new MockKeyValueStore(storeName, false);
     private final TopicPartition changelogPartition = new TopicPartition("store-changelog", 0);
     private final Long offset = 543L;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 9693184..6102969 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -43,7 +43,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStoreBuilder;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.Test;
@@ -333,10 +333,10 @@ public class StreamsPartitionAssignorTest {
     public void testAssignWithPartialTopology() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
-        builder.addStateStore(new MockStoreBuilder("store1", false), "processor1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
-        builder.addStateStore(new MockStoreBuilder("store2", false), "processor2");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), "processor2");
         final List<String> topics = Utils.mkList("topic1", "topic2");
         final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
@@ -474,11 +474,11 @@ public class StreamsPartitionAssignorTest {
         builder.addSource(null, "source2", null, null, null, "topic2");
 
         builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
-        builder.addStateStore(new MockStoreBuilder("store1", false), "processor-1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor-1");
 
         builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
-        builder.addStateStore(new MockStoreBuilder("store2", false), "processor-2");
-        builder.addStateStore(new MockStoreBuilder("store3", false), "processor-2");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), "processor-2");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store3", false), "processor-2");
 
         final List<String> topics = Utils.mkList("topic1", "topic2");
 
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
similarity index 70%
rename from streams/src/test/java/org/apache/kafka/test/MockStateStore.java
rename to streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
index a2b0d21..8fd1f74 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
@@ -21,10 +21,13 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.ArrayList;
+import java.util.List;
 
-public class MockStateStore implements StateStore {
+public class MockKeyValueStore implements KeyValueStore {
     private final String name;
     private final boolean persistent;
 
@@ -33,8 +36,8 @@ public class MockStateStore implements StateStore {
     public boolean closed = true;
     public final ArrayList<Integer> keys = new ArrayList<>();
 
-    public MockStateStore(final String name,
-                          final boolean persistent) {
+    public MockKeyValueStore(final String name,
+                             final boolean persistent) {
         this.name = name;
         this.persistent = persistent;
     }
@@ -81,4 +84,44 @@ public class MockStateStore implements StateStore {
             keys.add(deserializer.deserialize("", key));
         }
     };
+
+    @Override
+    public void put(final Object key, final Object value) {
+
+    }
+
+    @Override
+    public Object putIfAbsent(final Object key, final Object value) {
+        return null;
+    }
+
+    @Override
+    public Object delete(final Object key) {
+        return null;
+    }
+
+    @Override
+    public void putAll(final List entries) {
+
+    }
+
+    @Override
+    public Object get(final Object key) {
+        return null;
+    }
+
+    @Override
+    public KeyValueIterator range(final Object from, final Object to) {
+        return null;
+    }
+
+    @Override
+    public KeyValueIterator all() {
+        return null;
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return 0;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStoreBuilder.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java
similarity index 78%
rename from streams/src/test/java/org/apache/kafka/test/MockStoreBuilder.java
rename to streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java
index 41aa239..70b4e02 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStoreBuilder.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java
@@ -18,22 +18,22 @@ package org.apache.kafka.test;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.internals.AbstractStoreBuilder;
 
-public class MockStoreBuilder extends AbstractStoreBuilder<Integer, byte[], StateStore> {
+public class MockKeyValueStoreBuilder extends AbstractStoreBuilder<Integer, byte[], KeyValueStore> {
 
     private final boolean persistent;
 
-    public MockStoreBuilder(final String storeName, final boolean persistent) {
+    public MockKeyValueStoreBuilder(final String storeName, final boolean persistent) {
         super(storeName, Serdes.Integer(), Serdes.ByteArray(), new MockTime());
 
         this.persistent = persistent;
     }
 
     @Override
-    public StateStore build() {
-        return new MockStateStore(name, persistent);
+    public KeyValueStore build() {
+        return new MockKeyValueStore(name, persistent);
     }
 }