You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2018/01/31 10:21:34 UTC

[kafka] branch 1.0 updated: KAFKA-6378 KStream-GlobalKTable null KeyValueMapper handling

This is an automated email from the ASF dual-hosted git repository.

damianguy pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new ab3e4a2  KAFKA-6378 KStream-GlobalKTable null KeyValueMapper handling
ab3e4a2 is described below

commit ab3e4a27671df2499f8dea34a84fc0740102269c
Author: Andy Bryant <an...@gmail.com>
AuthorDate: Wed Jan 31 10:20:12 2018 +0000

    KAFKA-6378 KStream-GlobalKTable null KeyValueMapper handling
    
    For KStream-GlobalKTable joins let `null` `KeyValueMapper` results indicate no match
    
    For KStream-GlobalKTable joins, a `KeyValueMapper` is used to derive a key from the stream records into the `GlobalKTable`. For some stream values there may be no valid reference to the table stream. This patch allows developers to use `null` return values to indicate there is no possible match. This is possible in this case since `null` is never a valid key value for a `GlobalKTable`.
    Without this patch, providing a `null` value caused the stream to crash on Kafka 1.0.
    
    I added unit tests for KStream-GlobalKTable left and inner joins, since they were missing. I also covered this additional scenario where `KeyValueMapper` returns `null` to insure it is handled correctly.
    
    Author: Andy Bryant <an...@gmail.com>
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@gmail.com>
    
    Closes #4424 from andybryant/KAFKA-6378-null-handling-stream-globaltable-join
