You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/09/25 20:59:24 UTC

[kafka] branch trunk updated: KAFKA-10077: Filter downstream of state-store results in spurious tombstones (#9156)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dc81d44  KAFKA-10077: Filter downstream of state-store results in spurious tombstones (#9156)
dc81d44 is described below

commit dc81d442dfc5c3d8d9c069942f6b00e2de377a3c
Author: Andy Coates <80...@users.noreply.github.com>
AuthorDate: Fri Sep 25 21:58:04 2020 +0100

    KAFKA-10077: Filter downstream of state-store results in spurious tombstones (#9156)
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../streams/kstream/internals/KTableAggregate.java |   4 +-
 .../streams/kstream/internals/KTableFilter.java    |  13 +-
 .../streams/kstream/internals/KTableImpl.java      |  21 ++-
 .../internals/KTableKTableAbstractJoin.java        |   8 +-
 .../kstream/internals/KTableKTableJoinMerger.java  |   8 +-
 .../streams/kstream/internals/KTableMapValues.java |   8 +-
 .../kstream/internals/KTableProcessorSupplier.java |  16 ++-
 .../streams/kstream/internals/KTableReduce.java    |   4 +-
 .../kstream/internals/KTableRepartitionMap.java    |   2 +-
 .../kstream/internals/KTableTransformValues.java   |   9 +-
 .../suppress/KTableSuppressProcessorSupplier.java  |   6 +-
 .../kstream/internals/KTableFilterTest.java        |  39 +++++-
 .../streams/kstream/internals/KTableImplTest.java  | 151 +++++++++++++++++----
 .../internals/KTableKTableInnerJoinTest.java       |   2 +-
 .../internals/KTableKTableLeftJoinTest.java        |   2 +-
 .../internals/KTableKTableOuterJoinTest.java       |   2 +-
 .../kstream/internals/KTableMapValuesTest.java     |   2 +-
 .../kstream/internals/KTableSourceTest.java        |   2 +-
 .../internals/KTableTransformValuesTest.java       |  10 +-
 .../kafka/streams/scala/kstream/KTableTest.scala   |  20 ++-
 20 files changed, 247 insertions(+), 82 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index be0a7a8..1e64746 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -47,8 +47,10 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean forceMaterialization) {
+        // Aggregates are always materialized:
         sendOldValues = true;
+        return true;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 6c9d452..0d41043 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -28,7 +28,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
     private final Predicate<? super K, ? super V> predicate;
     private final boolean filterNot;
     private final String queryableName;
-    private boolean sendOldValues = false;
+    private boolean sendOldValues;
 
     KTableFilter(final KTableImpl<K, ?, V> parent,
                  final Predicate<? super K, ? super V> predicate,
@@ -38,6 +38,8 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
         this.predicate = predicate;
         this.filterNot = filterNot;
         this.queryableName = queryableName;
+        // If upstream is already materialized, enable sending old values to avoid sending unnecessary tombstones:
+        this.sendOldValues = parent.enableSendingOldValues(false);
     }
 
     @Override
@@ -46,9 +48,11 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent.enableSendingOldValues();
-        sendOldValues = true;
+    public boolean enableSendingOldValues(final boolean forceMaterialization) {
+        if (parent.enableSendingOldValues(forceMaterialization)) {
+            sendOldValues = true;
+        }
+        return sendOldValues;
     }
 
     private V computeValue(final K key, final V value) {
@@ -141,7 +145,6 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
             this.parentGetter = parentGetter;
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             parentGetter.init(context);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 74939b3..97d7e89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -701,10 +701,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VO>) other));
 
         if (leftOuter) {
-            enableSendingOldValues();
+            enableSendingOldValues(true);
         }
         if (rightOuter) {
-            ((KTableImpl<?, ?, ?>) other).enableSendingOldValues();
+            ((KTableImpl<?, ?, ?>) other).enableSendingOldValues(true);
         }
 
         final KTableKTableAbstractJoin<K, VR, V, VO> joinThis;
@@ -807,7 +807,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
 
         builder.addGraphNode(this.streamsGraphNode, groupByMapNode);
 
-        this.enableSendingOldValues();
+        this.enableSendingOldValues(true);
         return new KGroupedTableImpl<>(
             builder,
             selectName,
@@ -832,18 +832,25 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean forceMaterialization) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (!forceMaterialization && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();
             } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
                 ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
             } else {
-                ((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues();
+                final KTableProcessorSupplier<K, S, V> tableProcessorSupplier = (KTableProcessorSupplier<K, S, V>) processorSupplier;
+                if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) {
+                    return false;
+                }
             }
             sendOldValues = true;
         }
