You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/02 22:19:14 UTC
[1/2] kafka git commit: KAFKA-2706: make state stores first class
citizens in the processor topology
Repository: kafka
Updated Branches:
refs/heads/trunk 6383593fe -> 758272267
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 8ec73fd..c5f040f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -26,7 +26,8 @@ import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
/**
* Factory for creating key-value stores.
@@ -34,13 +35,12 @@ import org.apache.kafka.streams.processor.ProcessorContext;
public class Stores {
/**
- * Begin to create a new {@link org.apache.kafka.streams.processor.StateStore} instance.
- *
+ * Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance.
+ *
* @param name the name of the store
- * @param context the processor context
* @return the factory that can be used to specify other options or configurations for the store; never null
*/
- public static StoreFactory create(final String name, final ProcessorContext context) {
+ public static StoreFactory create(final String name, final StreamingConfig config) {
return new StoreFactory() {
@Override
public <K> ValueFactory<K> withKeys(final Serializer<K> keySerializer, final Deserializer<K> keyDeserializer) {
@@ -48,8 +48,8 @@ public class Stores {
@Override
public <V> KeyValueFactory<K, V> withValues(final Serializer<V> valueSerializer,
final Deserializer<V> valueDeserializer) {
- final Serdes<K, V> serdes = new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer,
- context);
+ final Serdes<K, V> serdes =
+ new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer, config);
return new KeyValueFactory<K, V>() {
@Override
public InMemoryKeyValueFactory<K, V> inMemory() {
@@ -64,11 +64,11 @@ public class Stores {
}
@Override
- public KeyValueStore<K, V> build() {
+ public StateStoreSupplier build() {
if (capacity < Integer.MAX_VALUE) {
- return InMemoryLRUCacheStore.create(name, capacity, context, serdes, null);
+ return new InMemoryLRUCacheStoreSupplier<>(name, capacity, serdes, null);
}
- return new InMemoryKeyValueStore<>(name, context, serdes, null);
+ return new InMemoryKeyValueStoreSupplier<>(name, serdes, null);
}
};
}
@@ -77,8 +77,8 @@ public class Stores {
public LocalDatabaseKeyValueFactory<K, V> localDatabase() {
return new LocalDatabaseKeyValueFactory<K, V>() {
@Override
- public KeyValueStore<K, V> build() {
- return new RocksDBKeyValueStore<>(name, context, serdes, null);
+ public StateStoreSupplier build() {
+ return new RocksDBKeyValueStoreSupplier<>(name, serdes, null);
}
};
}
@@ -92,7 +92,7 @@ public class Stores {
public static abstract class StoreFactory {
/**
* Begin to create a {@link KeyValueStore} by specifying the keys will be {@link String}s.
- *
+ *
* @return the interface used to specify the type of values; never null
*/
public ValueFactory<String> withStringKeys() {
@@ -101,7 +101,7 @@ public class Stores {
/**
* Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Integer}s.
- *
+ *
* @return the interface used to specify the type of values; never null
*/
public ValueFactory<Integer> withIntegerKeys() {
@@ -110,7 +110,7 @@ public class Stores {
/**
* Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Long}s.
- *
+ *
* @return the interface used to specify the type of values; never null
*/
public ValueFactory<Long> withLongKeys() {
@@ -119,7 +119,7 @@ public class Stores {
/**
* Begin to create a {@link KeyValueStore} by specifying the keys will be byte arrays.
- *
+ *
* @return the interface used to specify the type of values; never null
*/
public ValueFactory<byte[]> withByteArrayKeys() {
@@ -129,7 +129,7 @@ public class Stores {
/**
* Begin to create a {@link KeyValueStore} by specifying the keys will be either {@link String}, {@link Integer},
* {@link Long}, or {@code byte[]}.
- *
+ *
* @param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serializers and
* deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or
* {@code byte[].class})
@@ -141,7 +141,7 @@ public class Stores {
/**
* Begin to create a {@link KeyValueStore} by specifying the serializer and deserializer for the keys.
- *
+ *
* @param keySerializer the serializer for keys; may not be null
* @param keyDeserializer the deserializer for keys; may not be null
* @return the interface used to specify the type of values; never null
@@ -151,13 +151,13 @@ public class Stores {
/**
* The factory for creating off-heap key-value stores.
- *
+ *
* @param <K> the type of keys
*/
public static abstract class ValueFactory<K> {
/**
* Use {@link String} values.
- *
+ *
* @return the interface used to specify the remaining key-value store options; never null
*/
public KeyValueFactory<K, String> withStringValues() {
@@ -166,7 +166,7 @@ public class Stores {
/**
* Use {@link Integer} values.
- *
+ *
* @return the interface used to specify the remaining key-value store options; never null
*/
public KeyValueFactory<K, Integer> withIntegerValues() {
@@ -175,7 +175,7 @@ public class Stores {
/**
* Use {@link Long} values.
- *
+ *
* @return the interface used to specify the remaining key-value store options; never null
*/
public KeyValueFactory<K, Long> withLongValues() {
@@ -184,7 +184,7 @@ public class Stores {
/**
* Use byte arrays for values.
- *
+ *
* @return the interface used to specify the remaining key-value store options; never null
*/
public KeyValueFactory<K, byte[]> withByteArrayValues() {
@@ -194,7 +194,7 @@ public class Stores {
/**
* Use values of the specified type, which must be either {@link String}, {@link Integer}, {@link Long}, or {@code byte[]}
* .
- *
+ *
* @param valueClass the class for the values, which must be one of the types for which Kafka has built-in serializers and
* deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or
* {@code byte[].class})
@@ -206,7 +206,7 @@ public class Stores {
/**
* Use the specified serializer and deserializer for the values.
- *
+ *
* @param valueSerializer the serializer for value; may not be null
* @param valueDeserializer the deserializer for values; may not be null
* @return the interface used to specify the remaining key-value store options; never null
@@ -224,7 +224,7 @@ public class Stores {
/**
* Keep all key-value entries in-memory, although for durability all entries are recorded in a Kafka topic that can be
* read to restore the entries if they are lost.
- *
+ *
* @return the factory to create in-memory key-value stores; never null
*/
InMemoryKeyValueFactory<K, V> inMemory();
@@ -232,7 +232,7 @@ public class Stores {
/**
* Keep all key-value entries off-heap in a local database, although for durability all entries are recorded in a Kafka
* topic that can be read to restore the entries if they are lost.
- *
+ *
* @return the factory to create in-memory key-value stores; never null
*/
LocalDatabaseKeyValueFactory<K, V> localDatabase();
@@ -248,7 +248,7 @@ public class Stores {
/**
* Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is
* equivalent to not placing a limit on the number of entries.
- *
+ *
* @param capacity the maximum capacity of the in-memory cache; should be one less than a power of 2
* @return this factory
* @throws IllegalArgumentException if the capacity is not positive
@@ -256,10 +256,10 @@ public class Stores {
InMemoryKeyValueFactory<K, V> maxEntries(int capacity);
/**
- * Return the new key-value store.
- * @return the key-value store; never null
+ * Return the instance of StateStoreSupplier of new key-value store.
+ * @return the state store supplier; never null
*/
- KeyValueStore<K, V> build();
+ StateStoreSupplier build();
}
/**
@@ -270,9 +270,9 @@ public class Stores {
*/
public static interface LocalDatabaseKeyValueFactory<K, V> {
/**
- * Return the new key-value store.
+ * Return the instance of StateStoreSupplier of new key-value store.
* @return the key-value store; never null
*/
- KeyValueStore<K, V> build();
+ StateStoreSupplier build();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index b77c253..de1328e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.apache.kafka.common.utils.Utils.mkSet;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
import org.junit.Test;
import java.util.Arrays;
@@ -106,6 +107,55 @@ public class TopologyBuilderTest {
assertEquals(3, builder.sourceTopics().size());
}
+ @Test(expected = TopologyException.class)
+ public void testAddStateStoreWithNonExistingProcessor() {
+ final TopologyBuilder builder = new TopologyBuilder();
+
+ builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
+ }
+
+ @Test(expected = TopologyException.class)
+ public void testAddStateStoreWithSource() {
+ final TopologyBuilder builder = new TopologyBuilder();
+
+ builder.addSource("source-1", "topic-1");
+ builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
+ }
+
+ @Test(expected = TopologyException.class)
+ public void testAddStateStoreWithSink() {
+ final TopologyBuilder builder = new TopologyBuilder();
+
+ builder.addSink("sink-1", "topic-1");
+ builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
+ }
+
+ @Test(expected = TopologyException.class)
+ public void testAddStateStoreWithDuplicates() {
+ final TopologyBuilder builder = new TopologyBuilder();
+
+ builder.addStateStore(new MockStateStoreSupplier("store", false));
+ builder.addStateStore(new MockStateStoreSupplier("store", false));
+ }
+
+ @Test
+ public void testAddStateStore() {
+ final TopologyBuilder builder = new TopologyBuilder();
+ List<StateStoreSupplier> suppliers;
+
+ StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
+ builder.addStateStore(supplier);
+ suppliers = builder.build().stateStoreSuppliers();
+ assertEquals(0, suppliers.size());
+
+ builder.addSource("source-1", "topic-1");
+ builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+ builder.connectProcessorAndStateStores("processor-1", "store-1");
+ suppliers = builder.build().stateStoreSuppliers();
+ assertEquals(1, suppliers.size());
+ assertEquals(supplier.name(), suppliers.get(0).name());
+ }
+
@Test
public void testTopicGroups() {
final TopologyBuilder builder = new TopologyBuilder();
@@ -138,6 +188,35 @@ public class TopologyBuilderTest {
assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
}
+ @Test
+ public void testTopicGroupsByStateStore() {
+ final TopologyBuilder builder = new TopologyBuilder();
+
+ builder.addSource("source-1", "topic-1", "topic-1x");
+ builder.addSource("source-2", "topic-2");
+ builder.addSource("source-3", "topic-3");
+ builder.addSource("source-4", "topic-4");
+ builder.addSource("source-5", "topic-5");
+
+ builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+ builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
+ builder.addStateStore(new MockStateStoreSupplier("strore-1", false), "processor-1", "processor-2");
+
+ builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
+ builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
+ builder.addStateStore(new MockStateStoreSupplier("strore-2", false), "processor-3", "processor-4");
+
+ Map<Integer, Set<String>> topicGroups = builder.topicGroups();
+
+ Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
+ expectedTopicGroups.put(0, set("topic-1", "topic-1x", "topic-2"));
+ expectedTopicGroups.put(1, set("topic-3", "topic-4"));
+ expectedTopicGroups.put(2, set("topic-5"));
+
+ assertEquals(3, topicGroups.size());
+ assertEquals(expectedTopicGroups, topicGroups);
+ }
+
private <T> Set<T> set(T... items) {
Set<T> set = new HashSet<>();
for (T item : items) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index eb33dc3..c447f99 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -24,14 +24,11 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.apache.kafka.test.MockStateStoreSupplier;
import org.junit.Test;
import java.io.File;
@@ -53,45 +50,6 @@ import static org.junit.Assert.assertFalse;
public class ProcessorStateManagerTest {
- private static class MockStateStore implements StateStore {
- private final String name;
- private final boolean persistent;
-
- public boolean flushed = false;
- public boolean closed = false;
- public final ArrayList<Integer> keys = new ArrayList<>();
-
- public MockStateStore(String name, boolean persistent) {
- this.name = name;
- this.persistent = persistent;
- }
- @Override
- public String name() {
- return name;
- }
- @Override
- public void flush() {
- flushed = true;
- }
- @Override
- public void close() {
- closed = true;
- }
- @Override
- public boolean persistent() {
- return persistent;
- }
-
- public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
- private final Deserializer<Integer> deserializer = new IntegerDeserializer();
-
- @Override
- public void restore(byte[] key, byte[] value) {
- keys.add(deserializer.deserialize("", key));
- }
- };
- }
-
private class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
private final Serializer<Integer> serializer = new IntegerSerializer();
@@ -255,7 +213,7 @@ public class ProcessorStateManagerTest {
public void testNoTopic() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
- MockStateStore mockStateStore = new MockStateStore("mockStore", false);
+ MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
try {
@@ -283,7 +241,7 @@ public class ProcessorStateManagerTest {
));
restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
- MockStateStore persistentStore = new MockStateStore("persistentStore", true); // persistent store
+ MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
try {
@@ -331,7 +289,7 @@ public class ProcessorStateManagerTest {
));
restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
- MockStateStore nonPersistentStore = new MockStateStore("nonPersistentStore", false); // non persistent store
+ MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false); // non persistent store
ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
try {
@@ -371,7 +329,7 @@ public class ProcessorStateManagerTest {
new PartitionInfo("mockStore", 1, Node.noNode(), new Node[0], new Node[0])
));
- MockStateStore mockStateStore = new MockStateStore("mockStore", false);
+ MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
try {
@@ -411,8 +369,8 @@ public class ProcessorStateManagerTest {
ackedOffsets.put(new TopicPartition("nonPersistentStore", 1), 456L);
ackedOffsets.put(new TopicPartition("otherTopic", 1), 789L);
- MockStateStore persistentStore = new MockStateStore("persistentStore", true);
- MockStateStore nonPersistentStore = new MockStateStore("nonPersistentStore", false);
+ MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
+ MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 5f8ca46..803d4b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -83,7 +83,7 @@ public class ProcessorTopologyTest {
}
driver = null;
}
-
+
@Test
public void testTopologyMetadata() {
final TopologyBuilder builder = new TopologyBuilder();
@@ -203,6 +203,10 @@ public class ProcessorTopologyTest {
protected TopologyBuilder createStatefulTopology(String storeName) {
return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
.addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
+ .addStateStore(
+ Stores.create(storeName, config).withStringKeys().withStringValues().inMemory().build(),
+ "processor"
+ )
.addSink("counts", OUTPUT_TOPIC_1, "processor");
}
@@ -262,9 +266,10 @@ public class ProcessorTopologyTest {
}
@Override
+ @SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
super.init(context);
- store = Stores.create(storeName, context).withStringKeys().withStringValues().inMemory().build();
+ store = (KeyValueStore<String, String>) context.getStateStore(storeName);
}
@Override
@@ -281,7 +286,7 @@ public class ProcessorTopologyTest {
}
context().forward(Long.toString(streamTime), count);
}
-
+
@Override
public void close() {
store.close();
@@ -303,4 +308,4 @@ public class ProcessorTopologyTest {
return timestamp;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index b1403bd..2c7aaeb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -30,7 +30,7 @@ public class PunctuationQueueTest {
@Test
public void testPunctuationInterval() {
TestProcessor processor = new TestProcessor();
- ProcessorNode<String, String> node = new ProcessorNode<>("test", processor);
+ ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
PunctuationQueue queue = new PunctuationQueue();
PunctuationSchedule sched = new PunctuationSchedule(node, 100L);
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 0b828f7..d80e98c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.test.MockSourceNode;
import org.junit.Test;
@@ -37,6 +38,7 @@ import org.junit.Before;
import java.io.File;
import java.nio.file.Files;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Properties;
@@ -57,13 +59,15 @@ public class StreamTaskTest {
private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
private final ProcessorTopology topology = new ProcessorTopology(
- Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2),
- new HashMap<String, SourceNode>() {
- {
- put("topic1", source1);
- put("topic2", source2);
- }
- });
+ Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2),
+ new HashMap<String, SourceNode>() {
+ {
+ put("topic1", source1);
+ put("topic2", source2);
+ }
+ },
+ Collections.<StateStoreSupplier>emptyList()
+ );
private StreamingConfig createConfig(final File baseDir) throws Exception {
return new StreamingConfig(new Properties() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
index d8f06ea..209f3c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
@@ -21,20 +21,22 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
+import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.junit.Test;
public abstract class AbstractKeyValueStoreTest {
- protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
+ protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(StreamingConfig config,
+ ProcessorContext context,
Class<K> keyClass, Class<V> valueClass,
boolean useContextSerdes);
-
+
@Test
public void testPutGetRange() {
// Create the test driver ...
KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false);
try {
// Verify that the store reads and writes correctly ...
@@ -100,7 +102,7 @@ public abstract class AbstractKeyValueStoreTest {
public void testPutGetRangeWithDefaultSerdes() {
// Create the test driver ...
KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true);
try {
// Verify that the store reads and writes correctly ...
@@ -150,7 +152,7 @@ public abstract class AbstractKeyValueStoreTest {
// Create the store, which should register with the context and automatically
// receive the restore entries ...
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false);
try {
// Verify that the store's contents were properly restored ...
assertEquals(0, driver.checkForRestoredEntries(store));
@@ -176,7 +178,7 @@ public abstract class AbstractKeyValueStoreTest {
// Create the store, which should register with the context and automatically
// receive the restore entries ...
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true);
try {
// Verify that the store's contents were properly restored ...
assertEquals(0, driver.checkForRestoredEntries(store));
@@ -188,4 +190,4 @@ public abstract class AbstractKeyValueStoreTest {
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
index bee9967..b3fe98c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
@@ -18,21 +18,33 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context, Class<K> keyClass, Class<V> valueClass,
- boolean useContextSerdes) {
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(
+ StreamingConfig config,
+ ProcessorContext context,
+ Class<K> keyClass, Class<V> valueClass,
+ boolean useContextSerdes) {
+
+ StateStoreSupplier supplier;
if (useContextSerdes) {
Serializer<K> keySer = (Serializer<K>) context.keySerializer();
Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
- return Stores.create("my-store", context).withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
+ supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
+ } else {
+ supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).inMemory().build();
}
- return Stores.create("my-store", context).withKeys(keyClass).withValues(valueClass).inMemory().build();
+
+ KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+ store.init(context);
+ return store;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
index 6b96d3a..dddb9c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertNull;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.junit.Test;
public class InMemoryLRUCacheStoreTest {
@@ -29,10 +30,12 @@ public class InMemoryLRUCacheStoreTest {
public void testPutGetRange() {
// Create the test driver ...
KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
- KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+ StateStoreSupplier supplier = Stores.create("my-store", driver.config())
.withIntegerKeys().withStringValues()
.inMemory().maxEntries(3)
.build();
+ KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+ store.init(driver.context());
// Verify that the store reads and writes correctly, keeping only the last 2 entries ...
store.put(0, "zero");
@@ -79,11 +82,13 @@ public class InMemoryLRUCacheStoreTest {
Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
- KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+ StateStoreSupplier supplier = Stores.create("my-store", driver.config())
.withKeys(keySer, keyDeser)
.withValues(valSer, valDeser)
.inMemory().maxEntries(3)
.build();
+ KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+ store.init(driver.context());
// Verify that the store reads and writes correctly, keeping only the last 2 entries ...
store.put(0, "zero");
@@ -133,10 +138,12 @@ public class InMemoryLRUCacheStoreTest {
// Create the store, which should register with the context and automatically
// receive the restore entries ...
- KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+ StateStoreSupplier supplier = Stores.create("my-store", driver.config())
.withIntegerKeys().withStringValues()
.inMemory().maxEntries(3)
.build();
+ KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+ store.init(driver.context());
// Verify that the store's contents were properly restored ...
assertEquals(0, driver.checkForRestoredEntries(store));
@@ -145,4 +152,4 @@ public class InMemoryLRUCacheStoreTest {
assertEquals(3, driver.sizeOf(store));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 7e1512a..8bab1c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -30,6 +31,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.MockTimestampExtractor;
import java.io.File;
import java.util.HashMap;
@@ -38,6 +40,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Properties;
import java.util.Set;
/**
@@ -218,6 +221,7 @@ public class KeyValueStoreTestDriver<K, V> {
private final Map<K, V> flushedEntries = new HashMap<>();
private final Set<K> flushedRemovals = new HashSet<>();
private final List<Entry<K, V>> restorableEntries = new LinkedList<>();
+ private final StreamingConfig config;
private final MockProcessorContext context;
private final Map<String, StateStore> storeMap = new HashMap<>();
private final StreamingMetrics metrics = new StreamingMetrics() {
@@ -243,6 +247,15 @@ public class KeyValueStoreTestDriver<K, V> {
recordFlushed(record.key(), record.value());
}
};
+ Properties props = new Properties();
+ props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
+ props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass());
+ props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
+ props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
+ props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
+ this.config = new StreamingConfig(props);
+
this.context = new MockProcessorContext(null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
serdes.valueDeserializer(), recordCollector) {
@Override
@@ -349,6 +362,15 @@ public class KeyValueStoreTestDriver<K, V> {
}
/**
+ * Get the streaming config that should be supplied to a {@link Serdes}'s constructor.
+ *
+ * @return the streaming config; never null
+ */
+ public StreamingConfig config() {
+ return config;
+ }
+
+ /**
* Get the context that should be supplied to a {@link KeyValueStore}'s constructor. This context records any messages
* written by the store to the Kafka topic, making them available via the {@link #flushedEntryStored(Object)} and
* {@link #flushedEntryRemoved(Object)} methods.
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
index 9ac1740..37a12f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
@@ -18,21 +18,35 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context, Class<K> keyClass, Class<V> valueClass,
- boolean useContextSerdes) {
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(
+ StreamingConfig config,
+ ProcessorContext context,
+ Class<K> keyClass,
+ Class<V> valueClass,
+ boolean useContextSerdes) {
+
+ StateStoreSupplier supplier;
if (useContextSerdes) {
Serializer<K> keySer = (Serializer<K>) context.keySerializer();
Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
- return Stores.create("my-store", context).withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
+ supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
+ } else {
+ supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).localDatabase().build();
}
- return Stores.create("my-store", context).withKeys(keyClass).withValues(valueClass).localDatabase().build();
+
+ KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+ store.init(context);
+ return store;
+
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
new file mode 100644
index 0000000..16635b7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.ArrayList;
+
+public class MockStateStoreSupplier implements StateStoreSupplier {
+ private final String name;
+ private final boolean persistent;
+
+ public MockStateStoreSupplier(String name, boolean persistent) {
+ this.name = name;
+ this.persistent = persistent;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public StateStore get() {
+ return new MockStateStore(name, persistent);
+ }
+
+ public static class MockStateStore implements StateStore {
+ private final String name;
+ private final boolean persistent;
+
+ public boolean initialized = false;
+ public boolean flushed = false;
+ public boolean closed = false;
+ public final ArrayList<Integer> keys = new ArrayList<>();
+
+ public MockStateStore(String name, boolean persistent) {
+ this.name = name;
+ this.persistent = persistent;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ context.register(this, stateRestoreCallback);
+ initialized = true;
+ }
+
+ @Override
+ public void flush() {
+ flushed = true;
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+
+ @Override
+ public boolean persistent() {
+ return persistent;
+ }
+
+ public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
+ private final Deserializer<Integer> deserializer = new IntegerDeserializer();
+
+ @Override
+ public void restore(byte[] key, byte[] value) {
+ keys.add(deserializer.deserialize("", key));
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 0c4b1a2..fc83762 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -266,7 +267,7 @@ public class ProcessorTopologyTestDriver {
* @see #getKeyValueStore(String)
*/
public StateStore getStateStore(String name) {
- return task.context().getStateStore(name);
+ return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
}
/**
[2/2] kafka git commit: KAFKA-2706: make state stores first class
citizens in the processor topology
Posted by gu...@apache.org.
KAFKA-2706: make state stores first class citizens in the processor topology
* Added StateStoreSupplier
* StateStore
* Added init(ProcessorContext context) method
* TopologyBuilder
* Added addStateStore(StateStoreSupplier supplier, String... processNames)
* Added connectProessorAndStateStores(String processorName, String... stateStoreNames)
* This is for the case processors are not created when a store is added to the topology. (used by KStream)
* KStream
* add stateStoreNames to process(), transform(), transformValues().
* Refactored existing state stores to implement StateStoreSupplier
guozhangwang
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #387 from ymatsuda/state_store_supplier
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75827226
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75827226
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75827226
Branch: refs/heads/trunk
Commit: 758272267c811bf559336ea45571bc420a62a478
Parents: 6383593
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Nov 2 13:24:48 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 2 13:24:48 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/StreamingConfig.java | 18 ++
.../kafka/streams/examples/ProcessorJob.java | 5 +-
.../apache/kafka/streams/kstream/KStream.java | 9 +-
.../streams/kstream/internals/KStreamImpl.java | 9 +-
.../kafka/streams/processor/StateStore.java | 5 +
.../streams/processor/StateStoreSupplier.java | 25 ++
.../streams/processor/TopologyBuilder.java | 109 +++++--
.../internals/ProcessorContextImpl.java | 30 +-
.../processor/internals/ProcessorNode.java | 8 +-
.../processor/internals/ProcessorTopology.java | 11 +-
.../streams/processor/internals/StreamTask.java | 8 +
.../streams/state/InMemoryKeyValueStore.java | 135 ---------
.../state/InMemoryKeyValueStoreSupplier.java | 155 ++++++++++
.../streams/state/InMemoryLRUCacheStore.java | 180 -----------
.../state/InMemoryLRUCacheStoreSupplier.java | 195 ++++++++++++
.../streams/state/MeteredKeyValueStore.java | 68 +++--
.../streams/state/RocksDBKeyValueStore.java | 284 ------------------
.../state/RocksDBKeyValueStoreSupplier.java | 298 +++++++++++++++++++
.../org/apache/kafka/streams/state/Serdes.java | 45 ++-
.../org/apache/kafka/streams/state/Stores.java | 66 ++--
.../streams/processor/TopologyBuilderTest.java | 79 +++++
.../internals/ProcessorStateManagerTest.java | 56 +---
.../internals/ProcessorTopologyTest.java | 13 +-
.../internals/PunctuationQueueTest.java | 2 +-
.../processor/internals/StreamTaskTest.java | 18 +-
.../state/AbstractKeyValueStoreTest.java | 16 +-
.../state/InMemoryKeyValueStoreTest.java | 22 +-
.../state/InMemoryLRUCacheStoreTest.java | 15 +-
.../streams/state/KeyValueStoreTestDriver.java | 22 ++
.../streams/state/RocksDBKeyValueStoreTest.java | 24 +-
.../kafka/test/MockStateStoreSupplier.java | 97 ++++++
.../kafka/test/ProcessorTopologyTestDriver.java | 3 +-
32 files changed, 1224 insertions(+), 806 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
index a0aef48..88bd844 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
@@ -245,6 +247,22 @@ public class StreamingConfig extends AbstractConfig {
return props;
}
+ public Serializer keySerializer() {
+ return getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ }
+
+ public Serializer valueSerializer() {
+ return getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ }
+
+ public Deserializer keyDeserializer() {
+ return getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ }
+
+ public Deserializer valueDeserializer() {
+ return getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ }
+
public static void main(String[] args) {
System.out.println(CONFIG.toHtmlTable());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 0317b9d..3274aae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -45,10 +45,11 @@ public class ProcessorJob {
private KeyValueStore<String, Integer> kvStore;
@Override
+ @SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
- this.kvStore = Stores.create("local-state", context).withStringKeys().withIntegerValues().inMemory().build();
+ this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("local-state");
}
@Override
@@ -103,6 +104,8 @@ public class ProcessorJob {
builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
+ builder.addStateStore(Stores.create("local-state", config).withStringKeys().withIntegerValues().inMemory().build());
+ builder.connectProcessorAndStateStores("local-state", "PROCESS");
builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 915cf1c..8f0794c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -154,24 +154,27 @@ public interface KStream<K, V> {
* Applies a stateful transformation to all elements in this stream.
*
* @param transformerSupplier the class of TransformerDef
+ * @param stateStoreNames the names of the state store used by the processor
* @return KStream
*/
- <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier);
+ <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames);
/**
* Applies a stateful transformation to all values in this stream.
*
* @param valueTransformerSupplier the class of TransformerDef
+ * @param stateStoreNames the names of the state store used by the processor
* @return KStream
*/
- <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier);
+ <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
/**
* Processes all elements in this stream by applying a processor.
*
* @param processorSupplier the supplier of the Processor to use
+ * @param stateStoreNames the names of the state store used by the processor
* @return the new stream containing the processed output
*/
- void process(ProcessorSupplier<K, V> processorSupplier);
+ void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 1a2297c..1ea9b1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -201,27 +201,30 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
}
@Override
- public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier) {
+ public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) {
String name = TRANSFORM_NAME + INDEX.getAndIncrement();
topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
+ topology.connectProcessorAndStateStores(name, stateStoreNames);
return new KStreamImpl<>(topology, name, null);
}
@Override
- public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier) {
+ public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String... stateStoreNames) {
String name = TRANSFORMVALUES_NAME + INDEX.getAndIncrement();
topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
+ topology.connectProcessorAndStateStores(name, stateStoreNames);
return new KStreamImpl<>(topology, name, sourceNodes);
}
@Override
- public void process(final ProcessorSupplier<K, V> processorSupplier) {
+ public void process(final ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames) {
String name = PROCESSOR_NAME + INDEX.getAndIncrement();
topology.addProcessor(name, processorSupplier, this.name);
+ topology.connectProcessorAndStateStores(name, stateStoreNames);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 38afe9b..9c085a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -36,6 +36,11 @@ public interface StateStore {
String name();
/**
+ * Initializes this state store
+ */
+ void init(ProcessorContext context);
+
+ /**
* Flush any cached data
*/
void flush();
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
new file mode 100644
index 0000000..11545c5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+public interface StateStoreSupplier {
+
+ String name();
+
+ StateStore get();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 077489c..5b6d4ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -33,6 +33,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -48,10 +50,9 @@ import java.util.Set;
*/
public class TopologyBuilder {
- // list of node factories in a topological order
- private final ArrayList<NodeFactory> nodeFactories = new ArrayList<>();
+ // node factories in a topological order
+ private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
- private final Set<String> nodeNames = new HashSet<>();
private final Set<String> sourceTopicNames = new HashSet<>();
private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
@@ -59,6 +60,9 @@ public class TopologyBuilder {
private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
private Map<Integer, Set<String>> nodeGroups = null;
+ private Map<String, StateStoreSupplier> stateStores = new HashMap<>();
+ private Map<String, Set<String>> stateStoreUsers = new HashMap();
+
private interface NodeFactory {
ProcessorNode build();
}
@@ -67,6 +71,7 @@ public class TopologyBuilder {
public final String[] parents;
private final String name;
private final ProcessorSupplier supplier;
+ private final Set<String> stateStoreNames = new HashSet<>();
public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) {
this.name = name;
@@ -74,9 +79,13 @@ public class TopologyBuilder {
this.supplier = supplier;
}
+ public void addStateStore(String stateStoreName) {
+ stateStoreNames.add(stateStoreName);
+ }
+
@Override
public ProcessorNode build() {
- return new ProcessorNode(name, supplier.get());
+ return new ProcessorNode(name, supplier.get(), stateStoreNames);
}
}
@@ -155,7 +164,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
- if (nodeNames.contains(name))
+ if (nodeFactories.containsKey(name))
throw new TopologyException("Processor " + name + " is already added.");
for (String topic : topics) {
@@ -165,8 +174,7 @@ public class TopologyBuilder {
sourceTopicNames.add(topic);
}
- nodeNames.add(name);
- nodeFactories.add(new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
+ nodeFactories.put(name, new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
nodeToTopics.put(name, topics.clone());
nodeGrouper.add(name);
@@ -204,7 +212,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
- if (nodeNames.contains(name))
+ if (nodeFactories.containsKey(name))
throw new TopologyException("Processor " + name + " is already added.");
if (parentNames != null) {
@@ -212,14 +220,13 @@ public class TopologyBuilder {
if (parent.equals(name)) {
throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
}
- if (!nodeNames.contains(parent)) {
+ if (!nodeFactories.containsKey(parent)) {
throw new TopologyException("Parent processor " + parent + " is not added yet.");
}
}
}
- nodeNames.add(name);
- nodeFactories.add(new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
+ nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
return this;
}
@@ -233,7 +240,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
- if (nodeNames.contains(name))
+ if (nodeFactories.containsKey(name))
throw new TopologyException("Processor " + name + " is already added.");
if (parentNames != null) {
@@ -241,20 +248,80 @@ public class TopologyBuilder {
if (parent.equals(name)) {
throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
}
- if (!nodeNames.contains(parent)) {
+ if (!nodeFactories.containsKey(parent)) {
throw new TopologyException("Parent processor " + parent + " is not added yet.");
}
}
}
- nodeNames.add(name);
- nodeFactories.add(new ProcessorNodeFactory(name, parentNames, supplier));
+ nodeFactories.put(name, new ProcessorNodeFactory(name, parentNames, supplier));
nodeGrouper.add(name);
nodeGrouper.unite(name, parentNames);
return this;
}
/**
+ * Adds a state store
+ *
+ * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+ * @return this builder instance so methods can be chained together; never null
+ */
+ public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) {
+ if (stateStores.containsKey(supplier.name())) {
+ throw new TopologyException("StateStore " + supplier.name() + " is already added.");
+ }
+ stateStores.put(supplier.name(), supplier);
+ stateStoreUsers.put(supplier.name(), new HashSet<String>());
+
+ if (processorNames != null) {
+ for (String processorName : processorNames) {
+ connectProcessorAndStateStore(processorName, supplier.name());
+ }
+ }
+
+ return this;
+ }
+
+ /**
+ * Connects the processor and the state stores
+ *
+ * @param processorName the name of the processor
+ * @param stateStoreNames the names of state stores that the processor uses
+ * @return this builder instance so methods can be chained together; never null
+ */
+ public final TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) {
+ if (stateStoreNames != null) {
+ for (String stateStoreName : stateStoreNames) {
+ connectProcessorAndStateStore(processorName, stateStoreName);
+ }
+ }
+
+ return this;
+ }
+
+ private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
+ if (!stateStores.containsKey(stateStoreName))
+ throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
+ if (!nodeFactories.containsKey(processorName))
+ throw new TopologyException("Processor " + processorName + " is not added yet.");
+
+ Set<String> users = stateStoreUsers.get(stateStoreName);
+ Iterator<String> iter = users.iterator();
+ if (iter.hasNext()) {
+ String user = iter.next();
+ nodeGrouper.unite(user, processorName);
+ }
+ users.add(processorName);
+
+ NodeFactory factory = nodeFactories.get(processorName);
+ if (factory instanceof ProcessorNodeFactory) {
+ ((ProcessorNodeFactory) factory).addStateStore(stateStoreName);
+ } else {
+ throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
+ }
+ }
+
+ /**
* Returns the map of topic groups keyed by the group id.
* A topic group is a group of topics in the same task.
*
@@ -301,7 +368,7 @@ public class TopologyBuilder {
}
// Go through non-source nodes
- for (String nodeName : Utils.sorted(nodeNames)) {
+ for (String nodeName : Utils.sorted(nodeFactories.keySet())) {
if (!nodeToTopics.containsKey(nodeName)) {
String root = nodeGrouper.root(nodeName);
Set<String> nodeGroup = rootToNodeGroup.get(root);
@@ -357,10 +424,11 @@ public class TopologyBuilder {
List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
Map<String, ProcessorNode> processorMap = new HashMap<>();
Map<String, SourceNode> topicSourceMap = new HashMap<>();
+ Map<String, StateStoreSupplier> stateStoreMap = new HashMap<>();
try {
// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
- for (NodeFactory factory : nodeFactories) {
+ for (NodeFactory factory : nodeFactories.values()) {
ProcessorNode node = factory.build();
processorNodes.add(node);
processorMap.put(node.name(), node);
@@ -369,6 +437,11 @@ public class TopologyBuilder {
for (String parent : ((ProcessorNodeFactory) factory).parents) {
processorMap.get(parent).addChild(node);
}
+ for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
+ if (!stateStoreMap.containsKey(stateStoreName)) {
+ stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName));
+ }
+ }
} else if (factory instanceof SourceNodeFactory) {
for (String topic : ((SourceNodeFactory) factory).topics) {
topicSourceMap.put(topic, (SourceNode) node);
@@ -385,7 +458,7 @@ public class TopologyBuilder {
throw new KafkaException("ProcessorNode construction failed: this should not happen.");
}
- return new ProcessorTopology(processorNodes, topicSourceMap);
+ return new ProcessorTopology(processorNodes, topicSourceMap, new ArrayList<>(stateStoreMap.values()));
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 3c1e059..1321cc5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -62,19 +62,14 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
this.collector = collector;
this.stateMgr = stateMgr;
- this.keySerializer = config.getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
- this.valSerializer = config.getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
- this.keyDeserializer = config.getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- this.valDeserializer = config.getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ this.keySerializer = config.keySerializer();
+ this.valSerializer = config.valueSerializer();
+ this.keyDeserializer = config.keyDeserializer();
+ this.valDeserializer = config.valueDeserializer();
this.initialized = false;
}
- @Override
- public RecordCollector recordCollector() {
- return this.collector;
- }
-
public void initialized() {
this.initialized = true;
}
@@ -83,6 +78,15 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
return id;
}
+ public ProcessorStateManager getStateMgr() {
+ return stateMgr;
+ }
+
+ @Override
+ public RecordCollector recordCollector() {
+ return this.collector;
+ }
+
@Override
public Serializer<?> keySerializer() {
return this.keySerializer;
@@ -123,6 +127,14 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
@Override
public StateStore getStateStore(String name) {
+ ProcessorNode node = task.node();
+
+ if (node == null)
+ throw new KafkaException("accessing from an unknown node");
+
+ if (!node.stateStores.contains(name))
+ throw new KafkaException("Processor " + node.name() + " has no access to StateStore " + name);
+
return stateMgr.getStore(name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 9127c3f..6db83a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
public class ProcessorNode<K, V> {
@@ -30,14 +31,17 @@ public class ProcessorNode<K, V> {
private final String name;
private final Processor<K, V> processor;
+ public final Set<String> stateStores;
+
public ProcessorNode(String name) {
- this(name, null);
+ this(name, null, null);
}
- public ProcessorNode(String name, Processor<K, V> processor) {
+ public ProcessorNode(String name, Processor<K, V> processor, Set<String> stateStores) {
this.name = name;
this.processor = processor;
this.children = new ArrayList<>();
+ this.stateStores = stateStores;
}
public final String name() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 3efae65..a70aa70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -27,11 +29,14 @@ public class ProcessorTopology {
private final List<ProcessorNode> processorNodes;
private final Map<String, SourceNode> sourceByTopics;
+ private final List<StateStoreSupplier> stateStoreSuppliers;
public ProcessorTopology(List<ProcessorNode> processorNodes,
- Map<String, SourceNode> sourceByTopics) {
+ Map<String, SourceNode> sourceByTopics,
+ List<StateStoreSupplier> stateStoreSuppliers) {
this.processorNodes = Collections.unmodifiableList(processorNodes);
this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics);
+ this.stateStoreSuppliers = Collections.unmodifiableList(stateStoreSuppliers);
}
public Set<String> sourceTopics() {
@@ -50,4 +55,8 @@ public class ProcessorTopology {
return processorNodes;
}
+ public List<StateStoreSupplier> stateStoreSuppliers() {
+ return stateStoreSuppliers;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f01e00b..a9c14e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -26,6 +26,8 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
@@ -126,6 +128,12 @@ public class StreamTask implements Punctuator {
// initialize the topology with its own context
this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics);
+ // initialize the state stores
+ for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
+ StateStore store = stateStoreSupplier.get();
+ store.init(this.processorContext);
+ }
+
// initialize the task by initializing all its processor nodes in the topology
for (ProcessorNode node : this.topology.processors()) {
this.currNode = node;
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
deleted file mode 100644
index 1eb526f..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-/**
- * An in-memory key-value store based on a TreeMap.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- * @see Stores#create(String, ProcessorContext)
- */
-public class InMemoryKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
-
- protected InMemoryKeyValueStore(String name, ProcessorContext context, Serdes<K, V> serdes, Time time) {
- super(name, new MemoryStore<K, V>(name), context, serdes, "in-memory-state", time != null ? time : new SystemTime());
- }
-
- private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
-
- private final String name;
- private final NavigableMap<K, V> map;
-
- public MemoryStore(String name) {
- super();
- this.name = name;
- this.map = new TreeMap<>();
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- return this.map.get(key);
- }
-
- @Override
- public void put(K key, V value) {
- this.map.put(key, value);
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- return this.map.remove(key);
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
- }
-
- @Override
- public void flush() {
- // do-nothing since it is in-memory
- }
-
- @Override
- public void close() {
- // do-nothing
- }
-
- private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
- private final Iterator<Map.Entry<K, V>> iter;
-
- public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
- this.iter = iter;
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public Entry<K, V> next() {
- Map.Entry<K, V> entry = iter.next();
- return new Entry<>(entry.getKey(), entry.getValue());
- }
-
- @Override
- public void remove() {
- iter.remove();
- }
-
- @Override
- public void close() {
- }
-
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
new file mode 100644
index 0000000..d1f845c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * An in-memory key-value store based on a TreeMap.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
+ */
+public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final Serdes serdes;
+ private final Time time;
+
+ protected InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+ this.name = name;
+ this.serdes = serdes;
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
+ }
+
+ private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
+
+ private final String name;
+ private final NavigableMap<K, V> map;
+
+ public MemoryStore(String name) {
+ super();
+ this.name = name;
+ this.map = new TreeMap<>();
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ return this.map.get(key);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ this.map.put(key, value);
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ for (Entry<K, V> entry : entries)
+ put(entry.key(), entry.value());
+ }
+
+ @Override
+ public V delete(K key) {
+ return this.map.remove(key);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
+ }
+
+ @Override
+ public void flush() {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public void close() {
+ // do-nothing
+ }
+
+ private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
+ private final Iterator<Map.Entry<K, V>> iter;
+
+ public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
+ this.iter = iter;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ Map.Entry<K, V> entry = iter.next();
+ return new Entry<>(entry.getKey(), entry.getValue());
+ }
+
+ @Override
+ public void remove() {
+ iter.remove();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
deleted file mode 100644
index 1b96c59..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
-/**
- * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- */
-public class InMemoryLRUCacheStore<K, V> extends MeteredKeyValueStore<K, V> {
-
- protected static <K, V> InMemoryLRUCacheStore<K, V> create(String name, int capacity, ProcessorContext context,
- Serdes<K, V> serdes, Time time) {
- if (time == null) time = new SystemTime();
- MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
- final InMemoryLRUCacheStore<K, V> store = new InMemoryLRUCacheStore<>(name, context, cache, serdes, time);
- cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
- @Override
- public void apply(K key, V value) {
- store.removed(key);
- }
- });
- return store;
-
- }
-
- private InMemoryLRUCacheStore(String name, ProcessorContext context, MemoryLRUCache<K, V> cache, Serdes<K, V> serdes, Time time) {
- super(name, cache, context, serdes, "kafka-streams", time);
- }
-
- private static interface EldestEntryRemovalListener<K, V> {
- public void apply(K key, V value);
- }
-
- protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
-
- private final String name;
- private final Map<K, V> map;
- private final NavigableSet<K> keys;
- private EldestEntryRemovalListener<K, V> listener;
-
- public MemoryLRUCache(String name, final int maxCacheSize) {
- this.name = name;
- this.keys = new TreeSet<>();
- // leave room for one extra entry to handle adding an entry before the oldest can be removed
- this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
- private static final long serialVersionUID = 1L;
-
- @Override
- protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
- if (size() > maxCacheSize) {
- K key = eldest.getKey();
- keys.remove(key);
- if (listener != null) listener.apply(key, eldest.getValue());
- return true;
- }
- return false;
- }
- };
- }
-
- protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
- this.listener = listener;
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- return this.map.get(key);
- }
-
- @Override
- public void put(K key, V value) {
- this.map.put(key, value);
- this.keys.add(key);
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- V value = this.map.remove(key);
- this.keys.remove(key);
- return value;
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
- }
-
- @Override
- public void flush() {
- // do-nothing since it is in-memory
- }
-
- @Override
- public void close() {
- // do-nothing
- }
-
- private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
- private final Iterator<K> keys;
- private final Map<K, V> entries;
- private K lastKey;
-
- public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
- this.keys = keys;
- this.entries = entries;
- }
-
- @Override
- public boolean hasNext() {
- return keys.hasNext();
- }
-
- @Override
- public Entry<K, V> next() {
- lastKey = keys.next();
- return new Entry<>(lastKey, entries.get(lastKey));
- }
-
- @Override
- public void remove() {
- keys.remove();
- entries.remove(lastKey);
- }
-
- @Override
- public void close() {
- // do nothing
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
new file mode 100644
index 0000000..a346534
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ */
+public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final int capacity;
+ private final Serdes serdes;
+ private final Time time;
+
+ protected InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
+ this.name = name;
+ this.capacity = capacity;
+ this.serdes = serdes;
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
+ final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
+ cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
+ @Override
+ public void apply(K key, V value) {
+ store.removed(key);
+ }
+ });
+ return store;
+ }
+
+ private static interface EldestEntryRemovalListener<K, V> {
+ public void apply(K key, V value);
+ }
+
+ protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
+
+ private final String name;
+ private final Map<K, V> map;
+ private final NavigableSet<K> keys;
+ private EldestEntryRemovalListener<K, V> listener;
+
+ public MemoryLRUCache(String name, final int maxCacheSize) {
+ this.name = name;
+ this.keys = new TreeSet<>();
+ // leave room for one extra entry to handle adding an entry before the oldest can be removed
+ this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+ if (size() > maxCacheSize) {
+ K key = eldest.getKey();
+ keys.remove(key);
+ if (listener != null) listener.apply(key, eldest.getValue());
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+
+ protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ return this.map.get(key);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ this.map.put(key, value);
+ this.keys.add(key);
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ for (Entry<K, V> entry : entries)
+ put(entry.key(), entry.value());
+ }
+
+ @Override
+ public V delete(K key) {
+ V value = this.map.remove(key);
+ this.keys.remove(key);
+ return value;
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
+ }
+
+ @Override
+ public void flush() {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public void close() {
+ // do-nothing
+ }
+
+ private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
+ private final Iterator<K> keys;
+ private final Map<K, V> entries;
+ private K lastKey;
+
+ public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
+ this.keys = keys;
+ this.entries = entries;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return keys.hasNext();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ lastKey = keys.next();
+ return new Entry<>(lastKey, entries.get(lastKey));
+ }
+
+ @Override
+ public void remove() {
+ keys.remove();
+ entries.remove(lastKey);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index a7f4c12..c1ccbd4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state;
+import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -35,33 +36,51 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
protected final KeyValueStore<K, V> inner;
protected final Serdes<K, V> serialization;
-
- private final Time time;
- private final Sensor putTime;
- private final Sensor getTime;
- private final Sensor deleteTime;
- private final Sensor putAllTime;
- private final Sensor allTime;
- private final Sensor rangeTime;
- private final Sensor flushTime;
- private final Sensor restoreTime;
- private final StreamingMetrics metrics;
+ protected final String metricGrp;
+ protected final Time time;
private final String topic;
- private final int partition;
+
+ private Sensor putTime;
+ private Sensor getTime;
+ private Sensor deleteTime;
+ private Sensor putAllTime;
+ private Sensor allTime;
+ private Sensor rangeTime;
+ private Sensor flushTime;
+ private Sensor restoreTime;
+ private StreamingMetrics metrics;
+
private final Set<K> dirty;
private final Set<K> removed;
private final int maxDirty;
private final int maxRemoved;
- private final ProcessorContext context;
+
+ private int partition;
+ private ProcessorContext context;
// always wrap the logged store with the metered store
- public MeteredKeyValueStore(final String name, final KeyValueStore<K, V> inner, ProcessorContext context,
- Serdes<K, V> serialization, String metricGrp, Time time) {
+ public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricGrp, Time time) {
this.inner = inner;
this.serialization = serialization;
+ this.metricGrp = metricGrp;
+ this.time = time != null ? time : new SystemTime();
+ this.topic = inner.name();
+
+ this.dirty = new HashSet<K>();
+ this.removed = new HashSet<K>();
+ this.maxDirty = 100; // TODO: this needs to be configurable
+ this.maxRemoved = 100; // TODO: this needs to be configurable
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
- this.time = time;
+ @Override
+ public void init(ProcessorContext context) {
+ String name = name();
this.metrics = context.metrics();
this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
@@ -72,18 +91,12 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name", name);
this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
- this.topic = name;
- this.partition = context.id().partition;
-
this.context = context;
-
- this.dirty = new HashSet<K>();
- this.removed = new HashSet<K>();
- this.maxDirty = 100; // TODO: this needs to be configurable
- this.maxRemoved = 100; // TODO: this needs to be configurable
+ this.partition = context.id().partition;
// register and possibly restore the state from the logs
long startNs = time.nanoseconds();
+ inner.init(context);
try {
final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
final Deserializer<V> valDeserializer = serialization.valueDeserializer();
@@ -92,7 +105,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
@Override
public void restore(byte[] key, byte[] value) {
inner.put(keyDeserializer.deserialize(topic, key),
- valDeserializer.deserialize(topic, value));
+ valDeserializer.deserialize(topic, value));
}
});
} finally {
@@ -101,11 +114,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
}
@Override
- public String name() {
- return inner.name();
- }
-
- @Override
public boolean persistent() {
return inner.persistent();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
deleted file mode 100644
index 1de345e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.util.Comparator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see Stores#create(String, ProcessorContext)
- */
-public class RocksDBKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
-
- protected RocksDBKeyValueStore(String name, ProcessorContext context, Serdes<K, V> serdes, Time time) {
- super(name, new RocksDBStore<K, V>(name, context, serdes), context, serdes, "rocksdb-state", time != null ? time : new SystemTime());
- }
-
- private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
-
- private static final int TTL_NOT_USED = -1;
-
- // TODO: these values should be configurable
- private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
- private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
- private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
- private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
- private static final long BLOCK_SIZE = 4096L;
- private static final int TTL_SECONDS = TTL_NOT_USED;
- private static final int MAX_WRITE_BUFFERS = 3;
- private static final String DB_FILE_DIR = "rocksdb";
-
- private final Serdes<K, V> serdes;
-
- private final String topic;
- private final int partition;
- private final ProcessorContext context;
-
- private final Options options;
- private final WriteOptions wOptions;
- private final FlushOptions fOptions;
-
- private final String dbName;
- private final String dirName;
-
- private RocksDB db;
-
- public RocksDBStore(String name, ProcessorContext context, Serdes<K, V> serdes) {
- this.topic = name;
- this.partition = context.id().partition;
- this.context = context;
- this.serdes = serdes;
-
- // initialize the rocksdb options
- BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
- tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
- tableConfig.setBlockSize(BLOCK_SIZE);
-
- options = new Options();
- options.setTableFormatConfig(tableConfig);
- options.setWriteBufferSize(WRITE_BUFFER_SIZE);
- options.setCompressionType(COMPRESSION_TYPE);
- options.setCompactionStyle(COMPACTION_STYLE);
- options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
- options.setCreateIfMissing(true);
- options.setErrorIfExists(false);
-
- wOptions = new WriteOptions();
- wOptions.setDisableWAL(true);
-
- fOptions = new FlushOptions();
- fOptions.setWaitForFlush(true);
-
- dbName = this.topic + "." + this.partition;
- dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
-
- db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
- }
-
- private RocksDB openDB(File dir, Options options, int ttl) {
- try {
- if (ttl == TTL_NOT_USED) {
- dir.getParentFile().mkdirs();
- return RocksDB.open(options, dir.toString());
- } else {
- throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
- // TODO: support TTL with change log?
- // return TtlDB.open(options, dir.toString(), ttl, false);
- }
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
- }
- }
-
- @Override
- public String name() {
- return this.topic;
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- try {
- return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
- }
- }
-
- @Override
- public void put(K key, V value) {
- try {
- if (value == null) {
- db.remove(wOptions, serdes.rawKey(key));
- } else {
- db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
- }
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
- }
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- V value = get(key);
- put(key, null);
- return value;
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- RocksIterator innerIter = db.newIterator();
- innerIter.seekToFirst();
- return new RocksDbIterator<K, V>(innerIter, serdes);
- }
-
- @Override
- public void flush() {
- try {
- db.flush(fOptions);
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing flush from store " + this.topic, e);
- }
- }
-
- @Override
- public void close() {
- flush();
- db.close();
- }
-
- private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
- private final RocksIterator iter;
- private final Serdes<K, V> serdes;
-
- public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
- this.iter = iter;
- this.serdes = serdes;
- }
-
- protected byte[] peekRawKey() {
- return iter.key();
- }
-
- protected Entry<K, V> getEntry() {
- return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
- }
-
- @Override
- public boolean hasNext() {
- return iter.isValid();
- }
-
- @Override
- public Entry<K, V> next() {
- if (!hasNext())
- throw new NoSuchElementException();
-
- Entry<K, V> entry = this.getEntry();
- iter.next();
- return entry;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("RocksDB iterator does not support remove");
- }
-
- @Override
- public void close() {
- }
-
- }
-
- private static class LexicographicComparator implements Comparator<byte[]> {
-
- @Override
- public int compare(byte[] left, byte[] right) {
- for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
- int leftByte = left[i] & 0xff;
- int rightByte = right[j] & 0xff;
- if (leftByte != rightByte) {
- return leftByte - rightByte;
- }
- }
- return left.length - right.length;
- }
- }
-
- private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
- // RocksDB's JNI interface does not expose getters/setters that allow the
- // comparator to be pluggable, and the default is lexicographic, so it's
- // safe to just force lexicographic comparator here for now.
- private final Comparator<byte[]> comparator = new LexicographicComparator();
- byte[] rawToKey;
-
- public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
- K from, K to) {
- super(iter, serdes);
- iter.seek(serdes.rawKey(from));
- this.rawToKey = serdes.rawKey(to);
- }
-
- @Override
- public boolean hasNext() {
- return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
new file mode 100644
index 0000000..fe8f00a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ *
+ * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
+ */
+public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final Serdes serdes;
+ private final Time time;
+
+ protected RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+ this.name = name;
+ this.serdes = serdes;
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ return new MeteredKeyValueStore<K, V>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
+ }
+
+ private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
+
+ private static final int TTL_NOT_USED = -1;
+
+ // TODO: these values should be configurable
+ private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
+ private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
+ private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
+ private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
+ private static final long BLOCK_SIZE = 4096L;
+ private static final int TTL_SECONDS = TTL_NOT_USED;
+ private static final int MAX_WRITE_BUFFERS = 3;
+ private static final String DB_FILE_DIR = "rocksdb";
+
+ private final Serdes<K, V> serdes;
+ private final String topic;
+
+ private final Options options;
+ private final WriteOptions wOptions;
+ private final FlushOptions fOptions;
+
+ private ProcessorContext context;
+ private int partition;
+ private String dbName;
+ private String dirName;
+ private RocksDB db;
+
+ public RocksDBStore(String name, Serdes<K, V> serdes) {
+ this.topic = name;
+ this.serdes = serdes;
+
+ // initialize the rocksdb options
+ BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+ tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
+ tableConfig.setBlockSize(BLOCK_SIZE);
+
+ options = new Options();
+ options.setTableFormatConfig(tableConfig);
+ options.setWriteBufferSize(WRITE_BUFFER_SIZE);
+ options.setCompressionType(COMPRESSION_TYPE);
+ options.setCompactionStyle(COMPACTION_STYLE);
+ options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
+ options.setCreateIfMissing(true);
+ options.setErrorIfExists(false);
+
+ wOptions = new WriteOptions();
+ wOptions.setDisableWAL(true);
+
+ fOptions = new FlushOptions();
+ fOptions.setWaitForFlush(true);
+ }
+
+ public void init(ProcessorContext context) {
+ this.context = context;
+ this.partition = context.id().partition;
+ this.dbName = this.topic + "." + this.partition;
+ this.dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
+ this.db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
+ }
+
+ private RocksDB openDB(File dir, Options options, int ttl) {
+ try {
+ if (ttl == TTL_NOT_USED) {
+ dir.getParentFile().mkdirs();
+ return RocksDB.open(options, dir.toString());
+ } else {
+ throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
+ // TODO: support TTL with change log?
+ // return TtlDB.open(options, dir.toString(), ttl, false);
+ }
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
+ }
+ }
+
+ @Override
+ public String name() {
+ return this.topic;
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ try {
+ return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
+ }
+ }
+
+ @Override
+ public void put(K key, V value) {
+ try {
+ if (value == null) {
+ db.remove(wOptions, serdes.rawKey(key));
+ } else {
+ db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
+ }
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
+ }
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ for (Entry<K, V> entry : entries)
+ put(entry.key(), entry.value());
+ }
+
+ @Override
+ public V delete(K key) {
+ V value = get(key);
+ put(key, null);
+ return value;
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ RocksIterator innerIter = db.newIterator();
+ innerIter.seekToFirst();
+ return new RocksDbIterator<K, V>(innerIter, serdes);
+ }
+
+ @Override
+ public void flush() {
+ try {
+ db.flush(fOptions);
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error while executing flush from store " + this.topic, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ flush();
+ db.close();
+ }
+
+ private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
+ private final RocksIterator iter;
+ private final Serdes<K, V> serdes;
+
+ public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
+ this.iter = iter;
+ this.serdes = serdes;
+ }
+
+ protected byte[] peekRawKey() {
+ return iter.key();
+ }
+
+ protected Entry<K, V> getEntry() {
+ return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.isValid();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ Entry<K, V> entry = this.getEntry();
+ iter.next();
+ return entry;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("RocksDB iterator does not support remove");
+ }
+
+ @Override
+ public void close() {
+ }
+
+ }
+
+ private static class LexicographicComparator implements Comparator<byte[]> {
+
+ @Override
+ public int compare(byte[] left, byte[] right) {
+ for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
+ int leftByte = left[i] & 0xff;
+ int rightByte = right[j] & 0xff;
+ if (leftByte != rightByte) {
+ return leftByte - rightByte;
+ }
+ }
+ return left.length - right.length;
+ }
+ }
+
+ private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
+ // RocksDB's JNI interface does not expose getters/setters that allow the
+ // comparator to be pluggable, and the default is lexicographic, so it's
+ // safe to just force lexicographic comparator here for now.
+ private final Comparator<byte[]> comparator = new LexicographicComparator();
+ byte[] rawToKey;
+
+ public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
+ K from, K to) {
+ super(iter, serdes);
+ iter.seek(serdes.rawKey(from));
+ this.rawToKey = serdes.rawKey(to);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
index 540d763..31bd439 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamingConfig;
final class Serdes<K, V> {
@@ -64,7 +64,7 @@ final class Serdes<K, V> {
/**
* Create a context for serialization using the specified serializers and deserializers.
- *
+ *
* @param topic the name of the topic
* @param keySerializer the serializer for keys; may not be null
* @param keyDeserializer the deserializer for keys; may not be null
@@ -83,47 +83,44 @@ final class Serdes<K, V> {
/**
* Create a context for serialization using the specified serializers and deserializers, or if any of them are null the
- * corresponding {@link ProcessorContext}'s default serializer or deserializer, which
+ * corresponding {@link StreamingConfig}'s serializer or deserializer, which
* <em>must</em> match the key and value types used as parameters for this object.
- *
+ *
* @param topic the name of the topic
- * @param keySerializer the serializer for keys; may be null if the {@link ProcessorContext#keySerializer() default
+ * @param keySerializer the serializer for keys; may be null if the {@link StreamingConfig#keySerializer() default
* key serializer} should be used
- * @param keyDeserializer the deserializer for keys; may be null if the {@link ProcessorContext#keyDeserializer() default
+ * @param keyDeserializer the deserializer for keys; may be null if the {@link StreamingConfig#keyDeserializer() default
* key deserializer} should be used
- * @param valueSerializer the serializer for values; may be null if the {@link ProcessorContext#valueSerializer() default
+ * @param valueSerializer the serializer for values; may be null if the {@link StreamingConfig#valueSerializer() default
* value serializer} should be used
- * @param valueDeserializer the deserializer for values; may be null if the {@link ProcessorContext#valueDeserializer()
+ * @param valueDeserializer the deserializer for values; may be null if the {@link StreamingConfig#valueDeserializer()
* default value deserializer} should be used
- * @param context the processing context
+ * @param config the streaming config
*/
@SuppressWarnings("unchecked")
public Serdes(String topic,
Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
Serializer<V> valueSerializer, Deserializer<V> valueDeserializer,
- ProcessorContext context) {
+ StreamingConfig config) {
this.topic = topic;
- this.keySerializer = keySerializer != null ? keySerializer : (Serializer<K>) context.keySerializer();
- this.keyDeserializer = keyDeserializer != null ? keyDeserializer : (Deserializer<K>) context.keyDeserializer();
- this.valueSerializer = valueSerializer != null ? valueSerializer : (Serializer<V>) context.valueSerializer();
- this.valueDeserializer = valueDeserializer != null ? valueDeserializer : (Deserializer<V>) context.valueDeserializer();
+
+ this.keySerializer = keySerializer != null ? keySerializer : config.keySerializer();
+ this.keyDeserializer = keyDeserializer != null ? keyDeserializer : config.keyDeserializer();
+ this.valueSerializer = valueSerializer != null ? valueSerializer : config.valueSerializer();
+ this.valueDeserializer = valueDeserializer != null ? valueDeserializer : config.valueDeserializer();
}
/**
- * Create a context for serialization using the {@link ProcessorContext}'s default serializers and deserializers, which
+ * Create a context for serialization using the {@link StreamingConfig}'s serializers and deserializers, which
* <em>must</em> match the key and value types used as parameters for this object.
- *
+ *
* @param topic the name of the topic
- * @param context the processing context
+ * @param config the streaming config
*/
@SuppressWarnings("unchecked")
public Serdes(String topic,
- ProcessorContext context) {
- this.topic = topic;
- this.keySerializer = (Serializer<K>) context.keySerializer();
- this.keyDeserializer = (Deserializer<K>) context.keyDeserializer();
- this.valueSerializer = (Serializer<V>) context.valueSerializer();
- this.valueDeserializer = (Deserializer<V>) context.valueDeserializer();
+ StreamingConfig config) {
+ this(topic, null, null, null, null, config);
}
public Deserializer<K> keyDeserializer() {
@@ -161,4 +158,4 @@ final class Serdes<K, V> {
public byte[] rawValue(V value) {
return valueSerializer.serialize(topic, value);
}
-}
\ No newline at end of file
+}