You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/03/15 20:08:29 UTC

kafka git commit: MINOR: kstream/ktable counting method with default long serdes

Repository: kafka
Updated Branches:
  refs/heads/trunk 951e30adc -> 355076cd2


MINOR: kstream/ktable counting method with default long serdes

guozhangwang miguno

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Michael G. Noll, Guozhang Wang

Closes #1065 from ymatsuda/count_serdes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/355076cd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/355076cd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/355076cd

Branch: refs/heads/trunk
Commit: 355076cd262dd071287a1c8586ae0f9635e218e3
Parents: 951e30a
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Mar 15 12:08:26 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 15 12:08:26 2016 -0700

----------------------------------------------------------------------
 .../examples/pageview/PageViewTypedJob.java     |  3 +--
 .../examples/pageview/PageViewUntypedJob.java   |  3 +--
 .../examples/wordcount/WordCountJob.java        |  3 +--
 .../apache/kafka/streams/kstream/KStream.java   |  9 ++-------
 .../apache/kafka/streams/kstream/KTable.java    |  2 --
 .../streams/kstream/internals/KStreamImpl.java  | 21 ++++++++++----------
 .../streams/kstream/internals/KTableImpl.java   |  9 +++++----
 .../streams/smoketest/SmokeTestClient.java      |  8 ++------
 8 files changed, 22 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
index 6a105fd..1fcb403 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
@@ -151,8 +151,7 @@ public class PageViewTypedJob {
                     }
                 })
                 .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
-                        stringSerializer, longSerializer,
-                        stringDeserializer, longDeserializer)
+                        stringSerializer, stringDeserializer)
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
index e890589..fb1a55d 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
@@ -108,8 +108,7 @@ public class PageViewUntypedJob {
                     }
                 })
                 .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
-                        stringSerializer, longSerializer,
-                        stringDeserializer, longDeserializer)
+                        stringSerializer, stringDeserializer)
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
index 82d216e..d1f8d86 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
@@ -66,7 +66,6 @@ public class WordCountJob {
         final Serializer<String> stringSerializer = new StringSerializer();
         final Deserializer<String> stringDeserializer = new StringDeserializer();
         final Serializer<Long> longSerializer = new LongSerializer();
-        final Deserializer<Long> longDeserializer = new LongDeserializer();
 
         KStream<String, String> source = builder.stream("streams-file-input");
 
@@ -82,7 +81,7 @@ public class WordCountJob {
                         return new KeyValue<String, String>(value, value);
                     }
                 })
-                .countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts");
+                .countByKey(stringSerializer, stringDeserializer, "Counts");
 
         counts.to("streams-wordcount-output", stringSerializer, longSerializer);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 6426af9..1640bde 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -329,18 +329,13 @@ public interface KStream<K, V> {
      */
     <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
                                                             Serializer<K> keySerializer,
-                                                            Serializer<Long> aggValueSerializer,
-                                                            Deserializer<K> keyDeserializer,
-                                                            Deserializer<Long> aggValueDeserializer);
+                                                            Deserializer<K> keyDeserializer);
 
     /**
      * Count number of messages of this stream by key without a window basis, and hence
-     * return a ever updating counting table
-     *
+     * return a ever updating counting table.
      */
     KTable<K, Long> countByKey(Serializer<K> keySerializer,
-                               Serializer<Long> aggValueSerializer,
                                Deserializer<K> keyDeserializer,
-                               Deserializer<Long> aggValueDeserializer,
                                String name);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 485bb20..b44ed21 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -193,10 +193,8 @@ public interface KTable<K, V> {
     <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector,
                                 Serializer<K1> keySerializer,
                                 Serializer<V> valueSerializer,
-                                Serializer<Long> aggValueSerializer,
                                 Deserializer<K1> keyDeserializer,
                                 Deserializer<V> valueDeserializer,
-                                Deserializer<Long> aggValueDeserializer,
                                 String name);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index cff082c..884933b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -91,6 +93,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
 
+    private static final LongSerializer LONG_SERIALIZER = new LongSerializer();
+    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
 
     public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
         super(topology, name, sourceNodes);
@@ -501,9 +505,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
                                                                    Serializer<K> keySerializer,
-                                                                   Serializer<Long> aggValueSerializer,
-                                                                   Deserializer<K> keyDeserializer,
-                                                                   Deserializer<Long> aggValueDeserializer) {
+                                                                   Deserializer<K> keyDeserializer) {
         return this.aggregateByKey(
                 new Initializer<Long>() {
                     @Override
@@ -516,16 +518,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                     public Long apply(K aggKey, V value, Long aggregate) {
                         return aggregate + 1L;
                     }
-                }, windows, keySerializer, aggValueSerializer, keyDeserializer, aggValueDeserializer);
+                }, windows, keySerializer, LONG_SERIALIZER, keyDeserializer, LONG_DESERIALIZER);
     }
 
     @Override
-    public     KTable<K, Long> countByKey(Serializer<K> keySerializer,
-                                          Serializer<Long> aggValueSerializer,
-                                          Deserializer<K> keyDeserializer,
-                                          Deserializer<Long> aggValueDeserializer,
-                                          String name) {
-
+    public KTable<K, Long> countByKey(Serializer<K> keySerializer,
+                                      Deserializer<K> keyDeserializer,
+                                      String name) {
         return this.aggregateByKey(
                 new Initializer<Long>() {
                     @Override
@@ -538,6 +537,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                     public Long apply(K aggKey, V value, Long aggregate) {
                         return aggregate + 1L;
                     }
-                }, keySerializer, aggValueSerializer, keyDeserializer, aggValueDeserializer, name);
+                }, keySerializer, LONG_SERIALIZER, keyDeserializer, LONG_DESERIALIZER, name);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index b82582b..d63fcc8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -75,6 +77,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
+    private static final LongSerializer LONG_SERIALIZER = new LongSerializer();
+    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
 
     public final ProcessorSupplier<?, ?> processorSupplier;
 
@@ -302,12 +306,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector,
                                        Serializer<K1> keySerializer,
                                        Serializer<V> valueSerializer,
-                                       Serializer<Long> aggValueSerializer,
                                        Deserializer<K1> keyDeserializer,
                                        Deserializer<V> valueDeserializer,
-                                       Deserializer<Long> aggValueDeserializer,
                                        String name) {
-
         return this.aggregate(
                 new Initializer<Long>() {
                     @Override
@@ -331,7 +332,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                         return new KeyValue<>(selector.apply(key, value), value);
                     }
                 },
-                keySerializer, valueSerializer, aggValueSerializer, keyDeserializer, valueDeserializer, aggValueDeserializer, name);
+                keySerializer, valueSerializer, LONG_SERIALIZER, keyDeserializer, valueDeserializer, LONG_DESERIALIZER, name);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index fec447f..6cb45f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -191,9 +191,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         data.countByKey(
                 UnlimitedWindows.of("uwin-cnt"),
                 stringSerializer,
-                longSerializer,
-                stringDeserializer,
-                longDeserializer
+                stringDeserializer
         ).toStream().map(
                 new Unwindow<String, Long>()
         ).to("cnt", stringSerializer, longSerializer);
@@ -224,9 +222,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         data.countByKey(
                 TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE),
                 stringSerializer,
-                longSerializer,
-                stringDeserializer,
-                longDeserializer
+                stringDeserializer
         ).toStream().map(
                 new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
                     @Override