You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/10 01:27:28 UTC

[2/8] kafka git commit: MINOR: remove Kafka Streams in 0.9.0

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
deleted file mode 100644
index 909df13..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ /dev/null
@@ -1,464 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.PartitionGrouper;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Test;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-public class StreamThreadTest {
-
-    private TopicPartition t1p1 = new TopicPartition("topic1", 1);
-    private TopicPartition t1p2 = new TopicPartition("topic1", 2);
-    private TopicPartition t2p1 = new TopicPartition("topic2", 1);
-    private TopicPartition t2p2 = new TopicPartition("topic2", 2);
-    private TopicPartition t3p1 = new TopicPartition("topic3", 1);
-    private TopicPartition t3p2 = new TopicPartition("topic3", 2);
-
-    private List<PartitionInfo> infos = Arrays.asList(
-            new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
-    );
-
-    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
-
-    PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"));
-
-    // task0 is unused
-    private final TaskId task1 = new TaskId(0, 1);
-    private final TaskId task2 = new TaskId(0, 2);
-    private final TaskId task3 = new TaskId(0, 3);
-    private final TaskId task4 = new TaskId(1, 1);
-    private final TaskId task5 = new TaskId(1, 2);
-
-    private Properties configProps() {
-        return new Properties() {
-            {
-                setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-            }
-        };
-    }
-
-    private static class TestStreamTask extends StreamTask {
-        public boolean committed = false;
-
-        public TestStreamTask(TaskId id,
-                              Consumer<byte[], byte[]> consumer,
-                              Producer<byte[], byte[]> producer,
-                              Consumer<byte[], byte[]> restoreConsumer,
-                              Collection<TopicPartition> partitions,
-                              ProcessorTopology topology,
-                              StreamingConfig config) {
-            super(id, consumer, producer, restoreConsumer, partitions, topology, config, null);
-        }
-
-        @Override
-        public void commit() {
-            super.commit();
-            committed = true;
-        }
-    }
-
-    private ByteArraySerializer serializer = new ByteArraySerializer();
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testPartitionAssignmentChange() throws Exception {
-        StreamingConfig config = new StreamingConfig(configProps());
-
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addSource("source3", "topic3");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
-
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), new SystemTime()) {
-            @Override
-            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
-                ProcessorTopology topology = builder.build(id.topicGroupId);
-                return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
-            }
-        };
-
-        initPartitionGrouper(thread);
-
-        ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
-
-        assertTrue(thread.tasks().isEmpty());
-
-        List<TopicPartition> revokedPartitions;
-        List<TopicPartition> assignedPartitions;
-        Set<TopicPartition> expectedGroup1;
-        Set<TopicPartition> expectedGroup2;
-
-        revokedPartitions = Collections.emptyList();
-        assignedPartitions = Collections.singletonList(t1p1);
-        expectedGroup1 = new HashSet<>(Arrays.asList(t1p1));
-
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
-        rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
-        assertTrue(thread.tasks().containsKey(task1));
-        assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
-        assertEquals(1, thread.tasks().size());
-
-        revokedPartitions = assignedPartitions;
-        assignedPartitions = Collections.singletonList(t1p2);
-        expectedGroup2 = new HashSet<>(Arrays.asList(t1p2));
-
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
-        rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
-        assertTrue(thread.tasks().containsKey(task2));
-        assertEquals(expectedGroup2, thread.tasks().get(task2).partitions());
-        assertEquals(1, thread.tasks().size());
-
-        revokedPartitions = assignedPartitions;
-        assignedPartitions = Arrays.asList(t1p1, t1p2);
-        expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
-        expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
-
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
-        rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
-        assertTrue(thread.tasks().containsKey(task1));
-        assertTrue(thread.tasks().containsKey(task2));
-        assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task2).partitions());
-        assertEquals(2, thread.tasks().size());
-
-        revokedPartitions = assignedPartitions;
-        assignedPartitions = Arrays.asList(t2p1, t2p2, t3p1, t3p2);
-        expectedGroup1 = new HashSet<>(Arrays.asList(t2p1, t3p1));
-        expectedGroup2 = new HashSet<>(Arrays.asList(t2p2, t3p2));
-
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
-        rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
-        assertTrue(thread.tasks().containsKey(task4));
-        assertTrue(thread.tasks().containsKey(task5));
-        assertEquals(expectedGroup1, thread.tasks().get(task4).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task5).partitions());
-        assertEquals(2, thread.tasks().size());
-
-        revokedPartitions = assignedPartitions;
-        assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1);
-        expectedGroup1 = new HashSet<>(Arrays.asList(t1p1));
-        expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
-
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
-        rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
-        assertTrue(thread.tasks().containsKey(task1));
-        assertTrue(thread.tasks().containsKey(task4));
-        assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
-        assertEquals(2, thread.tasks().size());
-
-        revokedPartitions = assignedPartitions;
-        assignedPartitions = Collections.emptyList();
-
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
-        rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
-        assertTrue(thread.tasks().isEmpty());
-    }
-
-    @Test
-    public void testMaybeClean() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            final long cleanupDelay = 1000L;
-            Properties props = configProps();
-            props.setProperty(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay));
-            props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
-
-            StreamingConfig config = new StreamingConfig(props);
-
-            File stateDir1 = new File(baseDir, task1.toString());
-            File stateDir2 = new File(baseDir, task2.toString());
-            File stateDir3 = new File(baseDir, task3.toString());
-            File extraDir = new File(baseDir, "X");
-            stateDir1.mkdir();
-            stateDir2.mkdir();
-            stateDir3.mkdir();
-            extraDir.mkdir();
-
-            MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-            MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-            final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-            MockTime mockTime = new MockTime();
-
-            TopologyBuilder builder = new TopologyBuilder();
-            builder.addSource("source1", "topic1");
-
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), mockTime) {
-                @Override
-                public void maybeClean() {
-                    super.maybeClean();
-                }
-
-                @Override
-                protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
-                    ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
-                }
-            };
-
-            initPartitionGrouper(thread);
-
-            ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
-
-            assertTrue(thread.tasks().isEmpty());
-            mockTime.sleep(cleanupDelay);
-
-            // all directories exist since an assignment didn't happen
-            assertTrue(stateDir1.exists());
-            assertTrue(stateDir2.exists());
-            assertTrue(stateDir3.exists());
-            assertTrue(extraDir.exists());
-
-            List<TopicPartition> revokedPartitions;
-            List<TopicPartition> assignedPartitions;
-            Map<Integer, StreamTask> prevTasks;
-
-            //
-            // Assign t1p1 and t1p2. This should create task1 & task2
-            //
-            revokedPartitions = Collections.emptyList();
-            assignedPartitions = Arrays.asList(t1p1, t1p2);
-            prevTasks = new HashMap(thread.tasks());
-
-            rebalanceListener.onPartitionsRevoked(revokedPartitions);
-            rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
-            // there shouldn't be any previous task
-            assertTrue(prevTasks.isEmpty());
-
-            // task 1 & 2 are created
-            assertEquals(2, thread.tasks().size());
-
-            // all directories should still exit before the cleanup delay time
-            mockTime.sleep(cleanupDelay - 10L);
-            thread.maybeClean();
-            assertTrue(stateDir1.exists());
-            assertTrue(stateDir2.exists());
-            assertTrue(stateDir3.exists());
-            assertTrue(extraDir.exists());
-
-            // all state directories except for task task2 & task3 will be removed. the extra directory should still exists
-            mockTime.sleep(11L);
-            thread.maybeClean();
-            assertTrue(stateDir1.exists());
-            assertTrue(stateDir2.exists());
-            assertFalse(stateDir3.exists());
-            assertTrue(extraDir.exists());
-
-            //
-            // Revoke t1p1 and t1p2. This should remove task1 & task2
-            //
-            revokedPartitions = assignedPartitions;
-            assignedPartitions = Collections.emptyList();
-            prevTasks = new HashMap(thread.tasks());
-
-            rebalanceListener.onPartitionsRevoked(revokedPartitions);
-            rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
-            // previous tasks should be committed
-            assertEquals(2, prevTasks.size());
-            for (StreamTask task : prevTasks.values()) {
-                assertTrue(((TestStreamTask) task).committed);
-                ((TestStreamTask) task).committed = false;
-            }
-
-            // no task
-            assertTrue(thread.tasks().isEmpty());
-
-            // all state directories for task task1 & task2 still exist before the cleanup delay time
-            mockTime.sleep(cleanupDelay - 10L);
-            thread.maybeClean();
-            assertTrue(stateDir1.exists());
-            assertTrue(stateDir2.exists());
-            assertFalse(stateDir3.exists());
-            assertTrue(extraDir.exists());
-
-            // all state directories for task task1 & task2 are removed
-            mockTime.sleep(11L);
-            thread.maybeClean();
-            assertFalse(stateDir1.exists());
-            assertFalse(stateDir2.exists());
-            assertFalse(stateDir3.exists());
-            assertTrue(extraDir.exists());
-
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test
-    public void testMaybeCommit() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            final long commitInterval = 1000L;
-            Properties props = configProps();
-            props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
-            props.setProperty(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
-
-            StreamingConfig config = new StreamingConfig(props);
-
-            MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-            MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-            final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-            MockTime mockTime = new MockTime();
-
-            TopologyBuilder builder = new TopologyBuilder();
-            builder.addSource("source1", "topic1");
-
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), mockTime) {
-                @Override
-                public void maybeCommit() {
-                    super.maybeCommit();
-                }
-
-                @Override
-                protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
-                    ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
-                }
-            };
-
-            initPartitionGrouper(thread);
-
-            ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
-
-            List<TopicPartition> revokedPartitions;
-            List<TopicPartition> assignedPartitions;
-
-            //
-            // Assign t1p1 and t1p2. This should create Task 1 & 2
-            //
-            revokedPartitions = Collections.emptyList();
-            assignedPartitions = Arrays.asList(t1p1, t1p2);
-
-            rebalanceListener.onPartitionsRevoked(revokedPartitions);
-            rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
-            assertEquals(2, thread.tasks().size());
-
-            // no task is committed before the commit interval
-            mockTime.sleep(commitInterval - 10L);
-            thread.maybeCommit();
-            for (StreamTask task : thread.tasks().values()) {
-                assertFalse(((TestStreamTask) task).committed);
-            }
-
-            // all tasks are committed after the commit interval
-            mockTime.sleep(11L);
-            thread.maybeCommit();
-            for (StreamTask task : thread.tasks().values()) {
-                assertTrue(((TestStreamTask) task).committed);
-                ((TestStreamTask) task).committed = false;
-            }
-
-            // no task is committed before the commit interval, again
-            mockTime.sleep(commitInterval - 10L);
-            thread.maybeCommit();
-            for (StreamTask task : thread.tasks().values()) {
-                assertFalse(((TestStreamTask) task).committed);
-            }
-
-            // all tasks are committed after the commit interval, again
-            mockTime.sleep(11L);
-            thread.maybeCommit();
-            for (StreamTask task : thread.tasks().values()) {
-                assertTrue(((TestStreamTask) task).committed);
-                ((TestStreamTask) task).committed = false;
-            }
-
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    private void initPartitionGrouper(StreamThread thread) {
-        PartitionGrouper partitionGrouper = thread.partitionGrouper();
-
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-
-        partitionAssignor.configure(
-                Collections.singletonMap(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE, partitionGrouper)
-        );
-
-        Map<String, PartitionAssignor.Assignment> assignments =
-                partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));
-
-        partitionAssignor.onAssignment(assignments.get("client"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
deleted file mode 100644
index 209f3c9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.junit.Test;
-
-public abstract class AbstractKeyValueStoreTest {
-
-    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(StreamingConfig config,
-                                                                      ProcessorContext context,
-                                                                      Class<K> keyClass, Class<V> valueClass,
-                                                                      boolean useContextSerdes);
-
-    @Test
-    public void testPutGetRange() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false);
-        try {
-
-            // Verify that the store reads and writes correctly ...
-            store.put(0, "zero");
-            store.put(1, "one");
-            store.put(2, "two");
-            store.put(4, "four");
-            store.put(5, "five");
-            assertEquals(5, driver.sizeOf(store));
-            assertEquals("zero", store.get(0));
-            assertEquals("one", store.get(1));
-            assertEquals("two", store.get(2));
-            assertNull(store.get(3));
-            assertEquals("four", store.get(4));
-            assertEquals("five", store.get(5));
-            store.delete(5);
-
-            // Flush the store and verify all current entries were properly flushed ...
-            store.flush();
-            assertEquals("zero", driver.flushedEntryStored(0));
-            assertEquals("one", driver.flushedEntryStored(1));
-            assertEquals("two", driver.flushedEntryStored(2));
-            assertEquals("four", driver.flushedEntryStored(4));
-            assertEquals(null, driver.flushedEntryStored(5));
-
-            assertEquals(false, driver.flushedEntryRemoved(0));
-            assertEquals(false, driver.flushedEntryRemoved(1));
-            assertEquals(false, driver.flushedEntryRemoved(2));
-            assertEquals(false, driver.flushedEntryRemoved(4));
-            assertEquals(true, driver.flushedEntryRemoved(5));
-
-            // Check range iteration ...
-            try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
-                while (iter.hasNext()) {
-                    Entry<Integer, String> entry = iter.next();
-                    if (entry.key().equals(2))
-                        assertEquals("two", entry.value());
-                    else if (entry.key().equals(4))
-                        assertEquals("four", entry.value());
-                    else
-                        fail("Unexpected entry: " + entry);
-                }
-            }
-
-            // Check range iteration ...
-            try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
-                while (iter.hasNext()) {
-                    Entry<Integer, String> entry = iter.next();
-                    if (entry.key().equals(2))
-                        assertEquals("two", entry.value());
-                    else if (entry.key().equals(4))
-                        assertEquals("four", entry.value());
-                    else
-                        fail("Unexpected entry: " + entry);
-                }
-            }
-        } finally {
-            store.close();
-        }
-    }
-
-    @Test
-    public void testPutGetRangeWithDefaultSerdes() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true);
-        try {
-
-            // Verify that the store reads and writes correctly ...
-            store.put(0, "zero");
-            store.put(1, "one");
-            store.put(2, "two");
-            store.put(4, "four");
-            store.put(5, "five");
-            assertEquals(5, driver.sizeOf(store));
-            assertEquals("zero", store.get(0));
-            assertEquals("one", store.get(1));
-            assertEquals("two", store.get(2));
-            assertNull(store.get(3));
-            assertEquals("four", store.get(4));
-            assertEquals("five", store.get(5));
-            store.delete(5);
-
-            // Flush the store and verify all current entries were properly flushed ...
-            store.flush();
-            assertEquals("zero", driver.flushedEntryStored(0));
-            assertEquals("one", driver.flushedEntryStored(1));
-            assertEquals("two", driver.flushedEntryStored(2));
-            assertEquals("four", driver.flushedEntryStored(4));
-            assertEquals(null, driver.flushedEntryStored(5));
-
-            assertEquals(false, driver.flushedEntryRemoved(0));
-            assertEquals(false, driver.flushedEntryRemoved(1));
-            assertEquals(false, driver.flushedEntryRemoved(2));
-            assertEquals(false, driver.flushedEntryRemoved(4));
-            assertEquals(true, driver.flushedEntryRemoved(5));
-        } finally {
-            store.close();
-        }
-    }
-
-    @Test
-    public void testRestore() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
-        // Add any entries that will be restored to any store
-        // that uses the driver's context ...
-        driver.addEntryToRestoreLog(0, "zero");
-        driver.addEntryToRestoreLog(1, "one");
-        driver.addEntryToRestoreLog(2, "two");
-        driver.addEntryToRestoreLog(4, "four");
-
-        // Create the store, which should register with the context and automatically
-        // receive the restore entries ...
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false);
-        try {
-            // Verify that the store's contents were properly restored ...
-            assertEquals(0, driver.checkForRestoredEntries(store));
-
-            // and there are no other entries ...
-            assertEquals(4, driver.sizeOf(store));
-        } finally {
-            store.close();
-        }
-    }
-
-    @Test
-    public void testRestoreWithDefaultSerdes() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
-        // Add any entries that will be restored to any store
-        // that uses the driver's context ...
-        driver.addEntryToRestoreLog(0, "zero");
-        driver.addEntryToRestoreLog(1, "one");
-        driver.addEntryToRestoreLog(2, "two");
-        driver.addEntryToRestoreLog(4, "four");
-
-        // Create the store, which should register with the context and automatically
-        // receive the restore entries ...
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true);
-        try {
-            // Verify that the store's contents were properly restored ...
-            assertEquals(0, driver.checkForRestoredEntries(store));
-
-            // and there are no other entries ...
-            assertEquals(4, driver.sizeOf(store));
-        } finally {
-            store.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
deleted file mode 100644
index b3fe98c..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
-            StreamingConfig config,
-            ProcessorContext context,
-            Class<K> keyClass, Class<V> valueClass,
-            boolean useContextSerdes) {
-
-        StateStoreSupplier supplier;
-        if (useContextSerdes) {
-            Serializer<K> keySer = (Serializer<K>) context.keySerializer();
-            Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
-            Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
-            Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
-        } else {
-            supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).inMemory().build();
-        }
-
-        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
-        store.init(context);
-        return store;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
deleted file mode 100644
index dddb9c7..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.junit.Test;
-
-public class InMemoryLRUCacheStoreTest {
-
-    @Test
-    public void testPutGetRange() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-        StateStoreSupplier supplier = Stores.create("my-store", driver.config())
-                                                     .withIntegerKeys().withStringValues()
-                                                     .inMemory().maxEntries(3)
-                                                     .build();
-        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
-        store.init(driver.context());
-
-        // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
-        store.put(0, "zero");
-        store.put(1, "one");
-        store.put(2, "two");
-        store.put(3, "three");
-        store.put(4, "four");
-        store.put(5, "five");
-
-        // It should only keep the last 4 added ...
-        assertEquals(3, driver.sizeOf(store));
-        assertNull(store.get(0));
-        assertNull(store.get(1));
-        assertNull(store.get(2));
-        assertEquals("three", store.get(3));
-        assertEquals("four", store.get(4));
-        assertEquals("five", store.get(5));
-        store.delete(5);
-
-        // Flush the store and verify all current entries were properly flushed ...
-        store.flush();
-        assertNull(driver.flushedEntryStored(0));
-        assertNull(driver.flushedEntryStored(1));
-        assertNull(driver.flushedEntryStored(2));
-        assertEquals("three", driver.flushedEntryStored(3));
-        assertEquals("four", driver.flushedEntryStored(4));
-        assertNull(driver.flushedEntryStored(5));
-
-        assertEquals(true, driver.flushedEntryRemoved(0));
-        assertEquals(true, driver.flushedEntryRemoved(1));
-        assertEquals(true, driver.flushedEntryRemoved(2));
-        assertEquals(false, driver.flushedEntryRemoved(3));
-        assertEquals(false, driver.flushedEntryRemoved(4));
-        assertEquals(true, driver.flushedEntryRemoved(5));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testPutGetRangeWithDefaultSerdes() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-
-        Serializer<Integer> keySer = (Serializer<Integer>) driver.context().keySerializer();
-        Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
-        Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
-        Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
-        StateStoreSupplier supplier = Stores.create("my-store", driver.config())
-                                                     .withKeys(keySer, keyDeser)
-                                                     .withValues(valSer, valDeser)
-                                                     .inMemory().maxEntries(3)
-                                                     .build();
-        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
-        store.init(driver.context());
-
-        // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
-        store.put(0, "zero");
-        store.put(1, "one");
-        store.put(2, "two");
-        store.put(3, "three");
-        store.put(4, "four");
-        store.put(5, "five");
-
-        // It should only keep the last 4 added ...
-        assertEquals(3, driver.sizeOf(store));
-        assertNull(store.get(0));
-        assertNull(store.get(1));
-        assertNull(store.get(2));
-        assertEquals("three", store.get(3));
-        assertEquals("four", store.get(4));
-        assertEquals("five", store.get(5));
-        store.delete(5);
-
-        // Flush the store and verify all current entries were properly flushed ...
-        store.flush();
-        assertNull(driver.flushedEntryStored(0));
-        assertNull(driver.flushedEntryStored(1));
-        assertNull(driver.flushedEntryStored(2));
-        assertEquals("three", driver.flushedEntryStored(3));
-        assertEquals("four", driver.flushedEntryStored(4));
-        assertNull(driver.flushedEntryStored(5));
-
-        assertEquals(true, driver.flushedEntryRemoved(0));
-        assertEquals(true, driver.flushedEntryRemoved(1));
-        assertEquals(true, driver.flushedEntryRemoved(2));
-        assertEquals(false, driver.flushedEntryRemoved(3));
-        assertEquals(false, driver.flushedEntryRemoved(4));
-        assertEquals(true, driver.flushedEntryRemoved(5));
-    }
-
-    @Test
-    public void testRestore() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
-        // Add any entries that will be restored to any store
-        // that uses the driver's context ...
-        driver.addEntryToRestoreLog(1, "one");
-        driver.addEntryToRestoreLog(2, "two");
-        driver.addEntryToRestoreLog(4, "four");
-
-        // Create the store, which should register with the context and automatically
-        // receive the restore entries ...
-        StateStoreSupplier supplier = Stores.create("my-store", driver.config())
-                                                     .withIntegerKeys().withStringValues()
-                                                     .inMemory().maxEntries(3)
-                                                     .build();
-        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
-        store.init(driver.context());
-
-        // Verify that the store's contents were properly restored ...
-        assertEquals(0, driver.checkForRestoredEntries(store));
-
-        // and there are no other entries ...
-        assertEquals(3, driver.sizeOf(store));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
deleted file mode 100644
index 8bab1c9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ /dev/null
@@ -1,464 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.test.MockProcessorContext;
-import org.apache.kafka.test.MockTimestampExtractor;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that
- * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes.
- * This class simplifies testing of various {@link KeyValueStore} instances, especially those that use
- * {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka topic.
- * <p>
- * <h2>Basic usage</h2>
- * This component can be used to help test a {@link KeyValueStore}'s ability to read and write entries.
- *
- * <pre>
- * // Create the test driver ...
- * KeyValueStoreTestDriver&lt;Integer, String> driver = KeyValueStoreTestDriver.create();
- * KeyValueStore&lt;Integer, String> store = Stores.create("my-store", driver.context())
- *                                              .withIntegerKeys().withStringKeys()
- *                                              .inMemory().build();
- *
- * // Verify that the store reads and writes correctly ...
- * store.put(0, "zero");
- * store.put(1, "one");
- * store.put(2, "two");
- * store.put(4, "four");
- * store.put(5, "five");
- * assertEquals(5, driver.sizeOf(store));
- * assertEquals("zero", store.get(0));
- * assertEquals("one", store.get(1));
- * assertEquals("two", store.get(2));
- * assertEquals("four", store.get(4));
- * assertEquals("five", store.get(5));
- * assertNull(store.get(3));
- * store.delete(5);
- *
- * // Flush the store and verify all current entries were properly flushed ...
- * store.flush();
- * assertEquals("zero", driver.flushedEntryStored(0));
- * assertEquals("one", driver.flushedEntryStored(1));
- * assertEquals("two", driver.flushedEntryStored(2));
- * assertEquals("four", driver.flushedEntryStored(4));
- * assertEquals(null, driver.flushedEntryStored(5));
- *
- * assertEquals(false, driver.flushedEntryRemoved(0));
- * assertEquals(false, driver.flushedEntryRemoved(1));
- * assertEquals(false, driver.flushedEntryRemoved(2));
- * assertEquals(false, driver.flushedEntryRemoved(4));
- * assertEquals(true, driver.flushedEntryRemoved(5));
- * </pre>
- *
- * <p>
- * <h2>Restoring a store</h2>
- * This component can be used to test whether a {@link KeyValueStore} implementation properly
- * {@link ProcessorContext#register(StateStore, StateRestoreCallback) registers itself} with the {@link ProcessorContext}, so that
- * the persisted contents of a store are properly restored from the flushed entries when the store instance is started.
- * <p>
- * To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object, Object) add entries} that will be
- * passed to the store upon creation (simulating the entries that were previously flushed to the topic), and then create the store
- * using this driver's {@link #context() ProcessorContext}:
- *
- * <pre>
- * // Create the test driver ...
- * KeyValueStoreTestDriver&lt;Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
- *
- * // Add any entries that will be restored to any store that uses the driver's context ...
- * driver.addRestoreEntry(0, "zero");
- * driver.addRestoreEntry(1, "one");
- * driver.addRestoreEntry(2, "two");
- * driver.addRestoreEntry(4, "four");
- *
- * // Create the store, which should register with the context and automatically
- * // receive the restore entries ...
- * KeyValueStore&lt;Integer, String> store = Stores.create("my-store", driver.context())
- *                                              .withIntegerKeys().withStringKeys()
- *                                              .inMemory().build();
- *
- * // Verify that the store's contents were properly restored ...
- * assertEquals(0, driver.checkForRestoredEntries(store));
- *
- * // and there are no other entries ...
- * assertEquals(4, driver.sizeOf(store));
- * </pre>
- *
- * @param <K> the type of keys placed in the store
- * @param <V> the type of values placed in the store
- */
-public class KeyValueStoreTestDriver<K, V> {
-
-    private static <T> Serializer<T> unusableSerializer() {
-        return new Serializer<T>() {
-            @Override
-            public void configure(Map<String, ?> configs, boolean isKey) {
-            }
-
-            @Override
-            public byte[] serialize(String topic, T data) {
-                throw new UnsupportedOperationException("This serializer should not be used");
-            }
-
-            @Override
-            public void close() {
-            }
-        };
-    };
-
-    private static <T> Deserializer<T> unusableDeserializer() {
-        return new Deserializer<T>() {
-            @Override
-            public void configure(Map<String, ?> configs, boolean isKey) {
-            }
-
-            @Override
-            public T deserialize(String topic, byte[] data) {
-                throw new UnsupportedOperationException("This deserializer should not be used");
-            }
-
-            @Override
-            public void close() {
-            }
-        };
-    };
-
-    /**
-     * Create a driver object that will have a {@link #context()} that records messages
-     * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides <em>unusable</em> default key and
-     * value serializers and deserializers. This can be used when the actual serializers and deserializers are supplied to the
-     * store during creation, which should eliminate the need for a store to depend on the ProcessorContext's default key and
-     * value serializers and deserializers.
-     *
-     * @return the test driver; never null
-     */
-    public static <K, V> KeyValueStoreTestDriver<K, V> create() {
-        Serializer<K> keySerializer = unusableSerializer();
-        Deserializer<K> keyDeserializer = unusableDeserializer();
-        Serializer<V> valueSerializer = unusableSerializer();
-        Deserializer<V> valueDeserializer = unusableDeserializer();
-        Serdes<K, V> serdes = new Serdes<K, V>("unexpected", keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
-        return new KeyValueStoreTestDriver<K, V>(serdes);
-    }
-
-    /**
-     * Create a driver object that will have a {@link #context()} that records messages
-     * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides default serializers and
-     * deserializers for the given built-in key and value types (e.g., {@code String.class}, {@code Integer.class},
-     * {@code Long.class}, and {@code byte[].class}). This can be used when store is created to rely upon the
-     * ProcessorContext's default key and value serializers and deserializers.
-     *
-     * @param keyClass the class for the keys; must be one of {@code String.class}, {@code Integer.class},
-     *            {@code Long.class}, or {@code byte[].class}
-     * @param valueClass the class for the values; must be one of {@code String.class}, {@code Integer.class},
-     *            {@code Long.class}, or {@code byte[].class}
-     * @return the test driver; never null
-     */
-    public static <K, V> KeyValueStoreTestDriver<K, V> create(Class<K> keyClass, Class<V> valueClass) {
-        Serdes<K, V> serdes = Serdes.withBuiltinTypes("unexpected", keyClass, valueClass);
-        return new KeyValueStoreTestDriver<K, V>(serdes);
-    }
-
-    /**
-     * Create a driver object that will have a {@link #context()} that records messages
-     * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides the specified serializers and
-     * deserializers. This can be used when store is created to rely upon the ProcessorContext's default key and value serializers
-     * and deserializers.
-     *
-     * @param keySerializer the key serializer for the {@link ProcessorContext}; may not be null
-     * @param keyDeserializer the key deserializer for the {@link ProcessorContext}; may not be null
-     * @param valueSerializer the value serializer for the {@link ProcessorContext}; may not be null
-     * @param valueDeserializer the value deserializer for the {@link ProcessorContext}; may not be null
-     * @return the test driver; never null
-     */
-    public static <K, V> KeyValueStoreTestDriver<K, V> create(Serializer<K> keySerializer,
-                                                              Deserializer<K> keyDeserializer,
-                                                              Serializer<V> valueSerializer,
-                                                              Deserializer<V> valueDeserializer) {
-        Serdes<K, V> serdes = new Serdes<K, V>("unexpected", keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
-        return new KeyValueStoreTestDriver<K, V>(serdes);
-    }
-
-    private final Serdes<K, V> serdes;
-    private final Map<K, V> flushedEntries = new HashMap<>();
-    private final Set<K> flushedRemovals = new HashSet<>();
-    private final List<Entry<K, V>> restorableEntries = new LinkedList<>();
-    private final StreamingConfig config;
-    private final MockProcessorContext context;
-    private final Map<String, StateStore> storeMap = new HashMap<>();
-    private final StreamingMetrics metrics = new StreamingMetrics() {
-        @Override
-        public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
-            return null;
-        }
-
-        @Override
-        public void recordLatency(Sensor sensor, long startNs, long endNs) {
-        }
-    };
-    private final RecordCollector recordCollector;
-    private File stateDir = new File("build/data").getAbsoluteFile();
-
-    protected KeyValueStoreTestDriver(Serdes<K, V> serdes) {
-        this.serdes = serdes;
-        ByteArraySerializer rawSerializer = new ByteArraySerializer();
-        Producer<byte[], byte[]> producer = new MockProducer<byte[], byte[]>(true, rawSerializer, rawSerializer);
-        this.recordCollector = new RecordCollector(producer) {
-            @Override
-            public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                recordFlushed(record.key(), record.value());
-            }
-        };
-        Properties props = new Properties();
-        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
-        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass());
-        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
-        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
-        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
-        this.config = new StreamingConfig(props);
-
-        this.context = new MockProcessorContext(null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
-                serdes.valueDeserializer(), recordCollector) {
-            @Override
-            public TaskId id() {
-                return new TaskId(0, 1);
-            }
-
-            @Override
-            public <K1, V1> void forward(K1 key, V1 value, int childIndex) {
-                forward(key, value);
-            }
-
-            @Override
-            public void register(StateStore store, StateRestoreCallback func) {
-                storeMap.put(store.name(), store);
-                restoreEntries(func);
-            }
-
-            @Override
-            public StateStore getStateStore(String name) {
-                return storeMap.get(name);
-            }
-
-            @Override
-            public StreamingMetrics metrics() {
-                return metrics;
-            }
-
-            @Override
-            public File stateDir() {
-                if (stateDir == null) {
-                    stateDir = StateUtils.tempDir();
-                }
-                stateDir.mkdirs();
-                return stateDir;
-            }
-        };
-    }
-
-    /**
-     * Set the directory that should be used by the store for local disk storage.
-     *
-     * @param dir the directory; may be null if no local storage is allowed
-     */
-    public void useStateDir(File dir) {
-        this.stateDir = dir;
-    }
-
-    @SuppressWarnings("unchecked")
-    protected <K1, V1> void recordFlushed(K1 key, V1 value) {
-        K k = (K) key;
-        if (value == null) {
-            // This is a removal ...
-            flushedRemovals.add(k);
-            flushedEntries.remove(k);
-        } else {
-            // This is a normal add
-            flushedEntries.put(k, (V) value);
-            flushedRemovals.remove(k);
-        }
-    }
-
-    private void restoreEntries(StateRestoreCallback func) {
-        for (Entry<K, V> entry : restorableEntries) {
-            if (entry != null) {
-                byte[] rawKey = serdes.rawKey(entry.key());
-                byte[] rawValue = serdes.rawValue(entry.value());
-                func.restore(rawKey, rawValue);
-            }
-        }
-    }
-
-    /**
-     * This method adds an entry to the "restore log" for the {@link KeyValueStore}, and is used <em>only</em> when testing the
-     * restore functionality of a {@link KeyValueStore} implementation.
-     * <p>
-     * To create such a test, create the test driver, call this method one or more times, and then create the
-     * {@link KeyValueStore}. Your tests can then check whether the store contains the entries from the log.
-     *
-     * <pre>
-     * // Set up the driver and pre-populate the log ...
-     * KeyValueStoreTestDriver&lt;Integer, String> driver = KeyValueStoreTestDriver.create();
-     * driver.addRestoreEntry(1,"value1");
-     * driver.addRestoreEntry(2,"value2");
-     * driver.addRestoreEntry(3,"value3");
-     *
-     * // Create the store using the driver's context ...
-     * ProcessorContext context = driver.context();
-     * KeyValueStore&lt;Integer, String> store = ...
-     *
-     * // Verify that the store's contents were properly restored from the log ...
-     * assertEquals(0, driver.checkForRestoredEntries(store));
-     *
-     * // and there are no other entries ...
-     * assertEquals(3, driver.sizeOf(store));
-     * </pre>
-     *
-     * @param key the key for the entry
-     * @param value the value for the entry
-     * @see #checkForRestoredEntries(KeyValueStore)
-     */
-    public void addEntryToRestoreLog(K key, V value) {
-        restorableEntries.add(new Entry<K, V>(key, value));
-    }
-
-    /**
-     * Get the streaming config that should be supplied to a {@link Serdes}'s constructor.
-     *
-     * @return the streaming config; never null
-     */
-    public StreamingConfig config() {
-        return config;
-    }
-
-    /**
-     * Get the context that should be supplied to a {@link KeyValueStore}'s constructor. This context records any messages
-     * written by the store to the Kafka topic, making them available via the {@link #flushedEntryStored(Object)} and
-     * {@link #flushedEntryRemoved(Object)} methods.
-     * <p>
-     * If the {@link KeyValueStore}'s are to be restored upon its startup, be sure to {@link #addEntryToRestoreLog(Object, Object)
-     * add the restore entries} before creating the store with the {@link ProcessorContext} returned by this method.
-     *
-     * @return the processing context; never null
-     * @see #addEntryToRestoreLog(Object, Object)
-     */
-    public ProcessorContext context() {
-        return context;
-    }
-
-    /**
-     * Get the entries that are restored to a KeyValueStore when it is constructed with this driver's {@link #context()
-     * ProcessorContext}.
-     *
-     * @return the restore entries; never null but possibly a null iterator
-     */
-    public Iterable<Entry<K, V>> restoredEntries() {
-        return restorableEntries;
-    }
-
-    /**
-     * Utility method that will count the number of {@link #addEntryToRestoreLog(Object, Object) restore entries} missing from the
-     * supplied store.
-     *
-     * @param store the store that is to have all of the {@link #restoredEntries() restore entries}
-     * @return the number of restore entries missing from the store, or 0 if all restore entries were found
-     * @see #addEntryToRestoreLog(Object, Object)
-     */
-    public int checkForRestoredEntries(KeyValueStore<K, V> store) {
-        int missing = 0;
-        for (Entry<K, V> entry : restorableEntries) {
-            if (entry != null) {
-                V value = store.get(entry.key());
-                if (!Objects.equals(value, entry.value())) {
-                    ++missing;
-                }
-            }
-        }
-        return missing;
-    }
-
-    /**
-     * Utility method to compute the number of entries within the store.
-     *
-     * @param store the key value store using this {@link #context()}.
-     * @return the number of entries
-     */
-    public int sizeOf(KeyValueStore<K, V> store) {
-        int size = 0;
-        for (KeyValueIterator<K, V> iterator = store.all(); iterator.hasNext();) {
-            iterator.next();
-            ++size;
-        }
-        return size;
-    }
-
-    /**
-     * Retrieve the value that the store {@link KeyValueStore#flush() flushed} with the given key.
-     *
-     * @param key the key
-     * @return the value that was flushed with the key, or {@code null} if no such key was flushed or if the entry with this
-     *         key was {@link #flushedEntryStored(Object) removed} upon flush
-     */
-    public V flushedEntryStored(K key) {
-        return flushedEntries.get(key);
-    }
-
-    /**
-     * Determine whether the store {@link KeyValueStore#flush() flushed} the removal of the given key.
-     *
-     * @param key the key
-     * @return {@code true} if the entry with the given key was removed when flushed, or {@code false} if the entry was not
-     *         removed when last flushed
-     */
-    public boolean flushedEntryRemoved(K key) {
-        return flushedRemovals.contains(key);
-    }
-
-    /**
-     * Remove all {@link #flushedEntryStored(Object) flushed entries}, {@link #flushedEntryRemoved(Object) flushed removals},
-     */
-    public void clear() {
-        restorableEntries.clear();
-        flushedEntries.clear();
-        flushedRemovals.clear();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
deleted file mode 100644
index 37a12f9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
-            StreamingConfig config,
-            ProcessorContext context,
-            Class<K> keyClass,
-            Class<V> valueClass,
-            boolean useContextSerdes) {
-
-        StateStoreSupplier supplier;
-        if (useContextSerdes) {
-            Serializer<K> keySer = (Serializer<K>) context.keySerializer();
-            Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
-            Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
-            Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
-        } else {
-            supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).localDatabase().build();
-        }
-
-        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
-        store.init(context);
-        return store;
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
deleted file mode 100644
index c7ea748..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.test.TestUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * A utility for tests to create and manage unique and isolated directories on the file system for local state.
- */
-public class StateUtils {
-
-    private static final AtomicLong INSTANCE_COUNTER = new AtomicLong();
-
-    /**
-     * Create a new temporary directory that will be cleaned up automatically upon shutdown.
-     * @return the new directory that will exist; never null
-     */
-    public static File tempDir() {
-        final File dir = new File(TestUtils.IO_TMP_DIR, "kafka-" + INSTANCE_COUNTER.incrementAndGet());
-        dir.mkdirs();
-        dir.deleteOnExit();
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                deleteDirectory(dir);
-            }
-        });
-        return dir;
-    }
-
-    private static void deleteDirectory(File dir) {
-        if (dir != null && dir.exists()) {
-            try {
-                Files.walkFileTree(dir.toPath(), new SimpleFileVisitor<Path>() {
-                    @Override
-                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
-                        Files.delete(file);
-                        return FileVisitResult.CONTINUE;
-                    }
-
-                    @Override
-                    public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
-                        Files.delete(dir);
-                        return FileVisitResult.CONTINUE;
-                    }
-
-                });
-            } catch (IOException e) {
-                // do nothing
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
deleted file mode 100644
index ca5f33d..0000000
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-
-import java.util.List;
-
-public class KStreamTestDriver {
-
-    private final ProcessorTopology topology;
-    private final MockProcessorContext context;
-    private ProcessorNode currNode;
-
-    public KStreamTestDriver(KStreamBuilder builder) {
-        this(builder, null, null);
-    }
-
-    public KStreamTestDriver(KStreamBuilder builder, Serializer<?> serializer, Deserializer<?> deserializer) {
-        this.topology = builder.build(null);
-        this.context = new MockProcessorContext(this, serializer, deserializer);
-
-        for (ProcessorNode node : topology.processors()) {
-            currNode = node;
-            try {
-                node.init(context);
-            } finally {
-                currNode = null;
-            }
-        }
-    }
-
-    public void process(String topicName, Object key, Object value) {
-        currNode = topology.source(topicName);
-        try {
-            forward(key, value);
-        } finally {
-            currNode = null;
-        }
-    }
-
-    public void setTime(long timestamp) {
-        context.setTime(timestamp);
-    }
-
-    public StateStore getStateStore(String name) {
-        return context.getStateStore(name);
-    }
-
-    @SuppressWarnings("unchecked")
-    public <K, V> void forward(K key, V value) {
-        ProcessorNode thisNode = currNode;
-        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
-            currNode = childNode;
-            try {
-                childNode.process(key, value);
-            } finally {
-                currNode = thisNode;
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    public <K, V> void forward(K key, V value, int childIndex) {
-        ProcessorNode thisNode = currNode;
-        ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex);
-        currNode = childNode;
-        try {
-            childNode.process(key, value);
-        } finally {
-            currNode = thisNode;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
deleted file mode 100644
index 40f11a0..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
-
-    private final KStreamTestDriver driver;
-    private final Serializer keySerializer;
-    private final Serializer valueSerializer;
-    private final Deserializer keyDeserializer;
-    private final Deserializer valueDeserializer;
-    private final RecordCollector.Supplier recordCollectorSupplier;
-
-    private Map<String, StateStore> storeMap = new HashMap<>();
-
-    long timestamp = -1L;
-
-    public MockProcessorContext(KStreamTestDriver driver, Serializer<?> serializer, Deserializer<?> deserializer) {
-        this(driver, serializer, deserializer, serializer, deserializer, (RecordCollector.Supplier) null);
-    }
-
-    public MockProcessorContext(KStreamTestDriver driver, Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
-            Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
-            final RecordCollector collector) {
-        this(driver, keySerializer, keyDeserializer, valueSerializer, valueDeserializer,
-                collector == null ? null : new RecordCollector.Supplier() {
-                    @Override
-                    public RecordCollector recordCollector() {
-                        return collector;
-                    }
-                });
-    }
-
-    public MockProcessorContext(KStreamTestDriver driver, Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
-            Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
-            RecordCollector.Supplier collectorSupplier) {
-        this.driver = driver;
-        this.keySerializer = keySerializer;
-        this.valueSerializer = valueSerializer;
-        this.keyDeserializer = keyDeserializer;
-        this.valueDeserializer = valueDeserializer;
-        this.recordCollectorSupplier = collectorSupplier;
-    }
-
-    @Override
-    public RecordCollector recordCollector() {
-        if (recordCollectorSupplier == null) {
-            throw new UnsupportedOperationException("No RecordCollector specified");
-        }
-        return recordCollectorSupplier.recordCollector();
-    }
-
-    public void setTime(long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    @Override
-    public TaskId id() {
-        return new TaskId(0, 0);
-    }
-
-    @Override
-    public Serializer<?> keySerializer() {
-        return keySerializer;
-    }
-
-    @Override
-    public Serializer<?> valueSerializer() {
-        return valueSerializer;
-    }
-
-    @Override
-    public Deserializer<?> keyDeserializer() {
-        return keyDeserializer;
-    }
-
-    @Override
-    public Deserializer<?> valueDeserializer() {
-        return valueDeserializer;
-    }
-
-    @Override
-    public File stateDir() {
-        throw new UnsupportedOperationException("stateDir() not supported.");
-    }
-
-    @Override
-    public StreamingMetrics metrics() {
-        throw new UnsupportedOperationException("metrics() not supported.");
-    }
-
-    @Override
-    public void register(StateStore store, StateRestoreCallback func) {
-        if (func != null) throw new UnsupportedOperationException("StateRestoreCallback not supported.");
-        storeMap.put(store.name(), store);
-    }
-
-    @Override
-    public StateStore getStateStore(String name) {
-        return storeMap.get(name);
-    }
-
-    @Override
-    public void schedule(long interval) {
-        throw new UnsupportedOperationException("schedule() not supported");
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public <K, V> void forward(K key, V value) {
-        driver.forward(key, value);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public <K, V> void forward(K key, V value, int childIndex) {
-        driver.forward(key, value, childIndex);
-    }
-
-    @Override
-    public void commit() {
-        throw new UnsupportedOperationException("commit() not supported.");
-    }
-
-    @Override
-    public String topic() {
-        throw new UnsupportedOperationException("topic() not supported.");
-    }
-
-    @Override
-    public int partition() {
-        throw new UnsupportedOperationException("partition() not supported.");
-    }
-
-    @Override
-    public long offset() {
-        throw new UnsupportedOperationException("offset() not supported.");
-    }
-
-    @Override
-    public long timestamp() {
-        return this.timestamp;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
deleted file mode 100644
index f1aa167..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.util.ArrayList;
-
-public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
-
-    public final ArrayList<String> processed = new ArrayList<>();
-    public final ArrayList<Long> punctuated = new ArrayList<>();
-
-    @Override
-    public Processor<K, V> get() {
-        return new MockProcessor();
-    }
-
-    public class MockProcessor implements Processor<K, V> {
-
-        @Override
-        public void init(ProcessorContext context) {
-            // do nothing
-        }
-
-        @Override
-        public void process(K key, V value) {
-            processed.add(key + ":" + value);
-        }
-
-        @Override
-        public void punctuate(long streamTime) {
-            punctuated.add(streamTime);
-        }
-
-        @Override
-        public void close() {
-            // do nothing
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
deleted file mode 100644
index cf0202e..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.processor.internals.SourceNode;
-
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class MockSourceNode<K, V> extends SourceNode<K, V> {
-
-    public static final String NAME = "MOCK-SOURCE-";
-    public static final AtomicInteger INDEX = new AtomicInteger(1);
-
-    public int numReceived = 0;
-    public final ArrayList<K> keys = new ArrayList<>();
-    public final ArrayList<V> values = new ArrayList<>();
-
-    public MockSourceNode(Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) {
-        super(NAME + INDEX.getAndIncrement(), keyDeserializer, valDeserializer);
-    }
-
-    @Override
-    public void process(K key, V value) {
-        this.numReceived++;
-        this.keys.add(key);
-        this.values.add(value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
deleted file mode 100644
index 16635b7..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.ArrayList;
-
-public class MockStateStoreSupplier implements StateStoreSupplier {
-    private final String name;
-    private final boolean persistent;
-
-    public MockStateStoreSupplier(String name, boolean persistent) {
-        this.name = name;
-        this.persistent = persistent;
-    }
-
-    @Override
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public StateStore get() {
-        return new MockStateStore(name, persistent);
-    }
-
-    public static class MockStateStore implements StateStore {
-        private final String name;
-        private final boolean persistent;
-
-        public boolean initialized = false;
-        public boolean flushed = false;
-        public boolean closed = false;
-        public final ArrayList<Integer> keys = new ArrayList<>();
-
-        public MockStateStore(String name, boolean persistent) {
-            this.name = name;
-            this.persistent = persistent;
-        }
-
-        @Override
-        public String name() {
-            return name;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            context.register(this, stateRestoreCallback);
-            initialized = true;
-        }
-
-        @Override
-        public void flush() {
-            flushed = true;
-        }
-
-        @Override
-        public void close() {
-            closed = true;
-        }
-
-        @Override
-        public boolean persistent() {
-            return persistent;
-        }
-
-        public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
-            private final Deserializer<Integer> deserializer = new IntegerDeserializer();
-
-            @Override
-            public void restore(byte[] key, byte[] value) {
-                keys.add(deserializer.deserialize("", key));
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
deleted file mode 100644
index 274e7b5..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-/* Extract the timestamp as the offset of the record */
-public class MockTimestampExtractor implements TimestampExtractor {
-
-    @Override
-    public long extract(ConsumerRecord<Object, Object> record) {
-        return record.offset();
-    }
-}