+        return true;
     }
 
     boolean sendingOldValueEnabled() {
@@ -967,11 +974,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         //Old values are a useful optimization. The old values from the foreignKeyTable table are compared to the new values,
         //such that identical values do not cause a prefixScan. PrefixScan and propagation can be expensive and should
         //not be done needlessly.
-        ((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues();
+        ((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues(true);
 
         //Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node.
         //This occurs whenever the extracted foreignKey changes values.
-        enableSendingOldValues();
+        enableSendingOldValues(true);
 
         final NamedInternal renamed = new NamedInternal(joinName);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
index b0189df..ecaaf45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
@@ -39,9 +39,11 @@ abstract class KTableKTableAbstractJoin<K, R, V1, V2> implements KTableProcessor
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean forceMaterialization) {
+        // Table-table joins require upstream materialization:
+        table1.enableSendingOldValues(true);
+        table2.enableSendingOldValues(true);
         sendOldValues = true;
+        return true;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 86088c2..c669fb2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -77,10 +77,12 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean forceMaterialization) {
+        // Table-table joins require upstream materialization:
+        parent1.enableSendingOldValues(true);
+        parent2.enableSendingOldValues(true);
         sendOldValues = true;
+        return true;
     }
 
     public static <K, V> KTableKTableJoinMerger<K, V> of(final KTableProcessorSupplier<K, ?, V> parent1,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index e734457..650cef1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -66,9 +66,11 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent.enableSendingOldValues();
-        sendOldValues = true;
+    public boolean enableSendingOldValues(final boolean forceMaterialization) {
+        if (parent.enableSendingOldValues(forceMaterialization)) {
+            sendOldValues = true;
+        }
+        return sendOldValues;
     }
 
     private V1 computeValue(final K key, final V value) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
index 4be6c5b..ff6f9d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
@@ -22,5 +22,19 @@ public interface KTableProcessorSupplier<K, V, T> extends ProcessorSupplier<K, C
 
     KTableValueGetterSupplier<K, T> view();
 
-    void enableSendingOldValues();
+    /**
+     * Potentially enables sending old values.
+     * <p>
+     * If {@code forceMaterialization} is {@code true}, the method will force the materialization of upstream nodes to
+     * enable sending old values.
+     * <p>
+     * If {@code forceMaterialization} is {@code false}, the method will only enable the sending of old values <i>if</i>
+     * an upstream node is already materialized.
+     *
+     * @param forceMaterialization indicates if an upstream node should be forced to materialize to enable sending old
+     *                             values.
+     * @return {@code true} is sending old values is enabled, i.e. either because {@code forceMaterialization} was
+     * {@code true} or some upstream node is materialized.
+     */
+    boolean enableSendingOldValues(boolean forceMaterialization);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 171912f..88f62f9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -41,8 +41,10 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean forceMaterialization) {
+        // Reduce is always materialized:
         sendOldValues = true;
+        return true;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 22e7dec..d5dc5db 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -67,7 +67,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
      * @throws IllegalStateException since this method should never be called
      */
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean forceMaterialization) {
         // this should never be called
         throw new IllegalStateException("KTableRepartitionMap should always require sending old values.");
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index 86da063..cf0517f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -72,12 +72,13 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent.enableSendingOldValues();
-        sendOldValues = true;
+    public boolean enableSendingOldValues(final boolean forceMaterialization) {
+        if (parent.enableSendingOldValues(forceMaterialization)) {
+            sendOldValues = true;
+        }
+        return sendOldValues;
     }
 
-
     private class KTableTransformValuesProcessor extends AbstractProcessor<K, Change<V>> {
         private final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;
         private TimestampedKeyValueStore<K, V1> store;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
index a08c2b0..6a300b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
@@ -50,7 +50,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
         this.storeName = storeName;
         this.parentKTable = parentKTable;
         // The suppress buffer requires seeing the old values, to support the prior value view.
-        parentKTable.enableSendingOldValues();
+        parentKTable.enableSendingOldValues(true);
     }
 
     @Override
@@ -108,8 +108,8 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parentKTable.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean forceMaterialization) {
+        return parentKTable.enableSendingOldValues(forceMaterialization);
     }
 
     private static final class KTableSuppressProcessor<K, V> extends AbstractProcessor<K, Change<V>> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index b61f77b..57b8bfb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -297,12 +297,24 @@ public class KTableFilterTest {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotEnableSendingOldValuesIfNotAlreadyMaterializedAndNotForcedToMaterialize() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(false);
+
+        doTestNotSendingOldValue(builder, table1, table2, topic1);
+    }
+
     private void doTestSendingOldValue(final StreamsBuilder builder,
                                        final KTableImpl<String, Integer, Integer> table1,
                                        final KTableImpl<String, Integer, Integer> table2,
                                        final String topic1) {
-        table2.enableSendingOldValues();
-
         final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
         final Topology topology = builder.build();
 
@@ -347,7 +359,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void shouldSendOldValuesWhenEnabledWithoutMaterialization() {
+    public void shouldEnableSendOldValuesWhenNotMaterializedAlreadyButForcedToMaterialize() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
 
@@ -356,11 +368,13 @@ public class KTableFilterTest {
         final KTableImpl<String, Integer, Integer> table2 =
             (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
 
+        table2.enableSendingOldValues(true);
+
         doTestSendingOldValue(builder, table1, table2, topic1);
     }
 
     @Test
-    public void shouldSendOldValuesWhenEnabledOnMaterialization() {
+    public void shouldEnableSendOldValuesWhenMaterializedAlreadyAndForcedToMaterialize() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
 
@@ -369,6 +383,23 @@ public class KTableFilterTest {
         final KTableImpl<String, Integer, Integer> table2 =
             (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("store2"));
 
+        table2.enableSendingOldValues(true);
+
+        doTestSendingOldValue(builder, table1, table2, topic1);
+    }
+
+    @Test
+    public void shouldSendOldValuesWhenEnabledOnUpStreamMaterialization() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed, Materialized.as("store2"));
+        final KTableImpl<String, Integer, Integer> table2 =
+            (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(false);
+
         doTestSendingOldValue(builder, table1, table2, topic1);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 3e21ae1..c212df2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -42,7 +43,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
@@ -60,6 +60,8 @@ import java.util.Properties;
 
 import static java.util.Arrays.asList;
 import static org.easymock.EasyMock.mock;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -103,7 +105,7 @@ public class KTableImplTest {
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final TestInputTopic<String, String> inputTopic =
-                    driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
+                driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
             inputTopic.pipeInput("A", "01", 5L);
             inputTopic.pipeInput("B", "02", 100L);
             inputTopic.pipeInput("C", "03", 0L);
@@ -113,30 +115,103 @@ public class KTableImplTest {
         }
 
         final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
-        assertEquals(asList(new KeyValueTimestamp<>("A", "01", 5),
-                new KeyValueTimestamp<>("B", "02", 100),
-                new KeyValueTimestamp<>("C", "03", 0),
-                new KeyValueTimestamp<>("D", "04", 0),
-                new KeyValueTimestamp<>("A", "05", 10),
-                new KeyValueTimestamp<>("A", "06", 8)), processors.get(0).processed());
-        assertEquals(asList(new KeyValueTimestamp<>("A", 1, 5),
-                new KeyValueTimestamp<>("B", 2, 100),
-                new KeyValueTimestamp<>("C", 3, 0),
-                new KeyValueTimestamp<>("D", 4, 0),
-                new KeyValueTimestamp<>("A", 5, 10),
-                new KeyValueTimestamp<>("A", 6, 8)), processors.get(1).processed());
-        assertEquals(asList(new KeyValueTimestamp<>("A", null, 5),
-                new KeyValueTimestamp<>("B", 2, 100),
-                new KeyValueTimestamp<>("C", null, 0),
-                new KeyValueTimestamp<>("D", 4, 0),
-                new KeyValueTimestamp<>("A", null, 10),
-                new KeyValueTimestamp<>("A", 6, 8)), processors.get(2).processed());
-        assertEquals(asList(new KeyValueTimestamp<>("A", "01", 5),
-                new KeyValueTimestamp<>("B", "02", 100),
-                new KeyValueTimestamp<>("C", "03", 0),
-                new KeyValueTimestamp<>("D", "04", 0),
-                new KeyValueTimestamp<>("A", "05", 10),
-                new KeyValueTimestamp<>("A", "06", 8)), processors.get(3).processed());
+        assertEquals(asList(
+            new KeyValueTimestamp<>("A", "01", 5),
+            new KeyValueTimestamp<>("B", "02", 100),
+            new KeyValueTimestamp<>("C", "03", 0),
+            new KeyValueTimestamp<>("D", "04", 0),
+            new KeyValueTimestamp<>("A", "05", 10),
+            new KeyValueTimestamp<>("A", "06", 8)),
+            processors.get(0).processed());
+        assertEquals(asList(
+            new KeyValueTimestamp<>("A", 1, 5),
+            new KeyValueTimestamp<>("B", 2, 100),
+            new KeyValueTimestamp<>("C", 3, 0),
+            new KeyValueTimestamp<>("D", 4, 0),
+            new KeyValueTimestamp<>("A", 5, 10),
+            new KeyValueTimestamp<>("A", 6, 8)),
+            processors.get(1).processed());
+        assertEquals(asList(
+            new KeyValueTimestamp<>("A", null, 5),
+            new KeyValueTimestamp<>("B", 2, 100),
+            new KeyValueTimestamp<>("C", null, 0),
+            new KeyValueTimestamp<>("D", 4, 0),
+            new KeyValueTimestamp<>("A", null, 10),
+            new KeyValueTimestamp<>("A", 6, 8)),
+            processors.get(2).processed());
+        assertEquals(asList(
+            new KeyValueTimestamp<>("A", "01", 5),
+            new KeyValueTimestamp<>("B", "02", 100),
+            new KeyValueTimestamp<>("C", "03", 0),
+            new KeyValueTimestamp<>("D", "04", 0),
+            new KeyValueTimestamp<>("A", "05", 10),
+            new KeyValueTimestamp<>("A", "06", 8)),
+            processors.get(3).processed());
+    }
+
+    @Test
+    public void testMaterializedKTable() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final String topic1 = "topic1";
+        final String topic2 = "topic2";
+
+        final KTable<String, String> table1 = builder.table(topic1, consumed, Materialized.as("fred"));
+
+        final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
+        table1.toStream().process(supplier);
+
+        final KTable<String, Integer> table2 = table1.mapValues(s -> Integer.valueOf(s));
+        table2.toStream().process(supplier);
+
+        final KTable<String, Integer> table3 = table2.filter((key, value) -> (value % 2) == 0);
+        table3.toStream().process(supplier);
+        table1.toStream().to(topic2, produced);
+
+        final KTable<String, String> table4 = builder.table(topic2, consumed);
+        table4.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
+            inputTopic.pipeInput("A", "01", 5L);
+            inputTopic.pipeInput("B", "02", 100L);
+            inputTopic.pipeInput("C", "03", 0L);
+            inputTopic.pipeInput("D", "04", 0L);
+            inputTopic.pipeInput("A", "05", 10L);
+            inputTopic.pipeInput("A", "06", 8L);
+        }
+
+        final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
+        assertEquals(asList(
+            new KeyValueTimestamp<>("A", "01", 5),
+            new KeyValueTimestamp<>("B", "02", 100),
+            new KeyValueTimestamp<>("C", "03", 0),
+            new KeyValueTimestamp<>("D", "04", 0),
+            new KeyValueTimestamp<>("A", "05", 10),
+            new KeyValueTimestamp<>("A", "06", 8)),
+            processors.get(0).processed());
+        assertEquals(asList(
+            new KeyValueTimestamp<>("A", 1, 5),
+            new KeyValueTimestamp<>("B", 2, 100),
+            new KeyValueTimestamp<>("C", 3, 0),
+            new KeyValueTimestamp<>("D", 4, 0),
+            new KeyValueTimestamp<>("A", 5, 10),
+            new KeyValueTimestamp<>("A", 6, 8)),
+            processors.get(1).processed());
+        assertEquals(asList(
+            new KeyValueTimestamp<>("B", 2, 100),
+            new KeyValueTimestamp<>("D", 4, 0),
+            new KeyValueTimestamp<>("A", 6, 8)),
+            processors.get(2).processed());
+        assertEquals(asList(
+            new KeyValueTimestamp<>("A", "01", 5),
+            new KeyValueTimestamp<>("B", "02", 100),
+            new KeyValueTimestamp<>("C", "03", 0),
+            new KeyValueTimestamp<>("D", "04", 0),
+            new KeyValueTimestamp<>("A", "05", 10),
+            new KeyValueTimestamp<>("A", "06", 8)),
+            processors.get(3).processed());
     }
 
     @Test
@@ -304,6 +379,30 @@ public class KTableImplTest {
         }
     }
 
+    @Test
+    public void shouldNotEnableSendingOldValuesIfNotMaterializedAlreadyAndNotForcedToMaterialize() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTableImpl<String, String, String> table =
+            (KTableImpl<String, String, String>) builder.table("topic1", consumed);
+
+        table.enableSendingOldValues(false);
+
+        assertThat(table.sendingOldValueEnabled(), is(false));
+    }
+
+    @Test
+    public void shouldEnableSendingOldValuesIfNotMaterializedAlreadyButForcedToMaterialize() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTableImpl<String, String, String> table =
+            (KTableImpl<String, String, String>) builder.table("topic1", consumed);
+
+        table.enableSendingOldValues(true);
+
+        assertThat(table.sendingOldValueEnabled(), is(true));
+    }
+
     private void assertTopologyContainsProcessor(final Topology topology, final String processorName) {
         for (final TopologyDescription.Subtopology subtopology: topology.describe().subtopologies()) {
             for (final TopologyDescription.Node node: subtopology.nodes()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 8da8e50..f003b52 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -153,7 +153,7 @@ public class KTableKTableInnerJoinTest {
         table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
 
-        ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+        ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(true);
 
         builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index e6add21..dc33243 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -321,7 +321,7 @@ public class KTableKTableLeftJoinTest {
         table2 = builder.table(topic2, consumed);
         joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
-        ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+        ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(true);
 
         final Topology topology = builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index b689fea..9f99ae6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -308,7 +308,7 @@ public class KTableKTableOuterJoinTest {
         table2 = builder.table(topic2, consumed);
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
-        ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+        ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(true);
 
         final Topology topology = builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 0bca4ad..d48b38c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -249,7 +249,7 @@ public class KTableMapValuesTest {
             (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         final KTableImpl<String, String, Integer> table2 =
             (KTableImpl<String, String, Integer>) table1.mapValues(s -> Integer.valueOf(s));
-        table2.enableSendingOldValues();
+        table2.enableSendingOldValues(true);
 
         final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
         builder.build().addProcessor("proc", supplier, table2.name);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index a89167e..e503403 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -324,7 +324,7 @@ public class KTableSourceTest {
         @SuppressWarnings("unchecked")
         final KTableImpl<String, String, String> table1 =
             (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
-        table1.enableSendingOldValues();
+        table1.enableSendingOldValues(true);
         assertTrue(table1.sendingOldValueEnabled());
 
         final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index fe2306a..4aac321 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -171,7 +171,10 @@ public class KTableTransformValuesTest {
         final KTableTransformValues<String, String, String> transformValues =
             new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), null);
 
-        transformValues.enableSendingOldValues();
+        expect(parent.enableSendingOldValues(true)).andReturn(true);
+        replay(parent);
+
+        transformValues.enableSendingOldValues(true);
         final Processor<String, Change<String>> processor = transformValues.get();
         processor.init(context);
 
@@ -186,11 +189,10 @@ public class KTableTransformValuesTest {
 
     @Test
     public void shouldSetSendOldValuesOnParent() {
-        parent.enableSendingOldValues();
-        expectLastCall();
+        expect(parent.enableSendingOldValues(true)).andReturn(true);
         replay(parent);
 
-        new KTableTransformValues<>(parent, new SingletonNoOpValueTransformer<>(), QUERYABLE_NAME).enableSendingOldValues();
+        new KTableTransformValues<>(parent, new SingletonNoOpValueTransformer<>(), QUERYABLE_NAME).enableSendingOldValues(true);
 
         verify(parent);
     }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
index 2baca8c..8463cb3 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
@@ -39,29 +39,27 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
     val sinkTopic = "sink"
 
     val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count()
-    table.filter((_, value) => value > 1).toStream.to(sinkTopic)
+    table.filter((key, value) => key.equals("a") && value == 1).toStream.to(sinkTopic)
 
     val testDriver = createTestDriver(builder)
     val testInput = testDriver.createInput[String, String](sourceTopic)
     val testOutput = testDriver.createOutput[String, Long](sinkTopic)
 
     {
-      testInput.pipeInput("1", "value1")
+      testInput.pipeInput("a", "passes filter : add new row to table")
       val record = testOutput.readKeyValue
-      record.key shouldBe "1"
-      record.value shouldBe (null: java.lang.Long)
+      record.key shouldBe "a"
+      record.value shouldBe 1
     }
     {
-      testInput.pipeInput("1", "value2")
+      testInput.pipeInput("a", "fails filter : remove existing row from table")
       val record = testOutput.readKeyValue
-      record.key shouldBe "1"
-      record.value shouldBe 2
+      record.key shouldBe "a"
+      record.value shouldBe (null: java.lang.Long)
     }
     {
-      testInput.pipeInput("2", "value1")
-      val record = testOutput.readKeyValue
-      record.key shouldBe "2"
-      record.value shouldBe (null: java.lang.Long)
+      testInput.pipeInput("b", "fails filter : no output")
+      testOutput.isEmpty shouldBe true
     }
     testOutput.isEmpty shouldBe true