You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/18 22:43:56 UTC

kafka git commit: MINOR: complete built-in stream aggregate functions

Repository: kafka
Updated Branches:
  refs/heads/trunk a62eb5993 -> f75e33502


MINOR: complete built-in stream aggregate functions

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Matsuda

Closes #787 from guozhangwang/KBuiltInAgg


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f75e3350
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f75e3350
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f75e3350

Branch: refs/heads/trunk
Commit: f75e3350259c6df11719ee0a55f5780c136f3d26
Parents: a62eb59
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Jan 18 13:43:52 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Jan 18 13:43:52 2016 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   | 37 ------------
 .../streams/kstream/internals/KStreamImpl.java  | 63 +++++++++-----------
 2 files changed, 27 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f75e3350/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 85d51e9..36741a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-import java.util.Collection;
 
 /**
  * KStream is an abstraction of a stream of key-value pairs.
@@ -292,28 +291,6 @@ public interface KStream<K, V> {
                                                           Deserializer<K> keyDeserializer);
 
     /**
-     * Sum extracted integer values of this stream by key on a window basis.
-     *
-     * @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value
-     * @param windows the specification of the aggregation window
-     */
-    <W extends Window> KTable<Windowed<K>, Integer> sumByKey(KeyValueToIntMapper<K, V> valueSelector,
-                                                             Windows<W> windows,
-                                                             Serializer<K> keySerializer,
-                                                             Deserializer<K> keyDeserializer);
-
-    /**
-     * Sum extracted double decimal values of this stream by key on a window basis.
-     *
-     * @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value
-     * @param windows the specification of the aggregation window
-     */
-    <W extends Window> KTable<Windowed<K>, Double> sumByKey(KeyValueToDoubleMapper<K, V> valueSelector,
-                                                            Windows<W> windows,
-                                                            Serializer<K> keySerializer,
-                                                            Deserializer<K> keyDeserializer);
-
-    /**
      * Count number of records of this stream by key on a window basis.
      *
      * @param windows the specification of the aggregation window
@@ -322,18 +299,4 @@ public interface KStream<K, V> {
                                                             Serializer<K> keySerializer,
                                                             Deserializer<K> keyDeserializer);
 
-    /**
-     * Get the top-k values of this stream by key on a window basis.
-     *
-     * @param k parameter of the top-k computation
-     * @param valueSelector the class of KeyValueMapper to extract the comparable value
-     * @param windows the specification of the aggregation window
-     */
-    <W extends Window, V1 extends Comparable<V1>> KTable<Windowed<K>, Collection<V1>> topKByKey(int k,
-                                                                                                KeyValueMapper<K, V, V1> valueSelector,
-                                                                                                Windows<W> windows,
-                                                                                                Serializer<K> keySerializer,
-                                                                                                Serializer<V1> aggValueSerializer,
-                                                                                                Deserializer<K> keyDeserializer,
-                                                                                                Deserializer<V1> aggValueDeserializer);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f75e3350/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7b634dc..691910b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -18,14 +18,14 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.AggregatorSupplier;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper;
-import org.apache.kafka.streams.kstream.KeyValueToIntMapper;
 import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -43,7 +43,6 @@ import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier;
 import org.apache.kafka.streams.state.Serdes;
 
 import java.lang.reflect.Array;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -71,6 +70,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
 
+    private static final String SELECT_NAME = "KSTREAM-SELECT-";
+
     private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
 
     public static final String SINK_NAME = "KSTREAM-SINK-";
@@ -403,7 +404,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         // TODO: this agg window operator is only used for casting K to Windowed<K> for
         // KTableProcessorSupplier, which is a bit awkward and better be removed in the future
         String aggregateName = topology.newName(AGGREGATE_NAME);
-        String aggWindowName = topology.newName(WINDOWED_NAME);
+        String selectName = topology.newName(SELECT_NAME);
 
         ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
         ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregatorSupplier.get());
@@ -418,8 +419,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                         null);
 
         // aggregate the values with the aggregator and local store
-        topology.addProcessor(aggWindowName, aggWindowSupplier, this.name);
-        topology.addProcessor(aggregateName, aggregateSupplier, aggWindowName);
+        topology.addProcessor(selectName, aggWindowSupplier, this.name);
+        topology.addProcessor(aggregateName, aggregateSupplier, selectName);
         topology.addStateStore(aggregateStore, aggregateName);
 
         // return the KTable representation with the intermediate topic as the sources
@@ -427,47 +428,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public <W extends Window> KTable<Windowed<K>, Long> sumByKey(KeyValueToLongMapper<K, V> valueSelector,
+    public <W extends Window> KTable<Windowed<K>, Long> sumByKey(final KeyValueToLongMapper<K, V> valueSelector,
                                                                  Windows<W> windows,
                                                                  Serializer<K> keySerializer,
                                                                  Deserializer<K> keyDeserializer) {
-        // TODO
-        return null;
-    }
 
-    public <W extends Window> KTable<Windowed<K>, Integer> sumByKey(KeyValueToIntMapper<K, V> valueSelector,
-                                                                    Windows<W> windows,
-                                                                    Serializer<K> keySerializer,
-                                                                    Deserializer<K> keyDeserializer) {
-        // TODO
-        return null;
+        KStream<K, Long> selected = this.map(new KeyValueMapper<K, V, KeyValue<K, Long>>() {
+            @Override
+            public KeyValue<K, Long> apply(K key, V value) {
+                return new KeyValue<>(key, valueSelector.apply(key, value));
+            }
+        });
+
+        return selected.<Long, W>aggregateByKey(new LongSumSupplier<K>(),
+                windows,
+                keySerializer,
+                new LongSerializer(),
+                keyDeserializer,
+                new LongDeserializer());
     }
 
-    public <W extends Window> KTable<Windowed<K>, Double> sumByKey(KeyValueToDoubleMapper<K, V> valueSelector,
-                                                                   Windows<W> windows,
-                                                                   Serializer<K> keySerializer,
-                                                                   Deserializer<K> keyDeserializer) {
-        // TODO
-        return null;
-    }
 
     @Override
     public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
                                                                    Serializer<K> keySerializer,
                                                                    Deserializer<K> keyDeserializer) {
-        // TODO
-        return null;
-    }
 
-    @Override
-    public <W extends Window, V1 extends Comparable<V1>> KTable<Windowed<K>, Collection<V1>> topKByKey(int k,
-                                                                                                       KeyValueMapper<K, V, V1> valueSelector,
-                                                                                                       Windows<W> windows,
-                                                                                                       Serializer<K> keySerializer,
-                                                                                                       Serializer<V1> aggValueSerializer,
-                                                                                                       Deserializer<K> keyDeserializer,
-                                                                                                       Deserializer<V1> aggValueDeserializer) {
-        // TODO
-        return null;
+        return this.<Long, W>aggregateByKey(new CountSupplier<K, V>(),
+                windows,
+                keySerializer,
+                new LongSerializer(),
+                keyDeserializer,
+                new LongDeserializer());
     }
 }