You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/02/12 04:47:55 UTC

[kafka] branch 2.5 updated: KAFKA-9500: Fix FK Join Topology (#8015)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 1679839  KAFKA-9500: Fix FK Join Topology (#8015)
1679839 is described below

commit 1679839cdef342585bc19572a89672757bbb6d05
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Tue Feb 11 22:38:05 2020 -0600

    KAFKA-9500: Fix FK Join Topology (#8015)
    
    Corrects a flaw leading to an exception while building topologies that include both:
    
    * A foreign-key join with the result not explicitly materialized
    * An operation after the join that requires source materialization
    
    Also corrects a flaw in TopologyTestDriver leading to output records being enqueued in the wrong order under some (presumably rare) circumstances.
    
    Cherry-pick of 1681c78f60a3a75e69e2222be0649cd61f02f042 from trunk
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../streams/kstream/internals/KTableImpl.java      |   5 +-
 .../streams/kstream/internals/KTableSource.java    |   4 +
 .../kstream/internals/graph/StreamToTableNode.java |   2 +-
 .../internals/graph/TableProcessorNode.java        |  11 +-
 .../kstream/internals/graph/TableSourceNode.java   |   2 +-
 .../KTableKTableForeignKeyJoinIntegrationTest.java | 461 +++++++++++++--------
 7 files changed, 316 insertions(+), 171 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 3b67c10..9b05f59 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -214,7 +214,7 @@
               files="SmokeTestDriver.java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="KStreamKStreamJoinTest.java"/>
+              files="KStreamKStreamJoinTest.java|KTableKTableForeignKeyJoinIntegrationTest.java"/>
     <suppress checks="CyclomaticComplexity"
               files="RelationalSmokeTest.java|SmokeTestDriver.java"/>
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index bc14eee..ddff497 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -1119,9 +1119,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         );
 
         final StoreBuilder<TimestampedKeyValueStore<K, VR>> resultStore =
-            materializedInternal.queryableStoreName() == null
-                ? null
-                : new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize();
+            new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize();
+
         final TableProcessorNode<K, VR> resultNode = new TableProcessorNode<>(
             resultProcessorName,
             new ProcessorParameters<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 1106602..b9f3580 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -68,6 +68,10 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
         this.queryableName = storeName;
     }
 
+    public boolean materialized() {
+        return queryableName != null;
+    }
+
     private class KTableSourceProcessor extends AbstractProcessor<K, V> {
 
         private TimestampedKeyValueStore<K, V> store;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
index 99eed1e..adf1683 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
@@ -60,7 +60,7 @@ public class StreamToTableNode<K, V> extends StreamsGraphNode {
         final KTableSource<K, V> ktableSource = (KTableSource<K, V>) processorParameters.processorSupplier();
         topologyBuilder.addProcessor(processorName, processorParameters.processorSupplier(), parentNodeNames());
 
-        if (storeBuilder != null && ktableSource.queryableName() != null) {
+        if (storeBuilder != null && ktableSource.materialized()) {
             topologyBuilder.addStateStore(storeBuilder, processorName);
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index 6fc5e25..34a0e17 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -17,11 +17,13 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
+import org.apache.kafka.streams.kstream.internals.KTableSource;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
 import java.util.Arrays;
+import java.util.Objects;
 
 public class TableProcessorNode<K, V> extends StreamsGraphNode {
 
@@ -37,6 +39,7 @@ public class TableProcessorNode<K, V> extends StreamsGraphNode {
 
     public TableProcessorNode(final String nodeName,
                               final ProcessorParameters<K, V> processorParameters,
+                              // TODO KIP-300: we are enforcing this as a keyvalue store, but it should go beyond any type of stores
                               final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder,
                               final String[] storeNames) {
         super(nodeName);
@@ -64,8 +67,12 @@ public class TableProcessorNode<K, V> extends StreamsGraphNode {
             topologyBuilder.connectProcessorAndStateStores(processorName, storeNames);
         }
 
-        // TODO: we are enforcing this as a keyvalue store, but it should go beyond any type of stores
-        if (storeBuilder != null) {
+        if (processorParameters.processorSupplier() instanceof KTableSource) {
+            if (((KTableSource<?, ?>) processorParameters.processorSupplier()).materialized()) {
+                topologyBuilder.addStateStore(Objects.requireNonNull(storeBuilder, "storeBuilder was null"),
+                                              processorName);
+            }
+        } else if (storeBuilder != null) {
             topologyBuilder.addStateStore(storeBuilder, processorName);
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index cf5d70a..10710a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -109,7 +109,7 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
 
             // only add state store if the source KTable should be materialized
             final KTableSource<K, V> ktableSource = (KTableSource<K, V>) processorParameters.processorSupplier();
-            if (ktableSource.queryableName() != null) {
+            if (ktableSource.materialized()) {
                 topologyBuilder.addStateStore(storeBuilder, nodeName());
 
                 if (shouldReuseSourceTopicForChangelog) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index db0e2de..3900d70 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -41,6 +41,8 @@ import org.junit.runners.Parameterized;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.function.Function;
@@ -59,10 +61,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
     private static final String LEFT_TABLE = "left_table";
     private static final String RIGHT_TABLE = "right_table";
     private static final String OUTPUT = "output-topic";
+    private static final String REJOIN_OUTPUT = "rejoin-output-topic";
     private final Properties streamsConfig;
     private final boolean leftJoin;
+    private final boolean materialized;
+    private final boolean rejoin;
 
-    public KTableKTableForeignKeyJoinIntegrationTest(final boolean leftJoin, final String optimization) {
+    public KTableKTableForeignKeyJoinIntegrationTest(final boolean leftJoin,
+                                                     final String optimization,
+                                                     final boolean materialized,
+                                                     final boolean rejoin) {
         this.leftJoin = leftJoin;
         streamsConfig = mkProperties(mkMap(
             mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey"),
@@ -70,25 +78,49 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
             mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization)
         ));
+        this.materialized = materialized;
+        this.rejoin = rejoin;
     }
 
-    @Parameterized.Parameters(name = "leftJoin={0}, optimization={1}")
+    @Parameterized.Parameters(name = "leftJoin={0}, optimization={1}, materialized={2}, rejoin={3}")
     public static Collection<Object[]> data() {
-        return Arrays.asList(
-            new Object[] {false, StreamsConfig.OPTIMIZE},
-            new Object[] {false, StreamsConfig.NO_OPTIMIZATION},
-            new Object[] {true, StreamsConfig.OPTIMIZE},
-            new Object[] {true, StreamsConfig.NO_OPTIMIZATION}
-        );
+        final List<Boolean> booleans = Arrays.asList(true, false);
+        final List<String> optimizations = Arrays.asList(StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION);
+        return buildParameters(booleans, optimizations, booleans, booleans);
+    }
+
+    private static Collection<Object[]> buildParameters(final List<?>... argOptions) {
+        List<Object[]> result = new LinkedList<>();
+        result.add(new Object[0]);
+
+        for (final List<?> argOption : argOptions) {
+            result = times(result, argOption);
+        }
+
+        return result;
+    }
+
+    private static List<Object[]> times(final List<Object[]> left, final List<?> right) {
+        final List<Object[]> result = new LinkedList<>();
+        for (final Object[] args : left) {
+            for (final Object rightElem : right) {
+                final Object[] resArgs = new Object[args.length + 1];
+                System.arraycopy(args, 0, resArgs, 0, args.length);
+                resArgs[args.length] = rightElem;
+                result.add(resArgs);
+            }
+        }
+        return result;
     }
 
     @Test
     public void doJoinFromLeftThenDeleteLeftEntity() {
-        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
             final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+            final TestOutputTopic<String, String> rejoinOutputTopic = rejoin ? driver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new StringDeserializer()) : null;
             final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
 
             // Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records
@@ -100,10 +132,18 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 outputTopic.readKeyValuesToMap(),
                 is(emptyMap())
             );
-            assertThat(
-                asMap(store),
-                is(emptyMap())
-            );
+            if (rejoin) {
+                assertThat(
+                    rejoinOutputTopic.readKeyValuesToMap(),
+                    is(emptyMap())
+                );
+            }
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(emptyMap())
+                );
+            }
 
             left.pipeInput("lhs1", "lhsValue1|rhs1");
             left.pipeInput("lhs2", "lhsValue2|rhs2");
@@ -117,10 +157,21 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     outputTopic.readKeyValuesToMap(),
                     is(expected)
                 );
-                assertThat(
-                    asMap(store),
-                    is(expected)
-                );
+                if (rejoin) {
+                    assertThat(
+                        rejoinOutputTopic.readKeyValuesToMap(),
+                        is(mkMap(
+                            mkEntry("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
+                            mkEntry("lhs2", "rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
+                        ))
+                    );
+                }
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(expected)
+                    );
+                }
             }
 
             // Add another reference to an existing FK
@@ -132,14 +183,24 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                         mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
                     ))
                 );
-                assertThat(
-                    asMap(store),
-                    is(mkMap(
-                        mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                        mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
-                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
-                    ))
-                );
+                if (rejoin) {
+                    assertThat(
+                        rejoinOutputTopic.readKeyValuesToMap(),
+                        is(mkMap(
+                            mkEntry("lhs3", "rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
+                        ))
+                    );
+                }
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(mkMap(
+                            mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                            mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                            mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                        ))
+                    );
+                }
             }
             // Now delete one LHS entity such that one delete is propagated down to the output.
 
@@ -150,19 +211,29 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     mkEntry("lhs1", null)
                 ))
             );
