You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/11/30 05:21:45 UTC

[kafka] branch trunk updated: KAFKA-9244: Update FK reference should unsubscribe old FK (#7758)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ba02e8c  KAFKA-9244: Update FK reference should unsubscribe old FK (#7758)
ba02e8c is described below

commit ba02e8c6b6802262646a7d6287c7a2c237be65fd
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Nov 29 21:21:06 2019 -0800

    KAFKA-9244: Update FK reference should unsubscribe old FK (#7758)
    
    Reviewers: Adam Bellemare <ad...@wishabi.com>, John Roesler <jo...@confluent.io>
---
 .../SubscriptionStoreReceiveProcessorSupplier.java |  4 +-
 .../KTableKTableForeignKeyJoinIntegrationTest.java | 71 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
index 0a86980..9cbeadd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
@@ -92,9 +92,9 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
                 final ValueAndTimestamp<SubscriptionWrapper<K>> newValue = ValueAndTimestamp.make(value, context().timestamp());
                 final ValueAndTimestamp<SubscriptionWrapper<K>> oldValue = store.get(subscriptionKey);
 
-                //If the subscriptionWrapper hash indicates a null, must delete from statestore.
                 //This store is used by the prefix scanner in ForeignJoinSubscriptionProcessorSupplier
-                if (value.getHash() == null) {
+                if (value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) ||
+                    value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE)) {
                     store.delete(subscriptionKey);
                 } else {
                     store.put(subscriptionKey, newValue);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 80c0f52..746d6b3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -426,6 +426,77 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         }
     }
 
+    @Test
+    public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() {
+        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+            final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
+
+            // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference
+            // then populate update on RHS
+            right.pipeInput("rhs1", "rhsValue1");
+            right.pipeInput("rhs2", "rhsValue2");
+
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(emptyMap())
+            );
+            assertThat(
+                asMap(store),
+                is(emptyMap())
+            );
+
+            left.pipeInput("lhs1", "lhsValue1|rhs1");
+            {
+                final Map<String, String> expected = mkMap(
+                    mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+                );
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(expected)
+                );
+                assertThat(
+                    asMap(store),
+                    is(expected)
+                );
+            }
+
+            // Change LHS foreign key reference
+            left.pipeInput("lhs1", "lhsValue1|rhs2");
+            {
+                final Map<String, String> expected = mkMap(
+                    mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+                );
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(expected)
+                );
+                assertThat(
+                    asMap(store),
+                    is(expected)
+                );
+            }
+
+            // Populate RHS update on old LHS foreign key ref
+            right.pipeInput("rhs1", "rhsValue1Delta");
+            {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(emptyMap())
+                );
+                assertThat(
+                    asMap(store),
+                    is(mkMap(
+                        mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+                    ))
+                );
+            }
+        }
+    }
+
     private static Map<String, String> asMap(final KeyValueStore<String, String> store) {
         final HashMap<String, String> result = new HashMap<>();
         store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));