You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/14 02:16:02 UTC
[2/2] kafka git commit: KAFKA-3081: KTable Aggregation
KAFKA-3081: KTable Aggregation
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Matsuda
Closes #761 from guozhangwang/K3081
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4f22705c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4f22705c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4f22705c
Branch: refs/heads/trunk
Commit: 4f22705c7d0c8e8cab68883e76f554439341e34a
Parents: 7001174
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Jan 13 17:15:57 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jan 13 17:15:57 2016 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/KTable.java | 58 +---
.../kstream/internals/ChangedDeserializer.java | 59 ++++
.../kstream/internals/ChangedSerializer.java | 57 ++++
.../kstream/internals/CountSupplier.java | 52 ++++
.../internals/DefaultWindowedDeserializer.java | 59 ----
.../internals/DefaultWindowedSerializer.java | 57 ----
.../streams/kstream/internals/KStreamImpl.java | 58 ++--
.../kstream/internals/KTableAggregate.java | 118 +++++++
.../streams/kstream/internals/KTableImpl.java | 215 +++++++------
.../kstream/internals/KTableMapValues.java | 8 +-
.../kstream/internals/KTableRepartitionMap.java | 110 +++++++
.../kstream/internals/LongSumSupplier.java | 52 ++++
.../kstream/internals/WindowedDeserializer.java | 59 ++++
.../kstream/internals/WindowedSerializer.java | 57 ++++
.../kstream/internals/KTableAggregateTest.java | 122 ++++++++
.../internals/KTableMapValuesImplTest.java | 308 -------------------
.../kstream/internals/KTableMapValuesTest.java | 308 +++++++++++++++++++
.../apache/kafka/test/NoOpKeyValueMapper.java | 29 ++
18 files changed, 1195 insertions(+), 591 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 997edcd..9837dae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -20,8 +20,6 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import java.util.Collection;
-
/**
* KTable is an abstraction of a change log stream.
*
@@ -152,9 +150,11 @@ public interface KTable<K, V> {
*/
<K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- Serializer<K> keySerializer,
+ Serializer<K1> keySerializer,
+ Serializer<V1> valueSerializer,
Serializer<V2> aggValueSerializer,
- Deserializer<K> keyDeserializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V1> valueDeserializer,
Deserializer<V2> aggValueDeserializer,
String name);
@@ -167,59 +167,21 @@ public interface KTable<K, V> {
*/
<K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
KeyValueToLongMapper<K, V> valueSelector,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer,
+ Serializer<K1> keySerializer,
+ Deserializer<K1> keyDeserializer,
String name);
/**
- * Sum extracted integer values of this table by the selected aggregation key
- *
- * @param keySelector the class of KeyValueMapper to select the aggregation key
- * @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value
- * @param name the name of the resulted table
- */
- <K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector,
- KeyValueToIntMapper<K, V> valueSelector,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer,
- String name);
-
- /**
- * Sum extracted double decimal values of this table by the selected aggregation key
- *
- * @param keySelector the class of KeyValueMapper to select the aggregation key
- * @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value
- * @param name the name of the resulted table
- */
- <K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector,
- KeyValueToDoubleMapper<K, V> valueSelector,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer,
- String name);
-
- /**
* Count number of records of this table by the selected aggregation key
*
* @param keySelector the class of KeyValueMapper to select the aggregation key
* @param name the name of the resulted table
*/
<K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer,
+ Serializer<K1> keySerializer,
+ Serializer<V> valueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V> valueDeserializer,
String name);
- /**
- * Get the top-k values of this table by the selected aggregation key
- *
- * @param k parameter of the top-k computation
- * @param keySelector the class of KeyValueMapper to select the aggregation key
- * @param name the name of the resulted table
- */
- <K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k,
- KeyValueMapper<K, V, K1> keySelector,
- Serializer<K> keySerializer,
- Serializer<V1> aggValueSerializer,
- Deserializer<K> keyDeserializer,
- Deserializer<V1> aggValueDeserializer,
- String name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
new file mode 100644
index 0000000..d4c4e2d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
+
+ private static final int NEWFLAG_SIZE = 1;
+
+ private final Deserializer<T> inner;
+
+ public ChangedDeserializer(Deserializer<T> inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // do nothing
+ }
+
+ @Override
+ public Change<T> deserialize(String topic, byte[] data) {
+
+ byte[] bytes = new byte[data.length - NEWFLAG_SIZE];
+
+ System.arraycopy(data, 0, bytes, 0, bytes.length);
+
+ if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
+ return new Change<>(inner.deserialize(topic, bytes), null);
+ } else {
+ return new Change<>(null, inner.deserialize(topic, bytes));
+ }
+ }
+
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
new file mode 100644
index 0000000..e9b7cad
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ChangedSerializer<T> implements Serializer<Change<T>> {
+
+ private static final int NEWFLAG_SIZE = 1;
+
+ private final Serializer<T> inner;
+
+ public ChangedSerializer(Serializer<T> inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // do nothing
+ }
+
+ @Override
+ public byte[] serialize(String topic, Change<T> data) {
+ // only one of the old / new values would be not null
+ byte[] serializedKey = inner.serialize(topic, data.newValue != null ? data.newValue : data.oldValue);
+
+ ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE);
+ buf.put(serializedKey);
+ buf.put((byte) (data.newValue != null ? 1 : 0));
+
+ return buf.array();
+ }
+
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
new file mode 100644
index 0000000..b7dc5aa
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.AggregatorSupplier;
+
+public class CountSupplier<K, V> implements AggregatorSupplier<K, V, Long> {
+
+ private class Count implements Aggregator<K, V, Long> {
+
+ @Override
+ public Long initialValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long add(K aggKey, V value, Long aggregate) {
+ return aggregate + 1;
+ }
+
+ @Override
+ public Long remove(K aggKey, V value, Long aggregate) {
+ return aggregate - 1;
+ }
+
+ @Override
+ public Long merge(Long aggr1, Long aggr2) {
+ return aggr1 + aggr2;
+ }
+ }
+
+ @Override
+ public Aggregator<K, V, Long> get() {
+ return new Count();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java
deleted file mode 100644
index 9a14c53..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.kstream.Windowed;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-public class DefaultWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
-
- private static final int TIMESTAMP_SIZE = 8;
-
- private Deserializer<T> inner;
-
- public DefaultWindowedDeserializer(Deserializer<T> inner) {
- this.inner = inner;
- }
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- // do nothing
- }
-
- @Override
- public Windowed<T> deserialize(String topic, byte[] data) {
-
- byte[] bytes = new byte[data.length - TIMESTAMP_SIZE];
-
- System.arraycopy(data, 0, bytes, 0, bytes.length);
-
- long start = ByteBuffer.wrap(data).getLong(data.length - TIMESTAMP_SIZE);
-
- // always read as unlimited window
- return new Windowed<T>(inner.deserialize(topic, bytes), new UnlimitedWindow(start));
- }
-
-
- @Override
- public void close() {
- inner.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java
deleted file mode 100644
index 4bf2b28..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.Windowed;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-public class DefaultWindowedSerializer<T> implements Serializer<Windowed<T>> {
-
- private static final int TIMESTAMP_SIZE = 8;
-
- private Serializer<T> inner;
-
- public DefaultWindowedSerializer(Serializer<T> inner) {
- this.inner = inner;
- }
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- // do nothing
- }
-
- @Override
- public byte[] serialize(String topic, Windowed<T> data) {
- byte[] serializedKey = inner.serialize(topic, data.value());
-
- ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE);
- buf.put(serializedKey);
- buf.putLong(data.window().start());
-
- return buf.array();
- }
-
-
- @Override
- public void close() {
- inner.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 4505e74..f53c0d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -250,16 +250,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
- Serializer<K> keySerialzier,
- Serializer<V> thisValueSerialzier,
- Serializer<V1> otherValueSerialzier,
- Deserializer<K> keyDeserialier,
- Deserializer<V> thisValueDeserialzier,
- Deserializer<V1> otherValueDeserialzier) {
+ Serializer<K> keySerializer,
+ Serializer<V> thisValueSerializer,
+ Serializer<V1> otherValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> thisValueDeserializer,
+ Deserializer<V1> otherValueDeserializer) {
return join(other, joiner, windows,
- keySerialzier, thisValueSerialzier, otherValueSerialzier,
- keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, false);
+ keySerializer, thisValueSerializer, otherValueSerializer,
+ keyDeserializer, thisValueDeserializer, otherValueDeserializer, false);
}
@Override
@@ -267,16 +267,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
- Serializer<K> keySerialzier,
- Serializer<V> thisValueSerialzier,
- Serializer<V1> otherValueSerialzier,
- Deserializer<K> keyDeserialier,
- Deserializer<V> thisValueDeserialzier,
- Deserializer<V1> otherValueDeserialzier) {
+ Serializer<K> keySerializer,
+ Serializer<V> thisValueSerializer,
+ Serializer<V1> otherValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> thisValueDeserializer,
+ Deserializer<V1> otherValueDeserializer) {
return join(other, joiner, windows,
- keySerialzier, thisValueSerialzier, otherValueSerialzier,
- keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, true);
+ keySerializer, thisValueSerializer, otherValueSerializer,
+ keyDeserializer, thisValueDeserializer, otherValueDeserializer, true);
}
@SuppressWarnings("unchecked")
@@ -284,12 +284,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
- Serializer<K> keySerialzier,
- Serializer<V> thisValueSerialzier,
- Serializer<V1> otherValueSerialzier,
- Deserializer<K> keyDeserialier,
- Deserializer<V> thisValueDeserialzier,
- Deserializer<V1> otherValueDeserialzier,
+ Serializer<K> keySerializer,
+ Serializer<V> thisValueSerializer,
+ Serializer<V1> otherValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> thisValueDeserializer,
+ Deserializer<V1> otherValueDeserializer,
boolean outer) {
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
@@ -301,7 +301,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
windows.after,
windows.maintainMs(),
windows.segments,
- new Serdes<>("", keySerialzier, keyDeserialier, thisValueSerialzier, thisValueDeserialzier),
+ new Serdes<>("", keySerializer, keyDeserializer, thisValueSerializer, thisValueDeserializer),
null);
RocksDBWindowStoreSupplier<K, V1> otherWindow =
@@ -311,7 +311,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
windows.after,
windows.maintainMs(),
windows.segments,
- new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
+ new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
null);
KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name());
@@ -344,10 +344,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
- Serializer<K> keySerialzier,
- Serializer<V1> otherValueSerialzier,
- Deserializer<K> keyDeserialier,
- Deserializer<V1> otherValueDeserialzier) {
+ Serializer<K> keySerializer,
+ Serializer<V1> otherValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V1> otherValueDeserializer) {
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
@@ -358,7 +358,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
windows.after,
windows.maintainMs(),
windows.segments,
- new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
+ new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
null);
KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name());
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
new file mode 100644
index 0000000..a5948f8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {
+
+ private final String storeName;
+ private final Aggregator<K, V, T> aggregator;
+
+ private boolean sendOldValues = false;
+
+ KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) {
+ this.storeName = storeName;
+ this.aggregator = aggregator;
+ }
+
+ @Override
+ public void enableSendingOldValues() {
+ sendOldValues = true;
+ }
+
+ @Override
+ public Processor<K, Change<V>> get() {
+ return new KTableAggregateProcessor();
+ }
+
+ private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
+
+ private KeyValueStore<K, T> store;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+
+ store = (KeyValueStore<K, T>) context.getStateStore(storeName);
+ }
+
+ @Override
+ public void process(K key, Change<V> value) {
+ T oldAgg = store.get(key);
+
+ if (oldAgg == null)
+ oldAgg = aggregator.initialValue();
+
+ T newAgg = oldAgg;
+
+ // first try to remove the old value
+ if (value.oldValue != null) {
+ newAgg = aggregator.remove(key, value.oldValue, newAgg);
+ }
+
+ // then try to add the new new value
+ if (value.newValue != null) {
+ newAgg = aggregator.add(key, value.newValue, newAgg);
+ }
+
+ // update the store with the new value
+ store.put(key, newAgg);
+
+ // send the old / new pair
+ if (sendOldValues)
+ context().forward(key, new Change<>(newAgg, oldAgg));
+ else
+ context().forward(key, new Change<>(newAgg, null));
+ }
+ }
+
+ @Override
+ public KTableValueGetterSupplier<K, T> view() {
+
+ return new KTableValueGetterSupplier<K, T>() {
+
+ public KTableValueGetter<K, T> get() {
+ return new KTableAggregateValueGetter();
+ }
+
+ };
+ }
+
+ private class KTableAggregateValueGetter implements KTableValueGetter<K, T> {
+
+ private KeyValueStore<K, T> store;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ store = (KeyValueStore<K, T>) context.getStateStore(storeName);
+ }
+
+ @Override
+ public T get(K key) {
+ return store.get(key);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 32d3cc5..7f30f59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,6 +18,8 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.AggregatorSupplier;
import org.apache.kafka.streams.kstream.KStream;
@@ -25,16 +27,15 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper;
-import org.apache.kafka.streams.kstream.KeyValueToIntMapper;
import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
-import java.util.Collection;
+import java.util.Collections;
import java.util.Set;
/**
@@ -51,6 +52,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
+ private static final String SELECT_NAME = "KTABLE-SELECT-";
+
+ private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
+
public static final String SOURCE_NAME = "KTABLE-SOURCE-";
public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
@@ -169,47 +174,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@SuppressWarnings("unchecked")
- KTableValueGetterSupplier<K, V> valueGetterSupplier() {
- if (processorSupplier instanceof KTableSource) {
- KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
- materialize(source);
- return new KTableSourceValueGetterSupplier<>(source.topic);
- } else {
- return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
- }
- }
-
- @SuppressWarnings("unchecked")
- void enableSendingOldValues() {
- if (!sendOldValues) {
- if (processorSupplier instanceof KTableSource) {
- KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
- materialize(source);
- source.enableSendingOldValues();
- } else {
- ((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues();
- }
- sendOldValues = true;
- }
- }
-
- boolean sendingOldValueEnabled() {
- return sendOldValues;
- }
-
- private void materialize(KTableSource<K, ?> source) {
- synchronized (source) {
- if (!source.isMaterialized()) {
- StateStoreSupplier storeSupplier =
- new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
- // mark this state as non internal hence it is read directly from a user topic
- topology.addStateStore(storeSupplier, false, name);
- source.materialize();
- }
- }
- }
-
- @SuppressWarnings("unchecked")
@Override
public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
@@ -281,63 +245,142 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@Override
public <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- Serializer<K> keySerializer,
+ Serializer<K1> keySerializer,
+ Serializer<V1> valueSerializer,
Serializer<V2> aggValueSerializer,
- Deserializer<K> keyDeserializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V1> valueDeserializer,
Deserializer<V2> aggValueDeserializer,
String name) {
- // TODO
- return null;
+
+ String selectName = topology.newName(SELECT_NAME);
+ String sinkName = topology.newName(KStreamImpl.SINK_NAME);
+ String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
+ String aggregateName = topology.newName(AGGREGATE_NAME);
+
+ String topic = name + "-repartition";
+
+ ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
+ ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
+
+ KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
+
+ ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregatorSupplier.get());
+
+ StateStoreSupplier aggregateStore = Stores.create(name)
+ .withKeys(keySerializer, keyDeserializer)
+ .withValues(aggValueSerializer, aggValueDeserializer)
+ .localDatabase()
+ .build();
+
+ // select the aggregate key and values (old and new), it would require parent to send old values
+ topology.addProcessor(selectName, selectSupplier, this.name);
+ this.enableSendingOldValues();
+
+ // send the aggregate key-value pairs to the intermediate topic for partitioning
+ topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName);
+
+ // read the intermediate topic
+ topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
+
+ // aggregate the values with the aggregator and local store
+ topology.addProcessor(aggregateName, aggregateSupplier, sourceName);
+ topology.addStateStore(aggregateStore, aggregateName);
+
+ // return the KTable representation with the intermediate topic as the sources
+ return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName));
}
@Override
- public <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
- KeyValueToLongMapper<K, V> valueSelector,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer,
+ public <K1> KTable<K1, Long> sum(final KeyValueMapper<K, V, K1> keySelector,
+ final KeyValueToLongMapper<K, V> valueSelector,
+ Serializer<K1> keySerializer,
+ Deserializer<K1> keyDeserializer,
String name) {
- // TODO
- return null;
- }
- @Override
- public <K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector,
- KeyValueToIntMapper<K, V> valueSelector,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer,
- String name) {
- // TODO
- return null;
+ Serializer<Long> longSerializer = new LongSerializer();
+ Deserializer<Long> longDeserializer = new LongDeserializer();
+
+ KeyValueMapper<K, V, KeyValue<K1, Long>> mapper = new KeyValueMapper<K, V, KeyValue<K1, Long>>() {
+ @Override
+ public KeyValue<K1, Long> apply(K key, V value) {
+ K1 aggKey = keySelector.apply(key, value);
+ Long aggValue = valueSelector.apply(key, value);
+
+ return new KeyValue<>(aggKey, aggValue);
+ }
+ };
+
+ return this.<K1, Long, Long>aggregate(new LongSumSupplier<K1>(), mapper,
+ keySerializer, longSerializer, longSerializer,
+ keyDeserializer, longDeserializer, longDeserializer,
+ name);
}
@Override
- public <K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector,
- KeyValueToDoubleMapper<K, V> valueSelector,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer,
+ public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> keySelector,
+ Serializer<K1> keySerializer,
+ Serializer<V> valueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V> valueDeserializer,
String name) {
- // TODO
- return null;
+
+ Serializer<Long> longSerializer = new LongSerializer();
+ Deserializer<Long> longDeserializer = new LongDeserializer();
+
+ KeyValueMapper<K, V, KeyValue<K1, V>> mapper = new KeyValueMapper<K, V, KeyValue<K1, V>>() {
+ @Override
+ public KeyValue<K1, V> apply(K key, V value) {
+ K1 aggKey = keySelector.apply(key, value);
+
+ return new KeyValue<>(aggKey, value);
+ }
+ };
+
+ return this.<K1, V, Long>aggregate(new CountSupplier<K1, V>(), mapper,
+ keySerializer, valueSerializer, longSerializer,
+ keyDeserializer, valueDeserializer, longDeserializer,
+ name);
}
- @Override
- public <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer,
- String name) {
- // TODO
- return null;
+ @SuppressWarnings("unchecked")
+ KTableValueGetterSupplier<K, V> valueGetterSupplier() {
+ if (processorSupplier instanceof KTableSource) {
+ KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
+ materialize(source);
+ return new KTableSourceValueGetterSupplier<>(source.topic);
+ } else {
+ return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
+ }
}
- @Override
- public <K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k,
- KeyValueMapper<K, V, K1> keySelector,
- Serializer<K> keySerializer,
- Serializer<V1> aggValueSerializer,
- Deserializer<K> keyDeserializer,
- Deserializer<V1> aggValueDeserializer,
- String name) {
- // TODO
- return null;
+ @SuppressWarnings("unchecked")
+ void enableSendingOldValues() {
+ if (!sendOldValues) {
+ if (processorSupplier instanceof KTableSource) {
+ KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+ materialize(source);
+ source.enableSendingOldValues();
+ } else {
+ ((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues();
+ }
+ sendOldValues = true;
+ }
+ }
+
+ boolean sendingOldValueEnabled() {
+ return sendOldValues;
+ }
+
+ private void materialize(KTableSource<K, ?> source) {
+ synchronized (source) {
+ if (!source.isMaterialized()) {
+ StateStoreSupplier storeSupplier =
+ new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
+ // mark this state as non internal hence it is read directly from a user topic
+ topology.addStateStore(storeSupplier, false, name);
+ source.materialize();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index be80855..c664906 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+
class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> {
private final KTableImpl<K1, ?, V1> parent;
@@ -36,7 +37,7 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
@Override
public Processor<K1, Change<V1>> get() {
- return new KTableMapProcessor();
+ return new KTableMapValuesProcessor();
}
@Override
@@ -67,16 +68,15 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
return newValue;
}
- private class KTableMapProcessor extends AbstractProcessor<K1, Change<V1>> {
+ private class KTableMapValuesProcessor extends AbstractProcessor<K1, Change<V1>> {
@Override
public void process(K1 key, Change<V1> change) {
V2 newValue = computeValue(change.newValue);
V2 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
- context().forward(key, new Change(newValue, oldValue));
+ context().forward(key, new Change<>(newValue, oldValue));
}
-
}
private class KTableMapValuesValueGetter implements KTableValueGetter<K1, V2> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
new file mode 100644
index 0000000..bbef7fb
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+/**
+ * KTable repartition map functions are not exposed to public APIs, but only used for keyed aggregations.
+ *
+ * Given the input, it can output at most two records (one mapped from old value and one mapped from new value).
+ */
+public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupplier<K1, V1, KeyValue<K2, V2>> {
+
+ private final KTableImpl<K1, ?, V1> parent;
+ private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
+
+ public KTableRepartitionMap(KTableImpl<K1, ?, V1> parent, KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) {
+ this.parent = parent;
+ this.mapper = mapper;
+ }
+
+ @Override
+ public Processor<K1, Change<V1>> get() {
+ return new KTableMapProcessor();
+ }
+
+ @Override
+ public KTableValueGetterSupplier<K1, KeyValue<K2, V2>> view() {
+ final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier();
+
+ return new KTableValueGetterSupplier<K1, KeyValue<K2, V2>>() {
+
+ public KTableValueGetter<K1, KeyValue<K2, V2>> get() {
+ return new KTableMapValueGetter(parentValueGetterSupplier.get());
+ }
+
+ };
+ }
+
+ @Override
+ public void enableSendingOldValues() {
+ // this should never be called
+ throw new KafkaException("KTableRepartitionMap should always require sending old values.");
+ }
+
+ private KeyValue<K2, V2> computeValue(K1 key, V1 value) {
+ KeyValue<K2, V2> newValue = null;
+
+ if (key != null || value != null)
+ newValue = mapper.apply(key, value);
+
+ return newValue;
+ }
+
+ private class KTableMapProcessor extends AbstractProcessor<K1, Change<V1>> {
+
+ @Override
+ public void process(K1 key, Change<V1> change) {
+ KeyValue<K2, V2> newPair = computeValue(key, change.newValue);
+
+ context().forward(newPair.key, new Change<>(newPair.value, null));
+
+ if (change.oldValue != null) {
+ KeyValue<K2, V2> oldPair = computeValue(key, change.oldValue);
+ context().forward(oldPair.key, new Change<>(null, oldPair.value));
+ }
+ }
+ }
+
+ private class KTableMapValueGetter implements KTableValueGetter<K1, KeyValue<K2, V2>> {
+
+ private final KTableValueGetter<K1, V1> parentGetter;
+
+ public KTableMapValueGetter(KTableValueGetter<K1, V1> parentGetter) {
+ this.parentGetter = parentGetter;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ parentGetter.init(context);
+ }
+
+ @Override
+ public KeyValue<K2, V2> get(K1 key) {
+ return computeValue(key, parentGetter.get(key));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
new file mode 100644
index 0000000..b66590e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.AggregatorSupplier;
+
+public class LongSumSupplier<K> implements AggregatorSupplier<K, Long, Long> {
+
+ private class LongSum implements Aggregator<K, Long, Long> {
+
+ @Override
+ public Long initialValue() {
+ return 0L;
+ }
+
+ @Override
+ public Long add(K aggKey, Long value, Long aggregate) {
+ return aggregate + value;
+ }
+
+ @Override
+ public Long remove(K aggKey, Long value, Long aggregate) {
+ return aggregate - value;
+ }
+
+ @Override
+ public Long merge(Long aggr1, Long aggr2) {
+ return aggr1 + aggr2;
+ }
+ }
+
+ @Override
+ public Aggregator<K, Long, Long> get() {
+ return new LongSum();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
new file mode 100644
index 0000000..96c3668
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class WindowedDeserializer<T> implements Deserializer<Windowed<T>> {
+
+ private static final int TIMESTAMP_SIZE = 8;
+
+ private Deserializer<T> inner;
+
+ public WindowedDeserializer(Deserializer<T> inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // do nothing
+ }
+
+ @Override
+ public Windowed<T> deserialize(String topic, byte[] data) {
+
+ byte[] bytes = new byte[data.length - TIMESTAMP_SIZE];
+
+ System.arraycopy(data, 0, bytes, 0, bytes.length);
+
+ long start = ByteBuffer.wrap(data).getLong(data.length - TIMESTAMP_SIZE);
+
+ // always read as unlimited window
+ return new Windowed<T>(inner.deserialize(topic, bytes), new UnlimitedWindow(start));
+ }
+
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
new file mode 100644
index 0000000..4407a5b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
+
+ private static final int TIMESTAMP_SIZE = 8;
+
+ private Serializer<T> inner;
+
+ public WindowedSerializer(Serializer<T> inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // do nothing
+ }
+
+ @Override
+ public byte[] serialize(String topic, Windowed<T> data) {
+ byte[] serializedKey = inner.serialize(topic, data.value());
+
+ ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE);
+ buf.put(serializedKey);
+ buf.putLong(data.window().start());
+
+ return buf.array();
+ }
+
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
new file mode 100644
index 0000000..189cf9d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.AggregatorSupplier;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.NoOpKeyValueMapper;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+
+public class KTableAggregateTest {
+
+ private final Serializer<String> strSerializer = new StringSerializer();
+ private final Deserializer<String> strDeserializer = new StringDeserializer();
+
+ private class StringCanonizeSupplier implements AggregatorSupplier<String, String, String> {
+
+ private class StringCanonizer implements Aggregator<String, String, String> {
+
+ @Override
+ public String initialValue() {
+ return "";
+ }
+
+ @Override
+ public String add(String aggKey, String value, String aggregate) {
+ return aggregate + "+" + value;
+ }
+
+ @Override
+ public String remove(String aggKey, String value, String aggregate) {
+ return aggregate + "-" + value;
+ }
+
+ @Override
+ public String merge(String aggr1, String aggr2) {
+ return "(" + aggr1 + ") + (" + aggr2 + ")";
+ }
+ }
+
+ @Override
+ public Aggregator<String, String, String> get() {
+ return new StringCanonizer();
+ }
+ }
+
+ @Test
+ public void testAggBasic() throws Exception {
+ final File baseDir = Files.createTempDirectory("test").toFile();
+
+ try {
+ final KStreamBuilder builder = new KStreamBuilder();
+ String topic1 = "topic1";
+
+ KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+ KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringCanonizeSupplier(),
+ new NoOpKeyValueMapper<String, String>(),
+ strSerializer,
+ strSerializer,
+ strSerializer,
+ strDeserializer,
+ strDeserializer,
+ strDeserializer,
+ "topic1-Canonized");
+
+ MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
+ table2.toStream().process(proc2);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+
+ driver.process(topic1, "A", "1");
+ driver.process(topic1, "B", "2");
+ driver.process(topic1, "A", "3");
+ driver.process(topic1, "B", "4");
+ driver.process(topic1, "C", "5");
+ driver.process(topic1, "D", "6");
+ driver.process(topic1, "B", "7");
+ driver.process(topic1, "C", "8");
+
+ assertEquals(Utils.mkList(
+ "A:+1",
+ "B:+2",
+ "A:+1+3", "A:+1+3-1",
+ "B:+2+4", "B:+2+4-2",
+ "C:+5",
+ "D:+6",
+ "B:+2+4-2+7", "B:+2+4-2+7-4",
+ "C:+5+8", "C:+5+8-5"), proc2.processed);
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
deleted file mode 100644
index 037b30a..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class KTableMapValuesImplTest {
-
- private final Serializer<String> strSerializer = new StringSerializer();
- private final Deserializer<String> strDeserializer = new StringDeserializer();
-
- @Test
- public void testKTable() {
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
-
- KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
- KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
- @Override
- public Integer apply(String value) {
- return new Integer(value);
- }
- });
-
- MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
-
- KStreamTestDriver driver = new KStreamTestDriver(builder);
-
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "02");
- driver.process(topic1, "C", "03");
- driver.process(topic1, "D", "04");
-
- assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
- }
-
- @Test
- public void testValueGetter() throws IOException {
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- final Serializer<String> serializer = new StringSerializer();
- final Deserializer<String> deserializer = new StringDeserializer();
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
- String topic2 = "topic2";
-
- KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
- KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
- new ValueMapper<String, Integer>() {
- @Override
- public Integer apply(String value) {
- return new Integer(value);
- }
- });
- KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return (value % 2) == 0;
- }
- });
- KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
- table1.through(topic2, serializer, serializer, deserializer, deserializer);
-
- KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
- KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
- KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
- KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
-
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
-
- KTableValueGetter<String, String> getter1 = getterSupplier1.get();
- getter1.init(driver.context());
- KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
- getter2.init(driver.context());
- KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
- getter3.init(driver.context());
- KTableValueGetter<String, String> getter4 = getterSupplier4.get();
- getter4.init(driver.context());
-
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
-
- assertEquals("01", getter1.get("A"));
- assertEquals("01", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertEquals(new Integer(1), getter2.get("A"));
- assertEquals(new Integer(1), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertNull(getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertEquals("01", getter4.get("A"));
- assertEquals("01", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
-
- assertEquals("02", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertEquals(new Integer(2), getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertEquals(new Integer(2), getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertEquals("02", getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- driver.process(topic1, "A", "03");
-
- assertEquals("03", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertEquals(new Integer(3), getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertEquals("03", getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- driver.process(topic1, "A", null);
-
- assertNull(getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertNull(getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertNull(getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- } finally {
- Utils.delete(stateDir);
- }
- }
-
- @Test
- public void testNotSendingOldValue() throws IOException {
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- final Serializer<String> serializer = new StringSerializer();
- final Deserializer<String> deserializer = new StringDeserializer();
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
-
- KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
- KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
- new ValueMapper<String, Integer>() {
- @Override
- public Integer apply(String value) {
- return new Integer(value);
- }
- });
-
- MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
-
- builder.addProcessor("proc", proc, table2.name);
-
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
-
- assertFalse(table1.sendingOldValueEnabled());
- assertFalse(table2.sendingOldValueEnabled());
-
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
-
- proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
-
- proc.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
-
- driver.process(topic1, "A", "03");
-
- proc.checkAndClearResult("A:(3<-null)");
-
- driver.process(topic1, "A", null);
-
- proc.checkAndClearResult("A:(null<-null)");
-
- } finally {
- Utils.delete(stateDir);
- }
- }
-
- @Test
- public void testSendingOldValue() throws IOException {
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- final Serializer<String> serializer = new StringSerializer();
- final Deserializer<String> deserializer = new StringDeserializer();
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
-
- KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
- KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
- new ValueMapper<String, Integer>() {
- @Override
- public Integer apply(String value) {
- return new Integer(value);
- }
- });
-
- table2.enableSendingOldValues();
-
- MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
-
- builder.addProcessor("proc", proc, table2.name);
-
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
-
- assertTrue(table1.sendingOldValueEnabled());
- assertTrue(table2.sendingOldValueEnabled());
-
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
-
- proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
-
- proc.checkAndClearResult("A:(2<-1)", "B:(2<-1)");
-
- driver.process(topic1, "A", "03");
-
- proc.checkAndClearResult("A:(3<-2)");
-
- driver.process(topic1, "A", null);
-
- proc.checkAndClearResult("A:(null<-3)");
-
- } finally {
- Utils.delete(stateDir);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
new file mode 100644
index 0000000..58f1c2a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class KTableMapValuesTest {
+
+ private final Serializer<String> strSerializer = new StringSerializer();
+ private final Deserializer<String> strDeserializer = new StringDeserializer();
+
+ @Test
+ public void testKTable() {
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+
+ KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+ KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+
+ MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+ table2.toStream().process(proc2);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "02");
+ driver.process(topic1, "C", "03");
+ driver.process(topic1, "D", "04");
+
+ assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
+ }
+
+ @Test
+ public void testValueGetter() throws IOException {
+ File stateDir = Files.createTempDirectory("test").toFile();
+ try {
+ final Serializer<String> serializer = new StringSerializer();
+ final Deserializer<String> deserializer = new StringDeserializer();
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+ KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+ KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+ table1.through(topic2, serializer, serializer, deserializer, deserializer);
+
+ KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+ KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+ KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+ KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+ KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+ getter1.init(driver.context());
+ KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+ getter2.init(driver.context());
+ KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+ getter3.init(driver.context());
+ KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+ getter4.init(driver.context());
+
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "01");
+ driver.process(topic1, "C", "01");
+
+ assertEquals("01", getter1.get("A"));
+ assertEquals("01", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(1), getter2.get("A"));
+ assertEquals(new Integer(1), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertNull(getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("01", getter4.get("A"));
+ assertEquals("01", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", "02");
+ driver.process(topic1, "B", "02");
+
+ assertEquals("02", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(2), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertEquals(new Integer(2), getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("02", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", "03");
+
+ assertEquals("03", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(3), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("03", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", null);
+
+ assertNull(getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertNull(getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertNull(getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ } finally {
+ Utils.delete(stateDir);
+ }
+ }
+
+ @Test
+ public void testNotSendingOldValue() throws IOException {
+ File stateDir = Files.createTempDirectory("test").toFile();
+ try {
+ final Serializer<String> serializer = new StringSerializer();
+ final Deserializer<String> deserializer = new StringDeserializer();
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+
+ MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+
+ builder.addProcessor("proc", proc, table2.name);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+ assertFalse(table1.sendingOldValueEnabled());
+ assertFalse(table2.sendingOldValueEnabled());
+
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "01");
+ driver.process(topic1, "C", "01");
+
+ proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+
+ driver.process(topic1, "A", "02");
+ driver.process(topic1, "B", "02");
+
+ proc.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+
+ driver.process(topic1, "A", "03");
+
+ proc.checkAndClearResult("A:(3<-null)");
+
+ driver.process(topic1, "A", null);
+
+ proc.checkAndClearResult("A:(null<-null)");
+
+ } finally {
+ Utils.delete(stateDir);
+ }
+ }
+
+ @Test
+ public void testSendingOldValue() throws IOException {
+ File stateDir = Files.createTempDirectory("test").toFile();
+ try {
+ final Serializer<String> serializer = new StringSerializer();
+ final Deserializer<String> deserializer = new StringDeserializer();
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+
+ table2.enableSendingOldValues();
+
+ MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+
+ builder.addProcessor("proc", proc, table2.name);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+ assertTrue(table1.sendingOldValueEnabled());
+ assertTrue(table2.sendingOldValueEnabled());
+
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "01");
+ driver.process(topic1, "C", "01");
+
+ proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+
+ driver.process(topic1, "A", "02");
+ driver.process(topic1, "B", "02");
+
+ proc.checkAndClearResult("A:(2<-1)", "B:(2<-1)");
+
+ driver.process(topic1, "A", "03");
+
+ proc.checkAndClearResult("A:(3<-2)");
+
+ driver.process(topic1, "A", null);
+
+ proc.checkAndClearResult("A:(null<-3)");
+
+ } finally {
+ Utils.delete(stateDir);
+ }
+ }
+
+}