You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/17 00:37:58 UTC

[1/2] kafka git commit: KAFKA-2984: KTable should send old values when required

Repository: kafka
Updated Branches:
  refs/heads/trunk 841d2d1a2 -> 587a2f4ef


http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 97aca3d..187a6f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -34,6 +34,7 @@ import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class KTableSourceTest {
 
@@ -114,4 +115,92 @@ public class KTableSourceTest {
         }
     }
 
+    @Test
+    public void testNotSedingOldValue() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>)
+                    builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+
+            MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+
+            builder.addProcessor("proc1", proc1, table1.name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            proc1.checkAndClearResult("A:(02<-null)", "B:(02<-null)");
+
+            driver.process(topic1, "A", "03");
+
+            proc1.checkAndClearResult("A:(03<-null)");
+
+            driver.process(topic1, "A", null);
+            driver.process(topic1, "B", null);
+
+            proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+    @Test
+    public void testSedingOldValue() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>)
+                    builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+
+            table1.enableSendingOldValues();
+
+            assertTrue(table1.sendingOldValueEnabled());
+
+            MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+
+            builder.addProcessor("proc1", proc1, table1.name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            proc1.checkAndClearResult("A:(02<-01)", "B:(02<-01)");
+
+            driver.process(topic1, "A", "03");
+
+            proc1.checkAndClearResult("A:(03<-02)");
+
+            driver.process(topic1, "A", null);
+            driver.process(topic1, "B", null);
+
+            proc1.checkAndClearResult("A:(null<-03)", "B:(null<-02)");
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index 510b458..b402525 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -60,10 +60,10 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
     }
 
     public void checkAndClearResult(String... expected) {
-        assertEquals(expected.length, processed.size());
+        assertEquals("the number of outputs:", expected.length, processed.size());
 
         for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processed.get(i));
+            assertEquals("output[" + i + "]:", expected[i], processed.get(i));
         }
 
         processed.clear();


[2/2] kafka git commit: KAFKA-2984: KTable should send old values when required

Posted by gu...@apache.org.
KAFKA-2984: KTable should send old values when required

guozhangwang

At DAG level, `KTable<K,V>` sends (key, (new value, old value)) to down stream.  This is done by wrapping the new value and the old value in an instance of `Change<V>` class and sending it as a "value" part of the stream. The old value is omitted (set to null) by default for optimization. When any downstream processor needs to use the old value, the framework should enable it (see `KTableImpl.enableSendingOldValues()` and implementations of `KTableProcessorSupplier.enableSensingOldValues()`).

NOTE: This is meant to be used by aggregation. But, if there is a use case like a SQL database trigger, we can add a new KTable method to expose this.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #672 from ymatsuda/trigger


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/587a2f4e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/587a2f4e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/587a2f4e

Branch: refs/heads/trunk
Commit: 587a2f4efd7994d4d3af82ed91304f939514294a
Parents: 841d2d1
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Wed Dec 16 15:37:53 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Dec 16 15:37:53 2015 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/KStreamBuilder.java   |   4 +-
 .../kafka/streams/kstream/internals/Change.java |  34 ++++
 .../streams/kstream/internals/KTableFilter.java |  23 ++-
 .../streams/kstream/internals/KTableImpl.java   |  92 ++++++---
 .../internals/KTableKTableAbstractJoin.java     |  49 +++++
 .../kstream/internals/KTableKTableJoin.java     |  38 ++--
 .../internals/KTableKTableJoinMerger.java       |  56 ++++++
 .../kstream/internals/KTableKTableLeftJoin.java |  36 ++--
 .../internals/KTableKTableOuterJoin.java        |  34 ++--
 .../internals/KTableKTableRightJoin.java        |  33 ++--
 .../kstream/internals/KTableMapValues.java      |  23 ++-
 .../streams/kstream/internals/KTableMerge.java  |  92 ---------
 .../internals/KTableProcessorSupplier.java      |   4 +-
 .../streams/kstream/internals/KTableSource.java |  15 +-
 .../kstream/internals/KTableFilterTest.java     | 157 ++++++++++++++-
 .../kstream/internals/KTableKTableJoinTest.java | 180 +++++++++++++++++
 .../internals/KTableKTableLeftJoinTest.java     | 180 +++++++++++++++++
 .../internals/KTableKTableOuterJoinTest.java    | 196 +++++++++++++++++++
 .../internals/KTableMapValuesImplTest.java      | 110 +++++++++++
 .../kstream/internals/KTableSourceTest.java     |  89 +++++++++
 .../kafka/test/MockProcessorSupplier.java       |   4 +-
 21 files changed, 1222 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 0ed5144..32d3a9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
-import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 
 import java.util.Collections;