-            assertThat(
-                asMap(store),
-                is(mkMap(
-                    mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
-                    mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
-                ))
-            );
+            if (rejoin) {
+                assertThat(
+                    rejoinOutputTopic.readKeyValuesToMap(),
+                    is(mkMap(
+                        mkEntry("lhs1", null)
+                    ))
+                );
+            }
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(mkMap(
+                        mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                    ))
+                );
+            }
         }
     }
 
     @Test
     public void doJoinFromRightThenDeleteRightEntity() {
-        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
@@ -183,15 +254,17 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                        : emptyMap()
                 )
             );
-            assertThat(
-                asMap(store),
-                is(leftJoin
-                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
-                               mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
-                               mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
-                       : emptyMap()
-                )
-            );
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(leftJoin
+                           ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
+                                   mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
+                                   mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
+                           : emptyMap()
+                    )
+                );
+            }
 
             right.pipeInput("rhs1", "rhsValue1");
 
@@ -201,17 +274,19 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                          mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
                 )
             );
-            assertThat(
-                asMap(store),
-                is(leftJoin
-                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                               mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
-                               mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
-
-                       : mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                               mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
-                )
-            );
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(leftJoin
+                           ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                                   mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
+                                   mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+
+                           : mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                                   mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+                    )
+                );
+            }
 
             right.pipeInput("rhs2", "rhsValue2");
 
