You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/05 17:03:26 UTC

kafka git commit: KAFKA-3817: KTableRepartitionMap publish old Change first, for non-count aggregates

Repository: kafka
Updated Branches:
  refs/heads/trunk 6fb33afff -> 3dafb81da


KAFKA-3817: KTableRepartitionMap publish old Change first, for non-count aggregates

I affirm that the contribution is my original work and that I license the work to the project under the project's open source license.

This cleans up misbehaviour that was introduce while fixing KAFKA-3817. It is impossible for a non-count aggregate to be build, when the addition happens before the removal. IMHO making sure that these details are correct is very important.

This PR has local test errors. It somehow fails the ResetIntegrationTest. It doesn't quite appear to me why but it looks like this PR breaks it, especially because the error appears with the ordering of the events. Still I am unable to find where I could have broken it. Maybe not seems to fail on trunk aswell.

Author: jfilipiak <Ja...@trivago.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1705 from Kaiserchen/KAFKA-3817-preserve-order-for-aggreagators


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3dafb81d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3dafb81d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3dafb81d

Branch: refs/heads/trunk
Commit: 3dafb81da788294d4c2e9811f49437608e5b9ce8
Parents: 6fb33af
Author: Jan Filipiak <Ja...@trivago.com>
Authored: Fri Aug 5 10:03:22 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Aug 5 10:03:22 2016 -0700

----------------------------------------------------------------------
 .../kstream/internals/KTableRepartitionMap.java |  9 ++-
 .../kstream/internals/KTableAggregateTest.java  | 81 +++++++++++++++++---
 2 files changed, 77 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3dafb81d/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index ac7c00e..939a1df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -81,12 +81,15 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
             KeyValue<K1, V1> oldPair = change.oldValue == null ? null : mapper.apply(key, change.oldValue);
 
             // if the selected repartition key or value is null, skip
-            if (newPair != null && newPair.key != null && newPair.value != null) {
-                context().forward(newPair.key, new Change<>(newPair.value, null));
-            }
+            // forward oldPair first, to be consistent with reduce and aggregate
             if (oldPair != null && oldPair.key != null && oldPair.value != null) {
                 context().forward(oldPair.key, new Change<>(null, oldPair.value));
             }
+
+            if (newPair != null && newPair.key != null && newPair.value != null) {
+                context().forward(newPair.key, new Change<>(newPair.value, null));
+            }
+            
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dafb81d/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 26e6a0f..a405da4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -34,6 +36,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+
 import java.io.File;
 import java.io.IOException;
 
@@ -91,12 +94,12 @@ public class KTableAggregateTest {
         assertEquals(Utils.mkList(
                 "A:0+1",
                 "B:0+2",
-                "A:0+1+3", "A:0+1+3-1",
-                "B:0+2+4", "B:0+2+4-2",
+                "A:0+1-1", "A:0+1-1+3",
+                "B:0+2-2", "B:0+2-2+4",
                 "C:0+5",
                 "D:0+6",
-                "B:0+2+4-2+7", "B:0+2+4-2+7-4",
-                "C:0+5+8", "C:0+5+8-5"), proc.processed);
+                "B:0+2-2+4-4", "B:0+2-2+4-4+7",
+                "C:0+5-5", "C:0+5-5+8"), proc.processed);
     }
 
     @Test
@@ -144,11 +147,12 @@ public class KTableAggregateTest {
                 "1:0+1",
                 "1:0+1-1",
                 "1:0+1-1+1",
-                "2:0+2",
-                "4:0+4",
-                "2:0+2-2",
-                "7:0+7",
-                "4:0+4-4"), proc.processed);
+                "2:0+2", 
+                  //noop
+                "2:0+2-2", "4:0+4",
+                  //noop
+                "4:0+4-4", "7:0+7"
+                ), proc.processed);
     }
 
     @Test
@@ -171,6 +175,63 @@ public class KTableAggregateTest {
         driver.process(input, "C", "yellow");
         driver.process(input, "D", "green");
 
-        assertEquals(Utils.mkList("green:1", "green:2", "blue:1", "green:1", "yellow:1", "green:2"), proc.processed);
+        assertEquals(Utils.mkList(
+                 "green:1",
+                 "green:2",
+                 "green:1", "blue:1",
+                 "yellow:1",
+                 "green:2"
+                 ), proc.processed);
+    }
+    
+    @Test
+    public void testRemoveOldBeforeAddNew() throws IOException {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final String input = "count-test-input";
+        final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
+
+        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+                .groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+
+                    @Override
+                    public KeyValue<String, String> apply(String key, String value) {
+                        return KeyValue.pair(String.valueOf(key.charAt(0)), String.valueOf(key.charAt(1)));
+                    }
+                }, stringSerde, stringSerde)
+                .aggregate(new Initializer<String>() {
+
+                    @Override
+                    public String apply() {
+                        return "";
+                    }
+                }, new Aggregator<String, String, String>() {
+                    
+                    @Override
+                    public String apply(String aggKey, String value, String aggregate) {
+                        return aggregate + value;
+                    } 
+                }, new Aggregator<String, String, String>() {
+
+                    @Override
+                    public String apply(String key, String value, String aggregate) {
+                        return aggregate.replaceAll(value, "");
+                    }
+                }, "someStore")
+                .toStream()
+                .process(proc);
+
+        final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir);
+
+        driver.process(input, "11", "A");
+        driver.process(input, "12", "B");
+        driver.process(input, "11", null);
+        driver.process(input, "12", "C");
+
+        assertEquals(Utils.mkList(
+                 "1:1",
+                 "1:12",
+                 "1:2",
+                 "1:", "1:2"
+                 ), proc.processed);
     }
 }