@@ -99,7 +99,7 @@ public class KStreamBuilder extends TopologyBuilder {
 
         addSource(source, keyDeserializer, valDeserializer, topic);
 
-        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableSource<>(topic);
+        ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(topic);
         addProcessor(name, processorSupplier, source);
 
         return new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerializer, valSerializer, keyDeserializer, valDeserializer);

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
new file mode 100644
index 0000000..d7b868e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+public class Change<T> {
+
+    public final T newValue;
+    public final T oldValue;
+
+    public Change(T newValue, T oldValue) {
+        this.newValue = newValue;
+        this.oldValue = oldValue;
+    }
+
+    @Override
+    public String toString() {
+        return "(" + newValue + "<-" + oldValue + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index f8f00b8..72f1d88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -28,6 +28,8 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
     private final Predicate<K, V> predicate;
     private final boolean filterOut;
 
+    private boolean sendOldValues = false;
+
     public KTableFilter(KTableImpl<K, ?, V> parent, Predicate<K, V> predicate, boolean filterOut) {
         this.parent = parent;
         this.predicate = predicate;
@@ -35,7 +37,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<K, Change<V>> get() {
         return new KTableFilterProcessor();
     }
 
@@ -53,7 +55,13 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
         };
     }
 
-    private V computeNewValue(K key, V value) {
+    @Override
+    public void enableSendingOldValues() {
+        parent.enableSendingOldValues();
+        sendOldValues = true;
+    }
+
+    private V computeValue(K key, V value) {
         V newValue = null;
 
         if (value != null && (filterOut ^ predicate.test(key, value)))
@@ -62,11 +70,14 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
         return newValue;
     }
 
-    private class KTableFilterProcessor extends AbstractProcessor<K, V> {
+    private class KTableFilterProcessor extends AbstractProcessor<K, Change<V>> {
 
         @Override
-        public void process(K key, V value) {
-            context().forward(key, computeNewValue(key, value));
+        public void process(K key, Change<V> change) {
+            V newValue = computeValue(key, change.newValue);
+            V oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;
+
+            context().forward(key, new Change<>(newValue, oldValue));
         }
 
     }
@@ -86,7 +97,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
 
         @Override
         public V get(K key) {
-            return computeNewValue(key, parentGetter.get(key));
+            return computeValue(key, parentGetter.get(key));
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 9362b6a..9f97958 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 import java.util.Set;
@@ -45,8 +46,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     public static final String SOURCE_NAME = "KTABLE-SOURCE-";
 
-    public static final String SINK_NAME = "KTABLE-SINK-";
-
     public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
 
     public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
@@ -56,27 +55,30 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-";
 
     public static final String LEFTTHIS_NAME = "KTABLE-LEFTTHIS-";
+
     public static final String LEFTOTHER_NAME = "KTABLE-LEFTOTHER-";
 
     public static final String MERGE_NAME = "KTABLE-MERGE-";
 
-    public final KTableProcessorSupplier<K, S, V> processorSupplier;
+    public final ProcessorSupplier<K, ?> processorSupplier;
 
     private final Serializer<K> keySerializer;
     private final Serializer<V> valSerializer;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valDeserializer;
 
+    private boolean sendOldValues = false;
+
     public KTableImpl(KStreamBuilder topology,
                       String name,
-                      KTableProcessorSupplier<K, S, V> processorSupplier,
+                      ProcessorSupplier<K, ?> processorSupplier,
                       Set<String> sourceNodes) {
         this(topology, name, processorSupplier, sourceNodes, null, null, null, null);
     }
 
     public KTableImpl(KStreamBuilder topology,
                       String name,
-                      KTableProcessorSupplier<K, S, V> processorSupplier,
+                      ProcessorSupplier<K, ?> processorSupplier,
                       Set<String> sourceNodes,
                       Serializer<K> keySerializer,
                       Serializer<V> valSerializer,
@@ -142,35 +144,61 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @Override
     public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
-        String name = topology.newName(SINK_NAME);
-
-        topology.addSink(name, topic, keySerializer, valSerializer, this.name);
+        this.toStream().to(topic, keySerializer, valSerializer);
     }
 
     @Override
     public KStream<K, V> toStream() {
         String name = topology.newName(TOSTREAM_NAME);
 
-        topology.addProcessor(name, new KStreamPassThrough(), this.name);
+        topology.addProcessor(name, new KStreamMapValues<K, Change<V>, V>(new ValueMapper<Change<V>, V>() {
+            @Override
+            public V apply(Change<V> change) {
+                return change.newValue;
+            }
+        }), this.name);
 
         return new KStreamImpl<>(topology, name, sourceNodes);
     }
 
+    @SuppressWarnings("unchecked")
     KTableValueGetterSupplier<K, V> valueGetterSupplier() {
         if (processorSupplier instanceof KTableSource) {
-            final KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
-            synchronized (source) {
-                if (!source.isMaterialized()) {
-                    StateStoreSupplier storeSupplier =
-                        new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
-                    // mark this state is non internal hence it is read directly from a user topic
-                    topology.addStateStore(storeSupplier, false, name);
-                    source.materialize();
-                }
-            }
+            KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
+            materialize(source);
             return new KTableSourceValueGetterSupplier<>(source.topic);
         } else {
-            return processorSupplier.view();
+            return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    void enableSendingOldValues() {
+        if (!sendOldValues) {
+            if (processorSupplier instanceof KTableSource) {
+                KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                materialize(source);
+                source.enableSendingOldValues();
+            } else {
+                ((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues();
+            }
+            sendOldValues = true;
+        }
+    }
+
+    boolean sendingOldValueEnabled() {
+        return sendOldValues;
+    }
+
+    private void materialize(KTableSource<K, ?> source) {
+        synchronized (source) {
+            if (!source.isMaterialized()) {
+                StateStoreSupplier storeSupplier =
+                        new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
+                // mark this state as non internal hence it is read directly from a user topic
+                topology.addStateStore(storeSupplier, false, name);
+                source.materialize();
+            }
         }
     }
 
@@ -185,16 +213,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         KTableKTableJoin<K, R, V, V1> joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
         KTableKTableJoin<K, R, V1, V> joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
-        KTableMerge<K, R> joinMerge = new KTableMerge<>(
-                new KTableImpl<K, V, R>(topology, joinThisName, null, this.sourceNodes),
-                new KTableImpl<K, V1, R>(topology, joinOtherName, null, ((KTableImpl<K, ?, ?>) other).sourceNodes)
+        KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
+                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes),
+                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes)
         );
 
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
 
-        return new KTableImpl<>(topology, joinMergeName, joinThis, allSourceNodes);
+        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes);
     }
 
     @SuppressWarnings("unchecked")
@@ -208,16 +236,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         KTableKTableOuterJoin<K, R, V, V1> joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
         KTableKTableOuterJoin<K, R, V1, V> joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
-        KTableMerge<K, R> joinMerge = new KTableMerge<>(
-                new KTableImpl<K, V, R>(topology, joinThisName, null, this.sourceNodes),
-                new KTableImpl<K, V1, R>(topology, joinOtherName, null, ((KTableImpl<K, ?, ?>) other).sourceNodes)
+        KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
+                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes),
+                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes)
         );
 
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
 
-        return new KTableImpl<>(topology, joinMergeName, joinThis, allSourceNodes);
+        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes);
     }
 
     @SuppressWarnings("unchecked")
@@ -231,16 +259,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         KTableKTableLeftJoin<K, R, V, V1> joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
         KTableKTableRightJoin<K, R, V1, V> joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
-        KTableMerge<K, R> joinMerge = new KTableMerge<>(
-                new KTableImpl<K, V, R>(topology, joinThisName, null, this.sourceNodes),
-                new KTableImpl<K, V1, R>(topology, joinOtherName, null, ((KTableImpl<K, ?, ?>) other).sourceNodes)
+        KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
+                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes),
+                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes)
         );
 
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
 