@@ -219,13 +294,15 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 outputTopic.readKeyValuesToMap(),
                 is(mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")))
             );
-            assertThat(
-                asMap(store),
-                is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                         mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
-                         mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
-                )
-            );
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                             mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                             mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+                    )
+                );
+            }
 
             right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results
 
@@ -233,13 +310,15 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 outputTopic.readKeyValuesToMap(),
                 is(emptyMap())
             );
-            assertThat(
-                asMap(store),
-                is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                         mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
-                         mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
-                )
-            );
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                             mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                             mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+                    )
+                );
+            }
 
             // Now delete the RHS entity such that all matching keys have deletes propagated.
             right.pipeInput("rhs1", (String) null);
@@ -250,22 +329,24 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                          mkEntry("lhs3", leftJoin ? "(lhsValue3|rhs1,null)" : null))
                 )
             );
-            assertThat(
-                asMap(store),
-                is(leftJoin
-                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
-                               mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
-                               mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(leftJoin
+                           ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
+                                   mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                                   mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
 
-                       : mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"))
-                )
-            );
+                           : mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"))
+                    )
+                );
+            }
         }
     }
 
     @Test
     public void shouldEmitTombstoneWhenDeletingNonJoiningRecords() {
-        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
             final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
@@ -280,10 +361,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     outputTopic.readKeyValuesToMap(),
                     is(expected)
                 );
