You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/10 20:14:08 UTC

kafka git commit: KAFKA-3817: handle null keys in KTableRepartitionMap

Repository: kafka
Updated Branches:
  refs/heads/trunk eb6f04a8b -> da8517182


KAFKA-3817: handle null keys in KTableRepartitionMap

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Jeff Klukas <je...@klukas.net>

Closes #1488 from guozhangwang/K3817-handle-null-groupedkey


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da851718
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da851718
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da851718

Branch: refs/heads/trunk
Commit: da8517182d2f30c4e03b33b38d41d2fa33621e24
Parents: eb6f04a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Jun 10 13:14:05 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Jun 10 13:14:05 2016 -0700

----------------------------------------------------------------------
 .../kstream/internals/KTableRepartitionMap.java | 31 +++++-------
 .../kstream/internals/KTableAggregateTest.java  | 50 ++++++++++++++++++++
 2 files changed, 61 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/da851718/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 2a7cf1b..bba1857 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -53,7 +53,6 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
             public KTableValueGetter<K, KeyValue<K1, V1>> get() {
                 return new KTableMapValueGetter(parentValueGetterSupplier.get());
             }
-
         };
     }
 
@@ -66,15 +65,6 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
         throw new IllegalStateException("KTableRepartitionMap should always require sending old values.");
     }
 
-    private KeyValue<K1, V1> computeValue(K key, V value) {
-        KeyValue<K1, V1> newValue = null;
-
-        if (key != null || value != null)
-            newValue = mapper.apply(key, value);
-
-        return newValue;
-    }
-
     private class KTableMapProcessor extends AbstractProcessor<K, Change<V>> {
 
         /**
@@ -82,16 +72,18 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
          */
         @Override
         public void process(K key, Change<V> change) {
-            KeyValue<K1, V1> newPair = computeValue(key, change.newValue);
-
-            // the selected repartition key should never be null
-            if (newPair.key == null)
-                throw new StreamsException("Record key for KTable repartition operator should not be null.");
+            // the original key should never be null
+            if (key == null)
+                throw new StreamsException("Record key for the grouping KTable should not be null.");
 
-            context().forward(newPair.key, new Change<>(newPair.value, null));
+            KeyValue<K1, V1> newPair = mapper.apply(key, change.newValue);
+            KeyValue<K1, V1> oldPair = mapper.apply(key, change.oldValue);
 
-            if (change.oldValue != null) {
-                KeyValue<K1, V1> oldPair = computeValue(key, change.oldValue);
+            // if the selected repartition key or value is null, skip
+            if (newPair != null && newPair.key != null && newPair.value != null) {
+                context().forward(newPair.key, new Change<>(newPair.value, null));
+            }
+            if (oldPair != null && oldPair.key != null && oldPair.value != null) {
                 context().forward(oldPair.key, new Change<>(null, oldPair.value));
             }
         }
@@ -112,9 +104,8 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
 
         @Override
         public KeyValue<K1, V1> get(K key) {
-            return computeValue(key, parentGetter.get(key));
+            return mapper.apply(key, parentGetter.get(key));
         }
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da851718/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index a614479..75e007d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -20,8 +20,10 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
@@ -96,4 +98,52 @@ public class KTableAggregateTest {
                 "B:0+2+4-2+7", "B:0+2+4-2+7-4",
                 "C:0+5+8", "C:0+5+8-5"), proc2.processed);
     }
+
+    @Test
+    public void testAggRepartition() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        String topic1 = "topic1";
+
+        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
+        KTable<String, String> table2 = table1.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+            @Override
+                public KeyValue<String, String> apply(String key, String value) {
+                    if (key.equals("null")) {
+                        return KeyValue.pair(null, value + "s");
+                    } else if (key.equals("NULL")) {
+                        return null;
+                    } else {
+                        return KeyValue.pair(value, value + "s");
+                    }
+                }
+            },
+                stringSerde,
+                stringSerde
+        )
+                .aggregate(MockInitializer.STRING_INIT,
+                MockAggregator.STRING_ADDER,
+                MockAggregator.STRING_REMOVER,
+                stringSerde,
+                "topic1-Canonized");
+
+        MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
+
+        driver = new KStreamTestDriver(builder, stateDir);
+
+        driver.process(topic1, "A", "1");
+        driver.process(topic1, "B", "2");
+        driver.process(topic1, "null", "3");
+        driver.process(topic1, "B", "4");
+        driver.process(topic1, "NULL", "5");
+        driver.process(topic1, "B", "7");
+
+        assertEquals(Utils.mkList(
+                "1:0+1s",
+                "2:0+2s",
+                "4:0+4s",
+                "2:0+2s-2s",
+                "7:0+7s",
+                "4:0+4s-4s"), proc2.processed);
+    }
 }