You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/07/08 15:36:24 UTC
kafka git commit: KAFKA-3887: KAFKA-3817 follow-up to avoid
forwarding value if it is null in KTableRepartition
Repository: kafka
Updated Branches:
refs/heads/trunk efc4c8881 -> 730bf9a37
KAFKA-3887: KAFKA-3817 follow-up to avoid forwarding value if it is null in KTableRepartition
Also handle Null value in SmokeTestUtil.
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Damian Guy <da...@gmail.com>
Closes #1597 from guozhangwang/KHotfix-check-null
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/730bf9a3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/730bf9a3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/730bf9a3
Branch: refs/heads/trunk
Commit: 730bf9a37a08b2ca41dcda52d2c70e92e85980f7
Parents: efc4c88
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Jul 8 08:36:20 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Jul 8 08:36:20 2016 -0700
----------------------------------------------------------------------
.../kafka/streams/kstream/internals/KTableRepartitionMap.java | 5 +++--
.../java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java | 2 +-
2 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/730bf9a3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index bba1857..ac7c00e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -76,8 +76,9 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
if (key == null)
throw new StreamsException("Record key for the grouping KTable should not be null.");
- KeyValue<K1, V1> newPair = mapper.apply(key, change.newValue);
- KeyValue<K1, V1> oldPair = mapper.apply(key, change.oldValue);
+ // if the value is null, we do not need to forward its selected key-value further
+ KeyValue<K1, V1> newPair = change.newValue == null ? null : mapper.apply(key, change.newValue);
+ KeyValue<K1, V1> oldPair = change.oldValue == null ? null : mapper.apply(key, change.oldValue);
// if the selected repartition key or value is null, skip
if (newPair != null && newPair.key != null && newPair.value != null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/730bf9a3/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
index b0d7a0b..f1c237e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -87,7 +87,7 @@ public class SmokeTestUtil {
return new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
@Override
public KeyValue<String, Long> apply(String key, Long value) {
- return new KeyValue<>(Long.toString(value), 1L);
+ return new KeyValue<>(value == null ? null : Long.toString(value), 1L);
}
};
}