-                assertThat(
-                    asMap(store),
-                    is(expected)
-                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(expected)
+                    );
+                }
             }
 
             // Deleting a non-joining record produces an unnecessary tombstone for inner joins, because
@@ -295,10 +378,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     outputTopic.readKeyValuesToMap(),
                     is(mkMap(mkEntry("lhs1", null)))
                 );
-                assertThat(
-                    asMap(store),
-                    is(emptyMap())
-                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(emptyMap())
+                    );
+                }
             }
 
             // Deleting a non-existing record is idempotent
@@ -308,17 +393,19 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     outputTopic.readKeyValuesToMap(),
                     is(emptyMap())
                 );
-                assertThat(
-                    asMap(store),
-                    is(emptyMap())
-                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(emptyMap())
+                    );
+                }
             }
         }
     }
 
     @Test
     public void shouldNotEmitTombstonesWhenDeletingNonExistingRecords() {
-        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
             final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
@@ -331,17 +418,19 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     outputTopic.readKeyValuesToMap(),
                     is(emptyMap())
                 );
-                assertThat(
-                    asMap(store),
-                    is(emptyMap())
-                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(emptyMap())
+                    );
+                }
             }
         }
     }
 
     @Test
     public void joinShouldProduceNullsWhenValueHasNonMatchingForeignKey() {
-        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
@@ -355,10 +444,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 outputTopic.readKeyValuesToMap(),
                 is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)")) : emptyMap())
             );
-            assertThat(
-                asMap(store),
-                is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)")) : emptyMap())
-            );
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)")) : emptyMap())
+                );
+            }
             // "moving" our subscription to another non-existent FK results in an unnecessary tombstone for inner join,
             // since it impossible to know whether the prior FK existed or not (and thus whether any results have
             // previously been emitted)
@@ -368,20 +459,24 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 outputTopic.readKeyValuesToMap(),
                 is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)))
             );
-            assertThat(
-                asMap(store),
-                is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) : emptyMap())
-            );
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) : emptyMap())
+                );
+            }
             // of course, moving it again to yet another non-existent FK has the same effect
             left.pipeInput("lhs1", "lhsValue1|rhs3");
             assertThat(
                 outputTopic.readKeyValuesToMap(),
                 is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null)))
             );
-            assertThat(
-                asMap(store),
-                is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) : emptyMap())
-            );
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) : emptyMap())
+                );
+            }
 
             // Adding an RHS record now, so that we can demonstrate "moving" from a non-existent FK to an existent one
             // This RHS key was previously referenced, but it's not referenced now, so adding this record should
@@ -391,10 +486,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 outputTopic.readKeyValuesToMap(),
                 is(emptyMap())
             );
-            assertThat(
-                asMap(store),
-                is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) : emptyMap())
-            );
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) : emptyMap())
+                );
+            }
 
             // now, we change to a FK that exists, and see the join completes
             left.pipeInput("lhs1", "lhsValue1|rhs1");
@@ -404,12 +501,14 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
                 ))
             );
-            assertThat(
-                asMap(store),
-                is(mkMap(
-                    mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
-                ))
-            );
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(mkMap(
+                        mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+                    ))
+                );
+            }
 
             // but if we update it again to a non-existent one, we'll get a tombstone for the inner join, and the
             // left join updates appropriately.
@@ -420,16 +519,18 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)
                 ))
             );
-            assertThat(
-                asMap(store),
-                is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) : emptyMap())
-            );
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) : emptyMap())
+                );
+            }
         }
     }
 
     @Test
     public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() {
-        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
+        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
@@ -445,10 +546,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 outputTopic.readKeyValuesToMap(),
                 is(emptyMap())
             );
-            assertThat(
-                asMap(store),
-                is(emptyMap())
-            );
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(emptyMap())
+                );
+            }
 
             left.pipeInput("lhs1", "lhsValue1|rhs1");
             {
@@ -459,10 +562,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     outputTopic.readKeyValuesToMap(),
                     is(expected)
                 );
