You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/10 01:27:28 UTC
[2/8] kafka git commit: MINOR: remove Kafka Streams in 0.9.0
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
deleted file mode 100644
index 909df13..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ /dev/null
@@ -1,464 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.PartitionGrouper;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Test;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-public class StreamThreadTest {
-
- private TopicPartition t1p1 = new TopicPartition("topic1", 1);
- private TopicPartition t1p2 = new TopicPartition("topic1", 2);
- private TopicPartition t2p1 = new TopicPartition("topic2", 1);
- private TopicPartition t2p2 = new TopicPartition("topic2", 2);
- private TopicPartition t3p1 = new TopicPartition("topic3", 1);
- private TopicPartition t3p2 = new TopicPartition("topic3", 2);
-
- private List<PartitionInfo> infos = Arrays.asList(
- new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
- );
-
- private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
-
- PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"));
-
- // task0 is unused
- private final TaskId task1 = new TaskId(0, 1);
- private final TaskId task2 = new TaskId(0, 2);
- private final TaskId task3 = new TaskId(0, 3);
- private final TaskId task4 = new TaskId(1, 1);
- private final TaskId task5 = new TaskId(1, 2);
-
- private Properties configProps() {
- return new Properties() {
- {
- setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
- setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
- setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
- }
- };
- }
-
- private static class TestStreamTask extends StreamTask {
- public boolean committed = false;
-
- public TestStreamTask(TaskId id,
- Consumer<byte[], byte[]> consumer,
- Producer<byte[], byte[]> producer,
- Consumer<byte[], byte[]> restoreConsumer,
- Collection<TopicPartition> partitions,
- ProcessorTopology topology,
- StreamingConfig config) {
- super(id, consumer, producer, restoreConsumer, partitions, topology, config, null);
- }
-
- @Override
- public void commit() {
- super.commit();
- committed = true;
- }
- }
-
- private ByteArraySerializer serializer = new ByteArraySerializer();
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPartitionAssignmentChange() throws Exception {
- StreamingConfig config = new StreamingConfig(configProps());
-
- MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
- MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("source1", "topic1");
- builder.addSource("source2", "topic2");
- builder.addSource("source3", "topic3");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
-
- StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), new SystemTime()) {
- @Override
- protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
- ProcessorTopology topology = builder.build(id.topicGroupId);
- return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
- }
- };
-
- initPartitionGrouper(thread);
-
- ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
-
- assertTrue(thread.tasks().isEmpty());
-
- List<TopicPartition> revokedPartitions;
- List<TopicPartition> assignedPartitions;
- Set<TopicPartition> expectedGroup1;
- Set<TopicPartition> expectedGroup2;
-
- revokedPartitions = Collections.emptyList();
- assignedPartitions = Collections.singletonList(t1p1);
- expectedGroup1 = new HashSet<>(Arrays.asList(t1p1));
-
- rebalanceListener.onPartitionsRevoked(revokedPartitions);
- rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
- assertTrue(thread.tasks().containsKey(task1));
- assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
- assertEquals(1, thread.tasks().size());
-
- revokedPartitions = assignedPartitions;
- assignedPartitions = Collections.singletonList(t1p2);
- expectedGroup2 = new HashSet<>(Arrays.asList(t1p2));
-
- rebalanceListener.onPartitionsRevoked(revokedPartitions);
- rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
- assertTrue(thread.tasks().containsKey(task2));
- assertEquals(expectedGroup2, thread.tasks().get(task2).partitions());
- assertEquals(1, thread.tasks().size());
-
- revokedPartitions = assignedPartitions;
- assignedPartitions = Arrays.asList(t1p1, t1p2);
- expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
- expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
-
- rebalanceListener.onPartitionsRevoked(revokedPartitions);
- rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
- assertTrue(thread.tasks().containsKey(task1));
- assertTrue(thread.tasks().containsKey(task2));
- assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
- assertEquals(expectedGroup2, thread.tasks().get(task2).partitions());
- assertEquals(2, thread.tasks().size());
-
- revokedPartitions = assignedPartitions;
- assignedPartitions = Arrays.asList(t2p1, t2p2, t3p1, t3p2);
- expectedGroup1 = new HashSet<>(Arrays.asList(t2p1, t3p1));
- expectedGroup2 = new HashSet<>(Arrays.asList(t2p2, t3p2));
-
- rebalanceListener.onPartitionsRevoked(revokedPartitions);
- rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
- assertTrue(thread.tasks().containsKey(task4));
- assertTrue(thread.tasks().containsKey(task5));
- assertEquals(expectedGroup1, thread.tasks().get(task4).partitions());
- assertEquals(expectedGroup2, thread.tasks().get(task5).partitions());
- assertEquals(2, thread.tasks().size());
-
- revokedPartitions = assignedPartitions;
- assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1);
- expectedGroup1 = new HashSet<>(Arrays.asList(t1p1));
- expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
-
- rebalanceListener.onPartitionsRevoked(revokedPartitions);
- rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
- assertTrue(thread.tasks().containsKey(task1));
- assertTrue(thread.tasks().containsKey(task4));
- assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
- assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
- assertEquals(2, thread.tasks().size());
-
- revokedPartitions = assignedPartitions;
- assignedPartitions = Collections.emptyList();
-
- rebalanceListener.onPartitionsRevoked(revokedPartitions);
- rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
- assertTrue(thread.tasks().isEmpty());
- }
-
- @Test
- public void testMaybeClean() throws Exception {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- final long cleanupDelay = 1000L;
- Properties props = configProps();
- props.setProperty(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay));
- props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
-
- StreamingConfig config = new StreamingConfig(props);
-
- File stateDir1 = new File(baseDir, task1.toString());
- File stateDir2 = new File(baseDir, task2.toString());
- File stateDir3 = new File(baseDir, task3.toString());
- File extraDir = new File(baseDir, "X");
- stateDir1.mkdir();
- stateDir2.mkdir();
- stateDir3.mkdir();
- extraDir.mkdir();
-
- MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
- MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
- MockTime mockTime = new MockTime();
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("source1", "topic1");
-
- StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), mockTime) {
- @Override
- public void maybeClean() {
- super.maybeClean();
- }
-
- @Override
- protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
- ProcessorTopology topology = builder.build(id.topicGroupId);
- return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
- }
- };
-
- initPartitionGrouper(thread);
-
- ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
-
- assertTrue(thread.tasks().isEmpty());
- mockTime.sleep(cleanupDelay);
-
- // all directories exist since an assignment didn't happen
- assertTrue(stateDir1.exists());
- assertTrue(stateDir2.exists());
- assertTrue(stateDir3.exists());
- assertTrue(extraDir.exists());
-
- List<TopicPartition> revokedPartitions;
- List<TopicPartition> assignedPartitions;
- Map<Integer, StreamTask> prevTasks;
-
- //
- // Assign t1p1 and t1p2. This should create task1 & task2
- //
- revokedPartitions = Collections.emptyList();
- assignedPartitions = Arrays.asList(t1p1, t1p2);
- prevTasks = new HashMap(thread.tasks());
-
- rebalanceListener.onPartitionsRevoked(revokedPartitions);
- rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
- // there shouldn't be any previous task
- assertTrue(prevTasks.isEmpty());
-
- // task 1 & 2 are created
- assertEquals(2, thread.tasks().size());
-
- // all directories should still exit before the cleanup delay time
- mockTime.sleep(cleanupDelay - 10L);
- thread.maybeClean();
- assertTrue(stateDir1.exists());
- assertTrue(stateDir2.exists());
- assertTrue(stateDir3.exists());
- assertTrue(extraDir.exists());
-
- // all state directories except for task task2 & task3 will be removed. the extra directory should still exists
- mockTime.sleep(11L);
- thread.maybeClean();
- assertTrue(stateDir1.exists());
- assertTrue(stateDir2.exists());
- assertFalse(stateDir3.exists());
- assertTrue(extraDir.exists());
-
- //
- // Revoke t1p1 and t1p2. This should remove task1 & task2
- //
- revokedPartitions = assignedPartitions;
- assignedPartitions = Collections.emptyList();
- prevTasks = new HashMap(thread.tasks());
-
- rebalanceListener.onPartitionsRevoked(revokedPartitions);
- rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
- // previous tasks should be committed
- assertEquals(2, prevTasks.size());
- for (StreamTask task : prevTasks.values()) {
- assertTrue(((TestStreamTask) task).committed);
- ((TestStreamTask) task).committed = false;
- }
-
- // no task
- assertTrue(thread.tasks().isEmpty());
-
- // all state directories for task task1 & task2 still exist before the cleanup delay time
- mockTime.sleep(cleanupDelay - 10L);
- thread.maybeClean();
- assertTrue(stateDir1.exists());
- assertTrue(stateDir2.exists());
- assertFalse(stateDir3.exists());
- assertTrue(extraDir.exists());
-
- // all state directories for task task1 & task2 are removed
- mockTime.sleep(11L);
- thread.maybeClean();
- assertFalse(stateDir1.exists());
- assertFalse(stateDir2.exists());
- assertFalse(stateDir3.exists());
- assertTrue(extraDir.exists());
-
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- @Test
- public void testMaybeCommit() throws Exception {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- final long commitInterval = 1000L;
- Properties props = configProps();
- props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
- props.setProperty(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
-
- StreamingConfig config = new StreamingConfig(props);
-
- MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
- MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- MockTime mockTime = new MockTime();
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("source1", "topic1");
-
- StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), mockTime) {
- @Override
- public void maybeCommit() {
- super.maybeCommit();
- }
-
- @Override
- protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
- ProcessorTopology topology = builder.build(id.topicGroupId);
- return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
- }
- };
-
- initPartitionGrouper(thread);
-
- ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
-
- List<TopicPartition> revokedPartitions;
- List<TopicPartition> assignedPartitions;
-
- //
- // Assign t1p1 and t1p2. This should create Task 1 & 2
- //
- revokedPartitions = Collections.emptyList();
- assignedPartitions = Arrays.asList(t1p1, t1p2);
-
- rebalanceListener.onPartitionsRevoked(revokedPartitions);
- rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
- assertEquals(2, thread.tasks().size());
-
- // no task is committed before the commit interval
- mockTime.sleep(commitInterval - 10L);
- thread.maybeCommit();
- for (StreamTask task : thread.tasks().values()) {
- assertFalse(((TestStreamTask) task).committed);
- }
-
- // all tasks are committed after the commit interval
- mockTime.sleep(11L);
- thread.maybeCommit();
- for (StreamTask task : thread.tasks().values()) {
- assertTrue(((TestStreamTask) task).committed);
- ((TestStreamTask) task).committed = false;
- }
-
- // no task is committed before the commit interval, again
- mockTime.sleep(commitInterval - 10L);
- thread.maybeCommit();
- for (StreamTask task : thread.tasks().values()) {
- assertFalse(((TestStreamTask) task).committed);
- }
-
- // all tasks are committed after the commit interval, again
- mockTime.sleep(11L);
- thread.maybeCommit();
- for (StreamTask task : thread.tasks().values()) {
- assertTrue(((TestStreamTask) task).committed);
- ((TestStreamTask) task).committed = false;
- }
-
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- private void initPartitionGrouper(StreamThread thread) {
- PartitionGrouper partitionGrouper = thread.partitionGrouper();
-
- KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-
- partitionAssignor.configure(
- Collections.singletonMap(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE, partitionGrouper)
- );
-
- Map<String, PartitionAssignor.Assignment> assignments =
- partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));
-
- partitionAssignor.onAssignment(assignments.get("client"));
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
deleted file mode 100644
index 209f3c9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.junit.Test;
-
-public abstract class AbstractKeyValueStoreTest {
-
- protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(StreamingConfig config,
- ProcessorContext context,
- Class<K> keyClass, Class<V> valueClass,
- boolean useContextSerdes);
-
- @Test
- public void testPutGetRange() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false);
- try {
-
- // Verify that the store reads and writes correctly ...
- store.put(0, "zero");
- store.put(1, "one");
- store.put(2, "two");
- store.put(4, "four");
- store.put(5, "five");
- assertEquals(5, driver.sizeOf(store));
- assertEquals("zero", store.get(0));
- assertEquals("one", store.get(1));
- assertEquals("two", store.get(2));
- assertNull(store.get(3));
- assertEquals("four", store.get(4));
- assertEquals("five", store.get(5));
- store.delete(5);
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
- assertEquals("zero", driver.flushedEntryStored(0));
- assertEquals("one", driver.flushedEntryStored(1));
- assertEquals("two", driver.flushedEntryStored(2));
- assertEquals("four", driver.flushedEntryStored(4));
- assertEquals(null, driver.flushedEntryStored(5));
-
- assertEquals(false, driver.flushedEntryRemoved(0));
- assertEquals(false, driver.flushedEntryRemoved(1));
- assertEquals(false, driver.flushedEntryRemoved(2));
- assertEquals(false, driver.flushedEntryRemoved(4));
- assertEquals(true, driver.flushedEntryRemoved(5));
-
- // Check range iteration ...
- try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
- while (iter.hasNext()) {
- Entry<Integer, String> entry = iter.next();
- if (entry.key().equals(2))
- assertEquals("two", entry.value());
- else if (entry.key().equals(4))
- assertEquals("four", entry.value());
- else
- fail("Unexpected entry: " + entry);
- }
- }
-
- // Check range iteration ...
- try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
- while (iter.hasNext()) {
- Entry<Integer, String> entry = iter.next();
- if (entry.key().equals(2))
- assertEquals("two", entry.value());
- else if (entry.key().equals(4))
- assertEquals("four", entry.value());
- else
- fail("Unexpected entry: " + entry);
- }
- }
- } finally {
- store.close();
- }
- }
-
- @Test
- public void testPutGetRangeWithDefaultSerdes() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true);
- try {
-
- // Verify that the store reads and writes correctly ...
- store.put(0, "zero");
- store.put(1, "one");
- store.put(2, "two");
- store.put(4, "four");
- store.put(5, "five");
- assertEquals(5, driver.sizeOf(store));
- assertEquals("zero", store.get(0));
- assertEquals("one", store.get(1));
- assertEquals("two", store.get(2));
- assertNull(store.get(3));
- assertEquals("four", store.get(4));
- assertEquals("five", store.get(5));
- store.delete(5);
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
- assertEquals("zero", driver.flushedEntryStored(0));
- assertEquals("one", driver.flushedEntryStored(1));
- assertEquals("two", driver.flushedEntryStored(2));
- assertEquals("four", driver.flushedEntryStored(4));
- assertEquals(null, driver.flushedEntryStored(5));
-
- assertEquals(false, driver.flushedEntryRemoved(0));
- assertEquals(false, driver.flushedEntryRemoved(1));
- assertEquals(false, driver.flushedEntryRemoved(2));
- assertEquals(false, driver.flushedEntryRemoved(4));
- assertEquals(true, driver.flushedEntryRemoved(5));
- } finally {
- store.close();
- }
- }
-
- @Test
- public void testRestore() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
- // Add any entries that will be restored to any store
- // that uses the driver's context ...
- driver.addEntryToRestoreLog(0, "zero");
- driver.addEntryToRestoreLog(1, "one");
- driver.addEntryToRestoreLog(2, "two");
- driver.addEntryToRestoreLog(4, "four");
-
- // Create the store, which should register with the context and automatically
- // receive the restore entries ...
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false);
- try {
- // Verify that the store's contents were properly restored ...
- assertEquals(0, driver.checkForRestoredEntries(store));
-
- // and there are no other entries ...
- assertEquals(4, driver.sizeOf(store));
- } finally {
- store.close();
- }
- }
-
- @Test
- public void testRestoreWithDefaultSerdes() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
- // Add any entries that will be restored to any store
- // that uses the driver's context ...
- driver.addEntryToRestoreLog(0, "zero");
- driver.addEntryToRestoreLog(1, "one");
- driver.addEntryToRestoreLog(2, "two");
- driver.addEntryToRestoreLog(4, "four");
-
- // Create the store, which should register with the context and automatically
- // receive the restore entries ...
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true);
- try {
- // Verify that the store's contents were properly restored ...
- assertEquals(0, driver.checkForRestoredEntries(store));
-
- // and there are no other entries ...
- assertEquals(4, driver.sizeOf(store));
- } finally {
- store.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
deleted file mode 100644
index b3fe98c..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
-
- @SuppressWarnings("unchecked")
- @Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(
- StreamingConfig config,
- ProcessorContext context,
- Class<K> keyClass, Class<V> valueClass,
- boolean useContextSerdes) {
-
- StateStoreSupplier supplier;
- if (useContextSerdes) {
- Serializer<K> keySer = (Serializer<K>) context.keySerializer();
- Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
- Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
- Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
- supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
- } else {
- supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).inMemory().build();
- }
-
- KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
- store.init(context);
- return store;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
deleted file mode 100644
index dddb9c7..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.junit.Test;
-
-public class InMemoryLRUCacheStoreTest {
-
- @Test
- public void testPutGetRange() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
- StateStoreSupplier supplier = Stores.create("my-store", driver.config())
- .withIntegerKeys().withStringValues()
- .inMemory().maxEntries(3)
- .build();
- KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
- store.init(driver.context());
-
- // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
- store.put(0, "zero");
- store.put(1, "one");
- store.put(2, "two");
- store.put(3, "three");
- store.put(4, "four");
- store.put(5, "five");
-
- // It should only keep the last 4 added ...
- assertEquals(3, driver.sizeOf(store));
- assertNull(store.get(0));
- assertNull(store.get(1));
- assertNull(store.get(2));
- assertEquals("three", store.get(3));
- assertEquals("four", store.get(4));
- assertEquals("five", store.get(5));
- store.delete(5);
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
- assertNull(driver.flushedEntryStored(0));
- assertNull(driver.flushedEntryStored(1));
- assertNull(driver.flushedEntryStored(2));
- assertEquals("three", driver.flushedEntryStored(3));
- assertEquals("four", driver.flushedEntryStored(4));
- assertNull(driver.flushedEntryStored(5));
-
- assertEquals(true, driver.flushedEntryRemoved(0));
- assertEquals(true, driver.flushedEntryRemoved(1));
- assertEquals(true, driver.flushedEntryRemoved(2));
- assertEquals(false, driver.flushedEntryRemoved(3));
- assertEquals(false, driver.flushedEntryRemoved(4));
- assertEquals(true, driver.flushedEntryRemoved(5));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPutGetRangeWithDefaultSerdes() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-
- Serializer<Integer> keySer = (Serializer<Integer>) driver.context().keySerializer();
- Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
- Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
- Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
- StateStoreSupplier supplier = Stores.create("my-store", driver.config())
- .withKeys(keySer, keyDeser)
- .withValues(valSer, valDeser)
- .inMemory().maxEntries(3)
- .build();
- KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
- store.init(driver.context());
-
- // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
- store.put(0, "zero");
- store.put(1, "one");
- store.put(2, "two");
- store.put(3, "three");
- store.put(4, "four");
- store.put(5, "five");
-
- // It should only keep the last 4 added ...
- assertEquals(3, driver.sizeOf(store));
- assertNull(store.get(0));
- assertNull(store.get(1));
- assertNull(store.get(2));
- assertEquals("three", store.get(3));
- assertEquals("four", store.get(4));
- assertEquals("five", store.get(5));
- store.delete(5);
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
- assertNull(driver.flushedEntryStored(0));
- assertNull(driver.flushedEntryStored(1));
- assertNull(driver.flushedEntryStored(2));
- assertEquals("three", driver.flushedEntryStored(3));
- assertEquals("four", driver.flushedEntryStored(4));
- assertNull(driver.flushedEntryStored(5));
-
- assertEquals(true, driver.flushedEntryRemoved(0));
- assertEquals(true, driver.flushedEntryRemoved(1));
- assertEquals(true, driver.flushedEntryRemoved(2));
- assertEquals(false, driver.flushedEntryRemoved(3));
- assertEquals(false, driver.flushedEntryRemoved(4));
- assertEquals(true, driver.flushedEntryRemoved(5));
- }
-
- @Test
- public void testRestore() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
- // Add any entries that will be restored to any store
- // that uses the driver's context ...
- driver.addEntryToRestoreLog(1, "one");
- driver.addEntryToRestoreLog(2, "two");
- driver.addEntryToRestoreLog(4, "four");
-
- // Create the store, which should register with the context and automatically
- // receive the restore entries ...
- StateStoreSupplier supplier = Stores.create("my-store", driver.config())
- .withIntegerKeys().withStringValues()
- .inMemory().maxEntries(3)
- .build();
- KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
- store.init(driver.context());
-
- // Verify that the store's contents were properly restored ...
- assertEquals(0, driver.checkForRestoredEntries(store));
-
- // and there are no other entries ...
- assertEquals(3, driver.sizeOf(store));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
deleted file mode 100644
index 8bab1c9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ /dev/null
@@ -1,464 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.test.MockProcessorContext;
-import org.apache.kafka.test.MockTimestampExtractor;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that
- * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes.
- * This class simplifies testing of various {@link KeyValueStore} instances, especially those that use
- * {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka topic.
- * <p>
- * <h2>Basic usage</h2>
- * This component can be used to help test a {@link KeyValueStore}'s ability to read and write entries.
- *
- * <pre>
- * // Create the test driver ...
- * KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
- * KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
- * .withIntegerKeys().withStringKeys()
- * .inMemory().build();
- *
- * // Verify that the store reads and writes correctly ...
- * store.put(0, "zero");
- * store.put(1, "one");
- * store.put(2, "two");
- * store.put(4, "four");
- * store.put(5, "five");
- * assertEquals(5, driver.sizeOf(store));
- * assertEquals("zero", store.get(0));
- * assertEquals("one", store.get(1));
- * assertEquals("two", store.get(2));
- * assertEquals("four", store.get(4));
- * assertEquals("five", store.get(5));
- * assertNull(store.get(3));
- * store.delete(5);
- *
- * // Flush the store and verify all current entries were properly flushed ...
- * store.flush();
- * assertEquals("zero", driver.flushedEntryStored(0));
- * assertEquals("one", driver.flushedEntryStored(1));
- * assertEquals("two", driver.flushedEntryStored(2));
- * assertEquals("four", driver.flushedEntryStored(4));
- * assertEquals(null, driver.flushedEntryStored(5));
- *
- * assertEquals(false, driver.flushedEntryRemoved(0));
- * assertEquals(false, driver.flushedEntryRemoved(1));
- * assertEquals(false, driver.flushedEntryRemoved(2));
- * assertEquals(false, driver.flushedEntryRemoved(4));
- * assertEquals(true, driver.flushedEntryRemoved(5));
- * </pre>
- *
- * <p>
- * <h2>Restoring a store</h2>
- * This component can be used to test whether a {@link KeyValueStore} implementation properly
- * {@link ProcessorContext#register(StateStore, StateRestoreCallback) registers itself} with the {@link ProcessorContext}, so that
- * the persisted contents of a store are properly restored from the flushed entries when the store instance is started.
- * <p>
- * To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object, Object) add entries} that will be
- * passed to the store upon creation (simulating the entries that were previously flushed to the topic), and then create the store
- * using this driver's {@link #context() ProcessorContext}:
- *
- * <pre>
- * // Create the test driver ...
- * KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
- *
- * // Add any entries that will be restored to any store that uses the driver's context ...
- * driver.addRestoreEntry(0, "zero");
- * driver.addRestoreEntry(1, "one");
- * driver.addRestoreEntry(2, "two");
- * driver.addRestoreEntry(4, "four");
- *
- * // Create the store, which should register with the context and automatically
- * // receive the restore entries ...
- * KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
- * .withIntegerKeys().withStringKeys()
- * .inMemory().build();
- *
- * // Verify that the store's contents were properly restored ...
- * assertEquals(0, driver.checkForRestoredEntries(store));
- *
- * // and there are no other entries ...
- * assertEquals(4, driver.sizeOf(store));
- * </pre>
- *
- * @param <K> the type of keys placed in the store
- * @param <V> the type of values placed in the store
- */
-public class KeyValueStoreTestDriver<K, V> {
-
- private static <T> Serializer<T> unusableSerializer() {
- return new Serializer<T>() {
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- }
-
- @Override
- public byte[] serialize(String topic, T data) {
- throw new UnsupportedOperationException("This serializer should not be used");
- }
-
- @Override
- public void close() {
- }
- };
- };
-
- private static <T> Deserializer<T> unusableDeserializer() {
- return new Deserializer<T>() {
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- }
-
- @Override
- public T deserialize(String topic, byte[] data) {
- throw new UnsupportedOperationException("This deserializer should not be used");
- }
-
- @Override
- public void close() {
- }
- };
- };
-
- /**
- * Create a driver object that will have a {@link #context()} that records messages
- * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides <em>unusable</em> default key and
- * value serializers and deserializers. This can be used when the actual serializers and deserializers are supplied to the
- * store during creation, which should eliminate the need for a store to depend on the ProcessorContext's default key and
- * value serializers and deserializers.
- *
- * @return the test driver; never null
- */
- public static <K, V> KeyValueStoreTestDriver<K, V> create() {
- Serializer<K> keySerializer = unusableSerializer();
- Deserializer<K> keyDeserializer = unusableDeserializer();
- Serializer<V> valueSerializer = unusableSerializer();
- Deserializer<V> valueDeserializer = unusableDeserializer();
- Serdes<K, V> serdes = new Serdes<K, V>("unexpected", keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
- return new KeyValueStoreTestDriver<K, V>(serdes);
- }
-
- /**
- * Create a driver object that will have a {@link #context()} that records messages
- * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides default serializers and
- * deserializers for the given built-in key and value types (e.g., {@code String.class}, {@code Integer.class},
- * {@code Long.class}, and {@code byte[].class}). This can be used when store is created to rely upon the
- * ProcessorContext's default key and value serializers and deserializers.
- *
- * @param keyClass the class for the keys; must be one of {@code String.class}, {@code Integer.class},
- * {@code Long.class}, or {@code byte[].class}
- * @param valueClass the class for the values; must be one of {@code String.class}, {@code Integer.class},
- * {@code Long.class}, or {@code byte[].class}
- * @return the test driver; never null
- */
- public static <K, V> KeyValueStoreTestDriver<K, V> create(Class<K> keyClass, Class<V> valueClass) {
- Serdes<K, V> serdes = Serdes.withBuiltinTypes("unexpected", keyClass, valueClass);
- return new KeyValueStoreTestDriver<K, V>(serdes);
- }
-
- /**
- * Create a driver object that will have a {@link #context()} that records messages
- * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides the specified serializers and
- * deserializers. This can be used when store is created to rely upon the ProcessorContext's default key and value serializers
- * and deserializers.
- *
- * @param keySerializer the key serializer for the {@link ProcessorContext}; may not be null
- * @param keyDeserializer the key deserializer for the {@link ProcessorContext}; may not be null
- * @param valueSerializer the value serializer for the {@link ProcessorContext}; may not be null
- * @param valueDeserializer the value deserializer for the {@link ProcessorContext}; may not be null
- * @return the test driver; never null
- */
- public static <K, V> KeyValueStoreTestDriver<K, V> create(Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer,
- Serializer<V> valueSerializer,
- Deserializer<V> valueDeserializer) {
- Serdes<K, V> serdes = new Serdes<K, V>("unexpected", keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
- return new KeyValueStoreTestDriver<K, V>(serdes);
- }
-
- private final Serdes<K, V> serdes;
- private final Map<K, V> flushedEntries = new HashMap<>();
- private final Set<K> flushedRemovals = new HashSet<>();
- private final List<Entry<K, V>> restorableEntries = new LinkedList<>();
- private final StreamingConfig config;
- private final MockProcessorContext context;
- private final Map<String, StateStore> storeMap = new HashMap<>();
- private final StreamingMetrics metrics = new StreamingMetrics() {
- @Override
- public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
- return null;
- }
-
- @Override
- public void recordLatency(Sensor sensor, long startNs, long endNs) {
- }
- };
- private final RecordCollector recordCollector;
- private File stateDir = new File("build/data").getAbsoluteFile();
-
- protected KeyValueStoreTestDriver(Serdes<K, V> serdes) {
- this.serdes = serdes;
- ByteArraySerializer rawSerializer = new ByteArraySerializer();
- Producer<byte[], byte[]> producer = new MockProducer<byte[], byte[]>(true, rawSerializer, rawSerializer);
- this.recordCollector = new RecordCollector(producer) {
- @Override
- public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- recordFlushed(record.key(), record.value());
- }
- };
- Properties props = new Properties();
- props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
- props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass());
- props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
- props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
- props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
- this.config = new StreamingConfig(props);
-
- this.context = new MockProcessorContext(null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
- serdes.valueDeserializer(), recordCollector) {
- @Override
- public TaskId id() {
- return new TaskId(0, 1);
- }
-
- @Override
- public <K1, V1> void forward(K1 key, V1 value, int childIndex) {
- forward(key, value);
- }
-
- @Override
- public void register(StateStore store, StateRestoreCallback func) {
- storeMap.put(store.name(), store);
- restoreEntries(func);
- }
-
- @Override
- public StateStore getStateStore(String name) {
- return storeMap.get(name);
- }
-
- @Override
- public StreamingMetrics metrics() {
- return metrics;
- }
-
- @Override
- public File stateDir() {
- if (stateDir == null) {
- stateDir = StateUtils.tempDir();
- }
- stateDir.mkdirs();
- return stateDir;
- }
- };
- }
-
- /**
- * Set the directory that should be used by the store for local disk storage.
- *
- * @param dir the directory; may be null if no local storage is allowed
- */
- public void useStateDir(File dir) {
- this.stateDir = dir;
- }
-
- @SuppressWarnings("unchecked")
- protected <K1, V1> void recordFlushed(K1 key, V1 value) {
- K k = (K) key;
- if (value == null) {
- // This is a removal ...
- flushedRemovals.add(k);
- flushedEntries.remove(k);
- } else {
- // This is a normal add
- flushedEntries.put(k, (V) value);
- flushedRemovals.remove(k);
- }
- }
-
- private void restoreEntries(StateRestoreCallback func) {
- for (Entry<K, V> entry : restorableEntries) {
- if (entry != null) {
- byte[] rawKey = serdes.rawKey(entry.key());
- byte[] rawValue = serdes.rawValue(entry.value());
- func.restore(rawKey, rawValue);
- }
- }
- }
-
- /**
- * This method adds an entry to the "restore log" for the {@link KeyValueStore}, and is used <em>only</em> when testing the
- * restore functionality of a {@link KeyValueStore} implementation.
- * <p>
- * To create such a test, create the test driver, call this method one or more times, and then create the
- * {@link KeyValueStore}. Your tests can then check whether the store contains the entries from the log.
- *
- * <pre>
- * // Set up the driver and pre-populate the log ...
- * KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
- * driver.addRestoreEntry(1,"value1");
- * driver.addRestoreEntry(2,"value2");
- * driver.addRestoreEntry(3,"value3");
- *
- * // Create the store using the driver's context ...
- * ProcessorContext context = driver.context();
- * KeyValueStore<Integer, String> store = ...
- *
- * // Verify that the store's contents were properly restored from the log ...
- * assertEquals(0, driver.checkForRestoredEntries(store));
- *
- * // and there are no other entries ...
- * assertEquals(3, driver.sizeOf(store));
- * </pre>
- *
- * @param key the key for the entry
- * @param value the value for the entry
- * @see #checkForRestoredEntries(KeyValueStore)
- */
- public void addEntryToRestoreLog(K key, V value) {
- restorableEntries.add(new Entry<K, V>(key, value));
- }
-
- /**
- * Get the streaming config that should be supplied to a {@link Serdes}'s constructor.
- *
- * @return the streaming config; never null
- */
- public StreamingConfig config() {
- return config;
- }
-
- /**
- * Get the context that should be supplied to a {@link KeyValueStore}'s constructor. This context records any messages
- * written by the store to the Kafka topic, making them available via the {@link #flushedEntryStored(Object)} and
- * {@link #flushedEntryRemoved(Object)} methods.
- * <p>
- * If the {@link KeyValueStore}'s are to be restored upon its startup, be sure to {@link #addEntryToRestoreLog(Object, Object)
- * add the restore entries} before creating the store with the {@link ProcessorContext} returned by this method.
- *
- * @return the processing context; never null
- * @see #addEntryToRestoreLog(Object, Object)
- */
- public ProcessorContext context() {
- return context;
- }
-
- /**
- * Get the entries that are restored to a KeyValueStore when it is constructed with this driver's {@link #context()
- * ProcessorContext}.
- *
- * @return the restore entries; never null but possibly a null iterator
- */
- public Iterable<Entry<K, V>> restoredEntries() {
- return restorableEntries;
- }
-
- /**
- * Utility method that will count the number of {@link #addEntryToRestoreLog(Object, Object) restore entries} missing from the
- * supplied store.
- *
- * @param store the store that is to have all of the {@link #restoredEntries() restore entries}
- * @return the number of restore entries missing from the store, or 0 if all restore entries were found
- * @see #addEntryToRestoreLog(Object, Object)
- */
- public int checkForRestoredEntries(KeyValueStore<K, V> store) {
- int missing = 0;
- for (Entry<K, V> entry : restorableEntries) {
- if (entry != null) {
- V value = store.get(entry.key());
- if (!Objects.equals(value, entry.value())) {
- ++missing;
- }
- }
- }
- return missing;
- }
-
- /**
- * Utility method to compute the number of entries within the store.
- *
- * @param store the key value store using this {@link #context()}.
- * @return the number of entries
- */
- public int sizeOf(KeyValueStore<K, V> store) {
- int size = 0;
- for (KeyValueIterator<K, V> iterator = store.all(); iterator.hasNext();) {
- iterator.next();
- ++size;
- }
- return size;
- }
-
- /**
- * Retrieve the value that the store {@link KeyValueStore#flush() flushed} with the given key.
- *
- * @param key the key
- * @return the value that was flushed with the key, or {@code null} if no such key was flushed or if the entry with this
- * key was {@link #flushedEntryStored(Object) removed} upon flush
- */
- public V flushedEntryStored(K key) {
- return flushedEntries.get(key);
- }
-
- /**
- * Determine whether the store {@link KeyValueStore#flush() flushed} the removal of the given key.
- *
- * @param key the key
- * @return {@code true} if the entry with the given key was removed when flushed, or {@code false} if the entry was not
- * removed when last flushed
- */
- public boolean flushedEntryRemoved(K key) {
- return flushedRemovals.contains(key);
- }
-
- /**
- * Remove all {@link #flushedEntryStored(Object) flushed entries}, {@link #flushedEntryRemoved(Object) flushed removals},
- */
- public void clear() {
- restorableEntries.clear();
- flushedEntries.clear();
- flushedRemovals.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
deleted file mode 100644
index 37a12f9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
-
- @SuppressWarnings("unchecked")
- @Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(
- StreamingConfig config,
- ProcessorContext context,
- Class<K> keyClass,
- Class<V> valueClass,
- boolean useContextSerdes) {
-
- StateStoreSupplier supplier;
- if (useContextSerdes) {
- Serializer<K> keySer = (Serializer<K>) context.keySerializer();
- Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
- Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
- Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
- supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
- } else {
- supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).localDatabase().build();
- }
-
- KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
- store.init(context);
- return store;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
deleted file mode 100644
index c7ea748..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.test.TestUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * A utility for tests to create and manage unique and isolated directories on the file system for local state.
- */
-public class StateUtils {
-
- private static final AtomicLong INSTANCE_COUNTER = new AtomicLong();
-
- /**
- * Create a new temporary directory that will be cleaned up automatically upon shutdown.
- * @return the new directory that will exist; never null
- */
- public static File tempDir() {
- final File dir = new File(TestUtils.IO_TMP_DIR, "kafka-" + INSTANCE_COUNTER.incrementAndGet());
- dir.mkdirs();
- dir.deleteOnExit();
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- deleteDirectory(dir);
- }
- });
- return dir;
- }
-
- private static void deleteDirectory(File dir) {
- if (dir != null && dir.exists()) {
- try {
- Files.walkFileTree(dir.toPath(), new SimpleFileVisitor<Path>() {
- @Override
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
- Files.delete(file);
- return FileVisitResult.CONTINUE;
- }
-
- @Override
- public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
- Files.delete(dir);
- return FileVisitResult.CONTINUE;
- }
-
- });
- } catch (IOException e) {
- // do nothing
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
deleted file mode 100644
index ca5f33d..0000000
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-
-import java.util.List;
-
-public class KStreamTestDriver {
-
- private final ProcessorTopology topology;
- private final MockProcessorContext context;
- private ProcessorNode currNode;
-
- public KStreamTestDriver(KStreamBuilder builder) {
- this(builder, null, null);
- }
-
- public KStreamTestDriver(KStreamBuilder builder, Serializer<?> serializer, Deserializer<?> deserializer) {
- this.topology = builder.build(null);
- this.context = new MockProcessorContext(this, serializer, deserializer);
-
- for (ProcessorNode node : topology.processors()) {
- currNode = node;
- try {
- node.init(context);
- } finally {
- currNode = null;
- }
- }
- }
-
- public void process(String topicName, Object key, Object value) {
- currNode = topology.source(topicName);
- try {
- forward(key, value);
- } finally {
- currNode = null;
- }
- }
-
- public void setTime(long timestamp) {
- context.setTime(timestamp);
- }
-
- public StateStore getStateStore(String name) {
- return context.getStateStore(name);
- }
-
- @SuppressWarnings("unchecked")
- public <K, V> void forward(K key, V value) {
- ProcessorNode thisNode = currNode;
- for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
- currNode = childNode;
- try {
- childNode.process(key, value);
- } finally {
- currNode = thisNode;
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- public <K, V> void forward(K key, V value, int childIndex) {
- ProcessorNode thisNode = currNode;
- ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex);
- currNode = childNode;
- try {
- childNode.process(key, value);
- } finally {
- currNode = thisNode;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
deleted file mode 100644
index 40f11a0..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
-
- private final KStreamTestDriver driver;
- private final Serializer keySerializer;
- private final Serializer valueSerializer;
- private final Deserializer keyDeserializer;
- private final Deserializer valueDeserializer;
- private final RecordCollector.Supplier recordCollectorSupplier;
-
- private Map<String, StateStore> storeMap = new HashMap<>();
-
- long timestamp = -1L;
-
- public MockProcessorContext(KStreamTestDriver driver, Serializer<?> serializer, Deserializer<?> deserializer) {
- this(driver, serializer, deserializer, serializer, deserializer, (RecordCollector.Supplier) null);
- }
-
- public MockProcessorContext(KStreamTestDriver driver, Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
- Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
- final RecordCollector collector) {
- this(driver, keySerializer, keyDeserializer, valueSerializer, valueDeserializer,
- collector == null ? null : new RecordCollector.Supplier() {
- @Override
- public RecordCollector recordCollector() {
- return collector;
- }
- });
- }
-
- public MockProcessorContext(KStreamTestDriver driver, Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
- Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
- RecordCollector.Supplier collectorSupplier) {
- this.driver = driver;
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
- this.keyDeserializer = keyDeserializer;
- this.valueDeserializer = valueDeserializer;
- this.recordCollectorSupplier = collectorSupplier;
- }
-
- @Override
- public RecordCollector recordCollector() {
- if (recordCollectorSupplier == null) {
- throw new UnsupportedOperationException("No RecordCollector specified");
- }
- return recordCollectorSupplier.recordCollector();
- }
-
- public void setTime(long timestamp) {
- this.timestamp = timestamp;
- }
-
- @Override
- public TaskId id() {
- return new TaskId(0, 0);
- }
-
- @Override
- public Serializer<?> keySerializer() {
- return keySerializer;
- }
-
- @Override
- public Serializer<?> valueSerializer() {
- return valueSerializer;
- }
-
- @Override
- public Deserializer<?> keyDeserializer() {
- return keyDeserializer;
- }
-
- @Override
- public Deserializer<?> valueDeserializer() {
- return valueDeserializer;
- }
-
- @Override
- public File stateDir() {
- throw new UnsupportedOperationException("stateDir() not supported.");
- }
-
- @Override
- public StreamingMetrics metrics() {
- throw new UnsupportedOperationException("metrics() not supported.");
- }
-
- @Override
- public void register(StateStore store, StateRestoreCallback func) {
- if (func != null) throw new UnsupportedOperationException("StateRestoreCallback not supported.");
- storeMap.put(store.name(), store);
- }
-
- @Override
- public StateStore getStateStore(String name) {
- return storeMap.get(name);
- }
-
- @Override
- public void schedule(long interval) {
- throw new UnsupportedOperationException("schedule() not supported");
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <K, V> void forward(K key, V value) {
- driver.forward(key, value);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <K, V> void forward(K key, V value, int childIndex) {
- driver.forward(key, value, childIndex);
- }
-
- @Override
- public void commit() {
- throw new UnsupportedOperationException("commit() not supported.");
- }
-
- @Override
- public String topic() {
- throw new UnsupportedOperationException("topic() not supported.");
- }
-
- @Override
- public int partition() {
- throw new UnsupportedOperationException("partition() not supported.");
- }
-
- @Override
- public long offset() {
- throw new UnsupportedOperationException("offset() not supported.");
- }
-
- @Override
- public long timestamp() {
- return this.timestamp;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
deleted file mode 100644
index f1aa167..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.util.ArrayList;
-
-public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
-
- public final ArrayList<String> processed = new ArrayList<>();
- public final ArrayList<Long> punctuated = new ArrayList<>();
-
- @Override
- public Processor<K, V> get() {
- return new MockProcessor();
- }
-
- public class MockProcessor implements Processor<K, V> {
-
- @Override
- public void init(ProcessorContext context) {
- // do nothing
- }
-
- @Override
- public void process(K key, V value) {
- processed.add(key + ":" + value);
- }
-
- @Override
- public void punctuate(long streamTime) {
- punctuated.add(streamTime);
- }
-
- @Override
- public void close() {
- // do nothing
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
deleted file mode 100644
index cf0202e..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.processor.internals.SourceNode;
-
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class MockSourceNode<K, V> extends SourceNode<K, V> {
-
- public static final String NAME = "MOCK-SOURCE-";
- public static final AtomicInteger INDEX = new AtomicInteger(1);
-
- public int numReceived = 0;
- public final ArrayList<K> keys = new ArrayList<>();
- public final ArrayList<V> values = new ArrayList<>();
-
- public MockSourceNode(Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) {
- super(NAME + INDEX.getAndIncrement(), keyDeserializer, valDeserializer);
- }
-
- @Override
- public void process(K key, V value) {
- this.numReceived++;
- this.keys.add(key);
- this.values.add(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
deleted file mode 100644
index 16635b7..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.ArrayList;
-
-public class MockStateStoreSupplier implements StateStoreSupplier {
- private final String name;
- private final boolean persistent;
-
- public MockStateStoreSupplier(String name, boolean persistent) {
- this.name = name;
- this.persistent = persistent;
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public StateStore get() {
- return new MockStateStore(name, persistent);
- }
-
- public static class MockStateStore implements StateStore {
- private final String name;
- private final boolean persistent;
-
- public boolean initialized = false;
- public boolean flushed = false;
- public boolean closed = false;
- public final ArrayList<Integer> keys = new ArrayList<>();
-
- public MockStateStore(String name, boolean persistent) {
- this.name = name;
- this.persistent = persistent;
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public void init(ProcessorContext context) {
- context.register(this, stateRestoreCallback);
- initialized = true;
- }
-
- @Override
- public void flush() {
- flushed = true;
- }
-
- @Override
- public void close() {
- closed = true;
- }
-
- @Override
- public boolean persistent() {
- return persistent;
- }
-
- public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
- private final Deserializer<Integer> deserializer = new IntegerDeserializer();
-
- @Override
- public void restore(byte[] key, byte[] value) {
- keys.add(deserializer.deserialize("", key));
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
deleted file mode 100644
index 274e7b5..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-/* Extract the timestamp as the offset of the record */
-public class MockTimestampExtractor implements TimestampExtractor {
-
- @Override
- public long extract(ConsumerRecord<Object, Object> record) {
- return record.offset();
- }
-}