You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/11/30 05:21:45 UTC
[kafka] branch trunk updated: KAFKA-9244: Update FK reference
should unsubscribe old FK (#7758)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ba02e8c KAFKA-9244: Update FK reference should unsubscribe old FK (#7758)
ba02e8c is described below
commit ba02e8c6b6802262646a7d6287c7a2c237be65fd
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Nov 29 21:21:06 2019 -0800
KAFKA-9244: Update FK reference should unsubscribe old FK (#7758)
Reviewers: Adam Bellemare <ad...@wishabi.com>, John Roesler <jo...@confluent.io>
---
.../SubscriptionStoreReceiveProcessorSupplier.java | 4 +-
.../KTableKTableForeignKeyJoinIntegrationTest.java | 71 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
index 0a86980..9cbeadd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
@@ -92,9 +92,9 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
final ValueAndTimestamp<SubscriptionWrapper<K>> newValue = ValueAndTimestamp.make(value, context().timestamp());
final ValueAndTimestamp<SubscriptionWrapper<K>> oldValue = store.get(subscriptionKey);
- //If the subscriptionWrapper hash indicates a null, must delete from statestore.
//This store is used by the prefix scanner in ForeignJoinSubscriptionProcessorSupplier
- if (value.getHash() == null) {
+ if (value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) ||
+ value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE)) {
store.delete(subscriptionKey);
} else {
store.put(subscriptionKey, newValue);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 80c0f52..746d6b3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -426,6 +426,77 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
}
}
+ @Test
+ public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() {
+ final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+ final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
+ final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
+ final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+ final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
+
+ // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference
+ // then populate update on RHS
+ right.pipeInput("rhs1", "rhsValue1");
+ right.pipeInput("rhs2", "rhsValue2");
+
+ assertThat(
+ outputTopic.readKeyValuesToMap(),
+ is(emptyMap())
+ );
+ assertThat(
+ asMap(store),
+ is(emptyMap())
+ );
+
+ left.pipeInput("lhs1", "lhsValue1|rhs1");
+ {
+ final Map<String, String> expected = mkMap(
+ mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+ );
+ assertThat(
+ outputTopic.readKeyValuesToMap(),
+ is(expected)
+ );
+ assertThat(
+ asMap(store),
+ is(expected)
+ );
+ }
+
+ // Change LHS foreign key reference
+ left.pipeInput("lhs1", "lhsValue1|rhs2");
+ {
+ final Map<String, String> expected = mkMap(
+ mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+ );
+ assertThat(
+ outputTopic.readKeyValuesToMap(),
+ is(expected)
+ );
+ assertThat(
+ asMap(store),
+ is(expected)
+ );
+ }
+
+ // Populate RHS update on old LHS foreign key ref
+ right.pipeInput("rhs1", "rhsValue1Delta");
+ {
+ assertThat(
+ outputTopic.readKeyValuesToMap(),
+ is(emptyMap())
+ );
+ assertThat(
+ asMap(store),
+ is(mkMap(
+ mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+ ))
+ );
+ }
+ }
+ }
+
private static Map<String, String> asMap(final KeyValueStore<String, String> store) {
final HashMap<String, String> result = new HashMap<>();
store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));