-                assertThat(
-                    asMap(store),
-                    is(expected)
-                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(expected)
+                    );
+                }
             }
 
             // Change LHS foreign key reference
@@ -475,10 +580,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     outputTopic.readKeyValuesToMap(),
                     is(expected)
                 );
-                assertThat(
-                    asMap(store),
-                    is(expected)
-                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(expected)
+                    );
+                }
             }
 
             // Populate RHS update on old LHS foreign key ref
@@ -488,12 +595,14 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     outputTopic.readKeyValuesToMap(),
                     is(emptyMap())
                 );
-                assertThat(
-                    asMap(store),
-                    is(mkMap(
-                        mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
-                    ))
-                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(mkMap(
+                            mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+                        ))
+                    );
+                }
             }
         }
     }
@@ -506,7 +615,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
 
     private static Topology getTopology(final Properties streamsConfig,
                                         final String queryableStoreName,
-                                        final boolean leftJoin) {
+                                        final boolean leftJoin,
+                                        final boolean rejoin) {
         final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
         final StreamsBuilder builder = new StreamsBuilder();
 
@@ -523,33 +633,58 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
 
         final Function<String, String> extractor = value -> value.split("\\|")[1];
         final ValueJoiner<String, String, String> joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")";
+        final ValueJoiner<String, String, String> rejoiner = rejoin ? (value1, value2) -> "rejoin(" + value1 + "," + value2 + ")" : null;
+
+        // the cache suppresses some of the unnecessary tombstones we want to make assertions about
+        final Materialized<String, String, KeyValueStore<Bytes, byte[]>> mainMaterialized =
+            queryableStoreName == null ?
+                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(
+                    null,
+                    serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)
+                ).withCachingDisabled() :
+                Materialized.<String, String>as(Stores.inMemoryKeyValueStore(queryableStoreName))
+                    .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+                    .withCachingDisabled();
+
+        final Materialized<String, String, KeyValueStore<Bytes, byte[]>> rejoinMaterialized =
+            !rejoin ? null :
+                queryableStoreName == null ?
+                    Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) :
+                    // not actually going to query this store, but we need to force materialization here
+                    // to really test this confuguration
+                    Materialized.<String, String>as(Stores.inMemoryKeyValueStore(queryableStoreName + "-rejoin"))
+                        .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+                        // the cache suppresses some of the unnecessary tombstones we want to make assertions about
+                        .withCachingDisabled();
 
-        final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized =
-            Materialized.<String, String>as(Stores.inMemoryKeyValueStore(queryableStoreName))
-                .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
-                // the cache suppresses some of the unnecessary tombstones we want to make assertions about
-                .withCachingDisabled();
-
-        final KTable<String, String> joinResult;
         if (leftJoin) {
-            joinResult = left.leftJoin(
-                right,
-                extractor,
-                joiner,
-                materialized
-            );
+            final KTable<String, String> fkJoin =
+                left.leftJoin(right, extractor, joiner, mainMaterialized);
+
+            fkJoin.toStream()
+                  .to(OUTPUT);
+
+            // also make sure the FK join is set up right for downstream operations that require materialization
+            if (rejoin) {
+                fkJoin.leftJoin(left, rejoiner, rejoinMaterialized)
+                      .toStream()
+                      .to(REJOIN_OUTPUT);
+            }
         } else {
-            joinResult = left.join(
-                right,
-                extractor,
-                joiner,
-                materialized
-            );
+            final KTable<String, String> fkJoin = left.join(right, extractor, joiner, mainMaterialized);
+
+            fkJoin
+                .toStream()
+                .to(OUTPUT);
+
+            // also make sure the FK join is set up right for downstream operations that require materialization
+            if (rejoin) {
+                fkJoin.join(left, rejoiner, rejoinMaterialized)
+                      .toStream()
+                      .to(REJOIN_OUTPUT);
+            }
         }
 
-        joinResult
-            .toStream()
-            .to(OUTPUT);
 
         return builder.build(streamsConfig);
     }