You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/03/15 20:08:29 UTC
kafka git commit: MINOR: kstream/ktable counting method with default
long serdes
Repository: kafka
Updated Branches:
refs/heads/trunk 951e30adc -> 355076cd2
MINOR: kstream/ktable counting method with default long serdes
guozhangwang miguno
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Michael G. Noll, Guozhang Wang
Closes #1065 from ymatsuda/count_serdes
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/355076cd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/355076cd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/355076cd
Branch: refs/heads/trunk
Commit: 355076cd262dd071287a1c8586ae0f9635e218e3
Parents: 951e30a
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Mar 15 12:08:26 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 15 12:08:26 2016 -0700
----------------------------------------------------------------------
.../examples/pageview/PageViewTypedJob.java | 3 +--
.../examples/pageview/PageViewUntypedJob.java | 3 +--
.../examples/wordcount/WordCountJob.java | 3 +--
.../apache/kafka/streams/kstream/KStream.java | 9 ++-------
.../apache/kafka/streams/kstream/KTable.java | 2 --
.../streams/kstream/internals/KStreamImpl.java | 21 ++++++++++----------
.../streams/kstream/internals/KTableImpl.java | 9 +++++----
.../streams/smoketest/SmokeTestClient.java | 8 ++------
8 files changed, 22 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
index 6a105fd..1fcb403 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
@@ -151,8 +151,7 @@ public class PageViewTypedJob {
}
})
.countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
- stringSerializer, longSerializer,
- stringDeserializer, longDeserializer)
+ stringSerializer, stringDeserializer)
// TODO: we can merge ths toStream().map(...) with a single toStream(...)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
index e890589..fb1a55d 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
@@ -108,8 +108,7 @@ public class PageViewUntypedJob {
}
})
.countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
- stringSerializer, longSerializer,
- stringDeserializer, longDeserializer)
+ stringSerializer, stringDeserializer)
// TODO: we can merge ths toStream().map(...) with a single toStream(...)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
index 82d216e..d1f8d86 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
@@ -66,7 +66,6 @@ public class WordCountJob {
final Serializer<String> stringSerializer = new StringSerializer();
final Deserializer<String> stringDeserializer = new StringDeserializer();
final Serializer<Long> longSerializer = new LongSerializer();
- final Deserializer<Long> longDeserializer = new LongDeserializer();
KStream<String, String> source = builder.stream("streams-file-input");
@@ -82,7 +81,7 @@ public class WordCountJob {
return new KeyValue<String, String>(value, value);
}
})
- .countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts");
+ .countByKey(stringSerializer, stringDeserializer, "Counts");
counts.to("streams-wordcount-output", stringSerializer, longSerializer);
http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 6426af9..1640bde 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -329,18 +329,13 @@ public interface KStream<K, V> {
*/
<W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
Serializer<K> keySerializer,
- Serializer<Long> aggValueSerializer,
- Deserializer<K> keyDeserializer,
- Deserializer<Long> aggValueDeserializer);
+ Deserializer<K> keyDeserializer);
/**
* Count number of messages of this stream by key without a window basis, and hence
- * return a ever updating counting table
- *
+ * return a ever updating counting table.
*/
KTable<K, Long> countByKey(Serializer<K> keySerializer,
- Serializer<Long> aggValueSerializer,
Deserializer<K> keyDeserializer,
- Deserializer<Long> aggValueDeserializer,
String name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 485bb20..b44ed21 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -193,10 +193,8 @@ public interface KTable<K, V> {
<K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector,
Serializer<K1> keySerializer,
Serializer<V> valueSerializer,
- Serializer<Long> aggValueSerializer,
Deserializer<K1> keyDeserializer,
Deserializer<V> valueDeserializer,
- Deserializer<Long> aggValueDeserializer,
String name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index cff082c..884933b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -18,6 +18,8 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
@@ -91,6 +93,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
+ private static final LongSerializer LONG_SERIALIZER = new LongSerializer();
+ private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
super(topology, name, sourceNodes);
@@ -501,9 +505,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
Serializer<K> keySerializer,
- Serializer<Long> aggValueSerializer,
- Deserializer<K> keyDeserializer,
- Deserializer<Long> aggValueDeserializer) {
+ Deserializer<K> keyDeserializer) {
return this.aggregateByKey(
new Initializer<Long>() {
@Override
@@ -516,16 +518,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1L;
}
- }, windows, keySerializer, aggValueSerializer, keyDeserializer, aggValueDeserializer);
+ }, windows, keySerializer, LONG_SERIALIZER, keyDeserializer, LONG_DESERIALIZER);
}
@Override
- public KTable<K, Long> countByKey(Serializer<K> keySerializer,
- Serializer<Long> aggValueSerializer,
- Deserializer<K> keyDeserializer,
- Deserializer<Long> aggValueDeserializer,
- String name) {
-
+ public KTable<K, Long> countByKey(Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer,
+ String name) {
return this.aggregateByKey(
new Initializer<Long>() {
@Override
@@ -538,6 +537,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1L;
}
- }, keySerializer, aggValueSerializer, keyDeserializer, aggValueDeserializer, name);
+ }, keySerializer, LONG_SERIALIZER, keyDeserializer, LONG_DESERIALIZER, name);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index b82582b..d63fcc8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,6 +18,8 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
@@ -75,6 +77,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
+ private static final LongSerializer LONG_SERIALIZER = new LongSerializer();
+ private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
public final ProcessorSupplier<?, ?> processorSupplier;
@@ -302,12 +306,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector,
Serializer<K1> keySerializer,
Serializer<V> valueSerializer,
- Serializer<Long> aggValueSerializer,
Deserializer<K1> keyDeserializer,
Deserializer<V> valueDeserializer,
- Deserializer<Long> aggValueDeserializer,
String name) {
-
return this.aggregate(
new Initializer<Long>() {
@Override
@@ -331,7 +332,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return new KeyValue<>(selector.apply(key, value), value);
}
},
- keySerializer, valueSerializer, aggValueSerializer, keyDeserializer, valueDeserializer, aggValueDeserializer, name);
+ keySerializer, valueSerializer, LONG_SERIALIZER, keyDeserializer, valueDeserializer, LONG_DESERIALIZER, name);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/355076cd/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index fec447f..6cb45f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -191,9 +191,7 @@ public class SmokeTestClient extends SmokeTestUtil {
data.countByKey(
UnlimitedWindows.of("uwin-cnt"),
stringSerializer,
- longSerializer,
- stringDeserializer,
- longDeserializer
+ stringDeserializer
).toStream().map(
new Unwindow<String, Long>()
).to("cnt", stringSerializer, longSerializer);
@@ -224,9 +222,7 @@ public class SmokeTestClient extends SmokeTestUtil {
data.countByKey(
TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE),
stringSerializer,
- longSerializer,
- stringDeserializer,
- longDeserializer
+ stringDeserializer
).toStream().map(
new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
@Override