You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2018/01/31 10:21:34 UTC
[kafka] branch 1.0 updated: KAFKA-6378 KStream-GlobalKTable null
KeyValueMapper handling
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new ab3e4a2 KAFKA-6378 KStream-GlobalKTable null KeyValueMapper handling
ab3e4a2 is described below
commit ab3e4a27671df2499f8dea34a84fc0740102269c
Author: Andy Bryant <an...@gmail.com>
AuthorDate: Wed Jan 31 10:20:12 2018 +0000
KAFKA-6378 KStream-GlobalKTable null KeyValueMapper handling
For KStream-GlobalKTable joins let `null` `KeyValueMapper` results indicate no match
For KStream-GlobalKTable joins, a `KeyValueMapper` is used to derive a key from the stream records into the `GlobalKTable`. For some stream values there may be no valid reference to the table stream. This patch allows developers to use `null` return values to indicate there is no possible match. This is possible in this case since `null` is never a valid key value for a `GlobalKTable`.
Without this patch, providing a `null` value caused the stream to crash on Kafka 1.0.
I added unit tests for KStream-GlobalKTable left and inner joins, since they were missing. I also covered this additional scenario where `KeyValueMapper` returns `null` to insure it is handled correctly.
Author: Andy Bryant <an...@gmail.com>
Reviewers: Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@gmail.com>
Closes #4424 from andybryant/KAFKA-6378-null-handling-stream-globaltable-join
---
.../org/apache/kafka/streams/kstream/KStream.java | 12 +-
.../internals/KStreamKTableJoinProcessor.java | 5 +-
.../internals/KStreamGlobalKTableJoinTest.java | 211 +++++++++++++++++++++
.../internals/KStreamGlobalKTableLeftJoinTest.java | 211 +++++++++++++++++++++
.../kstream/internals/KStreamKTableJoinTest.java | 132 +++++++++----
.../internals/KStreamKTableLeftJoinTest.java | 139 +++++++++-----
6 files changed, 617 insertions(+), 93 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 0d1d201..6973719 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -2474,8 +2474,10 @@ public interface KStream<K, V> {
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as the key of this {@code KStream}.
- * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join
+ * If a {@code KStream} input record key or value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}.
+ * If {@code keyValueMapper} returns {@code null} implying no match exists, no output record will be added to the
+ * resulting {@code KStream}.
*
* @param globalKTable the {@link GlobalKTable} to be joined with this stream
* @param keyValueMapper instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
@@ -2506,11 +2508,13 @@ public interface KStream<K, V> {
* <p>
* For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
- * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
- * {@link ValueJoiner}.
* The key of the result record is the same as this {@code KStream}.
- * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join
+ * If a {@code KStream} input record key or value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}.
+ * If {@code keyValueMapper} returns {@code null} implying no match exists, a {@code null} value will be
+ * provided to {@link ValueJoiner}.
+ * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
+ * {@link ValueJoiner}.
*
* @param globalKTable the {@link GlobalKTable} to be joined with this stream
* @param keyValueMapper instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index bac930d..8e8b86a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -47,13 +47,16 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
@Override
public void process(final K1 key, final V1 value) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
+ // If {@code keyMapper} returns {@code null} it implies there is no match,
+ // so ignore unless it is a left join
//
// we also ignore the record if value is null, because in a key-value data model a null-value indicates
// an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
// furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
if (key != null && value != null) {
- final V2 value2 = valueGetter.get(keyMapper.apply(key, value));
+ final K2 mappedKey = keyMapper.apply(key, value);
+ final V2 value2 = mappedKey == null ? null : valueGetter.get(mappedKey);
if (leftJoin || value2 != null) {
context().forward(key, joiner.apply(value, value2));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
new file mode 100644
index 0000000..d1d048b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamGlobalKTableJoinTest {
+
+ final private String streamTopic = "streamTopic";
+ final private String globalTableTopic = "globalTableTopic";
+
+ final private Serde<Integer> intSerde = Serdes.Integer();
+ final private Serde<String> stringSerde = Serdes.String();
+ @Rule
+ public final KStreamTestDriver driver = new KStreamTestDriver();
+ private File stateDir = null;
+ private MockProcessorSupplier<Integer, String> processor;
+ private final int[] expectedKeys = {0, 1, 2, 3};
+ private StreamsBuilder builder;
+
+ @Before
+ public void setUp() throws IOException {
+ stateDir = TestUtils.tempDirectory("kafka-test");
+
+ builder = new StreamsBuilder();
+ final KStream<Integer, String> stream;
+ final GlobalKTable<String, String> table; // value of stream optionally contains key of table
+ final KeyValueMapper<Integer, String, String> keyMapper;
+
+ processor = new MockProcessorSupplier<>();
+ final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
+ final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
+ stream = builder.stream(streamTopic, streamConsumed);
+ table = builder.globalTable(globalTableTopic, tableConsumed);
+ keyMapper = new KeyValueMapper<Integer, String, String>() {
+ @Override
+ public String apply(final Integer key, final String value) {
+ final String[] tokens = value.split(",");
+ // Value is comma delimited. If second token is present, it's the key to the global ktable.
+ // If not present, use null to indicate no match
+ return tokens.length > 1 ? tokens[1] : null;
+ }
+ };
+ stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor);
+
+ driver.setUp(builder, stateDir);
+ driver.setTime(0L);
+ }
+
+ private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
+ for (int i = 0; i < messageCount; i++) {
+ String value = valuePrefix + expectedKeys[i];
+ if (includeForeignKey) {
+ value = value + ",FKey" + expectedKeys[i];
+ }
+ driver.process(streamTopic, expectedKeys[i], value);
+ }
+ }
+
+ private void pushToGlobalTable(final int messageCount, final String valuePrefix) {
+ for (int i = 0; i < messageCount; i++) {
+ driver.process(globalTableTopic, "FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]);
+ }
+ }
+
+ private void pushNullValueToGlobalTable(final int messageCount) {
+ for (int i = 0; i < messageCount; i++) {
+ driver.process(globalTableTopic, "FKey" + expectedKeys[i], null);
+ }
+ }
+
+ @Test
+ public void shouldNotRequireCopartitioning() {
+ final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+
+ assertEquals("KStream-GlobalKTable joins do not need to be co-partitioned", 0, copartitionGroups.size());
+ }
+
+ @Test
+ public void shouldNotJoinWithEmptyGlobalTableOnStreamUpdates() {
+
+ // push two items to the primary stream. the globalTable is empty
+
+ pushToStream(2, "X", true);
+ processor.checkAndClearProcessResult();
+ }
+
+ @Test
+ public void shouldNotJoinOnGlobalTableUpdates() {
+
+ // push two items to the primary stream. the globalTable is empty
+
+ pushToStream(2, "X", true);
+ processor.checkAndClearProcessResult();
+
+ // push two items to the globalTable. this should not produce any item.
+
+ pushToGlobalTable(2, "Y");
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce two items.
+
+ pushToStream(4, "X", true);
+ processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1");
+
+ // push all items to the globalTable. this should not produce any item
+
+ pushToGlobalTable(4, "YY");
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce four items.
+
+ pushToStream(4, "X", true);
+ processor.checkAndClearProcessResult("0:X0,FKey0+YY0", "1:X1,FKey1+YY1", "2:X2,FKey2+YY2", "3:X3,FKey3+YY3");
+
+ // push all items to the globalTable. this should not produce any item
+
+ pushToGlobalTable(4, "YYY");
+ processor.checkAndClearProcessResult();
+ }
+
+ @Test
+ public void shouldJoinOnlyIfMatchFoundOnStreamUpdates() {
+
+ // push two items to the globalTable. this should not produce any item.
+
+ pushToGlobalTable(2, "Y");
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce two items.
+
+ pushToStream(4, "X", true);
+ processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1");
+
+ }
+
+ @Test
+ public void shouldClearGlobalTableEntryOnNullValueUpdates() {
+
+ // push all four items to the globalTable. this should not produce any item.
+
+ pushToGlobalTable(4, "Y");
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce four items.
+
+ pushToStream(4, "X", true);
+ processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1", "2:X2,FKey2+Y2", "3:X3,FKey3+Y3");
+
+ // push two items with null to the globalTable as deletes. this should not produce any item.
+
+ pushNullValueToGlobalTable(2);
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce two items.
+
+ pushToStream(4, "XX", true);
+ processor.checkAndClearProcessResult("2:XX2,FKey2+Y2", "3:XX3,FKey3+Y3");
+ }
+
+ @Test
+ public void shouldNotJoinOnNullKeyMapperValues() {
+
+ // push all items to the globalTable. this should not produce any item
+
+ pushToGlobalTable(4, "Y");
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream with no foreign key, resulting in null keyMapper values.
+ // this should not produce any item.
+
+ pushToStream(4, "XXX", false);
+ processor.checkAndClearProcessResult();
+ }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
new file mode 100644
index 0000000..8b7dd42
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamGlobalKTableLeftJoinTest {
+
+ final private String streamTopic = "streamTopic";
+ final private String globalTableTopic = "globalTableTopic";
+
+ final private Serde<Integer> intSerde = Serdes.Integer();
+ final private Serde<String> stringSerde = Serdes.String();
+ @Rule
+ public final KStreamTestDriver driver = new KStreamTestDriver();
+ private File stateDir = null;
+ private MockProcessorSupplier<Integer, String> processor;
+ private final int[] expectedKeys = {0, 1, 2, 3};
+ private StreamsBuilder builder;
+
+ @Before
+ public void setUp() throws IOException {
+ stateDir = TestUtils.tempDirectory("kafka-test");
+
+ builder = new StreamsBuilder();
+ final KStream<Integer, String> stream;
+ final GlobalKTable<String, String> table; // value of stream optionally contains key of table
+ final KeyValueMapper<Integer, String, String> keyMapper;
+
+ processor = new MockProcessorSupplier<>();
+ final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
+ final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
+ stream = builder.stream(streamTopic, streamConsumed);
+ table = builder.globalTable(globalTableTopic, tableConsumed);
+ keyMapper = new KeyValueMapper<Integer, String, String>() {
+ @Override
+ public String apply(final Integer key, final String value) {
+ final String[] tokens = value.split(",");
+ // Value is comma delimited. If second token is present, it's the key to the global ktable.
+ // If not present, use null to indicate no match
+ return tokens.length > 1 ? tokens[1] : null;
+ }
+ };
+ stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor);
+
+ driver.setUp(builder, stateDir);
+ driver.setTime(0L);
+ }
+
+ private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
+ for (int i = 0; i < messageCount; i++) {
+ String value = valuePrefix + expectedKeys[i];
+ if (includeForeignKey) {
+ value = value + ",FKey" + expectedKeys[i];
+ }
+ driver.process(streamTopic, expectedKeys[i], value);
+ }
+ }
+
+ private void pushToGlobalTable(final int messageCount, final String valuePrefix) {
+ for (int i = 0; i < messageCount; i++) {
+ driver.process(globalTableTopic, "FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]);
+ }
+ }
+
+ private void pushNullValueToGlobalTable(final int messageCount) {
+ for (int i = 0; i < messageCount; i++) {
+ driver.process(globalTableTopic, "FKey" + expectedKeys[i], null);
+ }
+ }
+
+ @Test
+ public void shouldNotRequireCopartitioning() {
+ final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+
+ assertEquals("KStream-GlobalKTable joins do not need to be co-partitioned", 0, copartitionGroups.size());
+ }
+
+ @Test
+ public void shouldNotJoinWithEmptyGlobalTableOnStreamUpdates() {
+
+ // push two items to the primary stream. the globalTable is empty
+
+ pushToStream(2, "X", true);
+ processor.checkAndClearProcessResult("0:X0,FKey0+null", "1:X1,FKey1+null");
+ }
+
+ @Test
+ public void shouldNotJoinOnGlobalTableUpdates() {
+
+ // push two items to the primary stream. the globalTable is empty
+
+ pushToStream(2, "X", true);
+ processor.checkAndClearProcessResult("0:X0,FKey0+null", "1:X1,FKey1+null");
+
+ // push two items to the globalTable. this should not produce any item.
+
+ pushToGlobalTable(2, "Y");
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce four items.
+
+ pushToStream(4, "X", true);
+ processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1", "2:X2,FKey2+null", "3:X3,FKey3+null");
+
+ // push all items to the globalTable. this should not produce any item
+
+ pushToGlobalTable(4, "YY");
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce four items.
+
+ pushToStream(4, "X", true);
+ processor.checkAndClearProcessResult("0:X0,FKey0+YY0", "1:X1,FKey1+YY1", "2:X2,FKey2+YY2", "3:X3,FKey3+YY3");
+
+ // push all items to the globalTable. this should not produce any item
+
+ pushToGlobalTable(4, "YYY");
+ processor.checkAndClearProcessResult();
+ }
+
+ @Test
+ public void shouldJoinRegardlessIfMatchFoundOnStreamUpdates() {
+
+ // push two items to the globalTable. this should not produce any item.
+
+ pushToGlobalTable(2, "Y");
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce four items.
+
+ pushToStream(4, "X", true);
+ processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1", "2:X2,FKey2+null", "3:X3,FKey3+null");
+
+ }
+
+ @Test
+ public void shouldClearGlobalTableEntryOnNullValueUpdates() {
+
+ // push all four items to the globalTable. this should not produce any item.
+
+ pushToGlobalTable(4, "Y");
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce four items.
+
+ pushToStream(4, "X", true);
+ processor.checkAndClearProcessResult("0:X0,FKey0+Y0", "1:X1,FKey1+Y1", "2:X2,FKey2+Y2", "3:X3,FKey3+Y3");
+
+ // push two items with null to the globalTable as deletes. this should not produce any item.
+
+ pushNullValueToGlobalTable(2);
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce four items.
+
+ pushToStream(4, "XX", true);
+ processor.checkAndClearProcessResult("0:XX0,FKey0+null", "1:XX1,FKey1+null", "2:XX2,FKey2+Y2", "3:XX3,FKey3+Y3");
+ }
+
+ @Test
+ public void shouldJoinOnNullKeyMapperValues() {
+
+ // push all items to the globalTable. this should not produce any item
+
+ pushToGlobalTable(4, "Y");
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream with no foreign key, resulting in null keyMapper values.
+ // this should produce four items.
+
+ pushToStream(4, "XXX", false);
+ processor.checkAndClearProcessResult("0:XXX0+null", "1:XXX1+null", "2:XXX2+null", "3:XXX3+null");
+ }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index d1226c2..a23e787 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -42,97 +42,145 @@ import static org.junit.Assert.assertEquals;
public class KStreamKTableJoinTest {
- final private String topic1 = "topic1";
- final private String topic2 = "topic2";
+ final private String streamTopic = "streamTopic";
+ final private String tableTopic = "tableTopic";
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
private File stateDir = null;
+ private MockProcessorSupplier<Integer, String> processor;
+ private final int[] expectedKeys = {0, 1, 2, 3};
+ private StreamsBuilder builder;
@Before
public void setUp() throws IOException {
stateDir = TestUtils.tempDirectory("kafka-test");
- }
- @Test
- public void testJoin() {
- final StreamsBuilder builder = new StreamsBuilder();
+ builder = new StreamsBuilder();
- final int[] expectedKeys = new int[]{0, 1, 2, 3};
final KStream<Integer, String> stream;
final KTable<Integer, String> table;
- final MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
- stream = builder.stream(topic1, consumed);
- table = builder.table(topic2, consumed);
+ stream = builder.stream(streamTopic, consumed);
+ table = builder.table(tableTopic, consumed);
stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
+ driver.setUp(builder, stateDir);
+ driver.setTime(0L);
+ }
+
+ private void pushToStream(final int messageCount, final String valuePrefix) {
+ for (int i = 0; i < messageCount; i++) {
+ driver.process(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+ }
+ }
+
+ private void pushToTable(final int messageCount, final String valuePrefix) {
+ for (int i = 0; i < messageCount; i++) {
+ driver.process(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+ }
+ }
+
+ private void pushNullValueToTable(final int messageCount) {
+ for (int i = 0; i < messageCount; i++) {
+ driver.process(tableTopic, expectedKeys[i], null);
+ }
+ }
+
+ @Test
+ public void shouldRequireCopartitionedStreams() {
+
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
assertEquals(1, copartitionGroups.size());
- assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
-
- driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
- driver.setTime(0L);
+ assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), copartitionGroups.iterator().next());
+ }
- // push two items to the primary stream. the other table is empty
+ @Test
+ public void shouldNotJoinWithEmptyTableOnStreamUpdates() {
- for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
+ // push two items to the primary stream. the table is empty
+ pushToStream(2, "X");
processor.checkAndClearProcessResult();
+ }
- // push two items to the other stream. this should not produce any item.
+ @Test
+ public void shouldNotJoinOnTableUpdates() {
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
- }
+ // push two items to the primary stream. the table is empty
+ pushToStream(2, "X");
processor.checkAndClearProcessResult();
- // push all four items to the primary stream. this should produce two items.
+ // push two items to the table. this should not produce any item.
- for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "X" + expectedKey);
- }
+ pushToTable(2, "Y");
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce two items.
+ pushToStream(4, "X");
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
- // push all items to the other stream. this should not produce any item
- for (int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
- }
+ // push all items to the table. this should not produce any item
+ pushToTable(4, "YY");
processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items.
- for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "X" + expectedKey);
- }
-
+ pushToStream(4, "X");
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- // push two items with null to the other stream as deletes. this should not produce any item.
+ // push all items to the table. this should not produce any item
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], null);
- }
+ pushToTable(4, "YYY");
+ processor.checkAndClearProcessResult();
+ }
+ @Test
+ public void shouldJoinOnlyIfMatchFoundOnStreamUpdates() {
+
+ // push two items to the table. this should not produce any item.
+
+ pushToTable(2, "Y");
processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce two items.
- for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
- }
+ pushToStream(4, "X");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+
+ }
- processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3");
+ @Test
+ public void shouldClearTableEntryOnNullValueUpdates() {
+
+ // push all four items to the table. this should not produce any item.
+
+ pushToTable(4, "Y");
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce four items.
+
+ pushToStream(4, "X");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+Y2", "3:X3+Y3");
+
+ // push two items with null to the table as deletes. this should not produce any item.
+
+ pushNullValueToTable(2);
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce two items.
+
+ pushToStream(4, "XX");
+ processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
}
+
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index ed835a8..7507d7a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -42,98 +42,145 @@ import static org.junit.Assert.assertEquals;
public class KStreamKTableLeftJoinTest {
- final private String topic1 = "topic1";
- final private String topic2 = "topic2";
+ final private String streamTopic = "streamTopic";
+ final private String tableTopic = "tableTopic";
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
private File stateDir = null;
-
+ private MockProcessorSupplier<Integer, String> processor;
+ private final int[] expectedKeys = {0, 1, 2, 3};
+ private StreamsBuilder builder;
@Before
public void setUp() throws IOException {
stateDir = TestUtils.tempDirectory("kafka-test");
- }
- @Test
- public void testJoin() {
- StreamsBuilder builder = new StreamsBuilder();
+ builder = new StreamsBuilder();
- final int[] expectedKeys = new int[]{0, 1, 2, 3};
- KStream<Integer, String> stream;
- KTable<Integer, String> table;
- MockProcessorSupplier<Integer, String> processor;
+ final KStream<Integer, String> stream;
+ final KTable<Integer, String> table;
processor = new MockProcessorSupplier<>();
- Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
- stream = builder.stream(topic1, consumed);
- table = builder.table(topic2, consumed);
+ final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+ stream = builder.stream(streamTopic, consumed);
+ table = builder.table(tableTopic, consumed);
stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
- Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
-
- assertEquals(1, copartitionGroups.size());
- assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
-
driver.setUp(builder, stateDir);
driver.setTime(0L);
+ }
- // push two items to the primary stream. the other table is empty
+ private void pushToStream(final int messageCount, final String valuePrefix) {
+ for (int i = 0; i < messageCount; i++) {
+ driver.process(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+ }
+ }
+
+ private void pushToTable(final int messageCount, final String valuePrefix) {
+ for (int i = 0; i < messageCount; i++) {
+ driver.process(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+ }
+ }
- for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ private void pushNullValueToTable(final int messageCount) {
+ for (int i = 0; i < messageCount; i++) {
+ driver.process(tableTopic, expectedKeys[i], null);
}
+ }
+
+ @Test
+ public void shouldRequireCopartitionedStreams() {
+ final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+
+ assertEquals(1, copartitionGroups.size());
+ assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), copartitionGroups.iterator().next());
+ }
+
+ @Test
+ public void shouldJoinWithEmptyTableOnStreamUpdates() {
+
+ // push two items to the primary stream. the table is empty
+
+ pushToStream(2, "X");
processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+ }
- // push two items to the other stream. this should not produce any item.
+ @Test
+ public void shouldNotJoinOnTableUpdates() {
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
- }
+ // push two items to the primary stream. the table is empty
+ pushToStream(2, "X");
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+
+ // push two items to the table. this should not produce any item.
+
+ pushToTable(2, "Y");
processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items.
- for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "X" + expectedKey);
- }
-
+ pushToStream(4, "X");
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
- // push all items to the other stream. this should not produce any item
- for (int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
- }
+ // push all items to the table. this should not produce any item
+ pushToTable(4, "YY");
processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items.
- for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "X" + expectedKey);
- }
-
+ pushToStream(4, "X");
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- // push two items with null to the other stream as deletes. this should not produce any item.
+ // push all items to the table. this should not produce any item
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], null);
- }
+ pushToTable(4, "YYY");
+ processor.checkAndClearProcessResult();
+ }
+ @Test
+ public void shouldJoinRegardlessIfMatchFoundOnStreamUpdates() {
+
+ // push two items to the table. this should not produce any item.
+
+ pushToTable(2, "Y");
processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items.
- for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
- }
+ pushToStream(4, "X");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+
+ }
- processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+ @Test
+ public void shouldClearTableEntryOnNullValueUpdates() {
+
+ // push all four items to the table. this should not produce any item.
+
+ pushToTable(4, "Y");
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce four items.
+
+ pushToStream(4, "X");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+Y2", "3:X3+Y3");
+
+ // push two items with null to the table as deletes. this should not produce any item.
+
+ pushNullValueToTable(2);
+ processor.checkAndClearProcessResult();
+
+ // push all four items to the primary stream. this should produce four items.
+
+ pushToStream(4, "XX");
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
}
+
}
--
To stop receiving notification emails like this one, please contact
damianguy@apache.org.