You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/20 20:06:30 UTC

[2/2] kafka git commit: KAFKA-4001: Improve Kafka Streams Join Semantics (KIP-77)

KAFKA-4001: Improve Kafka Streams Join Semantics (KIP-77)

 - fixed leftJoin -> outerJoin test bug
 - simplified to only use values
 - fixed inner KTable-KTable join
 - fixed left KTable-KTable join
 - fixed outer KTable-KTable join
 - fixed inner, left, and outer left KStream-KStream joins
 - added inner KStream-KTable join
 - fixed left KStream-KTable join

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #1777 from mjsax/kafka-4001-joins


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62c0972e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62c0972e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62c0972e

Branch: refs/heads/trunk
Commit: 62c0972efc525cc0677bd3fd470bd9fbbd70b004
Parents: 24067e4
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Oct 20 13:06:25 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Oct 20 13:06:25 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   |  44 +-
 .../streams/kstream/internals/KStreamImpl.java  | 272 +++++-------
 .../kstream/internals/KStreamKStreamJoin.java   |  27 +-
 .../kstream/internals/KStreamKTableJoin.java    |  75 ++++
 .../internals/KStreamKTableLeftJoin.java        |  66 ---
 .../internals/KStreamWindowAggregate.java       |   2 +-
 .../streams/kstream/internals/KTableImpl.java   | 103 ++---
 .../kstream/internals/KTableKTableJoin.java     |  21 +-
 .../kstream/internals/KTableKTableLeftJoin.java |  19 +-
 .../internals/KTableKTableOuterJoin.java        |  16 +-
 .../internals/KTableKTableRightJoin.java        |  24 +-
 .../internals/ProcessorContextImpl.java         |   4 -
 .../integration/JoinIntegrationTest.java        | 433 +++++++++++++++++++
 .../KTableKTableJoinIntegrationTest.java        |  74 ++--
 .../internals/KStreamKStreamLeftJoinTest.java   |  98 +++--
 .../internals/KStreamKTableJoinTest.java        | 146 +++++++
 .../internals/KStreamWindowAggregateTest.java   |  22 +-
 .../kstream/internals/KTableKTableJoinTest.java |  93 ++--
 .../internals/KTableKTableLeftJoinTest.java     |  18 +-
 .../internals/KTableKTableOuterJoinTest.java    |  12 +-
 20 files changed, 1075 insertions(+), 494 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 032efb5..4483e9f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -533,6 +533,39 @@ public interface KStream<K, V> {
             JoinWindows windows);
 
     /**
+     * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Inner Join.
+     * If a record key or value is {@code null} it will not included in the resulting {@link KStream}
+     *
+     * @param table  the instance of {@link KTable} joined with this stream
+     * @param joiner the instance of {@link ValueJoiner}
+     * @param <V1>   the value type of the table
+     * @param <V2>   the value type of the new stream
+     * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     * one for each matched record-pair with the same key
+     */
+    <V1, V2> KStream<K, V2> join(KTable<K, V1> table, ValueJoiner<V, V1, V2> joiner);
+
+    /**
+     * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Inner Join.
+     * If a record key or value is {@code null} it will not included in the resulting {@link KStream}
+     *
+     * @param table       the instance of {@link KTable} joined with this stream
+     * @param valueJoiner the instance of {@link ValueJoiner}
+     * @param keySerde    key serdes for materializing this stream.
+     *                    If not specified the default serdes defined in the configs will be used
+     * @param valSerde    value serdes for materializing this stream,
+     *                    if not specified the default serdes defined in the configs will be used
+     * @param <V1>        the value type of the table
+     * @param <V2>        the value type of the new stream
+     * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     * one for each matched record-pair with the same key and within the joining window intervals
+     */
+    <V1, V2> KStream<K, V2> join(KTable<K, V1> table,
+                                 ValueJoiner<V, V1, V2> valueJoiner,
+                                 Serde<K> keySerde,
+                                 Serde<V> valSerde);
+
+    /**
      * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Left Join.
      * If a record key is null it will not included in the resulting {@link KStream}
      *
@@ -566,6 +599,7 @@ public interface KStream<K, V> {
                                      ValueJoiner<V, V1, V2> valueJoiner,
                                      Serde<K> keySerde,
                                      Serde<V> valSerde);
+
     /**
      * Group the records of this {@link KStream} using the provided {@link KeyValueMapper} and
      * default serializers and deserializers. If a record key is null it will not included in
@@ -592,8 +626,8 @@ public interface KStream<K, V> {
      * @return a {@link KGroupedStream} that contains the grouped records of the original {@link KStream}
      */
     <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector,
