You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/10 20:14:08 UTC
kafka git commit: KAFKA-3817: handle null keys in KTableRepartitionMap
Repository: kafka
Updated Branches:
refs/heads/trunk eb6f04a8b -> da8517182
KAFKA-3817: handle null keys in KTableRepartitionMap
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Jeff Klukas <je...@klukas.net>
Closes #1488 from guozhangwang/K3817-handle-null-groupedkey
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da851718
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da851718
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da851718
Branch: refs/heads/trunk
Commit: da8517182d2f30c4e03b33b38d41d2fa33621e24
Parents: eb6f04a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Jun 10 13:14:05 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Jun 10 13:14:05 2016 -0700
----------------------------------------------------------------------
.../kstream/internals/KTableRepartitionMap.java | 31 +++++-------
.../kstream/internals/KTableAggregateTest.java | 50 ++++++++++++++++++++
2 files changed, 61 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/da851718/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 2a7cf1b..bba1857 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -53,7 +53,6 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
public KTableValueGetter<K, KeyValue<K1, V1>> get() {
return new KTableMapValueGetter(parentValueGetterSupplier.get());
}
-
};
}
@@ -66,15 +65,6 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
throw new IllegalStateException("KTableRepartitionMap should always require sending old values.");
}
- private KeyValue<K1, V1> computeValue(K key, V value) {
- KeyValue<K1, V1> newValue = null;
-
- if (key != null || value != null)
- newValue = mapper.apply(key, value);
-
- return newValue;
- }
-
private class KTableMapProcessor extends AbstractProcessor<K, Change<V>> {
/**
@@ -82,16 +72,18 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
*/
@Override
public void process(K key, Change<V> change) {
- KeyValue<K1, V1> newPair = computeValue(key, change.newValue);
-
- // the selected repartition key should never be null
- if (newPair.key == null)
- throw new StreamsException("Record key for KTable repartition operator should not be null.");
+ // the original key should never be null
+ if (key == null)
+ throw new StreamsException("Record key for the grouping KTable should not be null.");
- context().forward(newPair.key, new Change<>(newPair.value, null));
+ KeyValue<K1, V1> newPair = mapper.apply(key, change.newValue);
+ KeyValue<K1, V1> oldPair = mapper.apply(key, change.oldValue);
- if (change.oldValue != null) {
- KeyValue<K1, V1> oldPair = computeValue(key, change.oldValue);
+ // if the selected repartition key or value is null, skip
+ if (newPair != null && newPair.key != null && newPair.value != null) {
+ context().forward(newPair.key, new Change<>(newPair.value, null));
+ }
+ if (oldPair != null && oldPair.key != null && oldPair.value != null) {
context().forward(oldPair.key, new Change<>(null, oldPair.value));
}
}
@@ -112,9 +104,8 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
@Override
public KeyValue<K1, V1> get(K key) {
- return computeValue(key, parentGetter.get(key));
+ return mapper.apply(key, parentGetter.get(key));
}
-
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da851718/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index a614479..75e007d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -20,8 +20,10 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
@@ -96,4 +98,52 @@ public class KTableAggregateTest {
"B:0+2+4-2+7", "B:0+2+4-2+7-4",
"C:0+5+8", "C:0+5+8-5"), proc2.processed);
}
+
+ @Test
+ public void testAggRepartition() throws Exception {
+ final KStreamBuilder builder = new KStreamBuilder();
+ String topic1 = "topic1";
+
+ KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
+ KTable<String, String> table2 = table1.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+ @Override
+ public KeyValue<String, String> apply(String key, String value) {
+ if (key.equals("null")) {
+ return KeyValue.pair(null, value + "s");
+ } else if (key.equals("NULL")) {
+ return null;
+ } else {
+ return KeyValue.pair(value, value + "s");
+ }
+ }
+ },
+ stringSerde,
+ stringSerde
+ )
+ .aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.STRING_ADDER,
+ MockAggregator.STRING_REMOVER,
+ stringSerde,
+ "topic1-Canonized");
+
+ MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
+ table2.toStream().process(proc2);
+
+ driver = new KStreamTestDriver(builder, stateDir);
+
+ driver.process(topic1, "A", "1");
+ driver.process(topic1, "B", "2");
+ driver.process(topic1, "null", "3");
+ driver.process(topic1, "B", "4");
+ driver.process(topic1, "NULL", "5");
+ driver.process(topic1, "B", "7");
+
+ assertEquals(Utils.mkList(
+ "1:0+1s",
+ "2:0+2s",
+ "4:0+4s",
+ "2:0+2s-2s",
+ "7:0+7s",
+ "4:0+4s-4s"), proc2.processed);
+ }
}