You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/21 01:10:51 UTC
[5/5] kafka git commit: KAFKA-3121: Remove aggregatorSupplier and add
Reduce functions
KAFKA-3121: Remove aggregatorSupplier and add Reduce functions
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Matsuda
Closes #795 from guozhangwang/K3121s1
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/959cf09e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/959cf09e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/959cf09e
Branch: refs/heads/trunk
Commit: 959cf09e8653f4b8255f49c6f4c258ed1a5ec38e
Parents: e4ef8e6
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Jan 20 16:10:43 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jan 20 16:10:43 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/examples/KTableJob.java | 111 ---
.../kafka/streams/kstream/Aggregator.java | 7 +-
.../streams/kstream/AggregatorSupplier.java | 23 -
.../org/apache/kafka/streams/kstream/Count.java | 36 +
.../apache/kafka/streams/kstream/KStream.java | 38 +-
.../apache/kafka/streams/kstream/KTable.java | 59 +-
.../apache/kafka/streams/kstream/Reducer.java | 23 +
.../apache/kafka/streams/kstream/SumAsLong.java | 36 +
.../kstream/internals/CountSupplier.java | 52 --
.../kstream/internals/KStreamAggregate.java | 4 +-
.../streams/kstream/internals/KStreamImpl.java | 122 ++--
.../kstream/internals/KStreamReduce.java | 167 +++++
.../kstream/internals/KTableAggregate.java | 4 +-
.../streams/kstream/internals/KTableImpl.java | 128 ++--
.../streams/kstream/internals/KTableReduce.java | 120 ++++
.../kstream/internals/KTableStoreSupplier.java | 4 +-
.../kstream/internals/LongSumSupplier.java | 52 --
.../streams/kstream/internals/TopKSupplier.java | 106 ---
.../internals/ProcessorStateManager.java | 2 +-
.../state/InMemoryKeyValueStoreSupplier.java | 155 -----
.../state/InMemoryLRUCacheStoreSupplier.java | 195 ------
.../streams/state/MeteredKeyValueStore.java | 250 -------
.../kafka/streams/state/MeteredWindowStore.java | 206 ------
.../kafka/streams/state/OffsetCheckpoint.java | 162 -----
.../state/RocksDBKeyValueStoreSupplier.java | 52 --
.../kafka/streams/state/RocksDBStore.java | 265 --------
.../kafka/streams/state/RocksDBWindowStore.java | 289 --------
.../state/RocksDBWindowStoreSupplier.java | 58 --
.../kafka/streams/state/StoreChangeLogger.java | 91 ---
.../org/apache/kafka/streams/state/Stores.java | 3 +
.../InMemoryKeyValueStoreSupplier.java | 159 +++++
.../InMemoryLRUCacheStoreSupplier.java | 199 ++++++
.../state/internals/MeteredKeyValueStore.java | 254 +++++++
.../state/internals/MeteredWindowStore.java | 209 ++++++
.../state/internals/OffsetCheckpoint.java | 162 +++++
.../internals/RocksDBKeyValueStoreSupplier.java | 53 ++
.../streams/state/internals/RocksDBStore.java | 269 ++++++++
.../state/internals/RocksDBWindowStore.java | 295 ++++++++
.../internals/RocksDBWindowStoreSupplier.java | 59 ++
.../state/internals/StoreChangeLogger.java | 92 +++
.../kstream/internals/KStreamAggregateTest.java | 36 +-
.../kstream/internals/KTableAggregateTest.java | 36 +-
.../internals/ProcessorStateManagerTest.java | 2 +-
.../processor/internals/StandbyTaskTest.java | 2 +-
.../state/AbstractKeyValueStoreTest.java | 191 ------
.../state/InMemoryKeyValueStoreTest.java | 48 --
.../state/InMemoryLRUCacheStoreTest.java | 156 -----
.../streams/state/KeyValueStoreTestDriver.java | 4 +-
.../streams/state/RocksDBKeyValueStoreTest.java | 50 --
.../streams/state/RocksDBWindowStoreTest.java | 671 ------------------
.../internals/AbstractKeyValueStoreTest.java | 195 ++++++
.../internals/InMemoryKeyValueStoreTest.java | 50 ++
.../internals/InMemoryLRUCacheStoreTest.java | 159 +++++
.../internals/RocksDBKeyValueStoreTest.java | 52 ++
.../state/internals/RocksDBWindowStoreTest.java | 676 +++++++++++++++++++
55 files changed, 3468 insertions(+), 3431 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
deleted file mode 100644
index 45ff58e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.KafkaStreaming;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Windowed;
-
-import java.util.Properties;
-
-public class KTableJob {
-
- public static void main(String[] args) throws Exception {
- Properties props = new Properties();
- props.put(StreamingConfig.JOB_ID_CONFIG, "example-ktable");
- props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
- StreamingConfig config = new StreamingConfig(props);
-
- Serializer<String> stringSerializer = new StringSerializer();
- Deserializer<String> stringDeserializer = new StringDeserializer();
-
- KStreamBuilder builder = new KStreamBuilder();
-
- // stream aggregate
- KStream<String, Long> stream1 = builder.stream("topic1");
-
- @SuppressWarnings("unchecked")
- KTable<Windowed<String>, Long> wtable1 = stream1.sumByKey(new KeyValueToLongMapper<String, Long>() {
- @Override
- public long apply(String key, Long value) {
- return value;
- }
- }, HoppingWindows.of("window1").with(500L).every(500L).emit(1000L).until(1000L * 60 * 60 * 24 /* one day */), stringSerializer, stringDeserializer);
-
- // table aggregation
- KTable<String, String> table1 = builder.table("topic2");
-
- KTable<String, Long> table2 = table1.sum(new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(String key, String value) {
- return value;
- }
- }, new KeyValueToLongMapper<String, String>() {
- @Override
- public long apply(String key, String value) {
- return Long.parseLong(value);
- }
- }, stringSerializer, stringDeserializer, "table2");
-
- // stream-table join
- KStream<String, Long> stream2 = stream1.leftJoin(table2, new ValueJoiner<Long, Long, Long>() {
- @Override
- public Long apply(Long value1, Long value2) {
- if (value2 == null)
- return 0L;
- else
- return value1 * value2;
- }
- });
-
- // table-table join
- KTable<String, String> table3 = table1.outerJoin(table2, new ValueJoiner<String, Long, String>() {
- @Override
- public String apply(String value1, Long value2) {
- if (value2 == null)
- return value1 + "-null";
- else if (value1 == null)
- return "null-" + value2;
- else
- return value1 + "-" + value2;
- }
- });
-
- wtable1.to("topic3");
-
- KafkaStreaming kstream = new KafkaStreaming(builder, config);
- kstream.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index d715fbd..c601024 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -21,7 +21,7 @@ public interface Aggregator<K, V, T> {
/**
* Set the initial aggregate value
*/
- T initialValue();
+ T initialValue(K aggKey);
/**
* When a new record with the aggregate key is added,
@@ -34,9 +34,4 @@ public interface Aggregator<K, V, T> {
* updating the aggregate value for this key
*/
T remove(K aggKey, V value, T aggregate);
-
- /**
- * Merge two aggregate values
- */
- T merge(T aggr1, T aggr2);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
deleted file mode 100644
index 6ed9125..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface AggregatorSupplier<K, V, T> {
-
- Aggregator<K, V, T> get();
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
new file mode 100644
index 0000000..3c1ed46
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public class Count<K> implements Aggregator<K, Long, Long> {
+
+ @Override
+ public Long initialValue(K aggKey) {
+ return 0L;
+ }
+
+ @Override
+ public Long add(K aggKey, Long value, Long aggregate) {
+ return aggregate + 1L;
+ }
+
+ @Override
+ public Long remove(K aggKey, Long value, Long aggregate) {
+ return aggregate - 1L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 36741a8..dfed661 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+
/**
* KStream is an abstraction of a stream of key-value pairs.
*
@@ -268,35 +269,28 @@ public interface KStream<K, V> {
/**
* Aggregate values of this stream by key on a window basis.
*
- * @param aggregatorSupplier the class of aggregatorSupplier
+ * @param reducer the class of Reducer
+ * @param windows the specification of the aggregation window
+ */
+ <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Serializer<V> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> aggValueDeserializer);
+
+ /**
+ * Aggregate values of this stream by key on a window basis.
+ *
+ * @param aggregator the class of Aggregator
* @param windows the specification of the aggregation window
* @param <T> the value type of the aggregated table
*/
- <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier,
+ <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator,
Windows<W> windows,
Serializer<K> keySerializer,
Serializer<T> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<T> aggValueDeserializer);
- /**
- * Sum extracted long integer values of this stream by key on a window basis.
- *
- * @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value
- * @param windows the specification of the aggregation window
- */
- <W extends Window> KTable<Windowed<K>, Long> sumByKey(KeyValueToLongMapper<K, V> valueSelector,
- Windows<W> windows,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer);
-
- /**
- * Count number of records of this stream by key on a window basis.
- *
- * @param windows the specification of the aggregation window
- */
- <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer);
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 93eceec..87298d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -139,49 +139,42 @@ public interface KTable<K, V> {
<V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
/**
- * Aggregate values of this table by the selected key.
+ * Reduce values of this table by the selected key.
*
- * @param aggregatorSupplier the class of AggregatorSupplier
+ * @param addReducer the class of Reducer
+ * @param removeReducer the class of Reducer
* @param selector the KeyValue mapper that select the aggregate key
* @param name the name of the resulted table
* @param <K1> the key type of the aggregated table
* @param <V1> the value type of the aggregated table
* @return the instance of KTable
*/
- <K1, V1, T> KTable<K1, T> aggregate(AggregatorSupplier<K1, V1, T> aggregatorSupplier,
- KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- Serializer<K1> keySerializer,
- Serializer<V1> valueSerializer,
- Serializer<T> aggValueSerializer,
- Deserializer<K1> keyDeserializer,
- Deserializer<V1> valueDeserializer,
- Deserializer<T> aggValueDeserializer,
- String name);
-
- /**
- * Sum extracted long integer values of this table by the selected aggregation key
- *
- * @param keySelector the class of KeyValueMapper to select the aggregation key
- * @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value
- * @param name the name of the resulted table
- */
- <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
- KeyValueToLongMapper<K, V> valueSelector,
- Serializer<K1> keySerializer,
- Deserializer<K1> keyDeserializer,
- String name);
+ <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer,
+ Reducer<V1> removeReducer,
+ KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+ Serializer<K1> keySerializer,
+ Serializer<V1> valueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V1> valueDeserializer,
+ String name);
/**
- * Count number of records of this table by the selected aggregation key
+ * Aggregate values of this table by the selected key.
*
- * @param keySelector the class of KeyValueMapper to select the aggregation key
+ * @param aggregator the class of Aggregator
+ * @param selector the KeyValue mapper that select the aggregate key
* @param name the name of the resulted table
+ * @param <K1> the key type of the aggregated table
+ * @param <V1> the value type of the aggregated table
+ * @return the instance of KTable
*/
- <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
- Serializer<K1> keySerializer,
- Serializer<V> valueSerializer,
- Deserializer<K1> keyDeserializer,
- Deserializer<V> valueDeserializer,
- String name);
-
+ <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator,
+ KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+ Serializer<K1> keySerializer,
+ Serializer<V1> valueSerializer,
+ Serializer<T> aggValueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V1> valueDeserializer,
+ Deserializer<T> aggValueDeserializer,
+ String name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
new file mode 100644
index 0000000..418f442
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface Reducer<V> {
+
+ V apply(V value1, V value2);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
new file mode 100644
index 0000000..1f8df04
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public class SumAsLong<K> implements Aggregator<K, Long, Long> {
+
+ @Override
+ public Long initialValue(K aggKey) {
+ return 0L;
+ }
+
+ @Override
+ public Long add(K aggKey, Long value, Long aggregate) {
+ return aggregate + value;
+ }
+
+ @Override
+ public Long remove(K aggKey, Long value, Long aggregate) {
+ return aggregate - value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
deleted file mode 100644
index b7dc5aa..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
-
-public class CountSupplier<K, V> implements AggregatorSupplier<K, V, Long> {
-
- private class Count implements Aggregator<K, V, Long> {
-
- @Override
- public Long initialValue() {
- return 0L;
- }
-
- @Override
- public Long add(K aggKey, V value, Long aggregate) {
- return aggregate + 1;
- }
-
- @Override
- public Long remove(K aggKey, V value, Long aggregate) {
- return aggregate - 1;
- }
-
- @Override
- public Long merge(Long aggr1, Long aggr2) {
- return aggr1 + aggr2;
- }
- }
-
- @Override
- public Aggregator<K, V, Long> get() {
- return new Count();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 5745a03..91bfa9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -97,7 +97,7 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
T oldAgg = entry.value;
if (oldAgg == null)
- oldAgg = aggregator.initialValue();
+ oldAgg = aggregator.initialValue(key);
// try to add the new new value (there will never be old value)
T newAgg = aggregator.add(key, value, oldAgg);
@@ -119,7 +119,7 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
// create the new window for the rest of unmatched window that do not exist yet
for (long windowStartMs : matchedWindows.keySet()) {
- T oldAgg = aggregator.initialValue();
+ T oldAgg = aggregator.initialValue(key);
T newAgg = aggregator.add(key, value, oldAgg);
windowStore.put(key, newAgg, windowStartMs);
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 691910b..ce89220 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -18,15 +18,13 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
+import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
+import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
@@ -39,7 +37,7 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.Serdes;
import java.lang.reflect.Array;
@@ -48,47 +46,50 @@ import java.util.Set;
public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
- private static final String FILTER_NAME = "KSTREAM-FILTER-";
+ private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
- private static final String MAP_NAME = "KSTREAM-MAP-";
+ private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
- private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
+ private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
+
+ private static final String FILTER_NAME = "KSTREAM-FILTER-";
private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
- private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
+ public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
- private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
+ public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
- private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
+ public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
- private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
+ private static final String MAP_NAME = "KSTREAM-MAP-";
- private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
+ private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
- private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
+ public static final String MERGE_NAME = "KSTREAM-MERGE-";
- private static final String SELECT_NAME = "KSTREAM-SELECT-";
+ public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
- private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
+ public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
- public static final String SINK_NAME = "KSTREAM-SINK-";
+ private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
- public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
+ private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
- public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
+ private static final String SELECT_NAME = "KSTREAM-SELECT-";
- public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
+ public static final String SINK_NAME = "KSTREAM-SINK-";
- public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
+ public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
- public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
+ private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
- public static final String MERGE_NAME = "KSTREAM-MERGE-";
+ private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
+
+ private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
- public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
super(topology, name, sourceNodes);
@@ -394,7 +395,41 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
@Override
- public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier,
+ public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Serializer<V> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> aggValueDeserializer) {
+
+ // TODO: this agg window operator is only used for casting K to Windowed<K> for
+ // KTableProcessorSupplier, which is a bit awkward and better be removed in the future
+ String reduceName = topology.newName(REDUCE_NAME);
+ String selectName = topology.newName(SELECT_NAME);
+
+ ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
+ ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamReduce<>(windows, windows.name(), reducer);
+
+ RocksDBWindowStoreSupplier<K, V> aggregateStore =
+ new RocksDBWindowStoreSupplier<>(
+ windows.name(),
+ windows.maintainMs(),
+ windows.segments,
+ false,
+ new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer),
+ null);
+
+ // aggregate the values with the aggregator and local store
+ topology.addProcessor(selectName, aggWindowSupplier, this.name);
+ topology.addProcessor(reduceName, aggregateSupplier, selectName);
+ topology.addStateStore(aggregateStore, reduceName);
+
+ // return the KTable representation with the intermediate topic as the sources
+ return new KTableImpl<>(topology, reduceName, aggregateSupplier, sourceNodes);
+ }
+
+ @Override
+ public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator,
Windows<W> windows,
Serializer<K> keySerializer,
Serializer<T> aggValueSerializer,
@@ -407,7 +442,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
String selectName = topology.newName(SELECT_NAME);
ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
- ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregatorSupplier.get());
+ ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregator);
RocksDBWindowStoreSupplier<K, T> aggregateStore =
new RocksDBWindowStoreSupplier<>(
@@ -426,39 +461,4 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
// return the KTable representation with the intermediate topic as the sources
return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes);
}
-
- @Override
- public <W extends Window> KTable<Windowed<K>, Long> sumByKey(final KeyValueToLongMapper<K, V> valueSelector,
- Windows<W> windows,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer) {
-
- KStream<K, Long> selected = this.map(new KeyValueMapper<K, V, KeyValue<K, Long>>() {
- @Override
- public KeyValue<K, Long> apply(K key, V value) {
- return new KeyValue<>(key, valueSelector.apply(key, value));
- }
- });
-
- return selected.<Long, W>aggregateByKey(new LongSumSupplier<K>(),
- windows,
- keySerializer,
- new LongSerializer(),
- keyDeserializer,
- new LongDeserializer());
- }
-
-
- @Override
- public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer) {
-
- return this.<Long, W>aggregateByKey(new CountSupplier<K, V>(),
- windows,
- keySerializer,
- new LongSerializer(),
- keyDeserializer,
- new LongDeserializer());
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
new file mode 100644
index 0000000..7d6eb27
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.Iterator;
+import java.util.Map;
+
+public class KStreamReduce<K, V, W extends Window> implements KTableProcessorSupplier<Windowed<K>, V, V> {
+
+ private final String storeName;
+ private final Windows<W> windows;
+ private final Reducer<V> reducer;
+
+ private boolean sendOldValues = false;
+
+ public KStreamReduce(Windows<W> windows, String storeName, Reducer<V> reducer) {
+ this.windows = windows;
+ this.storeName = storeName;
+ this.reducer = reducer;
+ }
+
+ @Override
+ public Processor<Windowed<K>, Change<V>> get() {
+ return new KStreamAggregateProcessor();
+ }
+
+ @Override
+ public void enableSendingOldValues() {
+ sendOldValues = true;
+ }
+
+ private class KStreamAggregateProcessor extends AbstractProcessor<Windowed<K>, Change<V>> {
+
+ private WindowStore<K, V> windowStore;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+
+ windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+ }
+
+ @Override
+ public void process(Windowed<K> windowedKey, Change<V> change) {
+ // first get the matching windows
+ long timestamp = windowedKey.window().start();
+ K key = windowedKey.value();
+ V value = change.newValue;
+
+ Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
+
+ long timeFrom = Long.MAX_VALUE;
+ long timeTo = Long.MIN_VALUE;
+
+ // use range query on window store for efficient reads
+ for (long windowStartMs : matchedWindows.keySet()) {
+ timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
+ timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
+ }
+
+ WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo);
+
+ // for each matching window, try to update the corresponding key and send to the downstream
+ while (iter.hasNext()) {
+ KeyValue<Long, V> entry = iter.next();
+ W window = matchedWindows.get(entry.key);
+
+ if (window != null) {
+
+ V oldAgg = entry.value;
+ V newAgg = oldAgg;
+
+ // try to add the new new value (there will never be old value)
+ if (newAgg == null) {
+ newAgg = value;
+ } else {
+ newAgg = reducer.apply(newAgg, value);
+ }
+
+ // update the store with the new value
+ windowStore.put(key, newAgg, window.start());
+
+ // forward the aggregated change pair
+ if (sendOldValues)
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+ else
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+
+ matchedWindows.remove(entry.key);
+ }
+ }
+
+ iter.close();
+
+ // create the new window for the rest of unmatched window that do not exist yet
+ for (long windowStartMs : matchedWindows.keySet()) {
+ windowStore.put(key, value, windowStartMs);
+
+ // send the new aggregate pair (there will be no old value)
+ context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(value, null));
+ }
+ }
+ }
+
+ @Override
+ public KTableValueGetterSupplier<Windowed<K>, V> view() {
+
+ return new KTableValueGetterSupplier<Windowed<K>, V>() {
+
+ public KTableValueGetter<Windowed<K>, V> get() {
+ return new KStreamAggregateValueGetter();
+ }
+
+ };
+ }
+
+ private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, V> {
+
+ private WindowStore<K, V> windowStore;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public V get(Windowed<K> windowedKey) {
+ K key = windowedKey.value();
+ W window = (W) windowedKey.window();
+
+ // this iterator should only contain one element
+ Iterator<KeyValue<Long, V>> iter = windowStore.fetch(key, window.start(), window.start());
+
+ return iter.next().value;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index a5948f8..1730a8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -30,7 +30,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
private boolean sendOldValues = false;
- KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) {
+ public KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) {
this.storeName = storeName;
this.aggregator = aggregator;
}
@@ -62,7 +62,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
T oldAgg = store.get(key);
if (oldAgg == null)
- oldAgg = aggregator.initialValue();
+ oldAgg = aggregator.initialValue(key);
T newAgg = oldAgg;
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 9888dff..8ee557c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,17 +18,15 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
+import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -48,31 +46,34 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
- private static final String FILTER_NAME = "KTABLE-FILTER-";
+ private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
- private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
+ private static final String FILTER_NAME = "KTABLE-FILTER-";
- private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
+ public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
- private static final String SELECT_NAME = "KTABLE-SELECT-";
+ public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
- private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
+ public static final String LEFTTHIS_NAME = "KTABLE-LEFTTHIS-";
- public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+ public static final String LEFTOTHER_NAME = "KTABLE-LEFTOTHER-";
- public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
+ private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
- public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
+ public static final String MERGE_NAME = "KTABLE-MERGE-";
public static final String OUTERTHIS_NAME = "KTABLE-OUTERTHIS-";
public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-";
- public static final String LEFTTHIS_NAME = "KTABLE-LEFTTHIS-";
+ private static final String REDUCE_NAME = "KTABLE-REDUCE-";
- public static final String LEFTOTHER_NAME = "KTABLE-LEFTOTHER-";
+ private static final String SELECT_NAME = "KTABLE-SELECT-";
+
+ public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+
+ private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
- public static final String MERGE_NAME = "KTABLE-MERGE-";
public final ProcessorSupplier<K, ?> processorSupplier;
@@ -245,15 +246,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@Override
- public <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
- KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- Serializer<K1> keySerializer,
- Serializer<V1> valueSerializer,
- Serializer<V2> aggValueSerializer,
- Deserializer<K1> keyDeserializer,
- Deserializer<V1> valueDeserializer,
- Deserializer<V2> aggValueDeserializer,
- String name) {
+ public <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator,
+ KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+ Serializer<K1> keySerializer,
+ Serializer<V1> valueSerializer,
+ Serializer<T> aggValueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V1> valueDeserializer,
+ Deserializer<T> aggValueDeserializer,
+ String name) {
String selectName = topology.newName(SELECT_NAME);
String sinkName = topology.newName(KStreamImpl.SINK_NAME);
@@ -267,7 +268,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
- ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregatorSupplier.get());
+ ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregator);
StateStoreSupplier aggregateStore = Stores.create(name)
.withKeys(keySerializer, keyDeserializer)
@@ -295,55 +296,52 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@Override
- public <K1> KTable<K1, Long> sum(final KeyValueMapper<K, V, K1> keySelector,
- final KeyValueToLongMapper<K, V> valueSelector,
- Serializer<K1> keySerializer,
- Deserializer<K1> keyDeserializer,
- String name) {
+ public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer,
+ Reducer<V1> removeReducer,
+ KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+ Serializer<K1> keySerializer,
+ Serializer<V1> valueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V1> valueDeserializer,
+ String name) {
- Serializer<Long> longSerializer = new LongSerializer();
- Deserializer<Long> longDeserializer = new LongDeserializer();
+ String selectName = topology.newName(SELECT_NAME);
+ String sinkName = topology.newName(KStreamImpl.SINK_NAME);
+ String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
+ String reduceName = topology.newName(REDUCE_NAME);
- KeyValueMapper<K, V, KeyValue<K1, Long>> mapper = new KeyValueMapper<K, V, KeyValue<K1, Long>>() {
- @Override
- public KeyValue<K1, Long> apply(K key, V value) {
- K1 aggKey = keySelector.apply(key, value);
- Long aggValue = valueSelector.apply(key, value);
+ String topic = name + REPARTITION_TOPIC_SUFFIX;
- return new KeyValue<>(aggKey, aggValue);
- }
- };
+ ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
+ ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
- return this.<K1, Long, Long>aggregate(new LongSumSupplier<K1>(), mapper,
- keySerializer, longSerializer, longSerializer,
- keyDeserializer, longDeserializer, longDeserializer,
- name);
- }
+ KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
- @Override
- public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> keySelector,
- Serializer<K1> keySerializer,
- Serializer<V> valueSerializer,
- Deserializer<K1> keyDeserializer,
- Deserializer<V> valueDeserializer,
- String name) {
+ ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableReduce<>(name, addReducer, removeReducer);
- Serializer<Long> longSerializer = new LongSerializer();
- Deserializer<Long> longDeserializer = new LongDeserializer();
+ StateStoreSupplier aggregateStore = Stores.create(name)
+ .withKeys(keySerializer, keyDeserializer)
+ .withValues(valueSerializer, valueDeserializer)
+ .localDatabase()
+ .build();
- KeyValueMapper<K, V, KeyValue<K1, V>> mapper = new KeyValueMapper<K, V, KeyValue<K1, V>>() {
- @Override
- public KeyValue<K1, V> apply(K key, V value) {
- K1 aggKey = keySelector.apply(key, value);
+ // select the aggregate key and values (old and new), it would require parent to send old values
+ topology.addProcessor(selectName, selectSupplier, this.name);
+ this.enableSendingOldValues();
- return new KeyValue<>(aggKey, value);
- }
- };
+ // send the aggregate key-value pairs to the intermediate topic for partitioning
+ topology.addInternalTopic(topic);
+ topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName);
- return this.<K1, V, Long>aggregate(new CountSupplier<K1, V>(), mapper,
- keySerializer, valueSerializer, longSerializer,
- keyDeserializer, valueDeserializer, longDeserializer,
- name);
+ // read the intermediate topic
+ topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
+
+ // aggregate the values with the aggregator and local store
+ topology.addProcessor(reduceName, aggregateSupplier, sourceName);
+ topology.addStateStore(aggregateStore, reduceName);
+
+ // return the KTable representation with the intermediate topic as the sources
+ return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName));
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
new file mode 100644
index 0000000..0d1b55a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
+
+ private final String storeName;
+ private final Reducer<V> addReducer;
+ private final Reducer<V> removeReducer;
+
+ private boolean sendOldValues = false;
+
+ public KTableReduce(String storeName, Reducer<V> addReducer, Reducer<V> removeReducer) {
+ this.storeName = storeName;
+ this.addReducer = addReducer;
+ this.removeReducer = removeReducer;
+ }
+
+ @Override
+ public void enableSendingOldValues() {
+ sendOldValues = true;
+ }
+
+ @Override
+ public Processor<K, Change<V>> get() {
+ return new KTableAggregateProcessor();
+ }
+
+ private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
+
+ private KeyValueStore<K, V> store;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+
+ store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+ }
+
+ @Override
+ public void process(K key, Change<V> value) {
+ V oldAgg = store.get(key);
+ V newAgg = oldAgg;
+
+ // first try to add the new new value
+ if (value.newValue != null) {
+ if (newAgg == null) {
+ newAgg = value.newValue;
+ } else {
+ newAgg = addReducer.apply(newAgg, value.newValue);
+ }
+ }
+
+ // then try to remove the old value
+ if (value.oldValue != null) {
+ newAgg = removeReducer.apply(newAgg, value.oldValue);
+ }
+
+ // update the store with the new value
+ store.put(key, newAgg);
+
+ // send the old / new pair
+ if (sendOldValues)
+ context().forward(key, new Change<>(newAgg, oldAgg));
+ else
+ context().forward(key, new Change<>(newAgg, null));
+ }
+ }
+
+ @Override
+ public KTableValueGetterSupplier<K, V> view() {
+
+ return new KTableValueGetterSupplier<K, V>() {
+
+ public KTableValueGetter<K, V> get() {
+ return new KTableAggregateValueGetter();
+ }
+
+ };
+ }
+
+ private class KTableAggregateValueGetter implements KTableValueGetter<K, V> {
+
+ private KeyValueStore<K, V> store;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+ }
+
+ @Override
+ public V get(K key) {
+ return store.get(key);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
index d07fc5d..c993512 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
@@ -22,8 +22,8 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.MeteredKeyValueStore;
-import org.apache.kafka.streams.state.RocksDBStore;
+import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.Serdes;
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
deleted file mode 100644
index b66590e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
-
-public class LongSumSupplier<K> implements AggregatorSupplier<K, Long, Long> {
-
- private class LongSum implements Aggregator<K, Long, Long> {
-
- @Override
- public Long initialValue() {
- return 0L;
- }
-
- @Override
- public Long add(K aggKey, Long value, Long aggregate) {
- return aggregate + value;
- }
-
- @Override
- public Long remove(K aggKey, Long value, Long aggregate) {
- return aggregate - value;
- }
-
- @Override
- public Long merge(Long aggr1, Long aggr2) {
- return aggr1 + aggr2;
- }
- }
-
- @Override
- public Aggregator<K, Long, Long> get() {
- return new LongSum();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java
deleted file mode 100644
index 00f4b55..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
-import org.apache.kafka.streams.kstream.Aggregator;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-
-/**
- * NOTE: This is just a demo aggregate supplier that can be implemented by users to add their own built-in aggregates.
- * It is highly in-efficient and is not supposed to be merged in.
- */
-public class TopKSupplier<K, V extends Comparable<V>> implements AggregatorSupplier<K, V, Collection<V>> {
-
- private final int k;
-
- public TopKSupplier(int k) {
- this.k = k;
- }
-
- private class TopK implements Aggregator<K, V, Collection<V>> {
-
- private final Map<K, PriorityQueue<V>> sorted = new HashMap<>();
-
- @Override
- public Collection<V> initialValue() {
- return Collections.<V>emptySet();
- }
-
- @Override
- public Collection<V> add(K aggKey, V value, Collection<V> aggregate) {
- PriorityQueue<V> queue = sorted.get(aggKey);
- if (queue == null) {
- queue = new PriorityQueue<>();
- sorted.put(aggKey, queue);
- }
-
- queue.add(value);
-
- PriorityQueue<V> copy = new PriorityQueue<>(queue);
-
- Set<V> ret = new HashSet<>();
- for (int i = 1; i <= k; i++)
- ret.add(copy.poll());
-
- return ret;
- }
-
- @Override
- public Collection<V> remove(K aggKey, V value, Collection<V> aggregate) {
- PriorityQueue<V> queue = sorted.get(aggKey);
-
- if (queue == null)
- throw new IllegalStateException("This should not happen.");
-
- queue.remove(value);
-
- PriorityQueue<V> copy = new PriorityQueue<>(queue);
-
- Set<V> ret = new HashSet<>();
- for (int i = 1; i <= k; i++)
- ret.add(copy.poll());
-
- return ret;
- }
-
- @Override
- public Collection<V> merge(Collection<V> aggr1, Collection<V> aggr2) {
- PriorityQueue<V> copy = new PriorityQueue<>(aggr1);
- copy.addAll(aggr2);
-
- Set<V> ret = new HashSet<>();
- for (int i = 1; i <= k; i++)
- ret.add(copy.poll());
-
- return ret;
- }
- }
-
- @Override
- public Aggregator<K, V, Collection<V>> get() {
- return new TopK();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3cac3f1..547bb15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
deleted file mode 100644
index d1f845c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-/**
- * An in-memory key-value store based on a TreeMap.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
- */
-public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
-
- private final String name;
- private final Serdes serdes;
- private final Time time;
-
- protected InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
- this.name = name;
- this.serdes = serdes;
- this.time = time;
- }
-
- public String name() {
- return name;
- }
-
- public StateStore get() {
- return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
- }
-
- private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
-
- private final String name;
- private final NavigableMap<K, V> map;
-
- public MemoryStore(String name) {
- super();
- this.name = name;
- this.map = new TreeMap<>();
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public void init(ProcessorContext context) {
- // do-nothing since it is in-memory
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- return this.map.get(key);
- }
-
- @Override
- public void put(K key, V value) {
- this.map.put(key, value);
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- return this.map.remove(key);
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
- }
-
- @Override
- public void flush() {
- // do-nothing since it is in-memory
- }
-
- @Override
- public void close() {
- // do-nothing
- }
-
- private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
- private final Iterator<Map.Entry<K, V>> iter;
-
- public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
- this.iter = iter;
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public Entry<K, V> next() {
- Map.Entry<K, V> entry = iter.next();
- return new Entry<>(entry.getKey(), entry.getValue());
- }
-
- @Override
- public void remove() {
- iter.remove();
- }
-
- @Override
- public void close() {
- }
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
deleted file mode 100644
index a346534..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
-/**
- * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- */
-public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
-
- private final String name;
- private final int capacity;
- private final Serdes serdes;
- private final Time time;
-
- protected InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
- this.name = name;
- this.capacity = capacity;
- this.serdes = serdes;
- this.time = time;
- }
-
- public String name() {
- return name;
- }
-
- public StateStore get() {
- MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
- final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
- cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
- @Override
- public void apply(K key, V value) {
- store.removed(key);
- }
- });
- return store;
- }
-
- private static interface EldestEntryRemovalListener<K, V> {
- public void apply(K key, V value);
- }
-
- protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
-
- private final String name;
- private final Map<K, V> map;
- private final NavigableSet<K> keys;
- private EldestEntryRemovalListener<K, V> listener;
-
- public MemoryLRUCache(String name, final int maxCacheSize) {
- this.name = name;
- this.keys = new TreeSet<>();
- // leave room for one extra entry to handle adding an entry before the oldest can be removed
- this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
- private static final long serialVersionUID = 1L;
-
- @Override
- protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
- if (size() > maxCacheSize) {
- K key = eldest.getKey();
- keys.remove(key);
- if (listener != null) listener.apply(key, eldest.getValue());
- return true;
- }
- return false;
- }
- };
- }
-
- protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
- this.listener = listener;
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public void init(ProcessorContext context) {
- // do-nothing since it is in-memory
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- return this.map.get(key);
- }
-
- @Override
- public void put(K key, V value) {
- this.map.put(key, value);
- this.keys.add(key);
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- V value = this.map.remove(key);
- this.keys.remove(key);
- return value;
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
- }
-
- @Override
- public void flush() {
- // do-nothing since it is in-memory
- }
-
- @Override
- public void close() {
- // do-nothing
- }
-
- private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
- private final Iterator<K> keys;
- private final Map<K, V> entries;
- private K lastKey;
-
- public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
- this.keys = keys;
- this.entries = entries;
- }
-
- @Override
- public boolean hasNext() {
- return keys.hasNext();
- }
-
- @Override
- public Entry<K, V> next() {
- lastKey = keys.next();
- return new Entry<>(lastKey, entries.get(lastKey));
- }
-
- @Override
- public void remove() {
- keys.remove();
- entries.remove(lastKey);
- }
-
- @Override
- public void close() {
- // do nothing
- }
- }
- }
-}