-                                            Serde<K1> keySerde,
-                                            Serde<V> valSerde);
+                                       Serde<K1> keySerde,
+                                       Serde<V> valSerde);
 
     /**
      * Group the records with the same key into a {@link KGroupedStream} while preserving the

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bb77e96..b67fca5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,24 +20,25 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
-import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.Stores;
+
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
@@ -63,6 +64,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
 
+    public static final String JOIN_NAME = "KSTREAM-JOIN-";
+
     public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
 
     private static final String MAP_NAME = "KSTREAM-MAP-";
@@ -345,7 +348,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
         Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();
-        
+
         if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
             WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
             partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
@@ -386,78 +389,59 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public <V1, R> KStream<K, R> join(
-            KStream<K, V1> other,
-            ValueJoiner<V, V1, R> joiner,
-            JoinWindows windows,
-            Serde<K> keySerde,
-            Serde<V> thisValueSerde,
-            Serde<V1> otherValueSerde) {
+            final KStream<K, V1> other,
+            final ValueJoiner<V, V1, R> joiner,
+            final JoinWindows windows,
+            final Serde<K> keySerde,
+            final Serde<V> thisValueSerde,
+            final Serde<V1> otherValueSerde) {
 
-        return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, false);
+        return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(false, false));
     }
 
     @Override
     public <V1, R> KStream<K, R> join(
-            KStream<K, V1> other,
-            ValueJoiner<V, V1, R> joiner,
-            JoinWindows windows) {
+        final KStream<K, V1> other,
+        final ValueJoiner<V, V1, R> joiner,
+        final JoinWindows windows) {
 
-        return join(other, joiner, windows, null, null, null, false);
+        return join(other, joiner, windows, null, null, null);
     }
 
     @Override
     public <V1, R> KStream<K, R> outerJoin(
-            KStream<K, V1> other,
-            ValueJoiner<V, V1, R> joiner,
-            JoinWindows windows,
-            Serde<K> keySerde,
-            Serde<V> thisValueSerde,
-            Serde<V1> otherValueSerde) {
+        final KStream<K, V1> other,
+        final ValueJoiner<V, V1, R> joiner,
+        final JoinWindows windows,
+        final Serde<K> keySerde,
+        final Serde<V> thisValueSerde,
+        final Serde<V1> otherValueSerde) {
 
-        return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, true);
+        return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(true, true));
     }
 
     @Override
     public <V1, R> KStream<K, R> outerJoin(
-            KStream<K, V1> other,
-            ValueJoiner<V, V1, R> joiner,
-            JoinWindows windows) {
+        final KStream<K, V1> other,
+        final ValueJoiner<V, V1, R> joiner,
+        final JoinWindows windows) {
 
-        return join(other, joiner, windows, null, null, null, true);
+        return outerJoin(other, joiner, windows, null, null, null);
     }
 
-    @SuppressWarnings("unchecked")
-    private <V1, R> KStream<K, R> join(
-            KStream<K, V1> other,
-            ValueJoiner<V, V1, R> joiner,
-            JoinWindows windows,
-            Serde<K> keySerde,
-            Serde<V> thisValueSerde,
-            Serde<V1> otherValueSerde,
-            boolean outer) {
-
-        return doJoin(other,
-            joiner,
-            windows,
-            keySerde,
-            thisValueSerde,
-            otherValueSerde,
-            new DefaultJoin(outer));
-    }
-
-    private <V1, R> KStream<K, R> doJoin(KStream<K, V1> other,
-                                         ValueJoiner<V, V1, R> joiner,
-                                         JoinWindows windows,
-                                         Serde<K> keySerde,
-                                         Serde<V> thisValueSerde,
-                                         Serde<V1> otherValueSerde,
-                                         KStreamImplJoin join) {
+    private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other,
+                                         final ValueJoiner<V, V1, R> joiner,
+                                         final JoinWindows windows,
+                                         final Serde<K> keySerde,
+                                         final Serde<V> thisValueSerde,
+                                         final Serde<V1> otherValueSerde,
+                                         final KStreamImplJoin join) {
         Objects.requireNonNull(other, "other KStream can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(windows, "windows can't be null");
 
         KStreamImpl<K, V> joinThis = this;
-        KStreamImpl<K, V1> joinOther = (KStreamImpl) other;
+        KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other;
 
         if (joinThis.repartitionRequired) {
             joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde, null);
@@ -531,20 +515,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @SuppressWarnings("unchecked")
     @Override
     public <V1, R> KStream<K, R> leftJoin(
-            KStream<K, V1> other,
-            ValueJoiner<V, V1, R> joiner,
-            JoinWindows windows,
-            Serde<K> keySerde,
-            Serde<V> thisValSerde,
-            Serde<V1> otherValueSerde) {
+        final KStream<K, V1> other,
+        final ValueJoiner<V, V1, R> joiner,
+        final JoinWindows windows,
+        final Serde<K> keySerde,
+        final Serde<V> thisValSerde,
+        final Serde<V1> otherValueSerde) {
 
         return doJoin(other,
-                      joiner,
-                      windows,
-                      keySerde,
-                      thisValSerde,
-                      otherValueSerde,
-                      new LeftJoin());
+            joiner,
+            windows,
+            keySerde,
+            thisValSerde,
+            otherValueSerde,
+            new KStreamImplJoin(true, false));
     }
 
     @Override
@@ -558,50 +542,69 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @SuppressWarnings("unchecked")
     @Override
-    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
-        return leftJoin(other, joiner, null, null);
+    public <V1, R> KStream<K, R> join(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+        return join(other, joiner, null, null);
 
     }
 
-    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other,
-                                          ValueJoiner<V, V1, R> joiner,
-                                          Serde<K> keySerde,
-                                          Serde<V> valueSerde) {
-        Objects.requireNonNull(other, "other KTable can't be null");
-        Objects.requireNonNull(joiner, "joiner can't be null");
-
+    @Override
+    public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
+                                      final ValueJoiner<V, V1, R> joiner,
+                                      final Serde<K> keySerde,
+                                      final Serde<V> valueSerde) {
         if (repartitionRequired) {
-            KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde,
-                                                                                valueSerde, null);
-            return thisStreamRepartitioned.doStreamTableLeftJoin(other, joiner);
+            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(keySerde,
+                valueSerde, null);
+            return thisStreamRepartitioned.doStreamTableJoin(other, joiner, false);
         } else {
-            return doStreamTableLeftJoin(other, joiner);
+            return doStreamTableJoin(other, joiner, false);
         }
-
     }
 
-    private <V1, R> KStream<K, R> doStreamTableLeftJoin(final KTable<K, V1> other,
-                                                        final ValueJoiner<V, V1, R> joiner) {
-        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+    private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
+                                                    final ValueJoiner<V, V1, R> joiner,
+                                                    final boolean leftJoin) {
+        Objects.requireNonNull(other, "other KTable can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
+
+        final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
-        String name = topology.newName(LEFTJOIN_NAME);
+        final String name = topology.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
 
-        topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
-        topology.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).valueGetterSupplier().storeNames());
+        topology.addProcessor(name, new KStreamKTableJoin<>((KTableImpl<K, ?, V1>) other, joiner, leftJoin), this.name);
+        topology.connectProcessorAndStateStores(name, other.getStoreName());
         topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
 
         return new KStreamImpl<>(topology, name, allSourceNodes, false);
     }
 
     @Override
+    public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+        return leftJoin(other, joiner, null, null);
+    }
+
+    public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other,
+                                          final ValueJoiner<V, V1, R> joiner,
+                                          final Serde<K> keySerde,
+                                          final Serde<V> valueSerde) {
+        if (repartitionRequired) {
+            final KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde,
+                                                                                valueSerde, null);
+            return thisStreamRepartitioned.doStreamTableJoin(other, joiner, true);
+        } else {
+            return doStreamTableJoin(other, joiner, true);
+        }
+    }
+
+    @Override
     public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector) {
         return groupBy(selector, null, null);
     }
 
     @Override
     public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector,
-                                                   Serde<K1> keySerde,
-                                                   Serde<V> valSerde) {
+                                              Serde<K1> keySerde,
+                                              Serde<V> valSerde) {
 
         Objects.requireNonNull(selector, "selector can't be null");
         String selectName = internalSelectKey(selector);
@@ -641,26 +644,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             .build();
     }
 
-    private interface KStreamImplJoin {
+    private class KStreamImplJoin {
 
-        <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
-                                            KStream<K1, V2> other,
-                                            ValueJoiner<V1, V2, R> joiner,
-                                            JoinWindows windows,
-                                            Serde<K1> keySerde,
-                                            Serde<V1> lhsValueSerde,
-                                            Serde<V2> otherValueSerde);
-    }
-
-    private class DefaultJoin implements KStreamImplJoin {
-
-        private final boolean outer;
+        private final boolean leftOuter;
+        private final boolean rightOuter;
 
-        DefaultJoin(final boolean outer) {
-            this.outer = outer;
+        KStreamImplJoin(final boolean leftOuter, final boolean rightOuter) {
+            this.leftOuter = leftOuter;
+            this.rightOuter = rightOuter;
         }
 
-        @Override
         public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
                                                    KStream<K1, V2> other,
                                                    ValueJoiner<V1, V2, R> joiner,
@@ -670,12 +663,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                    Serde<V2> otherValueSerde) {
             String thisWindowStreamName = topology.newName(WINDOWED_NAME);
             String otherWindowStreamName = topology.newName(WINDOWED_NAME);
-            String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
-            String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
+            String joinThisName = rightOuter ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
+            String joinOtherName = leftOuter ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
             String joinMergeName = topology.newName(MERGE_NAME);
 
             StateStoreSupplier thisWindow =
-                createWindowedStateStore(windows, keySerde, lhsValueSerde,  joinThisName + "-store");
+                createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store");
 
             StateStoreSupplier otherWindow =
                 createWindowedStateStore(windows, keySerde, otherValueSerde, joinOtherName + "-store");
@@ -688,16 +681,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                                                     windows.before + windows.after + 1,
                                                                                     windows.maintainMs());
 
-            KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
-                                                                                  windows.before,
-                                                                                  windows.after,
-                                                                                  joiner,
-                                                                                  outer);
-            KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
-                                                                                   windows.after,
-                                                                                   windows.before,
-                                                                                   reverseJoiner(joiner),
-                                                                                   outer);
+            final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
+                windows.before,
+                windows.after,
+                joiner,
+                leftOuter);
+            final KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
+                windows.after,
+                windows.before,
+                reverseJoiner(joiner),
+                rightOuter);
 
             KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
 
@@ -716,39 +709,4 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         }
     }
 
-
-    private class LeftJoin implements KStreamImplJoin {
-
-        @Override
-        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
-                                                   KStream<K1, V2> other,
-                                                   ValueJoiner<V1, V2, R> joiner,
-                                                   JoinWindows windows,
-                                                   Serde<K1> keySerde,
-                                                   Serde<V1> lhsValueSerde,
-                                                   Serde<V2> otherValueSerde) {
-            String otherWindowStreamName = topology.newName(WINDOWED_NAME);
-            String joinThisName = topology.newName(LEFTJOIN_NAME);
-
-            StateStoreSupplier otherWindow =
-                createWindowedStateStore(windows, keySerde, otherValueSerde, joinThisName + "-store");
-
-            KStreamJoinWindow<K1, V1>
-                otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
-            KStreamKStreamJoin<K1, R, V1, V2>
-                joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
-
-
-
-            topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
-            topology.addProcessor(joinThisName, joinThis, ((AbstractStream) lhs).name);
-            topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName);
-
-            Set<String> allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes);
-            allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
-            return new KStreamImpl<>(topology, joinThisName, allSourceNodes, false);
-        }
-    }
-
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index edde009..41547b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -52,7 +52,6 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
         private WindowStore<K, V2> otherWindow;
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);
@@ -62,14 +61,21 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
 
         @Override
-        public void process(K key, V1 value) {
-            if (key == null)
+        public void process(final K key, final V1 value) {
+            // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
+            //
+            // we also ignore the record if value is null, because in a key-value data model a null-value indicates
+            // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
+            // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
+            // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
+            if (key == null || value == null) {
                 return;
+            }
 
-            boolean needOuterJoin = KStreamKStreamJoin.this.outer;
+            boolean needOuterJoin = outer;
 
-            long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
-            long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
+            final long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
+            final long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
 
             try (WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
@@ -77,8 +83,9 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
                     context().forward(key, joiner.apply(value, iter.next().value));
                 }
 
-                if (needOuterJoin)
+                if (needOuterJoin) {
                     context().forward(key, joiner.apply(value, null));
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
new file mode 100644
index 0000000..1027b96
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+class KStreamKTableJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
+
+    private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
+    private final ValueJoiner<V1, V2, R> joiner;
+    private final boolean leftJoin;
+
+    KStreamKTableJoin(final KTableImpl<K, ?, V2> table, final ValueJoiner<V1, V2, R> joiner, final boolean leftJoin) {
+        valueGetterSupplier = table.valueGetterSupplier();
+        this.joiner = joiner;
+        this.leftJoin = leftJoin;
+    }
+
+    @Override
+    public Processor<K, V1> get() {
+        return new KStreamKTableJoinProcessor(valueGetterSupplier.get(), leftJoin);
+    }
+
+    private class KStreamKTableJoinProcessor extends AbstractProcessor<K, V1> {
+
+        private final KTableValueGetter<K, V2> valueGetter;
+        private final boolean leftJoin;
+
+        KStreamKTableJoinProcessor(final KTableValueGetter<K, V2> valueGetter, final boolean leftJoin) {
+            this.valueGetter = valueGetter;
+            this.leftJoin = leftJoin;
+        }
+
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            valueGetter.init(context);
+        }
+
+        @Override
+        public void process(final K key, final V1 value) {
+            // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
+            //
+            // we also ignore the record if value is null, because in a key-value data model a null-value indicates
+            // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
+            // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
+            // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
+            if (key != null && value != null) {
+                final V2 value2 = valueGetter.get(key);
+                if (leftJoin || value2 != null) {
+                    context().forward(key, joiner.apply(value, value2));
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
deleted file mode 100644
index 92b9b59..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamKTableLeftJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
-
-    private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
-    private final ValueJoiner<V1, V2, R> joiner;
-
-    KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, R> joiner) {
-        this.valueGetterSupplier = table.valueGetterSupplier();
-        this.joiner = joiner;
-    }
-
-    @Override
-    public Processor<K, V1> get() {
-        return new KStreamKTableLeftJoinProcessor(valueGetterSupplier.get());
-    }
-
-    private class KStreamKTableLeftJoinProcessor extends AbstractProcessor<K, V1> {
-
-        private final KTableValueGetter<K, V2> valueGetter;
-
-        public KStreamKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
-            this.valueGetter = valueGetter;
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public void init(ProcessorContext context) {
-            super.init(context);
-            valueGetter.init(context);
-        }
-
-        @Override
-        public void process(K key, V1 value) {
-            // if the key is null, we do not need proceed joining
-            // the record with the table
-            if (key != null) {
-                context().forward(key, joiner.apply(value, valueGetter.get(key)));
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 718e52b..55b0916 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -17,8 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 6423cff..683dc00 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -44,6 +44,7 @@ import java.util.Set;
 
 /**
  * The implementation class of {@link KTable}.
+ *
  * @param <K> the key type
  * @param <S> the source's (parent's) value type
  * @param <V> the value type
@@ -283,77 +284,55 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return toStream().selectKey(mapper);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
-    public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
-        Objects.requireNonNull(other, "other can't be null");
-        Objects.requireNonNull(joiner, "joiner can't be null");
-
-        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
-
-        String joinThisName = topology.newName(JOINTHIS_NAME);
-        String joinOtherName = topology.newName(JOINOTHER_NAME);
-        String joinMergeName = topology.newName(MERGE_NAME);
-
-        KTableKTableJoin<K, R, V, V1> joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
-        KTableKTableJoin<K, R, V1, V> joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
-        KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
-                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName),
-                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
-        );
-
-        topology.addProcessor(joinThisName, joinThis, this.name);
-        topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
-        topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
-        topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
-
-        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
+    public <V1, R> KTable<K, R> join(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+        return doJoin(other, joiner, false, false);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
-    public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
-        Objects.requireNonNull(other, "other can't be null");
-        Objects.requireNonNull(joiner, "joiner can't be null");
-
-        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
-
-        String joinThisName = topology.newName(OUTERTHIS_NAME);
-        String joinOtherName = topology.newName(OUTEROTHER_NAME);
-        String joinMergeName = topology.newName(MERGE_NAME);
-
-        KTableKTableOuterJoin<K, R, V, V1> joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
-        KTableKTableOuterJoin<K, R, V1, V> joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
-        KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
-                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName),
-                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
-        );
-
-        topology.addProcessor(joinThisName, joinThis, this.name);
-        topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
-        topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
-        topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
+    public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+        return doJoin(other, joiner, true, true);
+    }
 
-        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
+    @Override
+    public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+        return doJoin(other, joiner, true, false);
     }
 
     @SuppressWarnings("unchecked")
-    @Override
-    public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+    private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other, ValueJoiner<V, V1, R> joiner, final boolean leftOuter, final boolean rightOuter) {
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
-        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
-        String joinThisName = topology.newName(LEFTTHIS_NAME);
-        String joinOtherName = topology.newName(LEFTOTHER_NAME);
-        String joinMergeName = topology.newName(MERGE_NAME);
+        final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+        if (leftOuter) {
+            enableSendingOldValues();
+        }
+        if (rightOuter) {
+            ((KTableImpl) other).enableSendingOldValues();
+        }
+
+        final String joinThisName = topology.newName(JOINTHIS_NAME);
+        final String joinOtherName = topology.newName(JOINOTHER_NAME);
+        final String joinMergeName = topology.newName(MERGE_NAME);
+
+        final KTableKTableAbstractJoin<K, R, V, V1> joinThis;
+        final KTableKTableAbstractJoin<K, R, V1, V> joinOther;
+
+        if (!leftOuter) { // inner
+            joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+            joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+        } else if (!rightOuter) { // left
+            joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+            joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+        } else { // outer
+            joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+            joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+        }
 
-        KTableKTableLeftJoin<K, R, V, V1> joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
-        KTableKTableRightJoin<K, R, V1, V> joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
-        KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
-                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName),
+        final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
+            new KTableImpl<K, V, R>(topology, joinThisName, joinThis, sourceNodes, storeName),
                 new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
         );
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index cbd626d..49f6715 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -69,23 +69,26 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
          * @throws StreamsException if key is null
          */
         @Override
