You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/07/08 15:36:24 UTC

kafka git commit: KAFKA-3887: KAFKA-3817 follow-up to avoid forwarding value if it is null in KTableRepartition

Repository: kafka
Updated Branches:
  refs/heads/trunk efc4c8881 -> 730bf9a37


KAFKA-3887: KAFKA-3817 follow-up to avoid forwarding value if it is null in KTableRepartition

Also handle Null value in SmokeTestUtil.

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>

Closes #1597 from guozhangwang/KHotfix-check-null


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/730bf9a3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/730bf9a3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/730bf9a3

Branch: refs/heads/trunk
Commit: 730bf9a37a08b2ca41dcda52d2c70e92e85980f7
Parents: efc4c88
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Jul 8 08:36:20 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Jul 8 08:36:20 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/internals/KTableRepartitionMap.java   | 5 +++--
 .../java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java  | 2 +-
 2 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/730bf9a3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index bba1857..ac7c00e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -76,8 +76,9 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
             if (key == null)
                 throw new StreamsException("Record key for the grouping KTable should not be null.");
 
-            KeyValue<K1, V1> newPair = mapper.apply(key, change.newValue);
-            KeyValue<K1, V1> oldPair = mapper.apply(key, change.oldValue);
+            // if the value is null, we do not need to forward its selected key-value further
+            KeyValue<K1, V1> newPair = change.newValue == null ? null : mapper.apply(key, change.newValue);
+            KeyValue<K1, V1> oldPair = change.oldValue == null ? null : mapper.apply(key, change.oldValue);
 
             // if the selected repartition key or value is null, skip
             if (newPair != null && newPair.key != null && newPair.value != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/730bf9a3/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
index b0d7a0b..f1c237e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -87,7 +87,7 @@ public class SmokeTestUtil {
             return new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
                 @Override
                 public KeyValue<String, Long> apply(String key, Long value) {
-                    return new KeyValue<>(Long.toString(value), 1L);
+                    return new KeyValue<>(value == null ? null : Long.toString(value), 1L);
                 }
             };
         }