---
 .../org/apache/kafka/streams/kstream/KStream.java  |  12 +-
 .../internals/KStreamKTableJoinProcessor.java      |   5 +-
 .../internals/KStreamGlobalKTableJoinTest.java     | 211 +++++++++++++++++++++
 .../internals/KStreamGlobalKTableLeftJoinTest.java | 211 +++++++++++++++++++++
 .../kstream/internals/KStreamKTableJoinTest.java   | 132 +++++++++----
 .../internals/KStreamKTableLeftJoinTest.java       | 139 +++++++++-----
 6 files changed, 617 insertions(+), 93 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 0d1d201..6973719 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -2474,8 +2474,10 @@ public interface KStream<K, V> {
      * For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
      * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
      * The key of the result record is the same as the key of this {@code KStream}.
-     * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join
+     * If a {@code KStream} input record key or value is {@code null} the record will not be included in the join
      * operation and thus no output record will be added to the resulting {@code KStream}.
+     * If {@code keyValueMapper} returns {@code null} implying no match exists, no output record will be added to the
+     * resulting {@code KStream}.
      *
      * @param globalKTable   the {@link GlobalKTable} to be joined with this stream
      * @param keyValueMapper instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
@@ -2506,11 +2508,13 @@ public interface KStream<K, V> {
      * <p>
      * For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the
      * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
-     * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
-     * {@link ValueJoiner}.
      * The key of the result record is the same as this {@code KStream}.
-     * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join
+     * If a {@code KStream} input record key or value is {@code null} the record will not be included in the join
      * operation and thus no output record will be added to the resulting {@code KStream}.
+     * If {@code keyValueMapper} returns {@code null} implying no match exists, a {@code null} value will be
+     * provided to {@link ValueJoiner}.
+     * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
+     * {@link ValueJoiner}.
      *
      * @param globalKTable   the {@link GlobalKTable} to be joined with this stream
      * @param keyValueMapper instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index bac930d..8e8b86a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -47,13 +47,16 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
     @Override
     public void process(final K1 key, final V1 value) {
         // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
+        // If {@code keyMapper} returns {@code null} it implies there is no match,
+        // so ignore unless it is a left join
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
         if (key != null && value != null) {
-            final V2 value2 = valueGetter.get(keyMapper.apply(key, value));
+            final K2 mappedKey = keyMapper.apply(key, value);
+            final V2 value2 = mappedKey == null ? null : valueGetter.get(mappedKey);
             if (leftJoin || value2 != null) {
                 context().forward(key, joiner.apply(value, value2));
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
new file mode 100644
index 0000000..d1d048b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamGlobalKTableJoinTest {
+
+    final private String streamTopic = "streamTopic";
+    final private String globalTableTopic = "globalTableTopic";
+
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private File stateDir = null;
+    private MockProcessorSupplier<Integer, String> processor;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+    private StreamsBuilder builder;
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+
+        builder = new StreamsBuilder();
+        final KStream<Integer, String> stream;
+        final GlobalKTable<String, String> table; // value of stream optionally contains key of table
+        final KeyValueMapper<Integer, String, String> keyMapper;
+
+        processor = new MockProcessorSupplier<>();
+        final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
+        final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
+        stream = builder.stream(streamTopic, streamConsumed);
+        table = builder.globalTable(globalTableTopic, tableConsumed);
+        keyMapper = new KeyValueMapper<Integer, String, String>() {
+            @Override
+            public String apply(final Integer key, final String value) {
+                final String[] tokens = value.split(",");
+                // Value is comma delimited. If second token is present, it's the key to the global ktable.
+                // If not present, use null to indicate no match
+                return tokens.length > 1 ? tokens[1] : null;
+            }
+        };
+        stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor);
+
+        driver.setUp(builder, stateDir);
+        driver.setTime(0L);
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
+        for (int i = 0; i < messageCount; i++) {
+            String value = valuePrefix + expectedKeys[i];
+            if (includeForeignKey) {
+                value = value + ",FKey" + expectedKeys[i];
+            }
+            driver.process(streamTopic, expectedKeys[i], value);
+        }
+    }
+
+    private void pushToGlobalTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            driver.process(globalTableTopic, "FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]);
+        }
+    }
+
+    private void pushNullValueToGlobalTable(final int messageCount) {
+        for (int i = 0; i < messageCount; i++) {
+            driver.process(globalTableTopic, "FKey" + expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldNotRequireCopartitioning() {
+        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+
+        assertEquals("KStream-GlobalKTable joins do not need to be co-partitioned", 0, copartitionGroups.size());
+    }
+
+    @Test
+    public void shouldNotJoinWithEmptyGlobalTableOnStreamUpdates() {
+
+        // push two items to the primary stream. the globalTable is empty
+
+        pushToStream(2, "X", true);
+        processor.checkAndClearProcessResult();
+    }
+
+    @Test
+    public void shouldNotJoinOnGlobalTableUpdates() {
+
+        // push two items to the primary stream. the globalTable is empty
+
+        pushToStream(2, "X", true);
+        processor.checkAndClearProcessResult();
+
+        // push two items to the globalTable. this should not produce any item.
+
+        pushToGlobalTable(2, "Y");
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce two items.
+
+        pushToStream(4, "X", true);
+        processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1");
+
+        // push all items to the globalTable. this should not produce any item
+
+        pushToGlobalTable(4, "YY");
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce four items.
+
+        pushToStream(4, "X", true);
+        processor.checkAndClearProcessResult("0:X0,FKey0+YY0", "1:X1,FKey1+YY1", "2:X2,FKey2+YY2", "3:X3,FKey3+YY3");
+
+        // push all items to the globalTable. this should not produce any item
+
+        pushToGlobalTable(4, "YYY");
+        processor.checkAndClearProcessResult();
+    }
+
+    @Test
+    public void shouldJoinOnlyIfMatchFoundOnStreamUpdates() {
+
+        // push two items to the globalTable. this should not produce any item.
+
+        pushToGlobalTable(2, "Y");
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce two items.
+
+        pushToStream(4, "X", true);
+        processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1");
+
+    }
+
+    @Test
+    public void shouldClearGlobalTableEntryOnNullValueUpdates() {
+
+        // push all four items to the globalTable. this should not produce any item.
+
+        pushToGlobalTable(4, "Y");
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce four items.
+
+        pushToStream(4, "X", true);
+        processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1", "2:X2,FKey2+Y2", "3:X3,FKey3+Y3");
+
+        // push two items with null to the globalTable as deletes. this should not produce any item.
+
+        pushNullValueToGlobalTable(2);
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce two items.
+
+        pushToStream(4, "XX", true);
+        processor.checkAndClearProcessResult("2:XX2,FKey2+Y2", "3:XX3,FKey3+Y3");
+    }
+
+    @Test
+    public void shouldNotJoinOnNullKeyMapperValues() {
+
+        // push all items to the globalTable. this should not produce any item
+
+        pushToGlobalTable(4, "Y");
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream with no foreign key, resulting in null keyMapper values.
+        // this should not produce any item.
+
+        pushToStream(4, "XXX", false);
+        processor.checkAndClearProcessResult();
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
new file mode 100644
index 0000000..8b7dd42
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamGlobalKTableLeftJoinTest {
+
+    final private String streamTopic = "streamTopic";
+    final private String globalTableTopic = "globalTableTopic";
+
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private File stateDir = null;
+    private MockProcessorSupplier<Integer, String> processor;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+    private StreamsBuilder builder;
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+
+        builder = new StreamsBuilder();
+        final KStream<Integer, String> stream;
+        final GlobalKTable<String, String> table; // value of stream optionally contains key of table
+        final KeyValueMapper<Integer, String, String> keyMapper;
+
+        processor = new MockProcessorSupplier<>();
+        final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
+        final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
+        stream = builder.stream(streamTopic, streamConsumed);
+        table = builder.globalTable(globalTableTopic, tableConsumed);
+        keyMapper = new KeyValueMapper<Integer, String, String>() {
+            @Override
+            public String apply(final Integer key, final String value) {
+                final String[] tokens = value.split(",");
+                // Value is comma delimited. If second token is present, it's the key to the global ktable.
+                // If not present, use null to indicate no match
+                return tokens.length > 1 ? tokens[1] : null;
+            }
+        };
+        stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor);
+
+        driver.setUp(builder, stateDir);
+        driver.setTime(0L);
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
+        for (int i = 0; i < messageCount; i++) {
+            String value = valuePrefix + expectedKeys[i];
+            if (includeForeignKey) {
+                value = value + ",FKey" + expectedKeys[i];
+            }
+            driver.process(streamTopic, expectedKeys[i], value);
+        }
+    }
+
+    private void pushToGlobalTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            driver.process(globalTableTopic, "FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]);
+        }
+    }
+
+    private void pushNullValueToGlobalTable(final int messageCount) {
+        for (int i = 0; i < messageCount; i++) {
+            driver.process(globalTableTopic, "FKey" + expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldNotRequireCopartitioning() {
+        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+
+        assertEquals("KStream-GlobalKTable joins do not need to be co-partitioned", 0, copartitionGroups.size());
+    }
+
+    @Test
+    public void shouldNotJoinWithEmptyGlobalTableOnStreamUpdates() {
+
+        // push two items to the primary stream. the globalTable is empty
+
+        pushToStream(2, "X", true);
+        processor.checkAndClearProcessResult("0:X0,FKey0+null", "1:X1,FKey1+null");
+    }
+
+    @Test
+    public void shouldNotJoinOnGlobalTableUpdates() {
+
+        // push two items to the primary stream. the globalTable is empty
+
+        pushToStream(2, "X", true);
+        processor.checkAndClearProcessResult("0:X0,FKey0+null", "1:X1,FKey1+null");
+
+        // push two items to the globalTable. this should not produce any item.
+
+        pushToGlobalTable(2, "Y");
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce four items.
+
+        pushToStream(4, "X", true);
+        processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1", "2:X2,FKey2+null", "3:X3,FKey3+null");
+
+        // push all items to the globalTable. this should not produce any item
+
+        pushToGlobalTable(4, "YY");
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce four items.
+
+        pushToStream(4, "X", true);
+        processor.checkAndClearProcessResult("0:X0,FKey0+YY0", "1:X1,FKey1+YY1", "2:X2,FKey2+YY2", "3:X3,FKey3+YY3");
+
+        // push all items to the globalTable. this should not produce any item
+
+        pushToGlobalTable(4, "YYY");
+        processor.checkAndClearProcessResult();
+    }
+
+    @Test
+    public void shouldJoinRegardlessIfMatchFoundOnStreamUpdates() {
+
+        // push two items to the globalTable. this should not produce any item.
+
+        pushToGlobalTable(2, "Y");
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce four items.
+
+        pushToStream(4, "X", true);
+        processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1", "2:X2,FKey2+null", "3:X3,FKey3+null");
+
+    }
+
+    @Test
+    public void shouldClearGlobalTableEntryOnNullValueUpdates() {
+
+        // push all four items to the globalTable. this should not produce any item.
+
+        pushToGlobalTable(4, "Y");
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce four items.
+
+        pushToStream(4, "X", true);
+        processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1", "2:X2,FKey2+Y2", "3:X3,FKey3+Y3");
+
+        // push two items with null to the globalTable as deletes. this should not produce any item.
+
+        pushNullValueToGlobalTable(2);
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce four items.
+
+        pushToStream(4, "XX", true);
+        processor.checkAndClearProcessResult("0:XX0,FKey0+null", "1:XX1,FKey1+null", "2:XX2,FKey2+Y2", "3:XX3,FKey3+Y3");
+    }
+
+    @Test
+    public void shouldJoinOnNullKeyMapperValues() {
+
+        // push all items to the globalTable. this should not produce any item
+
+        pushToGlobalTable(4, "Y");
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream with no foreign key, resulting in null keyMapper values.
+        // this should produce four items.
+
+        pushToStream(4, "XXX", false);
+        processor.checkAndClearProcessResult("0:XXX0+null", "1:XXX1+null", "2:XXX2+null", "3:XXX3+null");
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index d1226c2..a23e787 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -42,97 +42,145 @@ import static org.junit.Assert.assertEquals;
 
 public class KStreamKTableJoinTest {
 
-    final private String topic1 = "topic1";
-    final private String topic2 = "topic2";
+    final private String streamTopic = "streamTopic";
+    final private String tableTopic = "tableTopic";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
+    private MockProcessorSupplier<Integer, String> processor;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+    private StreamsBuilder builder;
 
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
-    }
 
-    @Test
-    public void testJoin() {
-        final StreamsBuilder builder = new StreamsBuilder();
+        builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
         final KStream<Integer, String> stream;
         final KTable<Integer, String> table;
-        final MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
         final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
-        stream = builder.stream(topic1, consumed);
-        table = builder.table(topic2, consumed);
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed);
         stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
+        driver.setUp(builder, stateDir);
+        driver.setTime(0L);
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            driver.process(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            driver.process(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+        }
+    }
+
+    private void pushNullValueToTable(final int messageCount) {
+        for (int i = 0; i < messageCount; i++) {
+            driver.process(tableTopic, expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldRequireCopartitionedStreams() {
+
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
-        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
-
-        driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
-        driver.setTime(0L);
+        assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), copartitionGroups.iterator().next());
+    }
 
-        // push two items to the primary stream. the other table is empty
+    @Test
+    public void shouldNotJoinWithEmptyTableOnStreamUpdates() {
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
+        // push two items to the primary stream. the table is empty
 
+        pushToStream(2, "X");
         processor.checkAndClearProcessResult();
+    }
 
-        // push two items to the other stream. this should not produce any item.
+    @Test
+    public void shouldNotJoinOnTableUpdates() {
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
+        // push two items to the primary stream. the table is empty
 
+        pushToStream(2, "X");
         processor.checkAndClearProcessResult();
 
-        // push all four items to the primary stream. this should produce two items.
+        // push two items to the table. this should not produce any item.
 
-        for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
+        pushToTable(2, "Y");
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce two items.
 
+        pushToStream(4, "X");
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-        // push all items to the other stream. this should not produce any item
-        for (int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
-        }
+        // push all items to the table. this should not produce any item
 
+        pushToTable(4, "YY");
         processor.checkAndClearProcessResult();
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-
+        pushToStream(4, "X");
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        // push two items with null to the other stream as deletes. this should not produce any item.
+        // push all items to the table. this should not produce any item
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], null);
-        }
+        pushToTable(4, "YYY");
+        processor.checkAndClearProcessResult();
+    }
 
+    @Test
+    public void shouldJoinOnlyIfMatchFoundOnStreamUpdates() {
+
+        // push two items to the table. this should not produce any item.
+
+        pushToTable(2, "Y");
         processor.checkAndClearProcessResult();
 
         // push all four items to the primary stream. this should produce two items.
 
-        for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
-        }
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+
+    }
 
-        processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3");
+    @Test
+    public void shouldClearTableEntryOnNullValueUpdates() {
+
+        // push all four items to the table. this should not produce any item.
+
+        pushToTable(4, "Y");
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce four items.
+
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+Y2", "3:X3+Y3");
+
+        // push two items with null to the table as deletes. this should not produce any item.
+
+        pushNullValueToTable(2);
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce two items.
+
+        pushToStream(4, "XX");
+        processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
     }
+
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index ed835a8..7507d7a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -42,98 +42,145 @@ import static org.junit.Assert.assertEquals;
 
 public class KStreamKTableLeftJoinTest {
 
-    final private String topic1 = "topic1";
-    final private String topic2 = "topic2";
+    final private String streamTopic = "streamTopic";
+    final private String tableTopic = "tableTopic";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
-
+    private MockProcessorSupplier<Integer, String> processor;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+    private StreamsBuilder builder;
 
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
-    }
 
-    @Test
-    public void testJoin() {
-        StreamsBuilder builder = new StreamsBuilder();
+        builder = new StreamsBuilder();
 
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        KStream<Integer, String> stream;
-        KTable<Integer, String> table;
-        MockProcessorSupplier<Integer, String> processor;
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
 
         processor = new MockProcessorSupplier<>();
-        Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
-        stream = builder.stream(topic1, consumed);
-        table = builder.table(topic2, consumed);
+        final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed);
         stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
-        Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
-
-        assertEquals(1, copartitionGroups.size());
-        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
-
         driver.setUp(builder, stateDir);
         driver.setTime(0L);
+    }
 
-        // push two items to the primary stream. the other table is empty
+    private void pushToStream(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            driver.process(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            driver.process(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+        }
+    }
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+    private void pushNullValueToTable(final int messageCount) {
+        for (int i = 0; i < messageCount; i++) {
+            driver.process(tableTopic, expectedKeys[i], null);
         }
+    }
+
+    @Test
+    public void shouldRequireCopartitionedStreams() {
 
+        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), copartitionGroups.iterator().next());
+    }
+
+    @Test
+    public void shouldJoinWithEmptyTableOnStreamUpdates() {
+
+        // push two items to the primary stream. the table is empty
+
+        pushToStream(2, "X");
         processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+    }
 
-        // push two items to the other stream. this should not produce any item.
+    @Test
+    public void shouldNotJoinOnTableUpdates() {
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
+        // push two items to the primary stream. the table is empty
 
+        pushToStream(2, "X");
+        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+
+        // push two items to the table. this should not produce any item.
+
+        pushToTable(2, "Y");
         processor.checkAndClearProcessResult();
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-
+        pushToStream(4, "X");
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
 
-        // push all items to the other stream. this should not produce any item
-        for (int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
-        }
+        // push all items to the table. this should not produce any item
 
+        pushToTable(4, "YY");
         processor.checkAndClearProcessResult();
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-
+        pushToStream(4, "X");
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        // push two items with null to the other stream as deletes. this should not produce any item.
+        // push all items to the table. this should not produce any item
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], null);
-        }
+        pushToTable(4, "YYY");
+        processor.checkAndClearProcessResult();
+    }
 
+    @Test
+    public void shouldJoinRegardlessIfMatchFoundOnStreamUpdates() {
+
+        // push two items to the table. this should not produce any item.
+
+        pushToTable(2, "Y");
         processor.checkAndClearProcessResult();
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
-        }
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+
+    }
 
-        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+    @Test
+    public void shouldClearTableEntryOnNullValueUpdates() {
+
+        // push all four items to the table. this should not produce any item.
+
+        pushToTable(4, "Y");
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce four items.
+
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+Y2", "3:X3+Y3");
+
+        // push two items with null to the table as deletes. this should not produce any item.
+
+        pushNullValueToTable(2);
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce four items.
+
+        pushToStream(4, "XX");
+        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
     }
+
 }

-- 
To stop receiving notification emails like this one, please contact
damianguy@apache.org.