You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/02/24 00:11:05 UTC

kafka git commit: MINOR: KTable.count() to only take a selector for key

Repository: kafka
Updated Branches:
  refs/heads/trunk 14b688e00 -> 0ce916398


MINOR: KTable.count() to only take a selector for key

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Gwen Shapira, Yashiru Matsuda, Michael Noll

Closes #872 from guozhangwang/KCount


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0ce91639
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0ce91639
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0ce91639

Branch: refs/heads/trunk
Commit: 0ce9163989cb9898df7e74245cbc2ee37d729670
Parents: 14b688e
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Feb 23 15:10:56 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Feb 23 15:10:56 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/streams/KeyValue.java | 22 +++++++++++++
 .../apache/kafka/streams/kstream/KTable.java    | 18 +++++------
 .../streams/kstream/internals/KTableImpl.java   | 33 ++++++++++++--------
 .../apache/kafka/streams/processor/TaskId.java  |  3 ++
 .../apache/kafka/streams/StreamsConfigTest.java |  2 --
 5 files changed, 54 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0ce91639/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
index d813c47..ca86fc4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.streams;
 
+import java.util.Objects;
+
 /**
  * A key-value pair defined for a single Kafka Streams record.
  * If the record comes directly from a Kafka topic then its
@@ -42,4 +44,24 @@ public class KeyValue<K, V> {
     public String toString() {
         return "KeyValue(" + key + ", " + value + ")";
     }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other)
+            return true;
+
+        if (other instanceof KeyValue) {
+            KeyValue otherKV = (KeyValue) other;
+
+            return key == null ? otherKV.key == null : key.equals(otherKV.key)
+                    && value == null ? otherKV.value == null : value.equals(otherKV.value);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(key, value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ce91639/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index b83b0de..a2c6397 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -190,15 +190,15 @@ public interface KTable<K, V> {
      * @param selector the KeyValue mapper that select the aggregate key
      * @param name the name of the resulted table
      * @param <K1>   the key type of the aggregated table
-     * @param <V1>   the value type of the aggregated table
      * @return the instance of KTable
      */
-    <K1, V1> KTable<K1, Long> count(KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                    Serializer<K1> keySerializer,
-                                    Serializer<V1> valueSerializer,
-                                    Serializer<Long> aggValueSerializer,
-                                    Deserializer<K1> keyDeserializer,
-                                    Deserializer<V1> valueDeserializer,
-                                    Deserializer<Long> aggValueDeserializer,
-                                    String name);
+    <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector,
+                                Serializer<K1> keySerializer,
+                                Serializer<V> valueSerializer,
+                                Serializer<Long> aggValueSerializer,
+                                Deserializer<K1> keyDeserializer,
+                                Deserializer<V> valueDeserializer,
+                                Deserializer<Long> aggValueDeserializer,
+                                String name);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ce91639/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 9853737..fa4cd93 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -299,14 +299,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public <K1, V1> KTable<K1, Long> count(KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                           Serializer<K1> keySerializer,
-                                           Serializer<V1> valueSerializer,
-                                           Serializer<Long> aggValueSerializer,
-                                           Deserializer<K1> keyDeserializer,
-                                           Deserializer<V1> valueDeserializer,
-                                           Deserializer<Long> aggValueDeserializer,
-                                           String name) {
+    public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector,
+                                       Serializer<K1> keySerializer,
+                                       Serializer<V> valueSerializer,
+                                       Serializer<Long> aggValueSerializer,
+                                       Deserializer<K1> keyDeserializer,
+                                       Deserializer<V> valueDeserializer,
+                                       Deserializer<Long> aggValueDeserializer,
+                                       String name) {
+
         return this.aggregate(
                 new Initializer<Long>() {
                     @Override
@@ -314,17 +315,23 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                         return 0L;
                     }
                 },
-                new Aggregator<K1, V1, Long>() {
+                new Aggregator<K1, V, Long>() {
                     @Override
-                    public Long apply(K1 aggKey, V1 value, Long aggregate) {
+                    public Long apply(K1 aggKey, V value, Long aggregate) {
                         return aggregate + 1L;
                     }
-                }, new Aggregator<K1, V1, Long>() {
+                }, new Aggregator<K1, V, Long>() {
                     @Override
-                    public Long apply(K1 aggKey, V1 value, Long aggregate) {
+                    public Long apply(K1 aggKey, V value, Long aggregate) {
                         return aggregate - 1L;
                     }
-                }, selector, keySerializer, valueSerializer, aggValueSerializer, keyDeserializer, valueDeserializer, aggValueDeserializer, name);
+                }, new KeyValueMapper<K, V, KeyValue<K1, V>>() {
+                    @Override
+                    public KeyValue<K1, V> apply(K key, V value) {
+                        return new KeyValue<>(selector.apply(key, value), value);
+                    }
+                },
+                keySerializer, valueSerializer, aggValueSerializer, keyDeserializer, valueDeserializer, aggValueDeserializer, name);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ce91639/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index 69b29bf..ff21047 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -75,6 +75,9 @@ public class TaskId implements Comparable<TaskId> {
 
     @Override
     public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
         if (o instanceof TaskId) {
             TaskId other = (TaskId) o;
             return other.topicGroupId == this.topicGroupId && other.partition == this.partition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ce91639/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index b2af904..f0276ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -32,8 +32,6 @@ import java.util.Properties;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
-
-
 public class StreamsConfigTest {
 
     private Properties props = new Properties();