You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/02/12 04:47:55 UTC
[kafka] branch 2.5 updated: KAFKA-9500: Fix FK Join Topology (#8015)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 1679839 KAFKA-9500: Fix FK Join Topology (#8015)
1679839 is described below
commit 1679839cdef342585bc19572a89672757bbb6d05
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Tue Feb 11 22:38:05 2020 -0600
KAFKA-9500: Fix FK Join Topology (#8015)
Corrects a flaw leading to an exception while building topologies that include both:
* A foreign-key join with the result not explicitly materialized
* An operation after the join that requires source materialization
Also corrects a flaw in TopologyTestDriver leading to output records being enqueued in the wrong order under some (presumably rare) circumstances.
Cherry-pick of 1681c78f60a3a75e69e2222be0649cd61f02f042 from trunk
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
checkstyle/suppressions.xml | 2 +-
.../streams/kstream/internals/KTableImpl.java | 5 +-
.../streams/kstream/internals/KTableSource.java | 4 +
.../kstream/internals/graph/StreamToTableNode.java | 2 +-
.../internals/graph/TableProcessorNode.java | 11 +-
.../kstream/internals/graph/TableSourceNode.java | 2 +-
.../KTableKTableForeignKeyJoinIntegrationTest.java | 461 +++++++++++++--------
7 files changed, 316 insertions(+), 171 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 3b67c10..9b05f59 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -214,7 +214,7 @@
files="SmokeTestDriver.java"/>
<suppress checks="CyclomaticComplexity"
- files="KStreamKStreamJoinTest.java"/>
+ files="KStreamKStreamJoinTest.java|KTableKTableForeignKeyJoinIntegrationTest.java"/>
<suppress checks="CyclomaticComplexity"
files="RelationalSmokeTest.java|SmokeTestDriver.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index bc14eee..ddff497 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -1119,9 +1119,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
);
final StoreBuilder<TimestampedKeyValueStore<K, VR>> resultStore =
- materializedInternal.queryableStoreName() == null
- ? null
- : new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize();
+ new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize();
+
final TableProcessorNode<K, VR> resultNode = new TableProcessorNode<>(
resultProcessorName,
new ProcessorParameters<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 1106602..b9f3580 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -68,6 +68,10 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
this.queryableName = storeName;
}
+ public boolean materialized() {
+ return queryableName != null;
+ }
+
private class KTableSourceProcessor extends AbstractProcessor<K, V> {
private TimestampedKeyValueStore<K, V> store;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
index 99eed1e..adf1683 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
@@ -60,7 +60,7 @@ public class StreamToTableNode<K, V> extends StreamsGraphNode {
final KTableSource<K, V> ktableSource = (KTableSource<K, V>) processorParameters.processorSupplier();
topologyBuilder.addProcessor(processorName, processorParameters.processorSupplier(), parentNodeNames());
- if (storeBuilder != null && ktableSource.queryableName() != null) {
+ if (storeBuilder != null && ktableSource.materialized()) {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index 6fc5e25..34a0e17 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -17,11 +17,13 @@
package org.apache.kafka.streams.kstream.internals.graph;
+import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import java.util.Arrays;
+import java.util.Objects;
public class TableProcessorNode<K, V> extends StreamsGraphNode {
@@ -37,6 +39,7 @@ public class TableProcessorNode<K, V> extends StreamsGraphNode {
public TableProcessorNode(final String nodeName,
final ProcessorParameters<K, V> processorParameters,
+ // TODO KIP-300: we are enforcing this as a keyvalue store, but it should go beyond any type of stores
final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder,
final String[] storeNames) {
super(nodeName);
@@ -64,8 +67,12 @@ public class TableProcessorNode<K, V> extends StreamsGraphNode {
topologyBuilder.connectProcessorAndStateStores(processorName, storeNames);
}
- // TODO: we are enforcing this as a keyvalue store, but it should go beyond any type of stores
- if (storeBuilder != null) {
+ if (processorParameters.processorSupplier() instanceof KTableSource) {
+ if (((KTableSource<?, ?>) processorParameters.processorSupplier()).materialized()) {
+ topologyBuilder.addStateStore(Objects.requireNonNull(storeBuilder, "storeBuilder was null"),
+ processorName);
+ }
+ } else if (storeBuilder != null) {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index cf5d70a..10710a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -109,7 +109,7 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
// only add state store if the source KTable should be materialized
final KTableSource<K, V> ktableSource = (KTableSource<K, V>) processorParameters.processorSupplier();
- if (ktableSource.queryableName() != null) {
+ if (ktableSource.materialized()) {
topologyBuilder.addStateStore(storeBuilder, nodeName());
if (shouldReuseSourceTopicForChangelog) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index db0e2de..3900d70 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -41,6 +41,8 @@ import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
@@ -59,10 +61,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
private static final String LEFT_TABLE = "left_table";
private static final String RIGHT_TABLE = "right_table";
private static final String OUTPUT = "output-topic";
+ private static final String REJOIN_OUTPUT = "rejoin-output-topic";
private final Properties streamsConfig;
private final boolean leftJoin;
+ private final boolean materialized;
+ private final boolean rejoin;
- public KTableKTableForeignKeyJoinIntegrationTest(final boolean leftJoin, final String optimization) {
+ public KTableKTableForeignKeyJoinIntegrationTest(final boolean leftJoin,
+ final String optimization,
+ final boolean materialized,
+ final boolean rejoin) {
this.leftJoin = leftJoin;
streamsConfig = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey"),
@@ -70,25 +78,49 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization)
));
+ this.materialized = materialized;
+ this.rejoin = rejoin;
}
- @Parameterized.Parameters(name = "leftJoin={0}, optimization={1}")
+ @Parameterized.Parameters(name = "leftJoin={0}, optimization={1}, materialized={2}, rejoin={3}")
public static Collection<Object[]> data() {
- return Arrays.asList(
- new Object[] {false, StreamsConfig.OPTIMIZE},
- new Object[] {false, StreamsConfig.NO_OPTIMIZATION},
- new Object[] {true, StreamsConfig.OPTIMIZE},
- new Object[] {true, StreamsConfig.NO_OPTIMIZATION}
- );
+ final List<Boolean> booleans = Arrays.asList(true, false);
+ final List<String> optimizations = Arrays.asList(StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION);
+ return buildParameters(booleans, optimizations, booleans, booleans);
+ }
+
+ private static Collection<Object[]> buildParameters(final List<?>... argOptions) {
+ List<Object[]> result = new LinkedList<>();
+ result.add(new Object[0]);
+
+ for (final List<?> argOption : argOptions) {
+ result = times(result, argOption);
+ }
+
+ return result;
+ }
+
+ private static List<Object[]> times(final List<Object[]> left, final List<?> right) {
+ final List<Object[]> result = new LinkedList<>();
+ for (final Object[] args : left) {
+ for (final Object rightElem : right) {
+ final Object[] resArgs = new Object[args.length + 1];
+ System.arraycopy(args, 0, resArgs, 0, args.length);
+ resArgs[args.length] = rightElem;
+ result.add(resArgs);
+ }
+ }
+ return result;
}
@Test
public void doJoinFromLeftThenDeleteLeftEntity() {
- final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+ final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+ final TestOutputTopic<String, String> rejoinOutputTopic = rejoin ? driver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new StringDeserializer()) : null;
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
// Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records
@@ -100,10 +132,18 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
- assertThat(
- asMap(store),
- is(emptyMap())
- );
+ if (rejoin) {
+ assertThat(
+ rejoinOutputTopic.readKeyValuesToMap(),
+ is(emptyMap())
+ );
+ }
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(emptyMap())
+ );
+ }
left.pipeInput("lhs1", "lhsValue1|rhs1");
left.pipeInput("lhs2", "lhsValue2|rhs2");
@@ -117,10 +157,21 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(expected)
);
- assertThat(
- asMap(store),
- is(expected)
- );
+ if (rejoin) {
+ assertThat(
+ rejoinOutputTopic.readKeyValuesToMap(),
+ is(mkMap(
+ mkEntry("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
+ mkEntry("lhs2", "rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
+ ))
+ );
+ }
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(expected)
+ );
+ }
}
// Add another reference to an existing FK
@@ -132,14 +183,24 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
))
);
- assertThat(
- asMap(store),
- is(mkMap(
- mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
- mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
- mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
- ))
- );
+ if (rejoin) {
+ assertThat(
+ rejoinOutputTopic.readKeyValuesToMap(),
+ is(mkMap(
+ mkEntry("lhs3", "rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
+ ))
+ );
+ }
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(mkMap(
+ mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+ mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+ mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+ ))
+ );
+ }
}
// Now delete one LHS entity such that one delete is propagated down to the output.
@@ -150,19 +211,29 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
mkEntry("lhs1", null)
))
);
- assertThat(
- asMap(store),
- is(mkMap(
- mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
- mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
- ))
- );
+ if (rejoin) {
+ assertThat(
+ rejoinOutputTopic.readKeyValuesToMap(),
+ is(mkMap(
+ mkEntry("lhs1", null)
+ ))
+ );
+ }
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(mkMap(
+ mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+ mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+ ))
+ );
+ }
}
}
@Test
public void doJoinFromRightThenDeleteRightEntity() {
- final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+ final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
@@ -183,15 +254,17 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
: emptyMap()
)
);
- assertThat(
- asMap(store),
- is(leftJoin
- ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
- mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
- mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
- : emptyMap()
- )
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(leftJoin
+ ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
+ mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
+ mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
+ : emptyMap()
+ )
+ );
+ }
right.pipeInput("rhs1", "rhsValue1");
@@ -201,17 +274,19 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
)
);
- assertThat(
- asMap(store),
- is(leftJoin
- ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
- mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
- mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
-
- : mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
- mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
- )
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(leftJoin
+ ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+ mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
+ mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+
+ : mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+ mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+ )
+ );
+ }
right.pipeInput("rhs2", "rhsValue2");
@@ -219,13 +294,15 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")))
);
- assertThat(
- asMap(store),
- is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
- mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
- mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
- )
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+ mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+ mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+ )
+ );
+ }
right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results
@@ -233,13 +310,15 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
- assertThat(
- asMap(store),
- is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
- mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
- mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
- )
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+ mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+ mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+ )
+ );
+ }
// Now delete the RHS entity such that all matching keys have deletes propagated.
right.pipeInput("rhs1", (String) null);
@@ -250,22 +329,24 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
mkEntry("lhs3", leftJoin ? "(lhsValue3|rhs1,null)" : null))
)
);
- assertThat(
- asMap(store),
- is(leftJoin
- ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
- mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
- mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(leftJoin
+ ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
+ mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+ mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
- : mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"))
- )
- );
+ : mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"))
+ )
+ );
+ }
}
}
@Test
public void shouldEmitTombstoneWhenDeletingNonJoiningRecords() {
- final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+ final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
@@ -280,10 +361,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(expected)
);
- assertThat(
- asMap(store),
- is(expected)
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(expected)
+ );
+ }
}
// Deleting a non-joining record produces an unnecessary tombstone for inner joins, because
@@ -295,10 +378,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(mkMap(mkEntry("lhs1", null)))
);
- assertThat(
- asMap(store),
- is(emptyMap())
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(emptyMap())
+ );
+ }
}
// Deleting a non-existing record is idempotent
@@ -308,17 +393,19 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
- assertThat(
- asMap(store),
- is(emptyMap())
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(emptyMap())
+ );
+ }
}
}
}
@Test
public void shouldNotEmitTombstonesWhenDeletingNonExistingRecords() {
- final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+ final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
@@ -331,17 +418,19 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
- assertThat(
- asMap(store),
- is(emptyMap())
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(emptyMap())
+ );
+ }
}
}
}
@Test
public void joinShouldProduceNullsWhenValueHasNonMatchingForeignKey() {
- final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+ final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
@@ -355,10 +444,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)")) : emptyMap())
);
- assertThat(
- asMap(store),
- is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)")) : emptyMap())
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)")) : emptyMap())
+ );
+ }
// "moving" our subscription to another non-existent FK results in an unnecessary tombstone for inner join,
// since it impossible to know whether the prior FK existed or not (and thus whether any results have
// previously been emitted)
@@ -368,20 +459,24 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)))
);
- assertThat(
- asMap(store),
- is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) : emptyMap())
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) : emptyMap())
+ );
+ }
// of course, moving it again to yet another non-existent FK has the same effect
left.pipeInput("lhs1", "lhsValue1|rhs3");
assertThat(
outputTopic.readKeyValuesToMap(),
is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null)))
);
- assertThat(
- asMap(store),
- is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) : emptyMap())
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) : emptyMap())
+ );
+ }
// Adding an RHS record now, so that we can demonstrate "moving" from a non-existent FK to an existent one
// This RHS key was previously referenced, but it's not referenced now, so adding this record should
@@ -391,10 +486,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
- assertThat(
- asMap(store),
- is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) : emptyMap())
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) : emptyMap())
+ );
+ }
// now, we change to a FK that exists, and see the join completes
left.pipeInput("lhs1", "lhsValue1|rhs1");
@@ -404,12 +501,14 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
))
);
- assertThat(
- asMap(store),
- is(mkMap(
- mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
- ))
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(mkMap(
+ mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+ ))
+ );
+ }
// but if we update it again to a non-existent one, we'll get a tombstone for the inner join, and the
// left join updates appropriately.
@@ -420,16 +519,18 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)
))
);
- assertThat(
- asMap(store),
- is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) : emptyMap())
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) : emptyMap())
+ );
+ }
}
}
@Test
public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() {
- final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+ final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
@@ -445,10 +546,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
- assertThat(
- asMap(store),
- is(emptyMap())
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(emptyMap())
+ );
+ }
left.pipeInput("lhs1", "lhsValue1|rhs1");
{
@@ -459,10 +562,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(expected)
);
- assertThat(
- asMap(store),
- is(expected)
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(expected)
+ );
+ }
}
// Change LHS foreign key reference
@@ -475,10 +580,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(expected)
);
- assertThat(
- asMap(store),
- is(expected)
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(expected)
+ );
+ }
}
// Populate RHS update on old LHS foreign key ref
@@ -488,12 +595,14 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
- assertThat(
- asMap(store),
- is(mkMap(
- mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
- ))
- );
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(mkMap(
+ mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+ ))
+ );
+ }
}
}
}
@@ -506,7 +615,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
private static Topology getTopology(final Properties streamsConfig,
final String queryableStoreName,
- final boolean leftJoin) {
+ final boolean leftJoin,
+ final boolean rejoin) {
final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
final StreamsBuilder builder = new StreamsBuilder();
@@ -523,33 +633,58 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final Function<String, String> extractor = value -> value.split("\\|")[1];
final ValueJoiner<String, String, String> joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")";
+ final ValueJoiner<String, String, String> rejoiner = rejoin ? (value1, value2) -> "rejoin(" + value1 + "," + value2 + ")" : null;
+
+ // the cache suppresses some of the unnecessary tombstones we want to make assertions about
+ final Materialized<String, String, KeyValueStore<Bytes, byte[]>> mainMaterialized =
+ queryableStoreName == null ?
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(
+ null,
+ serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)
+ ).withCachingDisabled() :
+ Materialized.<String, String>as(Stores.inMemoryKeyValueStore(queryableStoreName))
+ .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+ .withCachingDisabled();
+
+ final Materialized<String, String, KeyValueStore<Bytes, byte[]>> rejoinMaterialized =
+ !rejoin ? null :
+ queryableStoreName == null ?
+ Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) :
+ // not actually going to query this store, but we need to force materialization here
+ // to really test this confuguration
+ Materialized.<String, String>as(Stores.inMemoryKeyValueStore(queryableStoreName + "-rejoin"))
+ .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+ // the cache suppresses some of the unnecessary tombstones we want to make assertions about
+ .withCachingDisabled();
- final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized =
- Materialized.<String, String>as(Stores.inMemoryKeyValueStore(queryableStoreName))
- .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
- // the cache suppresses some of the unnecessary tombstones we want to make assertions about
- .withCachingDisabled();
-
- final KTable<String, String> joinResult;
if (leftJoin) {
- joinResult = left.leftJoin(
- right,
- extractor,
- joiner,
- materialized
- );
+ final KTable<String, String> fkJoin =
+ left.leftJoin(right, extractor, joiner, mainMaterialized);
+
+ fkJoin.toStream()
+ .to(OUTPUT);
+
+ // also make sure the FK join is set up right for downstream operations that require materialization
+ if (rejoin) {
+ fkJoin.leftJoin(left, rejoiner, rejoinMaterialized)
+ .toStream()
+ .to(REJOIN_OUTPUT);
+ }
} else {
- joinResult = left.join(
- right,
- extractor,
- joiner,
- materialized
- );
+ final KTable<String, String> fkJoin = left.join(right, extractor, joiner, mainMaterialized);
+
+ fkJoin
+ .toStream()
+ .to(OUTPUT);
+
+ // also make sure the FK join is set up right for downstream operations that require materialization
+ if (rejoin) {
+ fkJoin.join(left, rejoiner, rejoinMaterialized)
+ .toStream()
+ .to(REJOIN_OUTPUT);
+ }
}
- joinResult
- .toStream()
- .to(OUTPUT);
return builder.build(streamsConfig);
}