You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/18 22:43:56 UTC
kafka git commit: MINOR: complete built-in stream aggregate functions
Repository: kafka
Updated Branches:
refs/heads/trunk a62eb5993 -> f75e33502
MINOR: complete built-in stream aggregate functions
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Matsuda
Closes #787 from guozhangwang/KBuiltInAgg
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f75e3350
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f75e3350
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f75e3350
Branch: refs/heads/trunk
Commit: f75e3350259c6df11719ee0a55f5780c136f3d26
Parents: a62eb59
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Jan 18 13:43:52 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Jan 18 13:43:52 2016 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/KStream.java | 37 ------------
.../streams/kstream/internals/KStreamImpl.java | 63 +++++++++-----------
2 files changed, 27 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f75e3350/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 85d51e9..36741a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import java.util.Collection;
/**
* KStream is an abstraction of a stream of key-value pairs.
@@ -292,28 +291,6 @@ public interface KStream<K, V> {
Deserializer<K> keyDeserializer);
/**
- * Sum extracted integer values of this stream by key on a window basis.
- *
- * @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value
- * @param windows the specification of the aggregation window
- */
- <W extends Window> KTable<Windowed<K>, Integer> sumByKey(KeyValueToIntMapper<K, V> valueSelector,
- Windows<W> windows,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer);
-
- /**
- * Sum extracted double decimal values of this stream by key on a window basis.
- *
- * @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value
- * @param windows the specification of the aggregation window
- */
- <W extends Window> KTable<Windowed<K>, Double> sumByKey(KeyValueToDoubleMapper<K, V> valueSelector,
- Windows<W> windows,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer);
-
- /**
* Count number of records of this stream by key on a window basis.
*
* @param windows the specification of the aggregation window
@@ -322,18 +299,4 @@ public interface KStream<K, V> {
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer);
- /**
- * Get the top-k values of this stream by key on a window basis.
- *
- * @param k parameter of the top-k computation
- * @param valueSelector the class of KeyValueMapper to extract the comparable value
- * @param windows the specification of the aggregation window
- */
- <W extends Window, V1 extends Comparable<V1>> KTable<Windowed<K>, Collection<V1>> topKByKey(int k,
- KeyValueMapper<K, V, V1> valueSelector,
- Windows<W> windows,
- Serializer<K> keySerializer,
- Serializer<V1> aggValueSerializer,
- Deserializer<K> keyDeserializer,
- Deserializer<V1> aggValueDeserializer);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f75e3350/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7b634dc..691910b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -18,14 +18,14 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.AggregatorSupplier;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper;
-import org.apache.kafka.streams.kstream.KeyValueToIntMapper;
import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -43,7 +43,6 @@ import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.Serdes;
import java.lang.reflect.Array;
-import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
@@ -71,6 +70,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
+ private static final String SELECT_NAME = "KSTREAM-SELECT-";
+
private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
public static final String SINK_NAME = "KSTREAM-SINK-";
@@ -403,7 +404,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
// TODO: this agg window operator is only used for casting K to Windowed<K> for
// KTableProcessorSupplier, which is a bit awkward and better be removed in the future
String aggregateName = topology.newName(AGGREGATE_NAME);
- String aggWindowName = topology.newName(WINDOWED_NAME);
+ String selectName = topology.newName(SELECT_NAME);
ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregatorSupplier.get());
@@ -418,8 +419,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
null);
// aggregate the values with the aggregator and local store
- topology.addProcessor(aggWindowName, aggWindowSupplier, this.name);
- topology.addProcessor(aggregateName, aggregateSupplier, aggWindowName);
+ topology.addProcessor(selectName, aggWindowSupplier, this.name);
+ topology.addProcessor(aggregateName, aggregateSupplier, selectName);
topology.addStateStore(aggregateStore, aggregateName);
// return the KTable representation with the intermediate topic as the sources
@@ -427,47 +428,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
@Override
- public <W extends Window> KTable<Windowed<K>, Long> sumByKey(KeyValueToLongMapper<K, V> valueSelector,
+ public <W extends Window> KTable<Windowed<K>, Long> sumByKey(final KeyValueToLongMapper<K, V> valueSelector,
Windows<W> windows,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer) {
- // TODO
- return null;
- }
- public <W extends Window> KTable<Windowed<K>, Integer> sumByKey(KeyValueToIntMapper<K, V> valueSelector,
- Windows<W> windows,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer) {
- // TODO
- return null;
+ KStream<K, Long> selected = this.map(new KeyValueMapper<K, V, KeyValue<K, Long>>() {
+ @Override
+ public KeyValue<K, Long> apply(K key, V value) {
+ return new KeyValue<>(key, valueSelector.apply(key, value));
+ }
+ });
+
+ return selected.<Long, W>aggregateByKey(new LongSumSupplier<K>(),
+ windows,
+ keySerializer,
+ new LongSerializer(),
+ keyDeserializer,
+ new LongDeserializer());
}
- public <W extends Window> KTable<Windowed<K>, Double> sumByKey(KeyValueToDoubleMapper<K, V> valueSelector,
- Windows<W> windows,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer) {
- // TODO
- return null;
- }
@Override
public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer) {
- // TODO
- return null;
- }
- @Override
- public <W extends Window, V1 extends Comparable<V1>> KTable<Windowed<K>, Collection<V1>> topKByKey(int k,
- KeyValueMapper<K, V, V1> valueSelector,
- Windows<W> windows,
- Serializer<K> keySerializer,
- Serializer<V1> aggValueSerializer,
- Deserializer<K> keyDeserializer,
- Deserializer<V1> aggValueDeserializer) {
- // TODO
- return null;
+ return this.<Long, W>aggregateByKey(new CountSupplier<K, V>(),
+ windows,
+ keySerializer,
+ new LongSerializer(),
+ keyDeserializer,
+ new LongDeserializer());
}
}