You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/04 23:59:31 UTC
[2/2] kafka git commit: KAFKA-2856: Add KTable non-stateful APIs
along with standby task support
KAFKA-2856: Add KTable non-stateful APIs along with standby task support
guozhangwang
* added KTable API and impl
* added standby support for KTable
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #604 from ymatsuda/add_ktable
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39c3512e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39c3512e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39c3512e
Branch: refs/heads/trunk
Commit: 39c3512eceedebcb6e50f8c6c4ef66601ff7dbc4
Parents: cd54fc8
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Fri Dec 4 14:59:24 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Dec 4 14:59:24 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/KStream.java | 22 +-
.../kafka/streams/kstream/KStreamBuilder.java | 41 +++
.../apache/kafka/streams/kstream/KTable.java | 108 ++++++++
.../streams/kstream/internals/KStreamImpl.java | 2 +-
.../KTableDerivedValueGetterSupplier.java | 28 ++
.../streams/kstream/internals/KTableFilter.java | 87 ++++++
.../streams/kstream/internals/KTableImpl.java | 188 +++++++++++++
.../kstream/internals/KTableMapValues.java | 85 ++++++
.../internals/KTableProcessorSupplier.java | 26 ++
.../streams/kstream/internals/KTableSource.java | 78 ++++++
.../KTableSourceValueGetterSupplier.java | 50 ++++
.../kstream/internals/KTableStoreSupplier.java | 58 ++++
.../kstream/internals/KTableValueGetter.java | 28 ++
.../internals/KTableValueGetterSupplier.java | 24 ++
.../streams/processor/PartitionGrouper.java | 2 +-
.../apache/kafka/streams/processor/TaskId.java | 12 +
.../processor/internals/AbstractTask.java | 14 +-
.../KafkaStreamingPartitionAssignor.java | 20 +-
.../internals/ProcessorStateManager.java | 49 +++-
.../processor/internals/StandbyTask.java | 25 +-
.../streams/processor/internals/StreamTask.java | 18 +-
.../processor/internals/StreamThread.java | 78 ++++--
.../internals/assignment/AssignmentInfo.java | 129 +++++----
.../state/KeyValueStoreChangeLogger.java | 87 ++++++
.../streams/state/MeteredKeyValueStore.java | 95 +++----
.../state/RocksDBKeyValueStoreSupplier.java | 252 +-----------------
.../kafka/streams/state/RocksDBStore.java | 265 +++++++++++++++++++
.../org/apache/kafka/streams/state/Serdes.java | 3 +-
.../kstream/internals/KTableFilterTest.java | 137 ++++++++++
.../kstream/internals/KTableImplTest.java | 220 +++++++++++++++
.../internals/KTableMapValuesImplTest.java | 198 ++++++++++++++
.../kstream/internals/KTableSourceTest.java | 117 ++++++++
.../KafkaStreamingPartitionAssignorTest.java | 151 +++++------
.../processor/internals/StandbyTaskTest.java | 11 +-
.../processor/internals/StreamTaskTest.java | 4 +-
.../processor/internals/StreamThreadTest.java | 12 +-
.../assignment/AssginmentInfoTest.java | 11 +-
.../apache/kafka/test/KStreamTestDriver.java | 43 ++-
.../apache/kafka/test/MockProcessorContext.java | 14 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 4 +-
40 files changed, 2268 insertions(+), 528 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 992bd75..93303eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -33,7 +33,7 @@ public interface KStream<K, V> {
* Creates a new instance of KStream consists of all elements of this stream which satisfy a predicate
*
* @param predicate the instance of Predicate
- * @return the stream with only those elements that satisfy the predicate
+ * @return the instance of KStream with only those elements that satisfy the predicate
*/
KStream<K, V> filter(Predicate<K, V> predicate);
@@ -41,22 +41,22 @@ public interface KStream<K, V> {
* Creates a new instance of KStream consists all elements of this stream which do not satisfy a predicate
*
* @param predicate the instance of Predicate
- * @return the stream with only those elements that do not satisfy the predicate
+ * @return the instance of KStream with only those elements that do not satisfy the predicate
*/
KStream<K, V> filterOut(Predicate<K, V> predicate);
/**
- * Creates a new stream by applying transforming each element in this stream into a different element in the new stream.
+ * Creates a new instance of KStream by applying transforming each element in this stream into a different element in the new stream.
*
* @param mapper the instance of KeyValueMapper
* @param <K1> the key type of the new stream
* @param <V1> the value type of the new stream
- * @return the mapped stream
+ * @return the instance of KStream
*/
<K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper);
/**
- * Creates a new instance of KStream by applying transforming each value in this stream into a different value in the new stream.
+ * Creates a new instance of KStream by transforming each value in this stream into a different value in the new stream.
*
* @param mapper the instance of ValueMapper
* @param <V1> the value type of the new stream
@@ -65,7 +65,7 @@ public interface KStream<K, V> {
<V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
/**
- * Creates a new instance of KStream by applying transforming each element in this stream into zero or more elements in the new stream.
+ * Creates a new instance of KStream by transforming each element in this stream into zero or more elements in the new stream.
*
* @param mapper the instance of KeyValueMapper
* @param <K1> the key type of the new stream
@@ -75,7 +75,7 @@ public interface KStream<K, V> {
<K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper);
/**
- * Creates a new instance of KStream by applying transforming each value in this stream into zero or more values in the new stream.
+ * Creates a new stream by transforming each value in this stream into zero or more values in the new stream.
*
* @param processor the instance of Processor
* @param <V1> the value type of the new stream
@@ -103,11 +103,11 @@ public interface KStream<K, V> {
KStream<K, V>[] branch(Predicate<K, V>... predicates);
/**
- * Sends key-value to a topic, also creates a new stream from the topic.
+ * Sends key-value to a topic, also creates a new instance of KStream from the topic.
* This is equivalent to calling to(topic) and from(topic).
*
* @param topic the topic name
- * @return the new stream that consumes the given topic
+ * @return the instance of KStream that consumes the given topic
*/
KStream<K, V> through(String topic);
@@ -124,7 +124,7 @@ public interface KStream<K, V> {
* if not specified the default key deserializer defined in the configuration will be used
* @param valDeserializer value deserializer used to create the new KStream,
* if not specified the default value deserializer defined in the configuration will be used
- * @return the new stream that consumes the given topic
+ * @return the instance of KStream that consumes the given topic
*/
KStream<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer);
@@ -160,7 +160,7 @@ public interface KStream<K, V> {
*
* @param valueTransformerSupplier the class of TransformerDef
* @param stateStoreNames the names of the state store used by the processor
- * @return the instance of KStream that contains transformed keys and values
+ * @return the instance of KStream that contains the keys and transformed values
*/
<R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index ae8f694..ca1a10d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -18,7 +18,11 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
+import org.apache.kafka.streams.kstream.internals.KTableImpl;
+import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.processor.TopologyBuilder;
import java.util.Collections;
@@ -65,6 +69,43 @@ public class KStreamBuilder extends TopologyBuilder {
}
/**
+ * Creates a KTable instance for the specified topic.
+ * The default deserializers specified in the config are used.
+ *
+ * @param topic the topic name
+ * @return KTable
+ */
+ public <K, V> KTable<K, V> table(String topic) {
+ return table(null, null, null, null, topic);
+ }
+
+ /**
+ * Creates a KTable instance for the specified topic.
+ *
+ * @param keySerializer key serializer used to send key-value pairs,
+ * if not specified the default key serializer defined in the configuration will be used
+ * @param valSerializer value serializer used to send key-value pairs,
+ * if not specified the default value serializer defined in the configuration will be used
+ * @param keyDeserializer key deserializer used to read this source KStream,
+ * if not specified the default deserializer defined in the configs will be used
+ * @param valDeserializer value deserializer used to read this source KStream,
+ * if not specified the default deserializer defined in the configs will be used
+ * @param topic the topic name
+ * @return KStream
+ */
+ public <K, V> KTable<K, V> table(Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String topic) {
+ String source = newName(KStreamImpl.SOURCE_NAME);
+ String name = newName(KTableImpl.SOURCE_NAME);
+
+ addSource(source, keyDeserializer, valDeserializer, topic);
+
+ KTableProcessorSupplier<K, V, V> processorSupplier = new KTableSource<>(topic);
+ addProcessor(name, processorSupplier, source);
+
+ return new KTableImpl<>(this, name, processorSupplier, source, topic, keySerializer, valSerializer, keyDeserializer, valDeserializer);
+ }
+
+ /**
* Creates a new stream by merging the given streams
*
* @param streams the streams to be merged
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
new file mode 100644
index 0000000..75fb87a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * KTable is an abstraction of a change log stream.
+ *
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ */
+public interface KTable<K, V> {
+
+ /**
+ * Creates a new instance of KTable consists of all elements of this stream which satisfy a predicate
+ *
+ * @param predicate the instance of Predicate
+ * @return the instance of KTable with only those elements that satisfy the predicate
+ */
+ KTable<K, V> filter(Predicate<K, V> predicate);
+
+ /**
+ * Creates a new instance of KTable consists all elements of this stream which do not satisfy a predicate
+ *
+ * @param predicate the instance of Predicate
+ * @return the instance of KTable with only those elements that do not satisfy the predicate
+ */
+ KTable<K, V> filterOut(Predicate<K, V> predicate);
+
+ /**
+ * Creates a new instance of KTable by transforming each value in this stream into a different value in the new stream.
+ *
+ * @param mapper the instance of ValueMapper
+ * @param <V1> the value type of the new stream
+ * @return the instance of KTable
+ */
+ <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper);
+
+ /**
+ * Sends key-value to a topic, also creates a new instance of KTable from the topic.
+ * This is equivalent to calling to(topic) and table(topic).
+ *
+ * @param topic the topic name
+ * @return the instance of KTable that consumes the given topic
+ */
+ KTable<K, V> through(String topic);
+
+ /**
+ * Sends key-value to a topic, also creates a new instance of KTable from the topic.
+ * This is equivalent to calling to(topic) and table(topic).
+ *
+ * @param topic the topic name
+ * @param keySerializer key serializer used to send key-value pairs,
+ * if not specified the default key serializer defined in the configuration will be used
+ * @param valSerializer value serializer used to send key-value pairs,
+ * if not specified the default value serializer defined in the configuration will be used
+ * @param keyDeserializer key deserializer used to create the new KStream,
+ * if not specified the default key deserializer defined in the configuration will be used
+ * @param valDeserializer value deserializer used to create the new KStream,
+ * if not specified the default value deserializer defined in the configuration will be used
+ * @return the new stream that consumes the given topic
+ */
+ KTable<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer);
+
+ /**
+ * Sends key-value to a topic using default serializers specified in the config.
+ *
+ * @param topic the topic name
+ */
+ void to(String topic);
+
+ /**
+ * Sends key-value to a topic.
+ *
+ * @param topic the topic name
+ * @param keySerializer key serializer used to send key-value pairs,
+ * if not specified the default serializer defined in the configs will be used
+ * @param valSerializer value serializer used to send key-value pairs,
+ * if not specified the default serializer defined in the configs will be used
+ */
+ void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer);
+
+ /**
+ * Creates a new instance of KStream from this KTable
+ *
+ * @return the instance of KStream
+ */
+ KStream<K, V> toStream();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 04aa8e9..fc8f4c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -59,7 +59,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
- private static final String SINK_NAME = "KSTREAM-SINK-";
+ public static final String SINK_NAME = "KSTREAM-SINK-";
public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
new file mode 100644
index 0000000..731d7f7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+public abstract class KTableDerivedValueGetterSupplier<K, V1, V2> implements KTableValueGetterSupplier<K, V2> {
+
+ protected final KTableValueGetterSupplier<K, V1> parentValueGetterSupplier;
+
+ public KTableDerivedValueGetterSupplier(KTableValueGetterSupplier<K, V1> parentValueGetterSupplier) {
+ this.parentValueGetterSupplier = parentValueGetterSupplier;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
new file mode 100644
index 0000000..212b1c9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableFilter<K, V> extends KTableProcessorSupplier<K, V, V> {
+
+ private final Predicate<K, V> predicate;
+ private final boolean filterOut;
+
+ public KTableFilter(Predicate<K, V> predicate, boolean filterOut) {
+ this.predicate = predicate;
+ this.filterOut = filterOut;
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return new KTableFilterProcessor();
+ }
+
+ @Override
+ public KTableValueGetterSupplier<K, V> view(KTableValueGetterSupplier<K, V> parentValueGetterSupplier) {
+ return new KTableDerivedValueGetterSupplier<K, V, V>(parentValueGetterSupplier) {
+
+ public KTableValueGetter<K, V> get() {
+ return new KTableFilterValueGetter(parentValueGetterSupplier.get());
+ }
+
+ };
+ }
+
+ private V computeNewValue(K key, V value) {
+ V newValue = null;
+
+ if (value != null && (filterOut ^ predicate.test(key, value)))
+ newValue = value;
+
+ return newValue;
+ }
+
+ private class KTableFilterProcessor extends AbstractProcessor<K, V> {
+
+ @Override
+ public void process(K key, V value) {
+ context().forward(key, computeNewValue(key, value));
+ }
+
+ }
+
+ private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
+
+ private final KTableValueGetter<K, V> parentGetter;
+
+ public KTableFilterValueGetter(KTableValueGetter<K, V> parentGetter) {
+ this.parentGetter = parentGetter;
+ }
+
+ public void init(ProcessorContext context) {
+ parentGetter.init(context);
+ }
+
+ public V get(K key) {
+ return computeNewValue(key, parentGetter.get(key));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
new file mode 100644
index 0000000..5b2b031
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Collections;
+
+/**
+ * The implementation class of KTable
+ * @param <K> the key type
+ * @param <S> the source's (parent's) value type
+ * @param <V> the value type
+ */
+public class KTableImpl<K, S, V> implements KTable<K, V> {
+
+ private static final String FILTER_NAME = "KTABLE-FILTER-";
+
+ private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
+
+ private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
+
+ public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+
+ protected final KStreamBuilder topology;
+ public final String name;
+ public final KTableProcessorSupplier<K, S, V> processorSupplier;
+ private final String sourceNode;
+
+ private final KTableImpl<K, ?, S> parent;
+ private final String topic;
+ private final Serializer<K> keySerializer;
+ private final Serializer<V> valSerializer;
+ private final Deserializer<K> keyDeserializer;
+ private final Deserializer<V> valDeserializer;
+
+ public KTableImpl(KStreamBuilder topology,
+ String name,
+ KTableProcessorSupplier<K, S, V> processorSupplier,
+ String sourceNode,
+ KTableImpl<K, ?, S> parent) {
+ this(topology, name, processorSupplier, sourceNode, null, null, null, null, null, parent);
+ }
+
+ public KTableImpl(KStreamBuilder topology,
+ String name,
+ KTableProcessorSupplier<K, S, V> processorSupplier,
+ String sourceNode,
+ String topic,
+ Serializer<K> keySerializer,
+ Serializer<V> valSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valDeserializer) {
+ this(topology, name, processorSupplier, sourceNode, topic, keySerializer, valSerializer, keyDeserializer, valDeserializer, null);
+ }
+
+ private KTableImpl(KStreamBuilder topology,
+ String name,
+ KTableProcessorSupplier<K, S, V> processorSupplier,
+ String sourceNode,
+ String topic,
+ Serializer<K> keySerializer,
+ Serializer<V> valSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valDeserializer,
+ KTableImpl<K, ?, S> parent) {
+ this.topology = topology;
+ this.name = name;
+ this.processorSupplier = processorSupplier;
+ this.sourceNode = sourceNode;
+ this.topic = topic;
+ this.keySerializer = keySerializer;
+ this.valSerializer = valSerializer;
+ this.keyDeserializer = keyDeserializer;
+ this.valDeserializer = valDeserializer;
+ this.parent = parent;
+ }
+
+ @Override
+ public KTable<K, V> filter(Predicate<K, V> predicate) {
+ String name = topology.newName(FILTER_NAME);
+ KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(predicate, false);
+ topology.addProcessor(name, processorSupplier, this.name);
+
+ return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+ }
+
+ @Override
+ public KTable<K, V> filterOut(final Predicate<K, V> predicate) {
+ String name = topology.newName(FILTER_NAME);
+ KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(predicate, true);
+
+ topology.addProcessor(name, processorSupplier, this.name);
+
+ return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+ }
+
+ @Override
+ public <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper) {
+ String name = topology.newName(MAPVALUES_NAME);
+ KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(mapper);
+
+ topology.addProcessor(name, processorSupplier, this.name);
+
+ return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+ }
+
+ @Override
+ public KTable<K, V> through(String topic,
+ Serializer<K> keySerializer,
+ Serializer<V> valSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valDeserializer) {
+ String sendName = topology.newName(KStreamImpl.SINK_NAME);
+
+ topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
+
+ return topology.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic);
+ }
+
+ @Override
+ public KTable<K, V> through(String topic) {
+ return through(topic, null, null, null, null);
+ }
+
+ @Override
+ public void to(String topic) {
+ String name = topology.newName(KStreamImpl.SINK_NAME);
+
+ topology.addSink(name, topic, this.name);
+ }
+
+ @Override
+ public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
+ String name = topology.newName(KStreamImpl.SINK_NAME);
+
+ topology.addSink(name, topic, keySerializer, valSerializer, this.name);
+ }
+
+ @Override
+ public KStream<K, V> toStream() {
+ String name = topology.newName(TOSTREAM_NAME);
+
+ topology.addProcessor(name, new KStreamPassThrough(), this.name);
+
+ return new KStreamImpl<>(topology, name, Collections.singleton(sourceNode));
+ }
+
+ KTableValueGetterSupplier<K, V> valueGetterSupplier() {
+ if (parent != null) {
+ return processorSupplier.view(parent.valueGetterSupplier());
+ } else {
+ KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
+ synchronized (source) {
+ if (!source.isMaterialized()) {
+ StateStoreSupplier storeSupplier =
+ new KTableStoreSupplier(topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
+ topology.addStateStore(storeSupplier, name);
+ source.materialize();
+ }
+ }
+ return new KTableSourceValueGetterSupplier<>(topic);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
new file mode 100644
index 0000000..0d14390
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableMapValues<K1, V1, V2> extends KTableProcessorSupplier<K1, V1, V2> {
+
+ private final ValueMapper<V1, V2> mapper;
+
+ public KTableMapValues(ValueMapper<V1, V2> mapper) {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public Processor<K1, V1> get() {
+ return new KTableMapProcessor();
+ }
+
+ @Override
+ public KTableValueGetterSupplier<K1, V2> view(KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier) {
+ return new KTableDerivedValueGetterSupplier<K1, V1, V2>(parentValueGetterSupplier) {
+
+ public KTableValueGetter<K1, V2> get() {
+ return new KTableMapValuesValueGetter(parentValueGetterSupplier.get());
+ }
+
+ };
+ }
+
+ private V2 computeNewValue(V1 value) {
+ V2 newValue = null;
+
+ if (value != null)
+ newValue = mapper.apply(value);
+
+ return newValue;
+ }
+
+ private class KTableMapProcessor extends AbstractProcessor<K1, V1> {
+
+ @Override
+ public void process(K1 key, V1 value) {
+ context().forward(key, computeNewValue(value));
+ }
+
+ }
+
+ private class KTableMapValuesValueGetter implements KTableValueGetter<K1, V2> {
+
+ private final KTableValueGetter<K1, V1> parentGetter;
+
+ public KTableMapValuesValueGetter(KTableValueGetter<K1, V1> parentGetter) {
+ this.parentGetter = parentGetter;
+ }
+
+ public void init(ProcessorContext context) {
+ parentGetter.init(context);
+ }
+
+ public V2 get(K1 key) {
+ return computeNewValue(parentGetter.get(key));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
new file mode 100644
index 0000000..cc6467f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public abstract class KTableProcessorSupplier<K, V, T> implements ProcessorSupplier<K, V> {
+
+ public abstract KTableValueGetterSupplier<K, T> view(KTableValueGetterSupplier<K, V> parentValueGetterFactory);
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
new file mode 100644
index 0000000..93790ed
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableSource<K, V> extends KTableProcessorSupplier<K, V, V> {
+
+ private final String topic;
+
+ private boolean materialized = false;
+
+ public KTableSource(String topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return materialized ? new MaterializedKTableSourceProcessor() : new KTableSourceProcessor();
+ }
+
+ public void materialize() {
+ materialized = true;
+ }
+
+ public boolean isMaterialized() {
+ return materialized;
+ }
+
+ @Override
+ public KTableValueGetterSupplier<K, V> view(KTableValueGetterSupplier<K, V> parentValueGetterSupplier) {
+ throw new IllegalStateException("a view cannot be define on the ktable source");
+ }
+
+ private class KTableSourceProcessor extends AbstractProcessor<K, V> {
+ @Override
+ public void process(K key, V value) {
+ context().forward(key, value);
+ }
+ }
+
+ private class MaterializedKTableSourceProcessor extends AbstractProcessor<K, V> {
+
+ private KeyValueStore<K, V> store;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+ store = (KeyValueStore<K, V>) context.getStateStore(topic);
+ }
+
+ @Override
+ public void process(K key, V value) {
+ store.put(key, value);
+ context().forward(key, value);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
new file mode 100644
index 0000000..dab92d5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
+
+ private final String topic;
+
+ public KTableSourceValueGetterSupplier(String topic) {
+ this.topic = topic;
+ }
+
+ public KTableValueGetter<K, V> get() {
+ return new KTableSourceValueGetter();
+ }
+
+ private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
+
+ KeyValueStore<K, V> store = null;
+
+ @SuppressWarnings("unchecked")
+ public void init(ProcessorContext context) {
+ store = (KeyValueStore<K, V>) context.getStateStore(topic);
+ }
+
+ public V get(K key) {
+ return store.get(key);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
new file mode 100644
index 0000000..d07fc5d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.MeteredKeyValueStore;
+import org.apache.kafka.streams.state.RocksDBStore;
+import org.apache.kafka.streams.state.Serdes;
+
+/**
+ * A KTable storage. It stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ */
+public class KTableStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final Serdes<K, V> serdes;
+ private final Time time;
+
+ protected KTableStoreSupplier(String name,
+ Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
+ Serializer<V> valSerializer, Deserializer<V> valDeserializer,
+ Time time) {
+ this.name = name;
+ this.serdes = new Serdes<>(name, keySerializer, keyDeserializer, valSerializer, valDeserializer);
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes), serdes, "rocksdb-state", time).disableLogging();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
new file mode 100644
index 0000000..53ec6ba
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+public interface KTableValueGetter<K, V> {
+
+ void init(ProcessorContext context);
+
+ V get(K key);
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
new file mode 100644
index 0000000..1ab6ba6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+public interface KTableValueGetterSupplier<K, V> {
+
+ KTableValueGetter<K, V> get();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index 00b56b3..187c4ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -50,7 +50,7 @@ public abstract class PartitionGrouper {
return partitionAssignor.taskIds(partition);
}
- public Set<TaskId> standbyTasks() {
+ public Map<TaskId, Set<TopicPartition>> standbyTasks() {
return partitionAssignor.standbyTasks();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index 5344f6c..023bbbb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -17,6 +17,9 @@
package org.apache.kafka.streams.processor;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
public class TaskId implements Comparable<TaskId> {
@@ -47,6 +50,15 @@ public class TaskId implements Comparable<TaskId> {
}
}
+ public void writeTo(DataOutputStream out) throws IOException {
+ out.writeInt(topicGroupId);
+ out.writeInt(partition);
+ }
+
+ public static TaskId readFrom(DataInputStream in) throws IOException {
+ return new TaskId(in.readInt(), in.readInt());
+ }
+
public void writeTo(ByteBuffer buf) {
buf.putInt(topicGroupId);
buf.putInt(partition);
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 14037ab..e1b4d62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -28,31 +28,37 @@ import org.apache.kafka.streams.processor.TaskId;
import java.io.File;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public abstract class AbstractTask {
protected final TaskId id;
protected final ProcessorTopology topology;
+ protected final Consumer consumer;
protected final ProcessorStateManager stateMgr;
protected final Set<TopicPartition> partitions;
protected ProcessorContext processorContext;
protected AbstractTask(TaskId id,
- Consumer<byte[], byte[]> restoreConsumer,
+ Collection<TopicPartition> partitions,
ProcessorTopology topology,
+ Consumer<byte[], byte[]> consumer,
+ Consumer<byte[], byte[]> restoreConsumer,
StreamingConfig config,
- Set<TopicPartition> partitions) {
+ boolean isStandby) {
this.id = id;
+ this.partitions = new HashSet<>(partitions);
this.topology = topology;
- this.partitions = partitions;
+ this.consumer = consumer;
// create the processor state manager
try {
File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
// if partitions is null, this is a standby task
- this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, partitions == null);
+ this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, isStandby);
} catch (IOException e) {
throw new KafkaException("Error while creating the state manager", e);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
index 451b214..54d5567 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
@@ -48,7 +48,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
private StreamThread streamThread;
private int numStandbyReplicas;
private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
- private Set<TaskId> standbyTasks;
+ private Map<TaskId, Set<TopicPartition>> standbyTasks;
@Override
public void configure(Map<String, ?> configs) {
@@ -154,28 +154,32 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
final int numConsumers = consumers.size();
List<TaskId> active = new ArrayList<>();
- Set<TaskId> standby = new HashSet<>();
+ Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
int i = 0;
for (String consumer : consumers) {
- List<TopicPartition> partitions = new ArrayList<>();
+ List<TopicPartition> activePartitions = new ArrayList<>();
final int numTaskIds = taskIds.size();
for (int j = i; j < numTaskIds; j += numConsumers) {
TaskId taskId = taskIds.get(j);
if (j < numActiveTasks) {
for (TopicPartition partition : partitionGroups.get(taskId)) {
- partitions.add(partition);
+ activePartitions.add(partition);
active.add(taskId);
}
} else {
- // no partition to a standby task
- standby.add(taskId);
+ Set<TopicPartition> standbyPartitions = standby.get(taskId);
+ if (standbyPartitions == null) {
+ standbyPartitions = new HashSet<>();
+ standby.put(taskId, standbyPartitions);
+ }
+ standbyPartitions.addAll(partitionGroups.get(taskId));
}
}
AssignmentInfo data = new AssignmentInfo(active, standby);
- assignment.put(consumer, new Assignment(partitions, data.encode()));
+ assignment.put(consumer, new Assignment(activePartitions, data.encode()));
i++;
active.clear();
@@ -220,7 +224,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
return partitionToTaskIds.get(partition);
}
- public Set<TaskId> standbyTasks() {
+ public Map<TaskId, Set<TopicPartition>> standbyTasks() {
return standbyTasks;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 2a8df9e..4cff02d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -33,6 +33,7 @@ import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -52,6 +53,7 @@ public class ProcessorStateManager {
private final Consumer<byte[], byte[]> restoreConsumer;
private final Map<TopicPartition, Long> restoredOffsets;
private final Map<TopicPartition, Long> checkpointedOffsets;
+ private final Map<TopicPartition, Long> offsetLimits;
private final boolean isStandby;
private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks
@@ -63,6 +65,7 @@ public class ProcessorStateManager {
this.restoredOffsets = new HashMap<>();
this.isStandby = isStandby;
this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
+ this.offsetLimits = new HashMap<>();
// create the state directory for this task if missing (we won't create the parent directory)
createStateDirectory(baseDir);
@@ -165,8 +168,10 @@ public class ProcessorStateManager {
// restore its state from changelog records; while restoring the log end offset
// should not change since it is only written by this thread.
+ long limit = offsetLimit(storePartition);
while (true) {
for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
+ if (record.offset() >= limit) break;
stateRestoreCallback.restore(record.key(), record.value());
}
@@ -178,7 +183,7 @@ public class ProcessorStateManager {
}
// record the restored offset for its change log partition
- long newOffset = restoreConsumer.position(storePartition);
+ long newOffset = Math.min(limit, restoreConsumer.position(storePartition));
restoredOffsets.put(storePartition, newOffset);
} finally {
// un-assign the change log partition
@@ -202,16 +207,40 @@ public class ProcessorStateManager {
return partitionsAndOffsets;
}
- public void updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
+ public List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
+ long limit = offsetLimit(storePartition);
+ List<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
+
// restore states from changelog records
StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());
+ long lastOffset = -1L;
+ int count = 0;
for (ConsumerRecord<byte[], byte[]> record : records) {
- restoreCallback.restore(record.key(), record.value());
+ if (record.offset() < limit) {
+ restoreCallback.restore(record.key(), record.value());
+ lastOffset = record.offset();
+ } else {
+ if (remainingRecords == null)
+ remainingRecords = new ArrayList<>(records.size() - count);
+
+ remainingRecords.add(record);
+ }
+ count++;
}
// record the restored offset for its change log partition
- long newOffset = restoreConsumer.position(storePartition);
- restoredOffsets.put(storePartition, newOffset);
+ restoredOffsets.put(storePartition, lastOffset + 1);
+
+ return remainingRecords;
+ }
+
+ public void putOffsetLimit(TopicPartition partition, long limit) {
+ offsetLimits.put(partition, limit);
+ }
+
+ private long offsetLimit(TopicPartition partition) {
+ Long limit = offsetLimits.get(partition);
+ return limit != null ? limit : Long.MAX_VALUE;
}
public StateStore getStore(String name) {
@@ -253,14 +282,14 @@ public class ProcessorStateManager {
if (stores.get(storeName).persistent()) {
Long offset = ackedOffsets.get(part);
- if (offset == null) {
- // if no record was produced. we need to check the restored offset.
- offset = restoredOffsets.get(part);
- }
-
if (offset != null) {
// store the last offset + 1 (the log position after restoration)
checkpointOffsets.put(part, offset + 1);
+ } else {
+ // if no record was produced. we need to check the restored offset.
+ offset = restoredOffsets.get(part);
+ if (offset != null)
+ checkpointOffsets.put(part, offset);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index c6442d9..d0d8493 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.StreamingMetrics;
@@ -50,11 +51,13 @@ public class StandbyTask extends AbstractTask {
* @param metrics the {@link StreamingMetrics} created by the thread
*/
public StandbyTask(TaskId id,
- Consumer<byte[], byte[]> restoreConsumer,
+ Collection<TopicPartition> partitions,
ProcessorTopology topology,
+ Consumer<byte[], byte[]> consumer,
+ Consumer<byte[], byte[]> restoreConsumer,
StreamingConfig config,
StreamingMetrics metrics) {
- super(id, restoreConsumer, topology, config, null);
+ super(id, partitions, topology, consumer, restoreConsumer, config, true);
// initialize the topology with its own context
this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
@@ -64,6 +67,9 @@ public class StandbyTask extends AbstractTask {
((StandbyContextImpl) this.processorContext).initialized();
this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets());
+
+ // set initial offset limits
+ initializeOffsetLimits();
}
public Map<TopicPartition, Long> checkpointedOffsets() {
@@ -76,13 +82,24 @@ public class StandbyTask extends AbstractTask {
/**
* Updates a state store using records from one change log partition
+ * @return a list of records not consumed
*/
- public void update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
- stateMgr.updateStandbyStates(partition, records);
+ public List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
+ return stateMgr.updateStandbyStates(partition, records);
}
public void commit() {
stateMgr.flush();
+
+ // reinitialize offset limits
+ initializeOffsetLimits();
+ }
+
+ protected void initializeOffsetLimits() {
+ for (TopicPartition partition : partitions) {
+ OffsetAndMetadata metadata = consumer.committed(partition); // TODO: batch API?
+ stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() : 0L);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 16f0667..24c450e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -30,9 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -45,7 +43,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
private final int maxBufferedSize;
- private final Consumer consumer;
private final PartitionGroup partitionGroup;
private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
private final PunctuationQueue punctuationQueue;
@@ -73,15 +70,14 @@ public class StreamTask extends AbstractTask implements Punctuator {
* @param metrics the {@link StreamingMetrics} created by the thread
*/
public StreamTask(TaskId id,
+ Collection<TopicPartition> partitions,
+ ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> restoreConsumer,
- Collection<TopicPartition> partitions,
- ProcessorTopology topology,
StreamingConfig config,
StreamingMetrics metrics) {
- super(id, restoreConsumer, topology, config, Collections.unmodifiableSet(new HashSet<>(partitions)));
- this.consumer = consumer;
+ super(id, partitions, topology, consumer, restoreConsumer, config, false);
this.punctuationQueue = new PunctuationQueue();
this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
@@ -98,7 +94,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
- // initialize the consumed and produced offset cache
+ // initialize the consumed offset cache
this.consumedOffsets = new HashMap<>();
// create the record recordCollector that maintains the produced offsets
@@ -245,7 +241,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
if (commitOffsetNeeded) {
Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
- consumedOffsetsAndMetadata.put(entry.getKey(), new OffsetAndMetadata(entry.getValue() + 1L));
+ TopicPartition partition = entry.getKey();
+ long offset = entry.getValue() + 1;
+ consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset));
+ stateMgr.putOffsetLimit(partition, offset);
}
consumer.commitSync(consumedOffsetsAndMetadata);
commitOffsetNeeded = false;
@@ -280,6 +279,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
punctuationQueue.schedule(new PunctuationSchedule(currNode, interval));
}
+ @Override
public void close() {
this.partitionGroup.close();
this.consumedOffsets.clear();
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 31dca39..c77a027 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -73,6 +74,7 @@ public class StreamThread extends Thread {
protected final StreamingConfig config;
protected final TopologyBuilder builder;
+ protected final Set<String> sourceTopics;
protected final Producer<byte[], byte[]> producer;
protected final Consumer<byte[], byte[]> consumer;
protected final Consumer<byte[], byte[]> restoreConsumer;
@@ -94,6 +96,9 @@ public class StreamThread extends Thread {
private long lastCommit;
private long recordsProcessed;
+ private final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
+ private boolean processStandbyRecords = false;
+
final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
@@ -133,6 +138,7 @@ public class StreamThread extends Thread {
this.config = config;
this.builder = builder;
+ this.sourceTopics = builder.sourceTopics();
this.clientId = clientId;
this.clientUUID = clientUUID;
this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
@@ -148,6 +154,9 @@ public class StreamThread extends Thread {
this.standbyTasks = new HashMap<>();
this.prevTasks = new HashSet<>();
+ // standby ktables
+ this.standbyRecords = new HashMap<>();
+
// read in task specific config values
this.stateDir = new File(this.config.getString(StreamingConfig.STATE_DIR_CONFIG));
this.stateDir.mkdir();
@@ -256,7 +265,7 @@ public class StreamThread extends Thread {
ensureCopartitioning(builder.copartitionGroups());
- consumer.subscribe(new ArrayList<>(builder.sourceTopics()), rebalanceListener);
+ consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
while (stillRunning()) {
// try to fetch some records if necessary
@@ -293,15 +302,12 @@ public class StreamThread extends Thread {
}
maybePunctuate();
- maybeCommit();
} else {
// even when no task is assigned, we must poll to get a task.
requiresPoll = true;
}
-
- if (!standbyTasks.isEmpty()) {
- updateStandbyTasks();
- }
+ maybeCommit();
+ maybeUpdateStandbyTasks();
maybeClean();
}
@@ -310,13 +316,38 @@ public class StreamThread extends Thread {
}
}
- private void updateStandbyTasks() {
- ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
+ private void maybeUpdateStandbyTasks() {
+ if (!standbyTasks.isEmpty()) {
+ if (processStandbyRecords) {
+ if (!standbyRecords.isEmpty()) {
+ for (StandbyTask task : standbyTasks.values()) {
+ for (TopicPartition partition : task.changeLogPartitions()) {
+ List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.remove(partition);
+ if (remaining != null) {
+ remaining = task.update(partition, remaining);
+ if (remaining != null) {
+ standbyRecords.put(partition, remaining);
+ } else {
+ restoreConsumer.resume(partition);
+ }
+ }
+ }
+ }
+ }
+ processStandbyRecords = false;
+ }
- if (!records.isEmpty()) {
- for (StandbyTask task : standbyTasks.values()) {
- for (TopicPartition partition : task.changeLogPartitions()) {
- task.update(partition, records.records(partition));
+ ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
+
+ if (!records.isEmpty()) {
+ for (StandbyTask task : standbyTasks.values()) {
+ for (TopicPartition partition : task.changeLogPartitions()) {
+ List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition));
+ if (remaining != null) {
+ restoreConsumer.pause(partition);
+ standbyRecords.put(partition, remaining);
+ }
+ }
}
}
}
@@ -359,6 +390,8 @@ public class StreamThread extends Thread {
commitAll();
lastCommit = now;
+
+ processStandbyRecords = true;
} else {
for (StreamTask task : activeTasks.values()) {
try {
@@ -478,12 +511,12 @@ public class StreamThread extends Thread {
return tasks;
}
- protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
+ protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
sensors.taskCreationSensor.record();
ProcessorTopology topology = builder.build(id.topicGroupId);
- return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, topology, config, sensors);
+ return new StreamTask(id, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
}
private void addStreamTasks(Collection<TopicPartition> assignment) {
@@ -501,7 +534,7 @@ public class StreamThread extends Thread {
}
}
- // create the tasks
+ // create the active tasks
for (TaskId taskId : partitionsForTask.keySet()) {
try {
activeTasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId)));
@@ -510,8 +543,6 @@ public class StreamThread extends Thread {
throw e;
}
}
-
- lastClean = time.milliseconds();
}
private void removeStreamTasks() {
@@ -537,13 +568,13 @@ public class StreamThread extends Thread {
sensors.taskDestructionSensor.record();
}
- protected StandbyTask createStandbyTask(TaskId id) {
+ protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
sensors.taskCreationSensor.record();
ProcessorTopology topology = builder.build(id.topicGroupId);
if (!topology.stateStoreSuppliers().isEmpty()) {
- return new StandbyTask(id, restoreConsumer, topology, config, sensors);
+ return new StandbyTask(id, partitions, topology, consumer, restoreConsumer, config, sensors);
} else {
return null;
}
@@ -552,10 +583,15 @@ public class StreamThread extends Thread {
private void addStandbyTasks() {
Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
- for (TaskId taskId : partitionGrouper.standbyTasks()) {
- StandbyTask task = createStandbyTask(taskId);
+ // create the standby tasks
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionGrouper.standbyTasks().entrySet()) {
+ TaskId taskId = entry.getKey();
+ Set<TopicPartition> partitions = entry.getValue();
+ StandbyTask task = createStandbyTask(taskId, partitions);
if (task != null) {
standbyTasks.put(taskId, task);
+ // collect checked pointed offsets to position the restore consumer
+ // this include all partitions from which we restore states
checkpointedOffsets.putAll(task.checkpointedOffsets());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index d82dd7d..2bd4457 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -17,14 +17,23 @@
package org.apache.kafka.streams.processor.internals.assignment;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.ByteBufferInputStream;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class AssignmentInfo {
@@ -33,70 +42,98 @@ public class AssignmentInfo {
public final int version;
public final List<TaskId> activeTasks; // each element corresponds to a partition
- public final Set<TaskId> standbyTasks;
+ public final Map<TaskId, Set<TopicPartition>> standbyTasks;
- public AssignmentInfo(List<TaskId> activeTasks, Set<TaskId> standbyTasks) {
+ public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
this(1, activeTasks, standbyTasks);
}
- protected AssignmentInfo(int version, List<TaskId> activeTasks, Set<TaskId> standbyTasks) {
+ protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
this.version = version;
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
}
public ByteBuffer encode() {
- if (version == 1) {
- ByteBuffer buf = ByteBuffer.allocate(4 + 4 + activeTasks.size() * 8 + 4 + standbyTasks.size() * 8);
- // Encode version
- buf.putInt(1);
- // Encode active tasks
- buf.putInt(activeTasks.size());
- for (TaskId id : activeTasks) {
- id.writeTo(buf);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+
+ try {
+ if (version == 1) {
+ // Encode version
+ out.writeInt(1);
+ // Encode active tasks
+ out.writeInt(activeTasks.size());
+ for (TaskId id : activeTasks) {
+ id.writeTo(out);
+ }
+ // Encode standby tasks
+ out.writeInt(standbyTasks.size());
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
+ TaskId id = entry.getKey();
+ id.writeTo(out);
+
+ Set<TopicPartition> partitions = entry.getValue();
+ out.writeInt(partitions.size());
+ for (TopicPartition partition : partitions) {
+ out.writeUTF(partition.topic());
+ out.writeInt(partition.partition());
+ }
+ }
+
+ out.flush();
+ out.close();
+
+ return ByteBuffer.wrap(baos.toByteArray());
+
+ } else {
+ TaskAssignmentException ex = new TaskAssignmentException("unable to encode assignment data: version=" + version);
+ log.error(ex.getMessage(), ex);
+ throw ex;
}
- // Encode standby tasks
- buf.putInt(standbyTasks.size());
- for (TaskId id : standbyTasks) {
- id.writeTo(buf);
- }
- buf.rewind();
-
- return buf;
-
- } else {
- TaskAssignmentException ex = new TaskAssignmentException("unable to encode assignment data: version=" + version);
- log.error(ex.getMessage(), ex);
- throw ex;
+ } catch (IOException ex) {
+ throw new KafkaException("failed to encode AssignmentInfo", ex);
}
}
public static AssignmentInfo decode(ByteBuffer data) {
// ensure we are at the beginning of the ByteBuffer
data.rewind();
-
- // Decode version
- int version = data.getInt();
- if (version == 1) {
- // Decode active tasks
- int count = data.getInt();
- List<TaskId> activeTasks = new ArrayList<>(count);
- for (int i = 0; i < count; i++) {
- activeTasks.add(TaskId.readFrom(data));
+ DataInputStream in = new DataInputStream(new ByteBufferInputStream(data));
+
+ try {
+ // Decode version
+ int version = in.readInt();
+ if (version == 1) {
+ // Decode active tasks
+ int count = in.readInt();
+ List<TaskId> activeTasks = new ArrayList<>(count);
+ for (int i = 0; i < count; i++) {
+ activeTasks.add(TaskId.readFrom(in));
+ }
+ // Decode standby tasks
+ count = in.readInt();
+ Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
+ for (int i = 0; i < count; i++) {
+ TaskId id = TaskId.readFrom(in);
+
+ int numPartitions = in.readInt();
+ Set<TopicPartition> partitions = new HashSet<>(numPartitions);
+ for (int j = 0; j < numPartitions; j++) {
+ partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
+ }
+ standbyTasks.put(id, partitions);
+ }
+
+ return new AssignmentInfo(activeTasks, standbyTasks);
+
+ } else {
+ TaskAssignmentException ex = new TaskAssignmentException("unknown assignment data version: " + version);
+ log.error(ex.getMessage(), ex);
+ throw ex;
}
- // Decode standby tasks
- count = data.getInt();
- Set<TaskId> standbyTasks = new HashSet<>(count);
- for (int i = 0; i < count; i++) {
- standbyTasks.add(TaskId.readFrom(data));
- }
-
- return new AssignmentInfo(activeTasks, standbyTasks);
-
- } else {
- TaskAssignmentException ex = new TaskAssignmentException("unknown assignment data version: " + version);
- log.error(ex.getMessage(), ex);
- throw ex;
+ } catch (IOException ex) {
+ throw new KafkaException("failed to decode AssignmentInfo", ex);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
new file mode 100644
index 0000000..2ad1f47
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class KeyValueStoreChangeLogger<K, V> {
+
+ protected final Serdes<K, V> serialization;
+
+ private final Set<K> dirty;
+ private final Set<K> removed;
+ private final int maxDirty;
+ private final int maxRemoved;
+
+ private final String topic;
+ private int partition;
+ private ProcessorContext context;
+
+ // always wrap the logged store with the metered store
+ public KeyValueStoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
+ this.topic = topic;
+ this.serialization = serialization;
+ this.context = context;
+ this.partition = context.id().partition;
+
+ this.dirty = new HashSet<>();
+ this.removed = new HashSet<>();
+ this.maxDirty = 100; // TODO: this needs to be configurable
+ this.maxRemoved = 100; // TODO: this needs to be configurable
+ }
+
+ public void add(K key) {
+ this.dirty.add(key);
+ this.removed.remove(key);
+ }
+
+ public void delete(K key) {
+ this.dirty.remove(key);
+ this.removed.add(key);
+ }
+
+ public void maybeLogChange(KeyValueStore<K, V> kv) {
+ if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
+ logChange(kv);
+ }
+
+ public void logChange(KeyValueStore<K, V> kv) {
+ RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
+ if (collector != null) {
+ Serializer<K> keySerializer = serialization.keySerializer();
+ Serializer<V> valueSerializer = serialization.valueSerializer();
+
+ for (K k : this.removed) {
+ collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
+ }
+ for (K k : this.dirty) {
+ V v = kv.get(k);
+ collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
+ }
+ this.removed.clear();
+ this.dirty.clear();
+ }
+ }
+
+}