You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/02/24 00:11:05 UTC
kafka git commit: MINOR: KTable.count() to only take a selector for
key
Repository: kafka
Updated Branches:
refs/heads/trunk 14b688e00 -> 0ce916398
MINOR: KTable.count() to only take a selector for key
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Gwen Shapira, Yashiru Matsuda, Michael Noll
Closes #872 from guozhangwang/KCount
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0ce91639
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0ce91639
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0ce91639
Branch: refs/heads/trunk
Commit: 0ce9163989cb9898df7e74245cbc2ee37d729670
Parents: 14b688e
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Feb 23 15:10:56 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Feb 23 15:10:56 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/kafka/streams/KeyValue.java | 22 +++++++++++++
.../apache/kafka/streams/kstream/KTable.java | 18 +++++------
.../streams/kstream/internals/KTableImpl.java | 33 ++++++++++++--------
.../apache/kafka/streams/processor/TaskId.java | 3 ++
.../apache/kafka/streams/StreamsConfigTest.java | 2 --
5 files changed, 54 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0ce91639/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
index d813c47..ca86fc4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams;
+import java.util.Objects;
+
/**
* A key-value pair defined for a single Kafka Streams record.
* If the record comes directly from a Kafka topic then its
@@ -42,4 +44,24 @@ public class KeyValue<K, V> {
public String toString() {
return "KeyValue(" + key + ", " + value + ")";
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other)
+ return true;
+
+ if (other instanceof KeyValue) {
+ KeyValue otherKV = (KeyValue) other;
+
+ return key == null ? otherKV.key == null : key.equals(otherKV.key)
+ && value == null ? otherKV.value == null : value.equals(otherKV.value);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0ce91639/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index b83b0de..a2c6397 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -190,15 +190,15 @@ public interface KTable<K, V> {
* @param selector the KeyValue mapper that select the aggregate key
* @param name the name of the resulted table
* @param <K1> the key type of the aggregated table
- * @param <V1> the value type of the aggregated table
* @return the instance of KTable
*/
- <K1, V1> KTable<K1, Long> count(KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- Serializer<K1> keySerializer,
- Serializer<V1> valueSerializer,
- Serializer<Long> aggValueSerializer,
- Deserializer<K1> keyDeserializer,
- Deserializer<V1> valueDeserializer,
- Deserializer<Long> aggValueDeserializer,
- String name);
+ <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector,
+ Serializer<K1> keySerializer,
+ Serializer<V> valueSerializer,
+ Serializer<Long> aggValueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V> valueDeserializer,
+ Deserializer<Long> aggValueDeserializer,
+ String name);
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0ce91639/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 9853737..fa4cd93 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -299,14 +299,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@Override
- public <K1, V1> KTable<K1, Long> count(KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- Serializer<K1> keySerializer,
- Serializer<V1> valueSerializer,
- Serializer<Long> aggValueSerializer,
- Deserializer<K1> keyDeserializer,
- Deserializer<V1> valueDeserializer,
- Deserializer<Long> aggValueDeserializer,
- String name) {
+ public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector,
+ Serializer<K1> keySerializer,
+ Serializer<V> valueSerializer,
+ Serializer<Long> aggValueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V> valueDeserializer,
+ Deserializer<Long> aggValueDeserializer,
+ String name) {
+
return this.aggregate(
new Initializer<Long>() {
@Override
@@ -314,17 +315,23 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return 0L;
}
},
- new Aggregator<K1, V1, Long>() {
+ new Aggregator<K1, V, Long>() {
@Override
- public Long apply(K1 aggKey, V1 value, Long aggregate) {
+ public Long apply(K1 aggKey, V value, Long aggregate) {
return aggregate + 1L;
}
- }, new Aggregator<K1, V1, Long>() {
+ }, new Aggregator<K1, V, Long>() {
@Override
- public Long apply(K1 aggKey, V1 value, Long aggregate) {
+ public Long apply(K1 aggKey, V value, Long aggregate) {
return aggregate - 1L;
}
- }, selector, keySerializer, valueSerializer, aggValueSerializer, keyDeserializer, valueDeserializer, aggValueDeserializer, name);
+ }, new KeyValueMapper<K, V, KeyValue<K1, V>>() {
+ @Override
+ public KeyValue<K1, V> apply(K key, V value) {
+ return new KeyValue<>(selector.apply(key, value), value);
+ }
+ },
+ keySerializer, valueSerializer, aggValueSerializer, keyDeserializer, valueDeserializer, aggValueDeserializer, name);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/0ce91639/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index 69b29bf..ff21047 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -75,6 +75,9 @@ public class TaskId implements Comparable<TaskId> {
@Override
public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
if (o instanceof TaskId) {
TaskId other = (TaskId) o;
return other.topicGroupId == this.topicGroupId && other.partition == this.partition;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0ce91639/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index b2af904..f0276ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -32,8 +32,6 @@ import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-
-
public class StreamsConfigTest {
private Properties props = new Properties();