-        public void process(K key, Change<V1> change) {
+        public void process(final K key, final Change<V1> change) {
             // the keys should never be null
             if (key == null)
                 throw new StreamsException("Record key for KTable join operator should not be null.");
 
             R newValue = null;
             R oldValue = null;
-            V2 value2 = null;
 
-            if (change.newValue != null || change.oldValue != null)
-                value2 = valueGetter.get(key);
+            final V2 value2 = valueGetter.get(key);
+            if (value2 == null) {
+                return;
+            }
 
-            if (change.newValue != null && value2 != null)
+            if (change.newValue != null) {
                 newValue = joiner.apply(change.newValue, value2);
+            }
 
-            if (sendOldValues && change.oldValue != null && value2 != null)
+            if (sendOldValues && change.oldValue != null) {
                 oldValue = joiner.apply(change.oldValue, value2);
+            }
 
             context().forward(key, new Change<>(newValue, oldValue));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 4bee38c..5f5cad6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -70,27 +70,28 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
          * @throws StreamsException if key is null
          */
         @Override
-        public void process(K key, Change<V1> change) {
+        public void process(final K key, final Change<V1> change) {
             // the keys should never be null
             if (key == null)
                 throw new StreamsException("Record key for KTable left-join operator should not be null.");
 
             R newValue = null;
             R oldValue = null;
-            V2 value2 = null;
 
-            if (change.newValue != null || change.oldValue != null)
-                value2 = valueGetter.get(key);
+            final V2 value2 = valueGetter.get(key);
+            if (value2 == null && change.newValue == null && change.oldValue == null) {
+                return;
+            }
 
-            if (change.newValue != null)
+            if (change.newValue != null) {
                 newValue = joiner.apply(change.newValue, value2);
+            }
 
             if (sendOldValues && change.oldValue != null)
                 oldValue = joiner.apply(change.oldValue, value2);
 
             context().forward(key, new Change<>(newValue, oldValue));
         }
-
     }
 
     private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, R> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index ad7dbde..2bfd8a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -69,21 +69,25 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
          * @throws StreamsException if key is null
          */
         @Override
-        public void process(K key, Change<V1> change) {
+        public void process(final K key, final Change<V1> change) {
             // the keys should never be null
             if (key == null)
                 throw new StreamsException("Record key for KTable outer-join operator should not be null.");
 
             R newValue = null;
             R oldValue = null;
-            V2 value2 = valueGetter.get(key);
 
-            if (change.newValue != null || value2 != null)
+            final V2 value2 = valueGetter.get(key);
+            if (value2 == null && change.newValue == null && change.oldValue == null) {
+                return;
+            }
+
+            if (value2 != null || change.newValue != null) {
                 newValue = joiner.apply(change.newValue, value2);
+            }
 
-            if (sendOldValues) {
-                if (change.oldValue != null || value2 != null)
-                    oldValue = joiner.apply(change.oldValue, value2);
+            if (sendOldValues && (value2 != null || change.oldValue != null)) {
+                oldValue = joiner.apply(change.oldValue, value2);
             }
 
             context().forward(key, new Change<>(newValue, oldValue));

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 80aadaa..8aeadcc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -70,19 +70,23 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
          * @throws StreamsException if key is null
          */
         @Override
-        public void process(K key, Change<V1> change) {
+        public void process(final K key, final Change<V1> change) {
             // the keys should never be null
             if (key == null)
                 throw new StreamsException("Record key for KTable right-join operator should not be null.");
 
-            R newValue = null;
+            final R newValue;
             R oldValue = null;
-            V2 value2 = valueGetter.get(key);
 
-            if (value2 != null) {
-                newValue = joiner.apply(change.newValue, value2);
-                if (sendOldValues)
-                    oldValue = joiner.apply(change.oldValue, value2);
+            final V2 value2 = valueGetter.get(key);
+            if (value2 == null) {
+                return;
+            }
+
+            newValue = joiner.apply(change.newValue, value2);
+
+            if (sendOldValues) {
+                oldValue = joiner.apply(change.oldValue, value2);
             }
 
             context().forward(key, new Change<>(newValue, oldValue));

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 195e5a4..11ca30e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -48,7 +48,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
     private RecordContext recordContext;
     private ProcessorNode currentNode;
 
-    @SuppressWarnings("unchecked")
     public ProcessorContextImpl(TaskId id,
                                 StreamTask task,
                                 StreamsConfig config,
@@ -194,7 +193,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
         return recordContext.timestamp();
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(K key, V value) {
         ProcessorNode previousNode = currentNode;
@@ -208,7 +206,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(K key, V value, int childIndex) {
         ProcessorNode previousNode = currentNode;
@@ -221,7 +218,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(K key, V value, String childName) {
         for (ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode.children()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
new file mode 100644
index 0000000..0f70588
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+public class JoinIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    private static ZkUtils zkUtils = null;
+
+    private static final String APP_ID = "join-integration-test";
+    private static final String INPUT_TOPIC_1 = "inputTopicLeft";
+    private static final String INPUT_TOPIC_2 = "inputTopicRight";
+    private static final String OUTPUT_TOPIC = "outputTopic";
+
+    private final static Properties PRODUCER_CONFIG = new Properties();
+    private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
+    private final static Properties STREAMS_CONFIG = new Properties();
+
+    private KStreamBuilder builder;
+    private KStream<Long, String> leftStream;
+    private KStream<Long, String> rightStream;
+    private KTable<Long, String> leftTable;
+    private KTable<Long, String> rightTable;
+
+    private final List<Input<String>> input = Arrays.asList(
+        new Input<>(INPUT_TOPIC_1, (String) null),
+        new Input<>(INPUT_TOPIC_2, (String) null),
+        new Input<>(INPUT_TOPIC_1, "A"),
+        new Input<>(INPUT_TOPIC_2, "a"),
+        new Input<>(INPUT_TOPIC_1, "B"),
+        new Input<>(INPUT_TOPIC_2, "b"),
+        new Input<>(INPUT_TOPIC_1, (String) null),
+        new Input<>(INPUT_TOPIC_2, (String) null),
+        new Input<>(INPUT_TOPIC_1, "C"),
+        new Input<>(INPUT_TOPIC_2, "c"),
+        new Input<>(INPUT_TOPIC_2, (String) null),
+        new Input<>(INPUT_TOPIC_1, (String) null),
+        new Input<>(INPUT_TOPIC_2, (String) null),
+        new Input<>(INPUT_TOPIC_2, "d"),
+        new Input<>(INPUT_TOPIC_1, "D")
+    );
+
+    private final ValueJoiner<String, String, String> valueJoiner = new ValueJoiner<String, String, String>() {
+        @Override
+        public String apply(final String value1, final String value2) {
+            return value1 + "-" + value2;
+        }
+    };
+
+    private final TestCondition topicsGotDeleted = new TopicsGotDeletedCondition();
+
+    @BeforeClass
+    public static void setupConfigsAndUtils() throws Exception {
+        PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
+        PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
+        PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer");
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        STREAMS_CONFIG.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        STREAMS_CONFIG.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+
+        zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
+            30000,
+            30000,
+            JaasUtils.isZkSecurityEnabled());
+    }
+
+    @AfterClass
+    public static void release() {
+        if (zkUtils != null) {
+            zkUtils.close();
+        }
+    }
+
+    @Before
+    public void prepareTopology() throws Exception {
+        CLUSTER.createTopic(INPUT_TOPIC_1);
+        CLUSTER.createTopic(INPUT_TOPIC_2);
+        CLUSTER.createTopic(OUTPUT_TOPIC);
+
+        builder = new KStreamBuilder();
+        leftTable = builder.table(INPUT_TOPIC_1, "leftTable");
+        rightTable = builder.table(INPUT_TOPIC_2, "rightTable");
+        leftStream = leftTable.toStream();
+        rightStream = rightTable.toStream();
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        CLUSTER.deleteTopic(INPUT_TOPIC_1);
+        CLUSTER.deleteTopic(INPUT_TOPIC_2);
+        CLUSTER.deleteTopic(OUTPUT_TOPIC);
+
+        TestUtils.waitForCondition(topicsGotDeleted, 120000, "Topics not deleted after 120 seconds.");
+    }
+
+    private void checkResult(final String outputTopic, final List<String> expectedResult) throws Exception {
+        if (expectedResult != null) {
+            final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), Long.MAX_VALUE);
+            assertThat(result, is(expectedResult));
+        }
+    }
+
+    /*
+     * Runs the actual test. Checks the result after each input record to ensure fixed processing order.
+     * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry
+     */
+    private void runTest(final List<List<String>> expectedResult) throws Exception {
+        assert expectedResult.size() == input.size();
+
+        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+        final KafkaStreams streams = new KafkaStreams(builder, STREAMS_CONFIG);
+        try {
+            streams.start();
+
+            long ts = System.currentTimeMillis();
+
+            final Iterator<List<String>> resultIterator = expectedResult.iterator();
+            for (final Input<String> singleInput : input) {
+                IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(singleInput.topic, Collections.singleton(singleInput.record), PRODUCER_CONFIG, ++ts);
+                checkResult(OUTPUT_TOPIC, resultIterator.next());
+            }
+        } finally {
+            streams.close();
+        }
+    }
+
+    @Test
+    public void testInnerKStreamKStream() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KStream");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            null,
+            Collections.singletonList("A-a"),
+            Collections.singletonList("B-a"),
+            Arrays.asList("A-b", "B-b"),
+            null,
+            null,
+            Arrays.asList("C-a", "C-b"),
+            Arrays.asList("A-c", "B-c", "C-c"),
+            null,
+            null,
+            null,
+            Arrays.asList("A-d", "B-d", "C-d"),
+            Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.join(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testLeftKStreamKStream() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KStream");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            Collections.singletonList("A-null"),
+            Collections.singletonList("A-a"),
+            Collections.singletonList("B-a"),
+            Arrays.asList("A-b", "B-b"),
+            null,
+            null,
+            Arrays.asList("C-a", "C-b"),
+            Arrays.asList("A-c", "B-c", "C-c"),
+            null,
+            null,
+            null,
+            Arrays.asList("A-d", "B-d", "C-d"),
+            Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testOuterKStreamKStream() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KStream-KStream");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            Collections.singletonList("A-null"),
+            Collections.singletonList("A-a"),
+            Collections.singletonList("B-a"),
+            Arrays.asList("A-b", "B-b"),
+            null,
+            null,
+            Arrays.asList("C-a", "C-b"),
+            Arrays.asList("A-c", "B-c", "C-c"),
+            null,
+            null,
+            null,
+            Arrays.asList("A-d", "B-d", "C-d"),
+            Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testInnerKStreamKTable() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KTable");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList("B-a"),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList("D-d")
+        );
+
+        leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testLeftKStreamKTable() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KTable");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            Collections.singletonList("A-null"),
+            null,
+            Collections.singletonList("B-a"),
+            null,
+            null,
+            null,
+            Collections.singletonList("C-null"),
+            null,
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList("D-d")
+        );
+
+        leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testInnerKTableKTable() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KTable-KTable");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            null,
+            Collections.singletonList("A-a"),
+            Collections.singletonList("B-a"),
+            Collections.singletonList("B-b"),
+            Collections.singletonList((String) null),
+            null,
+            null,
+            Collections.singletonList("C-c"),
+            Collections.singletonList((String) null),
+            null,
+            null,
+            null,
+            Collections.singletonList("D-d")
+        );
+
+        leftTable.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testLeftKTableKTable() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KTable-KTable");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            Collections.singletonList("A-null"),
+            Collections.singletonList("A-a"),
+            Collections.singletonList("B-a"),
+            Collections.singletonList("B-b"),
+            Collections.singletonList((String) null),
+            null,
+            Collections.singletonList("C-null"),
+            Collections.singletonList("C-c"),
+            Collections.singletonList("C-null"),
+            Collections.singletonList((String) null),
+            null,
+            null,
+            Collections.singletonList("D-d")
+        );
+
+        leftTable.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testOuterKTableKTable() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KTable-KTable");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            Collections.singletonList("A-null"),
+            Collections.singletonList("A-a"),
+            Collections.singletonList("B-a"),
+            Collections.singletonList("B-b"),
+            Collections.singletonList("null-b"),
+            Collections.singletonList((String) null),
+            Collections.singletonList("C-null"),
+            Collections.singletonList("C-c"),
+            Collections.singletonList("C-null"),
+            Collections.singletonList((String) null),
+            null,
+            Collections.singletonList("null-d"),
+            Collections.singletonList("D-d")
+        );
+
+        leftTable.outerJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    private final class TopicsGotDeletedCondition implements TestCondition {
+        @Override
+        public boolean conditionMet() {
+            final Set<String> allTopics = new HashSet<>();
+            allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+            return !allTopics.contains(INPUT_TOPIC_1) && !allTopics.contains(INPUT_TOPIC_2) && !allTopics.contains(OUTPUT_TOPIC);
+        }
+    }
+
+    private final class Input<V> {
+        String topic;
+        KeyValue<Long, V> record;
+
+        private final long anyUniqueKey = 0L;
+
+        Input(final String topic, final V value) {
+            this.topic = topic;
+            record = KeyValue.pair(anyUniqueKey, value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 85e2cf7..2cd3859 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -71,49 +71,49 @@ public class KTableKTableJoinIntegrationTest {
     public static Object[] parameters() {
         return new Object[][]{
             {JoinType.INNER, JoinType.INNER, Arrays.asList(
-                new KeyValue<>("a", null),
-                new KeyValue<>("b", null),
-                new KeyValue<>("c", null),
-                new KeyValue<>("a", null),
-                new KeyValue<>("b", null),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", null))
-            },
+//                new KeyValue<>("a", null),
+//                new KeyValue<>("b", null),
+//                new KeyValue<>("c", null),
+//                new KeyValue<>("a", null),
+//                new KeyValue<>("b", null),
+                new KeyValue<>("b", "B1-B2-B3")//,
+//                new KeyValue<>("c", null)
+            )},
             {JoinType.INNER, JoinType.LEFT, Arrays.asList(
-                new KeyValue<>("a", null),
-                new KeyValue<>("b", null),
-                new KeyValue<>("c", null),
-                new KeyValue<>("a", null),
-                new KeyValue<>("b", null),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", null)
+//                new KeyValue<>("a", null),
+//                new KeyValue<>("b", null),
+//                new KeyValue<>("c", null),
+//                new KeyValue<>("a", null),
+//                new KeyValue<>("b", null),
+                new KeyValue<>("b", "B1-B2-B3")//,
+//                new KeyValue<>("c", null)
             )},
             {JoinType.INNER, JoinType.OUTER, Arrays.asList(
                 new KeyValue<>("a", "null-A3"),
                 new KeyValue<>("b", "null-B3"),
                 new KeyValue<>("c", "null-C3"),
-                new KeyValue<>("a", "null-A3"),
-                new KeyValue<>("b", "null-B3"),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C3")
+//                new KeyValue<>("a", "null-A3"),
+//                new KeyValue<>("b", "null-B3"),
+                new KeyValue<>("b", "B1-B2-B3")//,
+//                new KeyValue<>("c", "null-C3")
             )},
             {JoinType.LEFT, JoinType.INNER, Arrays.asList(
-                new KeyValue<>("a", null),
-                new KeyValue<>("b", null),
-                new KeyValue<>("c", null),
+//                new KeyValue<>("a", null),
+//                new KeyValue<>("b", null),
+//                new KeyValue<>("c", null),
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", null)
+                new KeyValue<>("b", "B1-B2-B3")//,
+//                new KeyValue<>("c", null)
             )},
             {JoinType.LEFT, JoinType.LEFT, Arrays.asList(
-                new KeyValue<>("a", null),
-                new KeyValue<>("b", null),
-                new KeyValue<>("c", null),
+//                new KeyValue<>("a", null),
+//                new KeyValue<>("b", null),
+//                new KeyValue<>("c", null),
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", null)
+                new KeyValue<>("b", "B1-B2-B3")//,
+//                new KeyValue<>("c", null)
             )},
             {JoinType.LEFT, JoinType.OUTER, Arrays.asList(
                 new KeyValue<>("a", "null-A3"),
@@ -121,22 +121,22 @@ public class KTableKTableJoinIntegrationTest {
                 new KeyValue<>("c", "null-C3"),
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C3")
+                new KeyValue<>("b", "B1-B2-B3")//,
+//                new KeyValue<>("c", "null-C3")
             )},
             {JoinType.OUTER, JoinType.INNER, Arrays.asList(
-                new KeyValue<>("a", null),
-                new KeyValue<>("b", null),
-                new KeyValue<>("c", null),
+//                new KeyValue<>("a", null),
+//                new KeyValue<>("b", null),
+//                new KeyValue<>("c", null),
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
                 new KeyValue<>("b", "B1-B2-B3"),
                 new KeyValue<>("c", "null-C2-C3")
             )},
             {JoinType.OUTER, JoinType.LEFT, Arrays.asList(
-                new KeyValue<>("a", null),
-                new KeyValue<>("b", null),
-                new KeyValue<>("c", null),
+//                new KeyValue<>("a", null),
+//                new KeyValue<>("b", null),
+//                new KeyValue<>("c", null),
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
                 new KeyValue<>("b", "B1-B2-B3"),