You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/02 22:19:15 UTC
[2/2] kafka git commit: KAFKA-2706: make state stores first class
citizens in the processor topology
KAFKA-2706: make state stores first class citizens in the processor topology
* Added StateStoreSupplier
* StateStore
* Added init(ProcessorContext context) method
* TopologyBuilder
* Added addStateStore(StateStoreSupplier supplier, String... processNames)
* Added connectProessorAndStateStores(String processorName, String... stateStoreNames)
* This is for the case processors are not created when a store is added to the topology. (used by KStream)
* KStream
* add stateStoreNames to process(), transform(), transformValues().
* Refactored existing state stores to implement StateStoreSupplier
guozhangwang
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #387 from ymatsuda/state_store_supplier
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75827226
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75827226
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75827226
Branch: refs/heads/trunk
Commit: 758272267c811bf559336ea45571bc420a62a478
Parents: 6383593
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Nov 2 13:24:48 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 2 13:24:48 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/StreamingConfig.java | 18 ++
.../kafka/streams/examples/ProcessorJob.java | 5 +-
.../apache/kafka/streams/kstream/KStream.java | 9 +-
.../streams/kstream/internals/KStreamImpl.java | 9 +-
.../kafka/streams/processor/StateStore.java | 5 +
.../streams/processor/StateStoreSupplier.java | 25 ++
.../streams/processor/TopologyBuilder.java | 109 +++++--
.../internals/ProcessorContextImpl.java | 30 +-
.../processor/internals/ProcessorNode.java | 8 +-
.../processor/internals/ProcessorTopology.java | 11 +-
.../streams/processor/internals/StreamTask.java | 8 +
.../streams/state/InMemoryKeyValueStore.java | 135 ---------
.../state/InMemoryKeyValueStoreSupplier.java | 155 ++++++++++
.../streams/state/InMemoryLRUCacheStore.java | 180 -----------
.../state/InMemoryLRUCacheStoreSupplier.java | 195 ++++++++++++
.../streams/state/MeteredKeyValueStore.java | 68 +++--
.../streams/state/RocksDBKeyValueStore.java | 284 ------------------
.../state/RocksDBKeyValueStoreSupplier.java | 298 +++++++++++++++++++
.../org/apache/kafka/streams/state/Serdes.java | 45 ++-
.../org/apache/kafka/streams/state/Stores.java | 66 ++--
.../streams/processor/TopologyBuilderTest.java | 79 +++++
.../internals/ProcessorStateManagerTest.java | 56 +---
.../internals/ProcessorTopologyTest.java | 13 +-
.../internals/PunctuationQueueTest.java | 2 +-
.../processor/internals/StreamTaskTest.java | 18 +-
.../state/AbstractKeyValueStoreTest.java | 16 +-
.../state/InMemoryKeyValueStoreTest.java | 22 +-
.../state/InMemoryLRUCacheStoreTest.java | 15 +-
.../streams/state/KeyValueStoreTestDriver.java | 22 ++
.../streams/state/RocksDBKeyValueStoreTest.java | 24 +-
.../kafka/test/MockStateStoreSupplier.java | 97 ++++++
.../kafka/test/ProcessorTopologyTestDriver.java | 3 +-
32 files changed, 1224 insertions(+), 806 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
index a0aef48..88bd844 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
@@ -245,6 +247,22 @@ public class StreamingConfig extends AbstractConfig {
return props;
}
+ public Serializer keySerializer() {
+ return getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ }
+
+ public Serializer valueSerializer() {
+ return getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ }
+
+ public Deserializer keyDeserializer() {
+ return getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ }
+
+ public Deserializer valueDeserializer() {
+ return getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ }
+
public static void main(String[] args) {
System.out.println(CONFIG.toHtmlTable());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 0317b9d..3274aae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -45,10 +45,11 @@ public class ProcessorJob {
private KeyValueStore<String, Integer> kvStore;
@Override
+ @SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
- this.kvStore = Stores.create("local-state", context).withStringKeys().withIntegerValues().inMemory().build();
+ this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("local-state");
}
@Override
@@ -103,6 +104,8 @@ public class ProcessorJob {
builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
+ builder.addStateStore(Stores.create("local-state", config).withStringKeys().withIntegerValues().inMemory().build());
+ builder.connectProcessorAndStateStores("local-state", "PROCESS");
builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 915cf1c..8f0794c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -154,24 +154,27 @@ public interface KStream<K, V> {
* Applies a stateful transformation to all elements in this stream.
*
* @param transformerSupplier the class of TransformerDef
+ * @param stateStoreNames the names of the state store used by the processor
* @return KStream
*/
- <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier);
+ <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames);
/**
* Applies a stateful transformation to all values in this stream.
*
* @param valueTransformerSupplier the class of TransformerDef
+ * @param stateStoreNames the names of the state store used by the processor
* @return KStream
*/
- <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier);
+ <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
/**
* Processes all elements in this stream by applying a processor.
*
* @param processorSupplier the supplier of the Processor to use
+ * @param stateStoreNames the names of the state store used by the processor
* @return the new stream containing the processed output
*/
- void process(ProcessorSupplier<K, V> processorSupplier);
+ void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 1a2297c..1ea9b1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -201,27 +201,30 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
}
@Override
- public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier) {
+ public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) {
String name = TRANSFORM_NAME + INDEX.getAndIncrement();
topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
+ topology.connectProcessorAndStateStores(name, stateStoreNames);
return new KStreamImpl<>(topology, name, null);
}
@Override
- public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier) {
+ public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String... stateStoreNames) {
String name = TRANSFORMVALUES_NAME + INDEX.getAndIncrement();
topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
+ topology.connectProcessorAndStateStores(name, stateStoreNames);
return new KStreamImpl<>(topology, name, sourceNodes);
}
@Override
- public void process(final ProcessorSupplier<K, V> processorSupplier) {
+ public void process(final ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames) {
String name = PROCESSOR_NAME + INDEX.getAndIncrement();
topology.addProcessor(name, processorSupplier, this.name);
+ topology.connectProcessorAndStateStores(name, stateStoreNames);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 38afe9b..9c085a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -36,6 +36,11 @@ public interface StateStore {
String name();
/**
+ * Initializes this state store
+ */
+ void init(ProcessorContext context);
+
+ /**
* Flush any cached data
*/
void flush();
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
new file mode 100644
index 0000000..11545c5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+public interface StateStoreSupplier {
+
+ String name();
+
+ StateStore get();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 077489c..5b6d4ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -33,6 +33,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -48,10 +50,9 @@ import java.util.Set;
*/
public class TopologyBuilder {
- // list of node factories in a topological order
- private final ArrayList<NodeFactory> nodeFactories = new ArrayList<>();
+ // node factories in a topological order
+ private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
- private final Set<String> nodeNames = new HashSet<>();
private final Set<String> sourceTopicNames = new HashSet<>();
private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
@@ -59,6 +60,9 @@ public class TopologyBuilder {
private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
private Map<Integer, Set<String>> nodeGroups = null;
+ private Map<String, StateStoreSupplier> stateStores = new HashMap<>();
+ private Map<String, Set<String>> stateStoreUsers = new HashMap();
+
private interface NodeFactory {
ProcessorNode build();
}
@@ -67,6 +71,7 @@ public class TopologyBuilder {
public final String[] parents;
private final String name;
private final ProcessorSupplier supplier;
+ private final Set<String> stateStoreNames = new HashSet<>();
public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) {
this.name = name;
@@ -74,9 +79,13 @@ public class TopologyBuilder {
this.supplier = supplier;
}
+ public void addStateStore(String stateStoreName) {
+ stateStoreNames.add(stateStoreName);
+ }
+
@Override
public ProcessorNode build() {
- return new ProcessorNode(name, supplier.get());
+ return new ProcessorNode(name, supplier.get(), stateStoreNames);
}
}
@@ -155,7 +164,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
- if (nodeNames.contains(name))
+ if (nodeFactories.containsKey(name))
throw new TopologyException("Processor " + name + " is already added.");
for (String topic : topics) {
@@ -165,8 +174,7 @@ public class TopologyBuilder {
sourceTopicNames.add(topic);
}
- nodeNames.add(name);
- nodeFactories.add(new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
+ nodeFactories.put(name, new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
nodeToTopics.put(name, topics.clone());
nodeGrouper.add(name);
@@ -204,7 +212,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
- if (nodeNames.contains(name))
+ if (nodeFactories.containsKey(name))
throw new TopologyException("Processor " + name + " is already added.");
if (parentNames != null) {
@@ -212,14 +220,13 @@ public class TopologyBuilder {
if (parent.equals(name)) {
throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
}
- if (!nodeNames.contains(parent)) {
+ if (!nodeFactories.containsKey(parent)) {
throw new TopologyException("Parent processor " + parent + " is not added yet.");
}
}
}
- nodeNames.add(name);
- nodeFactories.add(new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
+ nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
return this;
}
@@ -233,7 +240,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
- if (nodeNames.contains(name))
+ if (nodeFactories.containsKey(name))
throw new TopologyException("Processor " + name + " is already added.");
if (parentNames != null) {
@@ -241,20 +248,80 @@ public class TopologyBuilder {
if (parent.equals(name)) {
throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
}
- if (!nodeNames.contains(parent)) {
+ if (!nodeFactories.containsKey(parent)) {
throw new TopologyException("Parent processor " + parent + " is not added yet.");
}
}
}
- nodeNames.add(name);
- nodeFactories.add(new ProcessorNodeFactory(name, parentNames, supplier));
+ nodeFactories.put(name, new ProcessorNodeFactory(name, parentNames, supplier));
nodeGrouper.add(name);
nodeGrouper.unite(name, parentNames);
return this;
}
/**
+ * Adds a state store
+ *
+ * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+ * @return this builder instance so methods can be chained together; never null
+ */
+ public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) {
+ if (stateStores.containsKey(supplier.name())) {
+ throw new TopologyException("StateStore " + supplier.name() + " is already added.");
+ }
+ stateStores.put(supplier.name(), supplier);
+ stateStoreUsers.put(supplier.name(), new HashSet<String>());
+
+ if (processorNames != null) {
+ for (String processorName : processorNames) {
+ connectProcessorAndStateStore(processorName, supplier.name());
+ }
+ }
+
+ return this;
+ }
+
+ /**
+ * Connects the processor and the state stores
+ *
+ * @param processorName the name of the processor
+ * @param stateStoreNames the names of state stores that the processor uses
+ * @return this builder instance so methods can be chained together; never null
+ */
+ public final TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) {
+ if (stateStoreNames != null) {
+ for (String stateStoreName : stateStoreNames) {
+ connectProcessorAndStateStore(processorName, stateStoreName);
+ }
+ }
+
+ return this;
+ }
+
+ private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
+ if (!stateStores.containsKey(stateStoreName))
+ throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
+ if (!nodeFactories.containsKey(processorName))
+ throw new TopologyException("Processor " + processorName + " is not added yet.");
+
+ Set<String> users = stateStoreUsers.get(stateStoreName);
+ Iterator<String> iter = users.iterator();
+ if (iter.hasNext()) {
+ String user = iter.next();
+ nodeGrouper.unite(user, processorName);
+ }
+ users.add(processorName);
+
+ NodeFactory factory = nodeFactories.get(processorName);
+ if (factory instanceof ProcessorNodeFactory) {
+ ((ProcessorNodeFactory) factory).addStateStore(stateStoreName);
+ } else {
+ throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
+ }
+ }
+
+ /**
* Returns the map of topic groups keyed by the group id.
* A topic group is a group of topics in the same task.
*
@@ -301,7 +368,7 @@ public class TopologyBuilder {
}
// Go through non-source nodes
- for (String nodeName : Utils.sorted(nodeNames)) {
+ for (String nodeName : Utils.sorted(nodeFactories.keySet())) {
if (!nodeToTopics.containsKey(nodeName)) {
String root = nodeGrouper.root(nodeName);
Set<String> nodeGroup = rootToNodeGroup.get(root);
@@ -357,10 +424,11 @@ public class TopologyBuilder {
List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
Map<String, ProcessorNode> processorMap = new HashMap<>();
Map<String, SourceNode> topicSourceMap = new HashMap<>();
+ Map<String, StateStoreSupplier> stateStoreMap = new HashMap<>();
try {
// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
- for (NodeFactory factory : nodeFactories) {
+ for (NodeFactory factory : nodeFactories.values()) {
ProcessorNode node = factory.build();
processorNodes.add(node);
processorMap.put(node.name(), node);
@@ -369,6 +437,11 @@ public class TopologyBuilder {
for (String parent : ((ProcessorNodeFactory) factory).parents) {
processorMap.get(parent).addChild(node);
}
+ for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
+ if (!stateStoreMap.containsKey(stateStoreName)) {
+ stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName));
+ }
+ }
} else if (factory instanceof SourceNodeFactory) {
for (String topic : ((SourceNodeFactory) factory).topics) {
topicSourceMap.put(topic, (SourceNode) node);
@@ -385,7 +458,7 @@ public class TopologyBuilder {
throw new KafkaException("ProcessorNode construction failed: this should not happen.");
}
- return new ProcessorTopology(processorNodes, topicSourceMap);
+ return new ProcessorTopology(processorNodes, topicSourceMap, new ArrayList<>(stateStoreMap.values()));
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 3c1e059..1321cc5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -62,19 +62,14 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
this.collector = collector;
this.stateMgr = stateMgr;
- this.keySerializer = config.getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
- this.valSerializer = config.getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
- this.keyDeserializer = config.getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- this.valDeserializer = config.getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ this.keySerializer = config.keySerializer();
+ this.valSerializer = config.valueSerializer();
+ this.keyDeserializer = config.keyDeserializer();
+ this.valDeserializer = config.valueDeserializer();
this.initialized = false;
}
- @Override
- public RecordCollector recordCollector() {
- return this.collector;
- }
-
public void initialized() {
this.initialized = true;
}
@@ -83,6 +78,15 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
return id;
}
+ public ProcessorStateManager getStateMgr() {
+ return stateMgr;
+ }
+
+ @Override
+ public RecordCollector recordCollector() {
+ return this.collector;
+ }
+
@Override
public Serializer<?> keySerializer() {
return this.keySerializer;
@@ -123,6 +127,14 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
@Override
public StateStore getStateStore(String name) {
+ ProcessorNode node = task.node();
+
+ if (node == null)
+ throw new KafkaException("accessing from an unknown node");
+
+ if (!node.stateStores.contains(name))
+ throw new KafkaException("Processor " + node.name() + " has no access to StateStore " + name);
+
return stateMgr.getStore(name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 9127c3f..6db83a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
public class ProcessorNode<K, V> {
@@ -30,14 +31,17 @@ public class ProcessorNode<K, V> {
private final String name;
private final Processor<K, V> processor;
+ public final Set<String> stateStores;
+
public ProcessorNode(String name) {
- this(name, null);
+ this(name, null, null);
}
- public ProcessorNode(String name, Processor<K, V> processor) {
+ public ProcessorNode(String name, Processor<K, V> processor, Set<String> stateStores) {
this.name = name;
this.processor = processor;
this.children = new ArrayList<>();
+ this.stateStores = stateStores;
}
public final String name() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 3efae65..a70aa70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -27,11 +29,14 @@ public class ProcessorTopology {
private final List<ProcessorNode> processorNodes;
private final Map<String, SourceNode> sourceByTopics;
+ private final List<StateStoreSupplier> stateStoreSuppliers;
public ProcessorTopology(List<ProcessorNode> processorNodes,
- Map<String, SourceNode> sourceByTopics) {
+ Map<String, SourceNode> sourceByTopics,
+ List<StateStoreSupplier> stateStoreSuppliers) {
this.processorNodes = Collections.unmodifiableList(processorNodes);
this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics);
+ this.stateStoreSuppliers = Collections.unmodifiableList(stateStoreSuppliers);
}
public Set<String> sourceTopics() {
@@ -50,4 +55,8 @@ public class ProcessorTopology {
return processorNodes;
}
+ public List<StateStoreSupplier> stateStoreSuppliers() {
+ return stateStoreSuppliers;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f01e00b..a9c14e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -26,6 +26,8 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
@@ -126,6 +128,12 @@ public class StreamTask implements Punctuator {
// initialize the topology with its own context
this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics);
+ // initialize the state stores
+ for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
+ StateStore store = stateStoreSupplier.get();
+ store.init(this.processorContext);
+ }
+
// initialize the task by initializing all its processor nodes in the topology
for (ProcessorNode node : this.topology.processors()) {
this.currNode = node;
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
deleted file mode 100644
index 1eb526f..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-/**
- * An in-memory key-value store based on a TreeMap.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- * @see Stores#create(String, ProcessorContext)
- */
-public class InMemoryKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
-
- protected InMemoryKeyValueStore(String name, ProcessorContext context, Serdes<K, V> serdes, Time time) {
- super(name, new MemoryStore<K, V>(name), context, serdes, "in-memory-state", time != null ? time : new SystemTime());
- }
-
- private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
-
- private final String name;
- private final NavigableMap<K, V> map;
-
- public MemoryStore(String name) {
- super();
- this.name = name;
- this.map = new TreeMap<>();
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- return this.map.get(key);
- }
-
- @Override
- public void put(K key, V value) {
- this.map.put(key, value);
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- return this.map.remove(key);
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
- }
-
- @Override
- public void flush() {
- // do-nothing since it is in-memory
- }
-
- @Override
- public void close() {
- // do-nothing
- }
-
- private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
- private final Iterator<Map.Entry<K, V>> iter;
-
- public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
- this.iter = iter;
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public Entry<K, V> next() {
- Map.Entry<K, V> entry = iter.next();
- return new Entry<>(entry.getKey(), entry.getValue());
- }
-
- @Override
- public void remove() {
- iter.remove();
- }
-
- @Override
- public void close() {
- }
-
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
new file mode 100644
index 0000000..d1f845c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * An in-memory key-value store based on a TreeMap.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
+ */
+public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final Serdes serdes;
+ private final Time time;
+
+ protected InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+ this.name = name;
+ this.serdes = serdes;
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
+ }
+
+ private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
+
+ private final String name;
+ private final NavigableMap<K, V> map;
+
+ public MemoryStore(String name) {
+ super();
+ this.name = name;
+ this.map = new TreeMap<>();
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ return this.map.get(key);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ this.map.put(key, value);
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ for (Entry<K, V> entry : entries)
+ put(entry.key(), entry.value());
+ }
+
+ @Override
+ public V delete(K key) {
+ return this.map.remove(key);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
+ }
+
+ @Override
+ public void flush() {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public void close() {
+ // do-nothing
+ }
+
+ private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
+ private final Iterator<Map.Entry<K, V>> iter;
+
+ public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
+ this.iter = iter;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ Map.Entry<K, V> entry = iter.next();
+ return new Entry<>(entry.getKey(), entry.getValue());
+ }
+
+ @Override
+ public void remove() {
+ iter.remove();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
deleted file mode 100644
index 1b96c59..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
-/**
- * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- */
-public class InMemoryLRUCacheStore<K, V> extends MeteredKeyValueStore<K, V> {
-
- protected static <K, V> InMemoryLRUCacheStore<K, V> create(String name, int capacity, ProcessorContext context,
- Serdes<K, V> serdes, Time time) {
- if (time == null) time = new SystemTime();
- MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
- final InMemoryLRUCacheStore<K, V> store = new InMemoryLRUCacheStore<>(name, context, cache, serdes, time);
- cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
- @Override
- public void apply(K key, V value) {
- store.removed(key);
- }
- });
- return store;
-
- }
-
- private InMemoryLRUCacheStore(String name, ProcessorContext context, MemoryLRUCache<K, V> cache, Serdes<K, V> serdes, Time time) {
- super(name, cache, context, serdes, "kafka-streams", time);
- }
-
- private static interface EldestEntryRemovalListener<K, V> {
- public void apply(K key, V value);
- }
-
- protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
-
- private final String name;
- private final Map<K, V> map;
- private final NavigableSet<K> keys;
- private EldestEntryRemovalListener<K, V> listener;
-
- public MemoryLRUCache(String name, final int maxCacheSize) {
- this.name = name;
- this.keys = new TreeSet<>();
- // leave room for one extra entry to handle adding an entry before the oldest can be removed
- this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
- private static final long serialVersionUID = 1L;
-
- @Override
- protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
- if (size() > maxCacheSize) {
- K key = eldest.getKey();
- keys.remove(key);
- if (listener != null) listener.apply(key, eldest.getValue());
- return true;
- }
- return false;
- }
- };
- }
-
- protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
- this.listener = listener;
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- return this.map.get(key);
- }
-
- @Override
- public void put(K key, V value) {
- this.map.put(key, value);
- this.keys.add(key);
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- V value = this.map.remove(key);
- this.keys.remove(key);
- return value;
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
- }
-
- @Override
- public void flush() {
- // do-nothing since it is in-memory
- }
-
- @Override
- public void close() {
- // do-nothing
- }
-
- private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
- private final Iterator<K> keys;
- private final Map<K, V> entries;
- private K lastKey;
-
- public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
- this.keys = keys;
- this.entries = entries;
- }
-
- @Override
- public boolean hasNext() {
- return keys.hasNext();
- }
-
- @Override
- public Entry<K, V> next() {
- lastKey = keys.next();
- return new Entry<>(lastKey, entries.get(lastKey));
- }
-
- @Override
- public void remove() {
- keys.remove();
- entries.remove(lastKey);
- }
-
- @Override
- public void close() {
- // do nothing
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
new file mode 100644
index 0000000..a346534
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ */
+public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final int capacity;
+ private final Serdes serdes;
+ private final Time time;
+
+ protected InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
+ this.name = name;
+ this.capacity = capacity;
+ this.serdes = serdes;
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
+ final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
+ cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
+ @Override
+ public void apply(K key, V value) {
+ store.removed(key);
+ }
+ });
+ return store;
+ }
+
+ private static interface EldestEntryRemovalListener<K, V> {
+ public void apply(K key, V value);
+ }
+
+ protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
+
+ private final String name;
+ private final Map<K, V> map;
+ private final NavigableSet<K> keys;
+ private EldestEntryRemovalListener<K, V> listener;
+
+ public MemoryLRUCache(String name, final int maxCacheSize) {
+ this.name = name;
+ this.keys = new TreeSet<>();
+ // leave room for one extra entry to handle adding an entry before the oldest can be removed
+ this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+ if (size() > maxCacheSize) {
+ K key = eldest.getKey();
+ keys.remove(key);
+ if (listener != null) listener.apply(key, eldest.getValue());
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+
+ protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ return this.map.get(key);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ this.map.put(key, value);
+ this.keys.add(key);
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ for (Entry<K, V> entry : entries)
+ put(entry.key(), entry.value());
+ }
+
+ @Override
+ public V delete(K key) {
+ V value = this.map.remove(key);
+ this.keys.remove(key);
+ return value;
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
+ }
+
+ @Override
+ public void flush() {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public void close() {
+ // do-nothing
+ }
+
+ private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
+ private final Iterator<K> keys;
+ private final Map<K, V> entries;
+ private K lastKey;
+
+ public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
+ this.keys = keys;
+ this.entries = entries;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return keys.hasNext();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ lastKey = keys.next();
+ return new Entry<>(lastKey, entries.get(lastKey));
+ }
+
+ @Override
+ public void remove() {
+ keys.remove();
+ entries.remove(lastKey);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index a7f4c12..c1ccbd4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state;
+import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -35,33 +36,51 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
protected final KeyValueStore<K, V> inner;
protected final Serdes<K, V> serialization;
-
- private final Time time;
- private final Sensor putTime;
- private final Sensor getTime;
- private final Sensor deleteTime;
- private final Sensor putAllTime;
- private final Sensor allTime;
- private final Sensor rangeTime;
- private final Sensor flushTime;
- private final Sensor restoreTime;
- private final StreamingMetrics metrics;
+ protected final String metricGrp;
+ protected final Time time;
private final String topic;
- private final int partition;
+
+ private Sensor putTime;
+ private Sensor getTime;
+ private Sensor deleteTime;
+ private Sensor putAllTime;
+ private Sensor allTime;
+ private Sensor rangeTime;
+ private Sensor flushTime;
+ private Sensor restoreTime;
+ private StreamingMetrics metrics;
+
private final Set<K> dirty;
private final Set<K> removed;
private final int maxDirty;
private final int maxRemoved;
- private final ProcessorContext context;
+
+ private int partition;
+ private ProcessorContext context;
// always wrap the logged store with the metered store
- public MeteredKeyValueStore(final String name, final KeyValueStore<K, V> inner, ProcessorContext context,
- Serdes<K, V> serialization, String metricGrp, Time time) {
+ public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricGrp, Time time) {
this.inner = inner;
this.serialization = serialization;
+ this.metricGrp = metricGrp;
+ this.time = time != null ? time : new SystemTime();
+ this.topic = inner.name();
+
+ this.dirty = new HashSet<K>();
+ this.removed = new HashSet<K>();
+ this.maxDirty = 100; // TODO: this needs to be configurable
+ this.maxRemoved = 100; // TODO: this needs to be configurable
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
- this.time = time;
+ @Override
+ public void init(ProcessorContext context) {
+ String name = name();
this.metrics = context.metrics();
this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
@@ -72,18 +91,12 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name", name);
this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
- this.topic = name;
- this.partition = context.id().partition;
-
this.context = context;
-
- this.dirty = new HashSet<K>();
- this.removed = new HashSet<K>();
- this.maxDirty = 100; // TODO: this needs to be configurable
- this.maxRemoved = 100; // TODO: this needs to be configurable
+ this.partition = context.id().partition;
// register and possibly restore the state from the logs
long startNs = time.nanoseconds();
+ inner.init(context);
try {
final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
final Deserializer<V> valDeserializer = serialization.valueDeserializer();
@@ -92,7 +105,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
@Override
public void restore(byte[] key, byte[] value) {
inner.put(keyDeserializer.deserialize(topic, key),
- valDeserializer.deserialize(topic, value));
+ valDeserializer.deserialize(topic, value));
}
});
} finally {
@@ -101,11 +114,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
}
@Override
- public String name() {
- return inner.name();
- }
-
- @Override
public boolean persistent() {
return inner.persistent();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
deleted file mode 100644
index 1de345e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.util.Comparator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see Stores#create(String, ProcessorContext)
- */
-public class RocksDBKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
-
- protected RocksDBKeyValueStore(String name, ProcessorContext context, Serdes<K, V> serdes, Time time) {
- super(name, new RocksDBStore<K, V>(name, context, serdes), context, serdes, "rocksdb-state", time != null ? time : new SystemTime());
- }
-
- private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
-
- private static final int TTL_NOT_USED = -1;
-
- // TODO: these values should be configurable
- private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
- private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
- private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
- private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
- private static final long BLOCK_SIZE = 4096L;
- private static final int TTL_SECONDS = TTL_NOT_USED;
- private static final int MAX_WRITE_BUFFERS = 3;
- private static final String DB_FILE_DIR = "rocksdb";
-
- private final Serdes<K, V> serdes;
-
- private final String topic;
- private final int partition;
- private final ProcessorContext context;
-
- private final Options options;
- private final WriteOptions wOptions;
- private final FlushOptions fOptions;
-
- private final String dbName;
- private final String dirName;
-
- private RocksDB db;
-
- public RocksDBStore(String name, ProcessorContext context, Serdes<K, V> serdes) {
- this.topic = name;
- this.partition = context.id().partition;
- this.context = context;
- this.serdes = serdes;
-
- // initialize the rocksdb options
- BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
- tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
- tableConfig.setBlockSize(BLOCK_SIZE);
-
- options = new Options();
- options.setTableFormatConfig(tableConfig);
- options.setWriteBufferSize(WRITE_BUFFER_SIZE);
- options.setCompressionType(COMPRESSION_TYPE);
- options.setCompactionStyle(COMPACTION_STYLE);
- options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
- options.setCreateIfMissing(true);
- options.setErrorIfExists(false);
-
- wOptions = new WriteOptions();
- wOptions.setDisableWAL(true);
-
- fOptions = new FlushOptions();
- fOptions.setWaitForFlush(true);
-
- dbName = this.topic + "." + this.partition;
- dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
-
- db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
- }
-
- private RocksDB openDB(File dir, Options options, int ttl) {
- try {
- if (ttl == TTL_NOT_USED) {
- dir.getParentFile().mkdirs();
- return RocksDB.open(options, dir.toString());
- } else {
- throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
- // TODO: support TTL with change log?
- // return TtlDB.open(options, dir.toString(), ttl, false);
- }
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
- }
- }
-
- @Override
- public String name() {
- return this.topic;
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- try {
- return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
- }
- }
-
- @Override
- public void put(K key, V value) {
- try {
- if (value == null) {
- db.remove(wOptions, serdes.rawKey(key));
- } else {
- db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
- }
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
- }
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- V value = get(key);
- put(key, null);
- return value;
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- RocksIterator innerIter = db.newIterator();
- innerIter.seekToFirst();
- return new RocksDbIterator<K, V>(innerIter, serdes);
- }
-
- @Override
- public void flush() {
- try {
- db.flush(fOptions);
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing flush from store " + this.topic, e);
- }
- }
-
- @Override
- public void close() {
- flush();
- db.close();
- }
-
- private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
- private final RocksIterator iter;
- private final Serdes<K, V> serdes;
-
- public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
- this.iter = iter;
- this.serdes = serdes;
- }
-
- protected byte[] peekRawKey() {
- return iter.key();
- }
-
- protected Entry<K, V> getEntry() {
- return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
- }
-
- @Override
- public boolean hasNext() {
- return iter.isValid();
- }
-
- @Override
- public Entry<K, V> next() {
- if (!hasNext())
- throw new NoSuchElementException();
-
- Entry<K, V> entry = this.getEntry();
- iter.next();
- return entry;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("RocksDB iterator does not support remove");
- }
-
- @Override
- public void close() {
- }
-
- }
-
- private static class LexicographicComparator implements Comparator<byte[]> {
-
- @Override
- public int compare(byte[] left, byte[] right) {
- for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
- int leftByte = left[i] & 0xff;
- int rightByte = right[j] & 0xff;
- if (leftByte != rightByte) {
- return leftByte - rightByte;
- }
- }
- return left.length - right.length;
- }
- }
-
- private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
- // RocksDB's JNI interface does not expose getters/setters that allow the
- // comparator to be pluggable, and the default is lexicographic, so it's
- // safe to just force lexicographic comparator here for now.
- private final Comparator<byte[]> comparator = new LexicographicComparator();
- byte[] rawToKey;
-
- public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
- K from, K to) {
- super(iter, serdes);
- iter.seek(serdes.rawKey(from));
- this.rawToKey = serdes.rawKey(to);
- }
-
- @Override
- public boolean hasNext() {
- return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
new file mode 100644
index 0000000..fe8f00a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ *
+ * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
+ */
+public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final Serdes serdes;
+ private final Time time;
+
+ protected RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+ this.name = name;
+ this.serdes = serdes;
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ return new MeteredKeyValueStore<K, V>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
+ }
+
+ private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
+
+ private static final int TTL_NOT_USED = -1;
+
+ // TODO: these values should be configurable
+ private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
+ private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
+ private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
+ private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
+ private static final long BLOCK_SIZE = 4096L;
+ private static final int TTL_SECONDS = TTL_NOT_USED;
+ private static final int MAX_WRITE_BUFFERS = 3;
+ private static final String DB_FILE_DIR = "rocksdb";
+
+ private final Serdes<K, V> serdes;
+ private final String topic;
+
+ private final Options options;
+ private final WriteOptions wOptions;
+ private final FlushOptions fOptions;
+
+ private ProcessorContext context;
+ private int partition;
+ private String dbName;
+ private String dirName;
+ private RocksDB db;
+
+ public RocksDBStore(String name, Serdes<K, V> serdes) {
+ this.topic = name;
+ this.serdes = serdes;
+
+ // initialize the rocksdb options
+ BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+ tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
+ tableConfig.setBlockSize(BLOCK_SIZE);
+
+ options = new Options();
+ options.setTableFormatConfig(tableConfig);
+ options.setWriteBufferSize(WRITE_BUFFER_SIZE);
+ options.setCompressionType(COMPRESSION_TYPE);
+ options.setCompactionStyle(COMPACTION_STYLE);
+ options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
+ options.setCreateIfMissing(true);
+ options.setErrorIfExists(false);
+
+ wOptions = new WriteOptions();
+ wOptions.setDisableWAL(true);
+
+ fOptions = new FlushOptions();
+ fOptions.setWaitForFlush(true);
+ }
+
+ public void init(ProcessorContext context) {
+ this.context = context;
+ this.partition = context.id().partition;
+ this.dbName = this.topic + "." + this.partition;
+ this.dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
+ this.db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
+ }
+
+ private RocksDB openDB(File dir, Options options, int ttl) {
+ try {
+ if (ttl == TTL_NOT_USED) {
+ dir.getParentFile().mkdirs();
+ return RocksDB.open(options, dir.toString());
+ } else {
+ throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
+ // TODO: support TTL with change log?
+ // return TtlDB.open(options, dir.toString(), ttl, false);
+ }
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
+ }
+ }
+
+ @Override
+ public String name() {
+ return this.topic;
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ try {
+ return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
+ }
+ }
+
+ @Override
+ public void put(K key, V value) {
+ try {
+ if (value == null) {
+ db.remove(wOptions, serdes.rawKey(key));
+ } else {
+ db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
+ }
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
+ }
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ for (Entry<K, V> entry : entries)
+ put(entry.key(), entry.value());
+ }
+
+ @Override
+ public V delete(K key) {
+ V value = get(key);
+ put(key, null);
+ return value;
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ RocksIterator innerIter = db.newIterator();
+ innerIter.seekToFirst();
+ return new RocksDbIterator<K, V>(innerIter, serdes);
+ }
+
+ @Override
+ public void flush() {
+ try {
+ db.flush(fOptions);
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error while executing flush from store " + this.topic, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ flush();
+ db.close();
+ }
+
+ private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
+ private final RocksIterator iter;
+ private final Serdes<K, V> serdes;
+
+ public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
+ this.iter = iter;
+ this.serdes = serdes;
+ }
+
+ protected byte[] peekRawKey() {
+ return iter.key();
+ }
+
+ protected Entry<K, V> getEntry() {
+ return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.isValid();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ Entry<K, V> entry = this.getEntry();
+ iter.next();
+ return entry;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("RocksDB iterator does not support remove");
+ }
+
+ @Override
+ public void close() {
+ }
+
+ }
+
+ private static class LexicographicComparator implements Comparator<byte[]> {
+
+ @Override
+ public int compare(byte[] left, byte[] right) {
+ for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
+ int leftByte = left[i] & 0xff;
+ int rightByte = right[j] & 0xff;
+ if (leftByte != rightByte) {
+ return leftByte - rightByte;
+ }
+ }
+ return left.length - right.length;
+ }
+ }
+
+ private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
+ // RocksDB's JNI interface does not expose getters/setters that allow the
+ // comparator to be pluggable, and the default is lexicographic, so it's
+ // safe to just force lexicographic comparator here for now.
+ private final Comparator<byte[]> comparator = new LexicographicComparator();
+ byte[] rawToKey;
+
+ public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
+ K from, K to) {
+ super(iter, serdes);
+ iter.seek(serdes.rawKey(from));
+ this.rawToKey = serdes.rawKey(to);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
index 540d763..31bd439 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamingConfig;
final class Serdes<K, V> {
@@ -64,7 +64,7 @@ final class Serdes<K, V> {
/**
* Create a context for serialization using the specified serializers and deserializers.
- *
+ *
* @param topic the name of the topic
* @param keySerializer the serializer for keys; may not be null
* @param keyDeserializer the deserializer for keys; may not be null
@@ -83,47 +83,44 @@ final class Serdes<K, V> {
/**
* Create a context for serialization using the specified serializers and deserializers, or if any of them are null the
- * corresponding {@link ProcessorContext}'s default serializer or deserializer, which
+ * corresponding {@link StreamingConfig}'s serializer or deserializer, which
* <em>must</em> match the key and value types used as parameters for this object.
- *
+ *
* @param topic the name of the topic
- * @param keySerializer the serializer for keys; may be null if the {@link ProcessorContext#keySerializer() default
+ * @param keySerializer the serializer for keys; may be null if the {@link StreamingConfig#keySerializer() default
* key serializer} should be used
- * @param keyDeserializer the deserializer for keys; may be null if the {@link ProcessorContext#keyDeserializer() default
+ * @param keyDeserializer the deserializer for keys; may be null if the {@link StreamingConfig#keyDeserializer() default
* key deserializer} should be used
- * @param valueSerializer the serializer for values; may be null if the {@link ProcessorContext#valueSerializer() default
+ * @param valueSerializer the serializer for values; may be null if the {@link StreamingConfig#valueSerializer() default
* value serializer} should be used
- * @param valueDeserializer the deserializer for values; may be null if the {@link ProcessorContext#valueDeserializer()
+ * @param valueDeserializer the deserializer for values; may be null if the {@link StreamingConfig#valueDeserializer()
* default value deserializer} should be used
- * @param context the processing context
+ * @param config the streaming config
*/
@SuppressWarnings("unchecked")
public Serdes(String topic,
Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
Serializer<V> valueSerializer, Deserializer<V> valueDeserializer,
- ProcessorContext context) {
+ StreamingConfig config) {
this.topic = topic;
- this.keySerializer = keySerializer != null ? keySerializer : (Serializer<K>) context.keySerializer();
- this.keyDeserializer = keyDeserializer != null ? keyDeserializer : (Deserializer<K>) context.keyDeserializer();
- this.valueSerializer = valueSerializer != null ? valueSerializer : (Serializer<V>) context.valueSerializer();
- this.valueDeserializer = valueDeserializer != null ? valueDeserializer : (Deserializer<V>) context.valueDeserializer();
+
+ this.keySerializer = keySerializer != null ? keySerializer : config.keySerializer();
+ this.keyDeserializer = keyDeserializer != null ? keyDeserializer : config.keyDeserializer();
+ this.valueSerializer = valueSerializer != null ? valueSerializer : config.valueSerializer();
+ this.valueDeserializer = valueDeserializer != null ? valueDeserializer : config.valueDeserializer();
}
/**
- * Create a context for serialization using the {@link ProcessorContext}'s default serializers and deserializers, which
+ * Create a context for serialization using the {@link StreamingConfig}'s serializers and deserializers, which
* <em>must</em> match the key and value types used as parameters for this object.
- *
+ *
* @param topic the name of the topic
- * @param context the processing context
+ * @param config the streaming config
*/
@SuppressWarnings("unchecked")
public Serdes(String topic,
- ProcessorContext context) {
- this.topic = topic;
- this.keySerializer = (Serializer<K>) context.keySerializer();
- this.keyDeserializer = (Deserializer<K>) context.keyDeserializer();
- this.valueSerializer = (Serializer<V>) context.valueSerializer();
- this.valueDeserializer = (Deserializer<V>) context.valueDeserializer();
+ StreamingConfig config) {
+ this(topic, null, null, null, null, config);
}
public Deserializer<K> keyDeserializer() {
@@ -161,4 +158,4 @@ final class Serdes<K, V> {
public byte[] rawValue(V value) {
return valueSerializer.serialize(topic, value);
}
-}
\ No newline at end of file
+}