-        return new KTableImpl<>(topology, joinMergeName, joinThis, allSourceNodes);
+        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
new file mode 100644
index 0000000..ad987dd
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+
+abstract class KTableKTableAbstractJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
+
+    protected final KTableImpl<K, ?, V1> table1;
+    protected final KTableImpl<K, ?, V2> table2;
+    protected final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
+    protected final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
+    protected final ValueJoiner<V1, V2, V> joiner;
+
+    protected boolean sendOldValues = false;
+
+    KTableKTableAbstractJoin(KTableImpl<K, ?, V1> table1,
+                             KTableImpl<K, ?, V2> table2,
+                             ValueJoiner<V1, V2, V> joiner) {
+        this.table1 = table1;
+        this.table2 = table2;
+        this.valueGetterSupplier1 = table1.valueGetterSupplier();
+        this.valueGetterSupplier2 = table2.valueGetterSupplier();
+        this.joiner = joiner;
+    }
+
+    @Override
+    public final void enableSendingOldValues() {
+        table1.enableSendingOldValues();
+        table2.enableSendingOldValues();
+        sendOldValues = true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 1eaa009..9716edd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -22,22 +22,14 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
-
-    private final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
-    private final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
-    private final ValueJoiner<V1, V2, V> joiner;
-
-    KTableKTableJoin(KTableImpl<K, ?, V1> table1,
-                     KTableImpl<K, ?, V2> table2,
-                     ValueJoiner<V1, V2, V> joiner) {
-        this.valueGetterSupplier1 = table1.valueGetterSupplier();
-        this.valueGetterSupplier2 = table2.valueGetterSupplier();
-        this.joiner = joiner;
+class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+
+    KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+        super(table1, table2, joiner);
     }
 
     @Override
-    public Processor<K, V1> get() {
+    public Processor<K, Change<V1>> get() {
         return new KTableKTableJoinProcessor(valueGetterSupplier2.get());
     }
 
@@ -52,7 +44,7 @@ class KTableKTableJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V
         };
     }
 
-    private class KTableKTableJoinProcessor extends AbstractProcessor<K, V1> {
+    private class KTableKTableJoinProcessor extends AbstractProcessor<K, Change<V1>> {
 
         private final KTableValueGetter<K, V2> valueGetter;
 
@@ -68,17 +60,21 @@ class KTableKTableJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V
         }
 
         @Override
-        public void process(K key, V1 value1) {
+        public void process(K key, Change<V1> change) {
             V newValue = null;
+            V oldValue = null;
+            V2 value2 = null;
 
-            if (value1 != null) {
-                V2 value2 = valueGetter.get(key);
+            if (change.newValue != null || change.oldValue != null)
+                value2 = valueGetter.get(key);
 
-                if (value2 != null)
-                    newValue = joiner.apply(value1, value2);
-            }
+            if (change.newValue != null && value2 != null)
+                newValue = joiner.apply(change.newValue, value2);
+
+            if (sendOldValues && change.oldValue != null && value2 != null)
+                oldValue = joiner.apply(change.oldValue, value2);
 
-            context().forward(key, newValue);
+            context().forward(key, new Change<>(newValue, oldValue));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
new file mode 100644
index 0000000..f52e618
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+
+class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
+
+    private final KTableImpl<K, ?, V> parent1;
+    private final KTableImpl<K, ?, V> parent2;
+
+    public KTableKTableJoinMerger(KTableImpl<K, ?, V> parent1, KTableImpl<K, ?, V> parent2) {
+        this.parent1 = parent1;
+        this.parent2 = parent2;
+    }
+
+    @Override
+    public Processor<K, Change<V>> get() {
+        return new KTableKTableJoinMergeProcessor<>();
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V> view() {
+        return parent1.valueGetterSupplier();
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        parent1.enableSendingOldValues();
+        parent2.enableSendingOldValues();
+    }
+
+    private class KTableKTableJoinMergeProcessor<K, V> extends AbstractProcessor<K, Change<V>> {
+        @Override
+        public void process(K key, Change<V> value) {
+            context().forward(key, value);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index fe4e280..b10bdb5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -22,22 +22,14 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableLeftJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
-
-    private final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
-    private final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
-    private final ValueJoiner<V1, V2, V> joiner;
-
-    KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1,
-                         KTableImpl<K, ?, V2> table2,
-                         ValueJoiner<V1, V2, V> joiner) {
-        this.valueGetterSupplier1 = table1.valueGetterSupplier();
-        this.valueGetterSupplier2 = table2.valueGetterSupplier();
-        this.joiner = joiner;
+class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+
+    KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+        super(table1, table2, joiner);
     }
 
     @Override
-    public Processor<K, V1> get() {
+    public Processor<K, Change<V1>> get() {
         return new KTableKTableLeftJoinProcessor(valueGetterSupplier2.get());
     }
 
@@ -52,7 +44,7 @@ class KTableKTableLeftJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V
         };
     }
 
-    private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, V1> {
+    private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, Change<V1>> {
 
         private final KTableValueGetter<K, V2> valueGetter;
 
@@ -68,13 +60,21 @@ class KTableKTableLeftJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V
         }
 
         @Override
-        public void process(K key, V1 value1) {
+        public void process(K key, Change<V1> change) {
             V newValue = null;
+            V oldValue = null;
+            V2 value2 = null;
+
+            if (change.newValue != null || change.oldValue != null)
+                value2 = valueGetter.get(key);
+
+            if (change.newValue != null)
+                newValue = joiner.apply(change.newValue, value2);
 
-            if (value1 != null)
-                newValue = joiner.apply(value1, valueGetter.get(key));
+            if (sendOldValues && change.oldValue != null)
+                oldValue = joiner.apply(change.oldValue, value2);
 
-            context().forward(key, newValue);
+            context().forward(key, new Change<>(newValue, oldValue));
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 8d54d02..b859b34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -22,22 +22,14 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableOuterJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
-
-    private final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
-    private final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
-    private final ValueJoiner<V1, V2, V> joiner;
-
-    KTableKTableOuterJoin(KTableImpl<K, ?, V1> table1,
-                          KTableImpl<K, ?, V2> table2,
-                          ValueJoiner<V1, V2, V> joiner) {
-        this.valueGetterSupplier1 = table1.valueGetterSupplier();
-        this.valueGetterSupplier2 = table2.valueGetterSupplier();
-        this.joiner = joiner;
+class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+
+    KTableKTableOuterJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+        super(table1, table2, joiner);
     }
 
     @Override
-    public Processor<K, V1> get() {
+    public Processor<K, Change<V1>> get() {
         return new KTableKTableOuterJoinProcessor(valueGetterSupplier2.get());
     }
 
@@ -52,7 +44,7 @@ class KTableKTableOuterJoin<K, V, V1, V2> implements KTableProcessorSupplier<K,
         };
     }
 
-    private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, V1> {
+    private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, Change<V1>> {
 
         private final KTableValueGetter<K, V2> valueGetter;
 
@@ -68,14 +60,20 @@ class KTableKTableOuterJoin<K, V, V1, V2> implements KTableProcessorSupplier<K,
         }
 
         @Override
-        public void process(K key, V1 value1) {
+        public void process(K key, Change<V1> change) {
             V newValue = null;
+            V oldValue = null;
             V2 value2 = valueGetter.get(key);
 
-            if (value1 != null || value2 != null)
-                newValue = joiner.apply(value1, value2);
+            if (change.newValue != null || value2 != null)
+                newValue = joiner.apply(change.newValue, value2);
+
+            if (sendOldValues) {
+                if (change.oldValue != null || value2 != null)
+                    oldValue = joiner.apply(change.oldValue, value2);
+            }
 
-            context().forward(key, newValue);
+            context().forward(key, new Change<>(newValue, oldValue));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 4219578..f20e987 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -22,22 +22,15 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableRightJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
-
-    private final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
-    private final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
-    private final ValueJoiner<V1, V2, V> joiner;
-
-    KTableKTableRightJoin(KTableImpl<K, ?, V1> table1,
-                          KTableImpl<K, ?, V2> table2,
-                          ValueJoiner<V1, V2, V> joiner) {
-        this.valueGetterSupplier1 = table1.valueGetterSupplier();
-        this.valueGetterSupplier2 = table2.valueGetterSupplier();
-        this.joiner = joiner;
+class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+
+
+    KTableKTableRightJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+        super(table1, table2, joiner);
     }
 
     @Override
-    public Processor<K, V1> get() {
+    public Processor<K, Change<V1>> get() {
         return new KTableKTableRightJoinProcessor(valueGetterSupplier2.get());
     }
 
@@ -52,7 +45,7 @@ class KTableKTableRightJoin<K, V, V1, V2> implements KTableProcessorSupplier<K,
         };
     }
 
-    private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, V1> {
+    private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, Change<V1>> {
 
         private final KTableValueGetter<K, V2> valueGetter;
 
@@ -68,14 +61,18 @@ class KTableKTableRightJoin<K, V, V1, V2> implements KTableProcessorSupplier<K,
         }
 
         @Override
-        public void process(K key, V1 value1) {
+        public void process(K key, Change<V1> change) {
             V newValue = null;
+            V oldValue = null;
             V2 value2 = valueGetter.get(key);
 
-            if (value2 != null)
-                newValue = joiner.apply(value1, value2);
+            if (value2 != null) {
+                newValue = joiner.apply(change.newValue, value2);
+                if (sendOldValues)
+                    oldValue = joiner.apply(change.oldValue, value2);
+            }
 
-            context().forward(key, newValue);
+            context().forward(key, new Change<>(newValue, oldValue));
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 300cce4..be80855 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -27,13 +27,15 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
     private final KTableImpl<K1, ?, V1> parent;
     private final ValueMapper<V1, V2> mapper;
 
+    private boolean sendOldValues = false;
+
     public KTableMapValues(KTableImpl<K1, ?, V1> parent, ValueMapper<V1, V2> mapper) {
         this.parent = parent;
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K1, V1> get() {
+    public Processor<K1, Change<V1>> get() {
         return new KTableMapProcessor();
     }
 
@@ -50,7 +52,13 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
         };
     }
 
-    private V2 computeNewValue(V1 value) {
+    @Override
+    public void enableSendingOldValues() {
+        parent.enableSendingOldValues();
+        sendOldValues = true;
+    }
+
+    private V2 computeValue(V1 value) {
         V2 newValue = null;
 
         if (value != null)
@@ -59,11 +67,14 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
         return newValue;
     }
 
-    private class KTableMapProcessor extends AbstractProcessor<K1, V1> {
+    private class KTableMapProcessor extends AbstractProcessor<K1, Change<V1>> {
 
         @Override
-        public void process(K1 key, V1 value) {
-            context().forward(key, computeNewValue(value));
+        public void process(K1 key, Change<V1> change) {
+            V2 newValue = computeValue(change.newValue);
+            V2 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
+
+            context().forward(key, new Change(newValue, oldValue));
         }
 
     }
@@ -83,7 +94,7 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
 
         @Override
         public V2 get(K1 key) {
-            return computeNewValue(parentGetter.get(key));
+            return computeValue(parentGetter.get(key));
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMerge.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMerge.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMerge.java
deleted file mode 100644
index 715ea6c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMerge.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-class KTableMerge<K, V> implements KTableProcessorSupplier<K, V, V> {
-
-    private final KTableImpl<K, ?, V>[] parents;
-
-    public KTableMerge(KTableImpl<K, ?, V>... parents) {
-        this.parents = parents.clone();
-    }
-
-    @Override
-    public Processor<K, V> get() {
-        return new KTableMergeProcessor<>();
-    }
-
-    @Override
-    public KTableValueGetterSupplier<K, V> view() {
-        final KTableValueGetterSupplier<K, V>[] valueGetterSuppliers = new KTableValueGetterSupplier[parents.length];
-
-        for (int i = 0; i < parents.length; i++) {
-            valueGetterSuppliers[i] = parents[i].valueGetterSupplier();
-        }
-        return new KTableValueGetterSupplier<K, V>() {
-
-            public KTableValueGetter<K, V> get() {
-                KTableValueGetter<K, V>[] valueGetters = new KTableValueGetter[valueGetterSuppliers.length];
-
-                for (int i = 0; i < valueGetters.length; i++) {
-                    valueGetters[i] = valueGetterSuppliers[i].get();
-                }
-                return new KTableMergeValueGetter(valueGetters);
-            }
-
-        };
-    }
-
-    private class KTableMergeProcessor<K, V> extends AbstractProcessor<K, V> {
-        @Override
-        public void process(K key, V value) {
-            context().forward(key, value);
-        }
-    }
-
-    private class KTableMergeValueGetter implements KTableValueGetter<K, V> {
-
-        private final KTableValueGetter<K, V>[] valueGetters;
-
-        public KTableMergeValueGetter(KTableValueGetter<K, V>[] valueGetters) {
-            this.valueGetters = valueGetters;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            for (KTableValueGetter<K, V> valueGetter : valueGetters) {
-                valueGetter.init(context);
-            }
-        }
-
-        @Override
-        public V get(K key) {
-            for (KTableValueGetter<K, V> valueGetter : valueGetters) {
-                V value = valueGetter.get(key);
-                if (value != null)
-                    return value;
-            }
-            return null;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
index 2fe5c15..d647b72 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
@@ -19,8 +19,10 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-public interface KTableProcessorSupplier<K, V, T> extends ProcessorSupplier<K, V> {
+public interface KTableProcessorSupplier<K, V, T> extends ProcessorSupplier<K, Change<V>> {
 
     KTableValueGetterSupplier<K, T> view();
 
+    void enableSendingOldValues();
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 60b2d5b..8010b3a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -20,13 +20,15 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 
-public class KTableSource<K, V> implements KTableProcessorSupplier<K, V, V> {
+public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
 
     public final String topic;
 
     private boolean materialized = false;
+    private boolean sendOldValues = false;
 
     public KTableSource(String topic) {
         this.topic = topic;
@@ -45,15 +47,14 @@ public class KTableSource<K, V> implements KTableProcessorSupplier<K, V, V> {
         return materialized;
     }
 
-    @Override
-    public KTableValueGetterSupplier<K, V> view() {
-        throw new IllegalStateException("a view cannot be define on the ktable source");
+    public void enableSendingOldValues() {
+        sendOldValues = true;
     }
 
     private class KTableSourceProcessor extends AbstractProcessor<K, V> {
         @Override
         public void process(K key, V value) {
-            context().forward(key, value);
+            context().forward(key, new Change<>(value, null));
         }
     }
 
@@ -70,8 +71,10 @@ public class KTableSource<K, V> implements KTableProcessorSupplier<K, V, V> {
 
         @Override
         public void process(K key, V value) {
+            V oldValue = sendOldValues ? store.get(key) : null;
             store.put(key, value);
-            context().forward(key, value);
+
+            context().forward(key, new Change<>(value, oldValue));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 590995b..c43bea0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -59,9 +59,17 @@ public class KTableFilterTest {
                 return (value % 2) == 0;
             }
         });
+        KTable<String, Integer> table3 = table1.filterOut(new Predicate<String, Integer>() {
+            @Override
+            public boolean test(String key, Integer value) {
+                return (value % 2) == 0;
+            }
+        });
 
         MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
         table2.toStream().process(proc2);
+        table3.toStream().process(proc3);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);
 
@@ -72,8 +80,8 @@ public class KTableFilterTest {
         driver.process(topic1, "A", null);
         driver.process(topic1, "B", null);
 
-
-        assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4", "A:null", "B:null"), proc2.processed);
+        proc2.checkAndClearResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
+        proc3.checkAndClearResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
     }
 
     @Test
@@ -93,13 +101,24 @@ public class KTableFilterTest {
                             return (value % 2) == 0;
                         }
                     });
+            KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterOut(
+                    new Predicate<String, Integer>() {
+                        @Override
+                        public boolean test(String key, Integer value) {
+                            return (value % 2) == 0;
+                        }
+                    });
 
             KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+            KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
 
             KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
 
             KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+
             getter2.init(driver.context());
+            getter3.init(driver.context());
 
             driver.process(topic1, "A", 1);
             driver.process(topic1, "B", 1);
@@ -109,19 +128,31 @@ public class KTableFilterTest {
             assertNull(getter2.get("B"));
             assertNull(getter2.get("C"));
 
+            assertEquals(1, (int) getter3.get("A"));
+            assertEquals(1, (int) getter3.get("B"));
+            assertEquals(1, (int) getter3.get("C"));
+
             driver.process(topic1, "A", 2);
             driver.process(topic1, "B", 2);
 
-            assertEquals(new Integer(2), getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(2, (int) getter2.get("A"));
+            assertEquals(2, (int) getter2.get("B"));
             assertNull(getter2.get("C"));
 
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertEquals(1, (int) getter3.get("C"));
+
             driver.process(topic1, "A", 3);
 
             assertNull(getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(2, (int) getter2.get("B"));
             assertNull(getter2.get("C"));
 
+            assertEquals(3, (int) getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertEquals(1, (int) getter3.get("C"));
+
             driver.process(topic1, "A", null);
             driver.process(topic1, "B", null);
 
@@ -129,6 +160,122 @@ public class KTableFilterTest {
             assertNull(getter2.get("B"));
             assertNull(getter2.get("C"));
 
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertEquals(1, (int) getter3.get("C"));
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+    @Test
+    public void testNotSendingOldValue() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, Integer, Integer> table1 =
+                    (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+            KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+                    new Predicate<String, Integer>() {
+                        @Override
+                        public boolean test(String key, Integer value) {
+                            return (value % 2) == 0;
+                        }
+                    });
+
+            MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+            MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+
+            builder.addProcessor("proc1", proc1, table1.name);
+            builder.addProcessor("proc2", proc2, table2.name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            driver.process(topic1, "A", 1);
+            driver.process(topic1, "B", 1);
+            driver.process(topic1, "C", 1);
+
+            proc1.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+
+            driver.process(topic1, "A", 2);
+            driver.process(topic1, "B", 2);
+
+            proc1.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+            proc2.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+
+            driver.process(topic1, "A", 3);
+
+            proc1.checkAndClearResult("A:(3<-null)");
+            proc2.checkAndClearResult("A:(null<-null)");
+
+            driver.process(topic1, "A", null);
+            driver.process(topic1, "B", null);
+
+            proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
+            proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+    @Test
+    public void testSendingOldValue() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, Integer, Integer> table1 =
+                    (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+            KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+                    new Predicate<String, Integer>() {
+                        @Override
+                        public boolean test(String key, Integer value) {
+                            return (value % 2) == 0;
+                        }
+                    });
+
+            table2.enableSendingOldValues();
+
+            MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+            MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+
+            builder.addProcessor("proc1", proc1, table1.name);
+            builder.addProcessor("proc2", proc2, table2.name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            driver.process(topic1, "A", 1);
+            driver.process(topic1, "B", 1);
+            driver.process(topic1, "C", 1);
+
+            proc1.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+
+            driver.process(topic1, "A", 2);
+            driver.process(topic1, "B", 2);
+
+            proc1.checkAndClearResult("A:(2<-1)", "B:(2<-1)");
+            proc2.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+
+            driver.process(topic1, "A", 3);
+
+            proc1.checkAndClearResult("A:(3<-2)");
+            proc2.checkAndClearResult("A:(null<-2)");
+
+            driver.process(topic1, "A", null);
+            driver.process(topic1, "B", null);
+
+            proc1.checkAndClearResult("A:(null<-3)", "B:(null<-2)");
+            proc2.checkAndClearResult("A:(null<-null)", "B:(null<-2)");
+
         } finally {
             Utils.delete(stateDir);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 9762cf7..aa09e74 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -38,7 +38,9 @@ import java.util.HashSet;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class KTableKTableJoinTest {
 
@@ -163,6 +165,184 @@ public class KTableKTableJoinTest {
         }
     }
 
+    @Test
+    public void testNotSendingOldValues() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KTable<Integer, String> table1;
+            KTable<Integer, String> table2;
+            KTable<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> proc;
+
+            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
+            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            joined = table1.join(table2, joiner);
+
+            proc = new MockProcessorSupplier<>();
+            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(0L);
+
+            assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+            assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+            assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+
+            // push two items to the primary stream. the other table is empty
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
+
+            // push two items to the other stream. this should produce two items.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
+
+            // push all items to the other stream. this should produce four items.
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+
+            // push two items with null to the other stream as deletes. this should produce two item.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], null);
+            }
+
+            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testSendingOldValues() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KTable<Integer, String> table1;
+            KTable<Integer, String> table2;
+            KTable<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> proc;
+
+            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
+            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            joined = table1.join(table2, joiner);
+
+            ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+
+            proc = new MockProcessorSupplier<>();
+            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(0L);
+
+            assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+            assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+            assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+
+            // push two items to the primary stream. the other table is empty
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
+
+            // push two items to the other stream. this should produce two items.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
+
+            // push all items to the other stream. this should produce four items.
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+
+            // push two items with null to the other stream as deletes. this should produce two item.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], null);
+            }
+
+            proc.checkAndClearResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
     private JoinedKeyValue kv(Integer key, String value) {
         return new JoinedKeyValue(key, value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index e1ef830..1527f17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -39,7 +39,9 @@ import java.util.HashSet;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class KTableKTableLeftJoinTest {
 
@@ -172,6 +174,184 @@ public class KTableKTableLeftJoinTest {
         }
     }
 
+    @Test
+    public void testNotSendingOldValue() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KTable<Integer, String> table1;
+            KTable<Integer, String> table2;
+            KTable<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> proc;
+
+            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
+            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            joined = table1.leftJoin(table2, joiner);
+
+            proc = new MockProcessorSupplier<>();
+            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(0L);
+
+            assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+            assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+            assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+
+            // push two items to the primary stream. the other table is empty
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+
+            // push two items to the other stream. this should produce two items.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+
+            // push all items to the other stream. this should produce four items.
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+
+            // push two items with null to the other stream as deletes. this should produce two item.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], null);
+            }
+
+            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testSendingOldValue() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KTable<Integer, String> table1;
+            KTable<Integer, String> table2;
+            KTable<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> proc;
+
+            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
+            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            joined = table1.leftJoin(table2, joiner);
+
+            ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+
+            proc = new MockProcessorSupplier<>();
+            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(0L);
+
+            assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+            assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+            assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+
+            // push two items to the primary stream. the other table is empty
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+
+            // push two items to the other stream. this should produce two items.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+
+            // push all items to the other stream. this should produce four items.
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+
+            // push two items with null to the other stream as deletes. this should produce two item.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], null);
+            }
+
+            proc.checkAndClearResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
     private JoinedKeyValue kv(Integer key, String value) {
         return new JoinedKeyValue(key, value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index fc800b8..67b83f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -38,7 +38,9 @@ import java.util.HashSet;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class KTableKTableOuterJoinTest {
 
@@ -172,6 +174,200 @@ public class KTableKTableOuterJoinTest {
         }
     }
 
+    @Test
+    public void testNotSendingOldValue() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KTable<Integer, String> table1;
+            KTable<Integer, String> table2;
+            KTable<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> proc;
+
+            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
+            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            joined = table1.outerJoin(table2, joiner);
+
+            proc = new MockProcessorSupplier<>();
+            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(0L);
+
+            assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+            assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+            assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+
+            // push two items to the primary stream. the other table is empty
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+
+            // push two items to the other stream. this should produce two items.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+
+            // push all items to the other stream. this should produce four items.
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+
+            // push two items with null to the other stream as deletes. this should produce two item.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], null);
+            }
+
+            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+
+            // push middle two items to the primary stream with null. this should produce two items.
+
+            for (int i = 1; i < 3; i++) {
+                driver.process(topic1, expectedKeys[i], null);
+            }
+
+            proc.checkAndClearResult("1:(null<-null)", "2:(null+YY2<-null)");
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testSendingOldValue() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KTable<Integer, String> table1;
+            KTable<Integer, String> table2;
+            KTable<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> proc;
+
+            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
+            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            joined = table1.outerJoin(table2, joiner);
+
+            ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+
+            proc = new MockProcessorSupplier<>();
+            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(0L);
+
+            assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+            assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+            assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+
+            // push two items to the primary stream. the other table is empty
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+
+            // push two items to the other stream. this should produce two items.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+
+            // push all items to the other stream. this should produce four items.
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+
+            // push two items with null to the other stream as deletes. this should produce two item.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], null);
+            }
+
+            proc.checkAndClearResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            proc.checkAndClearResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+
+            // push middle two items to the primary stream with null. this should produce two items.
+
+            for (int i = 1; i < 3; i++) {
+                driver.process(topic1, expectedKeys[i], null);
+            }
+
+            proc.checkAndClearResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
     private JoinedKeyValue kv(Integer key, String value) {
         return new JoinedKeyValue(key, value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
index 1ca6643..037b30a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
@@ -35,7 +35,9 @@ import java.io.IOException;
 import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class KTableMapValuesImplTest {
 
@@ -195,4 +197,112 @@ public class KTableMapValuesImplTest {
         }
     }
 
+    @Test
+    public void testNotSendingOldValue() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+            final Deserializer<String> deserializer = new StringDeserializer();
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, String, String> table1 =
+                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                    new ValueMapper<String, Integer>() {
+                        @Override
+                        public Integer apply(String value) {
+                            return new Integer(value);
+                        }
+                    });
+
+            MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+
+            builder.addProcessor("proc", proc, table2.name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            assertFalse(table1.sendingOldValueEnabled());
+            assertFalse(table2.sendingOldValueEnabled());
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            proc.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+
+            driver.process(topic1, "A", "03");
+
+            proc.checkAndClearResult("A:(3<-null)");
+
+            driver.process(topic1, "A", null);
+
+            proc.checkAndClearResult("A:(null<-null)");
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+    @Test
+    public void testSendingOldValue() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+            final Deserializer<String> deserializer = new StringDeserializer();
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, String, String> table1 =
+                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                    new ValueMapper<String, Integer>() {
+                        @Override
+                        public Integer apply(String value) {
+                            return new Integer(value);
+                        }
+                    });
+
+            table2.enableSendingOldValues();
+
+            MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+
+            builder.addProcessor("proc", proc, table2.name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            assertTrue(table1.sendingOldValueEnabled());
+            assertTrue(table2.sendingOldValueEnabled());
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            proc.checkAndClearResult("A:(2<-1)", "B:(2<-1)");
+
+            driver.process(topic1, "A", "03");
+
+            proc.checkAndClearResult("A:(3<-2)");
+
+            driver.process(topic1, "A", null);
+
+            proc.checkAndClearResult("A:(null<-3)");
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
 }