You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/20 20:06:30 UTC
[2/2] kafka git commit: KAFKA-4001: Improve Kafka Streams Join
Semantics (KIP-77)
KAFKA-4001: Improve Kafka Streams Join Semantics (KIP-77)
- fixed leftJoin -> outerJoin test bug
- simplified to only use values
- fixed inner KTable-KTable join
- fixed left KTable-KTable join
- fixed outer KTable-KTable join
- fixed inner, left, and outer left KStream-KStream joins
- added inner KStream-KTable join
- fixed left KStream-KTable join
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #1777 from mjsax/kafka-4001-joins
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62c0972e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62c0972e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62c0972e
Branch: refs/heads/trunk
Commit: 62c0972efc525cc0677bd3fd470bd9fbbd70b004
Parents: 24067e4
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Oct 20 13:06:25 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Oct 20 13:06:25 2016 -0700
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/KStream.java | 44 +-
.../streams/kstream/internals/KStreamImpl.java | 272 +++++-------
.../kstream/internals/KStreamKStreamJoin.java | 27 +-
.../kstream/internals/KStreamKTableJoin.java | 75 ++++
.../internals/KStreamKTableLeftJoin.java | 66 ---
.../internals/KStreamWindowAggregate.java | 2 +-
.../streams/kstream/internals/KTableImpl.java | 103 ++---
.../kstream/internals/KTableKTableJoin.java | 21 +-
.../kstream/internals/KTableKTableLeftJoin.java | 19 +-
.../internals/KTableKTableOuterJoin.java | 16 +-
.../internals/KTableKTableRightJoin.java | 24 +-
.../internals/ProcessorContextImpl.java | 4 -
.../integration/JoinIntegrationTest.java | 433 +++++++++++++++++++
.../KTableKTableJoinIntegrationTest.java | 74 ++--
.../internals/KStreamKStreamLeftJoinTest.java | 98 +++--
.../internals/KStreamKTableJoinTest.java | 146 +++++++
.../internals/KStreamWindowAggregateTest.java | 22 +-
.../kstream/internals/KTableKTableJoinTest.java | 93 ++--
.../internals/KTableKTableLeftJoinTest.java | 18 +-
.../internals/KTableKTableOuterJoinTest.java | 12 +-
20 files changed, 1075 insertions(+), 494 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 032efb5..4483e9f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -533,6 +533,39 @@ public interface KStream<K, V> {
JoinWindows windows);
/**
+ * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Inner Join.
+ * If a record key or value is {@code null} it will not included in the resulting {@link KStream}
+ *
+ * @param table the instance of {@link KTable} joined with this stream
+ * @param joiner the instance of {@link ValueJoiner}
+ * @param <V1> the value type of the table
+ * @param <V2> the value type of the new stream
+ * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key
+ */
+ <V1, V2> KStream<K, V2> join(KTable<K, V1> table, ValueJoiner<V, V1, V2> joiner);
+
+ /**
+ * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Inner Join.
+ * If a record key or value is {@code null} it will not included in the resulting {@link KStream}
+ *
+ * @param table the instance of {@link KTable} joined with this stream
+ * @param valueJoiner the instance of {@link ValueJoiner}
+ * @param keySerde key serdes for materializing this stream.
+ * If not specified the default serdes defined in the configs will be used
+ * @param valSerde value serdes for materializing this stream,
+ * if not specified the default serdes defined in the configs will be used
+ * @param <V1> the value type of the table
+ * @param <V2> the value type of the new stream
+ * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key and within the joining window intervals
+ */
+ <V1, V2> KStream<K, V2> join(KTable<K, V1> table,
+ ValueJoiner<V, V1, V2> valueJoiner,
+ Serde<K> keySerde,
+ Serde<V> valSerde);
+
+ /**
* Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Left Join.
* If a record key is null it will not included in the resulting {@link KStream}
*
@@ -566,6 +599,7 @@ public interface KStream<K, V> {
ValueJoiner<V, V1, V2> valueJoiner,
Serde<K> keySerde,
Serde<V> valSerde);
+
/**
* Group the records of this {@link KStream} using the provided {@link KeyValueMapper} and
* default serializers and deserializers. If a record key is null it will not included in
@@ -592,8 +626,8 @@ public interface KStream<K, V> {
* @return a {@link KGroupedStream} that contains the grouped records of the original {@link KStream}
*/
<K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector,
- Serde<K1> keySerde,
- Serde<V> valSerde);
+ Serde<K1> keySerde,
+ Serde<V> valSerde);
/**
* Group the records with the same key into a {@link KGroupedStream} while preserving the
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bb77e96..b67fca5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,24 +20,25 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
-import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.Stores;
+
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
@@ -63,6 +64,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
+ public static final String JOIN_NAME = "KSTREAM-JOIN-";
+
public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
private static final String MAP_NAME = "KSTREAM-MAP-";
@@ -345,7 +348,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();
-
+
if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
@@ -386,78 +389,59 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <V1, R> KStream<K, R> join(
- KStream<K, V1> other,
- ValueJoiner<V, V1, R> joiner,
- JoinWindows windows,
- Serde<K> keySerde,
- Serde<V> thisValueSerde,
- Serde<V1> otherValueSerde) {
+ final KStream<K, V1> other,
+ final ValueJoiner<V, V1, R> joiner,
+ final JoinWindows windows,
+ final Serde<K> keySerde,
+ final Serde<V> thisValueSerde,
+ final Serde<V1> otherValueSerde) {
- return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, false);
+ return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(false, false));
}
@Override
public <V1, R> KStream<K, R> join(
- KStream<K, V1> other,
- ValueJoiner<V, V1, R> joiner,
- JoinWindows windows) {
+ final KStream<K, V1> other,
+ final ValueJoiner<V, V1, R> joiner,
+ final JoinWindows windows) {
- return join(other, joiner, windows, null, null, null, false);
+ return join(other, joiner, windows, null, null, null);
}
@Override
public <V1, R> KStream<K, R> outerJoin(
- KStream<K, V1> other,
- ValueJoiner<V, V1, R> joiner,
- JoinWindows windows,
- Serde<K> keySerde,
- Serde<V> thisValueSerde,
- Serde<V1> otherValueSerde) {
+ final KStream<K, V1> other,
+ final ValueJoiner<V, V1, R> joiner,
+ final JoinWindows windows,
+ final Serde<K> keySerde,
+ final Serde<V> thisValueSerde,
+ final Serde<V1> otherValueSerde) {
- return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, true);
+ return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(true, true));
}
@Override
public <V1, R> KStream<K, R> outerJoin(
- KStream<K, V1> other,
- ValueJoiner<V, V1, R> joiner,
- JoinWindows windows) {
+ final KStream<K, V1> other,
+ final ValueJoiner<V, V1, R> joiner,
+ final JoinWindows windows) {
- return join(other, joiner, windows, null, null, null, true);
+ return outerJoin(other, joiner, windows, null, null, null);
}
- @SuppressWarnings("unchecked")
- private <V1, R> KStream<K, R> join(
- KStream<K, V1> other,
- ValueJoiner<V, V1, R> joiner,
- JoinWindows windows,
- Serde<K> keySerde,
- Serde<V> thisValueSerde,
- Serde<V1> otherValueSerde,
- boolean outer) {
-
- return doJoin(other,
- joiner,
- windows,
- keySerde,
- thisValueSerde,
- otherValueSerde,
- new DefaultJoin(outer));
- }
-
- private <V1, R> KStream<K, R> doJoin(KStream<K, V1> other,
- ValueJoiner<V, V1, R> joiner,
- JoinWindows windows,
- Serde<K> keySerde,
- Serde<V> thisValueSerde,
- Serde<V1> otherValueSerde,
- KStreamImplJoin join) {
+ private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other,
+ final ValueJoiner<V, V1, R> joiner,
+ final JoinWindows windows,
+ final Serde<K> keySerde,
+ final Serde<V> thisValueSerde,
+ final Serde<V1> otherValueSerde,
+ final KStreamImplJoin join) {
Objects.requireNonNull(other, "other KStream can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(windows, "windows can't be null");
KStreamImpl<K, V> joinThis = this;
- KStreamImpl<K, V1> joinOther = (KStreamImpl) other;
+ KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other;
if (joinThis.repartitionRequired) {
joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde, null);
@@ -531,20 +515,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@SuppressWarnings("unchecked")
@Override
public <V1, R> KStream<K, R> leftJoin(
- KStream<K, V1> other,
- ValueJoiner<V, V1, R> joiner,
- JoinWindows windows,
- Serde<K> keySerde,
- Serde<V> thisValSerde,
- Serde<V1> otherValueSerde) {
+ final KStream<K, V1> other,
+ final ValueJoiner<V, V1, R> joiner,
+ final JoinWindows windows,
+ final Serde<K> keySerde,
+ final Serde<V> thisValSerde,
+ final Serde<V1> otherValueSerde) {
return doJoin(other,
- joiner,
- windows,
- keySerde,
- thisValSerde,
- otherValueSerde,
- new LeftJoin());
+ joiner,
+ windows,
+ keySerde,
+ thisValSerde,
+ otherValueSerde,
+ new KStreamImplJoin(true, false));
}
@Override
@@ -558,50 +542,69 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@SuppressWarnings("unchecked")
@Override
- public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
- return leftJoin(other, joiner, null, null);
+ public <V1, R> KStream<K, R> join(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+ return join(other, joiner, null, null);
}
- public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other,
- ValueJoiner<V, V1, R> joiner,
- Serde<K> keySerde,
- Serde<V> valueSerde) {
- Objects.requireNonNull(other, "other KTable can't be null");
- Objects.requireNonNull(joiner, "joiner can't be null");
-
+ @Override
+ public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
+ final ValueJoiner<V, V1, R> joiner,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
if (repartitionRequired) {
- KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde,
- valueSerde, null);
- return thisStreamRepartitioned.doStreamTableLeftJoin(other, joiner);
+ final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(keySerde,
+ valueSerde, null);
+ return thisStreamRepartitioned.doStreamTableJoin(other, joiner, false);
} else {
- return doStreamTableLeftJoin(other, joiner);
+ return doStreamTableJoin(other, joiner, false);
}
-
}
- private <V1, R> KStream<K, R> doStreamTableLeftJoin(final KTable<K, V1> other,
- final ValueJoiner<V, V1, R> joiner) {
- Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+ private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
+ final ValueJoiner<V, V1, R> joiner,
+ final boolean leftJoin) {
+ Objects.requireNonNull(other, "other KTable can't be null");
+ Objects.requireNonNull(joiner, "joiner can't be null");
+
+ final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
- String name = topology.newName(LEFTJOIN_NAME);
+ final String name = topology.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
- topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
- topology.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).valueGetterSupplier().storeNames());
+ topology.addProcessor(name, new KStreamKTableJoin<>((KTableImpl<K, ?, V1>) other, joiner, leftJoin), this.name);
+ topology.connectProcessorAndStateStores(name, other.getStoreName());
topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
return new KStreamImpl<>(topology, name, allSourceNodes, false);
}
@Override
+ public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+ return leftJoin(other, joiner, null, null);
+ }
+
+ public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other,
+ final ValueJoiner<V, V1, R> joiner,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
+ if (repartitionRequired) {
+ final KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde,
+ valueSerde, null);
+ return thisStreamRepartitioned.doStreamTableJoin(other, joiner, true);
+ } else {
+ return doStreamTableJoin(other, joiner, true);
+ }
+ }
+
+ @Override
public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector) {
return groupBy(selector, null, null);
}
@Override
public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector,
- Serde<K1> keySerde,
- Serde<V> valSerde) {
+ Serde<K1> keySerde,
+ Serde<V> valSerde) {
Objects.requireNonNull(selector, "selector can't be null");
String selectName = internalSelectKey(selector);
@@ -641,26 +644,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
.build();
}
- private interface KStreamImplJoin {
+ private class KStreamImplJoin {
- <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
- KStream<K1, V2> other,
- ValueJoiner<V1, V2, R> joiner,
- JoinWindows windows,
- Serde<K1> keySerde,
- Serde<V1> lhsValueSerde,
- Serde<V2> otherValueSerde);
- }
-
- private class DefaultJoin implements KStreamImplJoin {
-
- private final boolean outer;
+ private final boolean leftOuter;
+ private final boolean rightOuter;
- DefaultJoin(final boolean outer) {
- this.outer = outer;
+ KStreamImplJoin(final boolean leftOuter, final boolean rightOuter) {
+ this.leftOuter = leftOuter;
+ this.rightOuter = rightOuter;
}
- @Override
public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
KStream<K1, V2> other,
ValueJoiner<V1, V2, R> joiner,
@@ -670,12 +663,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Serde<V2> otherValueSerde) {
String thisWindowStreamName = topology.newName(WINDOWED_NAME);
String otherWindowStreamName = topology.newName(WINDOWED_NAME);
- String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
- String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
+ String joinThisName = rightOuter ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
+ String joinOtherName = leftOuter ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
String joinMergeName = topology.newName(MERGE_NAME);
StateStoreSupplier thisWindow =
- createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store");
+ createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store");
StateStoreSupplier otherWindow =
createWindowedStateStore(windows, keySerde, otherValueSerde, joinOtherName + "-store");
@@ -688,16 +681,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
windows.before + windows.after + 1,
windows.maintainMs());
- KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
- windows.before,
- windows.after,
- joiner,
- outer);
- KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
- windows.after,
- windows.before,
- reverseJoiner(joiner),
- outer);
+ final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
+ windows.before,
+ windows.after,
+ joiner,
+ leftOuter);
+ final KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
+ windows.after,
+ windows.before,
+ reverseJoiner(joiner),
+ rightOuter);
KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
@@ -716,39 +709,4 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
}
-
- private class LeftJoin implements KStreamImplJoin {
-
- @Override
- public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
- KStream<K1, V2> other,
- ValueJoiner<V1, V2, R> joiner,
- JoinWindows windows,
- Serde<K1> keySerde,
- Serde<V1> lhsValueSerde,
- Serde<V2> otherValueSerde) {
- String otherWindowStreamName = topology.newName(WINDOWED_NAME);
- String joinThisName = topology.newName(LEFTJOIN_NAME);
-
- StateStoreSupplier otherWindow =
- createWindowedStateStore(windows, keySerde, otherValueSerde, joinThisName + "-store");
-
- KStreamJoinWindow<K1, V1>
- otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
- KStreamKStreamJoin<K1, R, V1, V2>
- joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
-
-
-
- topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
- topology.addProcessor(joinThisName, joinThis, ((AbstractStream) lhs).name);
- topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName);
-
- Set<String> allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes);
- allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
- return new KStreamImpl<>(topology, joinThisName, allSourceNodes, false);
- }
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index edde009..41547b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -52,7 +52,6 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
private WindowStore<K, V2> otherWindow;
- @SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
@@ -62,14 +61,21 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
@Override
- public void process(K key, V1 value) {
- if (key == null)
+ public void process(final K key, final V1 value) {
+ // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
+ //
+ // we also ignore the record if value is null, because in a key-value data model a null-value indicates
+ // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
+ // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
+ // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
+ if (key == null || value == null) {
return;
+ }
- boolean needOuterJoin = KStreamKStreamJoin.this.outer;
+ boolean needOuterJoin = outer;
- long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
- long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
+ final long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
+ final long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
try (WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
while (iter.hasNext()) {
@@ -77,8 +83,9 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
context().forward(key, joiner.apply(value, iter.next().value));
}
- if (needOuterJoin)
+ if (needOuterJoin) {
context().forward(key, joiner.apply(value, null));
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
new file mode 100644
index 0000000..1027b96
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+class KStreamKTableJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
+
+ private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
+ private final ValueJoiner<V1, V2, R> joiner;
+ private final boolean leftJoin;
+
+ KStreamKTableJoin(final KTableImpl<K, ?, V2> table, final ValueJoiner<V1, V2, R> joiner, final boolean leftJoin) {
+ valueGetterSupplier = table.valueGetterSupplier();
+ this.joiner = joiner;
+ this.leftJoin = leftJoin;
+ }
+
+ @Override
+ public Processor<K, V1> get() {
+ return new KStreamKTableJoinProcessor(valueGetterSupplier.get(), leftJoin);
+ }
+
+ private class KStreamKTableJoinProcessor extends AbstractProcessor<K, V1> {
+
+ private final KTableValueGetter<K, V2> valueGetter;
+ private final boolean leftJoin;
+
+ KStreamKTableJoinProcessor(final KTableValueGetter<K, V2> valueGetter, final boolean leftJoin) {
+ this.valueGetter = valueGetter;
+ this.leftJoin = leftJoin;
+ }
+
+ @Override
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ valueGetter.init(context);
+ }
+
+ @Override
+ public void process(final K key, final V1 value) {
+ // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
+ //
+ // we also ignore the record if value is null, because in a key-value data model a null-value indicates
+ // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
+ // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
+ // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
+ if (key != null && value != null) {
+ final V2 value2 = valueGetter.get(key);
+ if (leftJoin || value2 != null) {
+ context().forward(key, joiner.apply(value, value2));
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
deleted file mode 100644
index 92b9b59..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamKTableLeftJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
-
- private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
- private final ValueJoiner<V1, V2, R> joiner;
-
- KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, R> joiner) {
- this.valueGetterSupplier = table.valueGetterSupplier();
- this.joiner = joiner;
- }
-
- @Override
- public Processor<K, V1> get() {
- return new KStreamKTableLeftJoinProcessor(valueGetterSupplier.get());
- }
-
- private class KStreamKTableLeftJoinProcessor extends AbstractProcessor<K, V1> {
-
- private final KTableValueGetter<K, V2> valueGetter;
-
- public KStreamKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
- this.valueGetter = valueGetter;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void init(ProcessorContext context) {
- super.init(context);
- valueGetter.init(context);
- }
-
- @Override
- public void process(K key, V1 value) {
- // if the key is null, we do not need proceed joining
- // the record with the table
- if (key != null) {
- context().forward(key, joiner.apply(value, valueGetter.get(key)));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 718e52b..55b0916 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -17,8 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 6423cff..683dc00 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -44,6 +44,7 @@ import java.util.Set;
/**
* The implementation class of {@link KTable}.
+ *
* @param <K> the key type
* @param <S> the source's (parent's) value type
* @param <V> the value type
@@ -283,77 +284,55 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return toStream().selectKey(mapper);
}
- @SuppressWarnings("unchecked")
@Override
- public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
- Objects.requireNonNull(other, "other can't be null");
- Objects.requireNonNull(joiner, "joiner can't be null");
-
- Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
-
- String joinThisName = topology.newName(JOINTHIS_NAME);
- String joinOtherName = topology.newName(JOINOTHER_NAME);
- String joinMergeName = topology.newName(MERGE_NAME);
-
- KTableKTableJoin<K, R, V, V1> joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
- KTableKTableJoin<K, R, V1, V> joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
- KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
- new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName),
- new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
- );
-
- topology.addProcessor(joinThisName, joinThis, this.name);
- topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
- topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
- topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
- topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
-
- return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
+ public <V1, R> KTable<K, R> join(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+ return doJoin(other, joiner, false, false);
}
- @SuppressWarnings("unchecked")
@Override
- public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
- Objects.requireNonNull(other, "other can't be null");
- Objects.requireNonNull(joiner, "joiner can't be null");
-
- Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
-
- String joinThisName = topology.newName(OUTERTHIS_NAME);
- String joinOtherName = topology.newName(OUTEROTHER_NAME);
- String joinMergeName = topology.newName(MERGE_NAME);
-
- KTableKTableOuterJoin<K, R, V, V1> joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
- KTableKTableOuterJoin<K, R, V1, V> joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
- KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
- new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName),
- new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
- );
-
- topology.addProcessor(joinThisName, joinThis, this.name);
- topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
- topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
- topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
- topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
+ public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+ return doJoin(other, joiner, true, true);
+ }
- return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
+ @Override
+ public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+ return doJoin(other, joiner, true, false);
}
@SuppressWarnings("unchecked")
- @Override
- public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+ private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other, ValueJoiner<V, V1, R> joiner, final boolean leftOuter, final boolean rightOuter) {
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
- Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
- String joinThisName = topology.newName(LEFTTHIS_NAME);
- String joinOtherName = topology.newName(LEFTOTHER_NAME);
- String joinMergeName = topology.newName(MERGE_NAME);
+ final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+ if (leftOuter) {
+ enableSendingOldValues();
+ }
+ if (rightOuter) {
+ ((KTableImpl) other).enableSendingOldValues();
+ }
+
+ final String joinThisName = topology.newName(JOINTHIS_NAME);
+ final String joinOtherName = topology.newName(JOINOTHER_NAME);
+ final String joinMergeName = topology.newName(MERGE_NAME);
+
+ final KTableKTableAbstractJoin<K, R, V, V1> joinThis;
+ final KTableKTableAbstractJoin<K, R, V1, V> joinOther;
+
+ if (!leftOuter) { // inner
+ joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+ joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+ } else if (!rightOuter) { // left
+ joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+ joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+ } else { // outer
+ joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+ joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+ }
- KTableKTableLeftJoin<K, R, V, V1> joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
- KTableKTableRightJoin<K, R, V1, V> joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
- KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
- new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName),
+ final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
+ new KTableImpl<K, V, R>(topology, joinThisName, joinThis, sourceNodes, storeName),
new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
);
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index cbd626d..49f6715 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -69,23 +69,26 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
* @throws StreamsException if key is null
*/
@Override
- public void process(K key, Change<V1> change) {
+ public void process(final K key, final Change<V1> change) {
// the keys should never be null
if (key == null)
throw new StreamsException("Record key for KTable join operator should not be null.");
R newValue = null;
R oldValue = null;
- V2 value2 = null;
- if (change.newValue != null || change.oldValue != null)
- value2 = valueGetter.get(key);
+ final V2 value2 = valueGetter.get(key);
+ if (value2 == null) {
+ return;
+ }
- if (change.newValue != null && value2 != null)
+ if (change.newValue != null) {
newValue = joiner.apply(change.newValue, value2);
+ }
- if (sendOldValues && change.oldValue != null && value2 != null)
+ if (sendOldValues && change.oldValue != null) {
oldValue = joiner.apply(change.oldValue, value2);
+ }
context().forward(key, new Change<>(newValue, oldValue));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 4bee38c..5f5cad6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -70,27 +70,28 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
* @throws StreamsException if key is null
*/
@Override
- public void process(K key, Change<V1> change) {
+ public void process(final K key, final Change<V1> change) {
// the keys should never be null
if (key == null)
throw new StreamsException("Record key for KTable left-join operator should not be null.");
R newValue = null;
R oldValue = null;
- V2 value2 = null;
- if (change.newValue != null || change.oldValue != null)
- value2 = valueGetter.get(key);
+ final V2 value2 = valueGetter.get(key);
+ if (value2 == null && change.newValue == null && change.oldValue == null) {
+ return;
+ }
- if (change.newValue != null)
+ if (change.newValue != null) {
newValue = joiner.apply(change.newValue, value2);
+ }
if (sendOldValues && change.oldValue != null)
oldValue = joiner.apply(change.oldValue, value2);
context().forward(key, new Change<>(newValue, oldValue));
}
-
}
private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, R> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index ad7dbde..2bfd8a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -69,21 +69,25 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
* @throws StreamsException if key is null
*/
@Override
- public void process(K key, Change<V1> change) {
+ public void process(final K key, final Change<V1> change) {
// the keys should never be null
if (key == null)
throw new StreamsException("Record key for KTable outer-join operator should not be null.");
R newValue = null;
R oldValue = null;
- V2 value2 = valueGetter.get(key);
- if (change.newValue != null || value2 != null)
+ final V2 value2 = valueGetter.get(key);
+ if (value2 == null && change.newValue == null && change.oldValue == null) {
+ return;
+ }
+
+ if (value2 != null || change.newValue != null) {
newValue = joiner.apply(change.newValue, value2);
+ }
- if (sendOldValues) {
- if (change.oldValue != null || value2 != null)
- oldValue = joiner.apply(change.oldValue, value2);
+ if (sendOldValues && (value2 != null || change.oldValue != null)) {
+ oldValue = joiner.apply(change.oldValue, value2);
}
context().forward(key, new Change<>(newValue, oldValue));
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 80aadaa..8aeadcc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -70,19 +70,23 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
* @throws StreamsException if key is null
*/
@Override
- public void process(K key, Change<V1> change) {
+ public void process(final K key, final Change<V1> change) {
// the keys should never be null
if (key == null)
throw new StreamsException("Record key for KTable right-join operator should not be null.");
- R newValue = null;
+ final R newValue;
R oldValue = null;
- V2 value2 = valueGetter.get(key);
- if (value2 != null) {
- newValue = joiner.apply(change.newValue, value2);
- if (sendOldValues)
- oldValue = joiner.apply(change.oldValue, value2);
+ final V2 value2 = valueGetter.get(key);
+ if (value2 == null) {
+ return;
+ }
+
+ newValue = joiner.apply(change.newValue, value2);
+
+ if (sendOldValues) {
+ oldValue = joiner.apply(change.oldValue, value2);
}
context().forward(key, new Change<>(newValue, oldValue));
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 195e5a4..11ca30e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -48,7 +48,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
private RecordContext recordContext;
private ProcessorNode currentNode;
- @SuppressWarnings("unchecked")
public ProcessorContextImpl(TaskId id,
StreamTask task,
StreamsConfig config,
@@ -194,7 +193,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
return recordContext.timestamp();
}
- @SuppressWarnings("unchecked")
@Override
public <K, V> void forward(K key, V value) {
ProcessorNode previousNode = currentNode;
@@ -208,7 +206,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
}
}
- @SuppressWarnings("unchecked")
@Override
public <K, V> void forward(K key, V value, int childIndex) {
ProcessorNode previousNode = currentNode;
@@ -221,7 +218,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
}
}
- @SuppressWarnings("unchecked")
@Override
public <K, V> void forward(K key, V value, String childName) {
for (ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode.children()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
new file mode 100644
index 0000000..0f70588
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+public class JoinIntegrationTest {
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+ private static ZkUtils zkUtils = null;
+
+ private static final String APP_ID = "join-integration-test";
+ private static final String INPUT_TOPIC_1 = "inputTopicLeft";
+ private static final String INPUT_TOPIC_2 = "inputTopicRight";
+ private static final String OUTPUT_TOPIC = "outputTopic";
+
+ private final static Properties PRODUCER_CONFIG = new Properties();
+ private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
+ private final static Properties STREAMS_CONFIG = new Properties();
+
+ private KStreamBuilder builder;
+ private KStream<Long, String> leftStream;
+ private KStream<Long, String> rightStream;
+ private KTable<Long, String> leftTable;
+ private KTable<Long, String> rightTable;
+
+ private final List<Input<String>> input = Arrays.asList(
+ new Input<>(INPUT_TOPIC_1, (String) null),
+ new Input<>(INPUT_TOPIC_2, (String) null),
+ new Input<>(INPUT_TOPIC_1, "A"),
+ new Input<>(INPUT_TOPIC_2, "a"),
+ new Input<>(INPUT_TOPIC_1, "B"),
+ new Input<>(INPUT_TOPIC_2, "b"),
+ new Input<>(INPUT_TOPIC_1, (String) null),
+ new Input<>(INPUT_TOPIC_2, (String) null),
+ new Input<>(INPUT_TOPIC_1, "C"),
+ new Input<>(INPUT_TOPIC_2, "c"),
+ new Input<>(INPUT_TOPIC_2, (String) null),
+ new Input<>(INPUT_TOPIC_1, (String) null),
+ new Input<>(INPUT_TOPIC_2, (String) null),
+ new Input<>(INPUT_TOPIC_2, "d"),
+ new Input<>(INPUT_TOPIC_1, "D")
+ );
+
+ private final ValueJoiner<String, String, String> valueJoiner = new ValueJoiner<String, String, String>() {
+ @Override
+ public String apply(final String value1, final String value2) {
+ return value1 + "-" + value2;
+ }
+ };
+
+ private final TestCondition topicsGotDeleted = new TopicsGotDeletedCondition();
+
+ @BeforeClass
+ public static void setupConfigsAndUtils() throws Exception {
+ PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
+ PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
+ PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+ PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer");
+ RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+ RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ STREAMS_CONFIG.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+ STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+ STREAMS_CONFIG.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+ STREAMS_CONFIG.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+
+ zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
+ 30000,
+ 30000,
+ JaasUtils.isZkSecurityEnabled());
+ }
+
+ @AfterClass
+ public static void release() {
+ if (zkUtils != null) {
+ zkUtils.close();
+ }
+ }
+
+ @Before
+ public void prepareTopology() throws Exception {
+ CLUSTER.createTopic(INPUT_TOPIC_1);
+ CLUSTER.createTopic(INPUT_TOPIC_2);
+ CLUSTER.createTopic(OUTPUT_TOPIC);
+
+ builder = new KStreamBuilder();
+ leftTable = builder.table(INPUT_TOPIC_1, "leftTable");
+ rightTable = builder.table(INPUT_TOPIC_2, "rightTable");
+ leftStream = leftTable.toStream();
+ rightStream = rightTable.toStream();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ CLUSTER.deleteTopic(INPUT_TOPIC_1);
+ CLUSTER.deleteTopic(INPUT_TOPIC_2);
+ CLUSTER.deleteTopic(OUTPUT_TOPIC);
+
+ TestUtils.waitForCondition(topicsGotDeleted, 120000, "Topics not deleted after 120 seconds.");
+ }
+
+ private void checkResult(final String outputTopic, final List<String> expectedResult) throws Exception {
+ if (expectedResult != null) {
+ final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), Long.MAX_VALUE);
+ assertThat(result, is(expectedResult));
+ }
+ }
+
+ /*
+ * Runs the actual test. Checks the result after each input record to ensure fixed processing order.
+ * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry
+ */
+ private void runTest(final List<List<String>> expectedResult) throws Exception {
+ assert expectedResult.size() == input.size();
+
+ IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+ final KafkaStreams streams = new KafkaStreams(builder, STREAMS_CONFIG);
+ try {
+ streams.start();
+
+ long ts = System.currentTimeMillis();
+
+ final Iterator<List<String>> resultIterator = expectedResult.iterator();
+ for (final Input<String> singleInput : input) {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(singleInput.topic, Collections.singleton(singleInput.record), PRODUCER_CONFIG, ++ts);
+ checkResult(OUTPUT_TOPIC, resultIterator.next());
+ }
+ } finally {
+ streams.close();
+ }
+ }
+
+ @Test
+ public void testInnerKStreamKStream() throws Exception {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KStream");
+
+ final List<List<String>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Collections.singletonList("A-a"),
+ Collections.singletonList("B-a"),
+ Arrays.asList("A-b", "B-b"),
+ null,
+ null,
+ Arrays.asList("C-a", "C-b"),
+ Arrays.asList("A-c", "B-c", "C-c"),
+ null,
+ null,
+ null,
+ Arrays.asList("A-d", "B-d", "C-d"),
+ Arrays.asList("D-a", "D-b", "D-c", "D-d")
+ );
+
+ leftStream.join(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+ runTest(expectedResult);
+ }
+
+ @Test
+ public void testLeftKStreamKStream() throws Exception {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KStream");
+
+ final List<List<String>> expectedResult = Arrays.asList(
+ null,
+ null,
+ Collections.singletonList("A-null"),
+ Collections.singletonList("A-a"),
+ Collections.singletonList("B-a"),
+ Arrays.asList("A-b", "B-b"),
+ null,
+ null,
+ Arrays.asList("C-a", "C-b"),
+ Arrays.asList("A-c", "B-c", "C-c"),
+ null,
+ null,
+ null,
+ Arrays.asList("A-d", "B-d", "C-d"),
+ Arrays.asList("D-a", "D-b", "D-c", "D-d")
+ );
+
+ leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+ runTest(expectedResult);
+ }
+
+ @Test
+ public void testOuterKStreamKStream() throws Exception {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KStream-KStream");
+
+ final List<List<String>> expectedResult = Arrays.asList(
+ null,
+ null,
+ Collections.singletonList("A-null"),
+ Collections.singletonList("A-a"),
+ Collections.singletonList("B-a"),
+ Arrays.asList("A-b", "B-b"),
+ null,
+ null,
+ Arrays.asList("C-a", "C-b"),
+ Arrays.asList("A-c", "B-c", "C-c"),
+ null,
+ null,
+ null,
+ Arrays.asList("A-d", "B-d", "C-d"),
+ Arrays.asList("D-a", "D-b", "D-c", "D-d")
+ );
+
+ leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+ runTest(expectedResult);
+ }
+
+ @Test
+ public void testInnerKStreamKTable() throws Exception {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KTable");
+
+ final List<List<String>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList("B-a"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList("D-d")
+ );
+
+ leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
+ runTest(expectedResult);
+ }
+
+ @Test
+ public void testLeftKStreamKTable() throws Exception {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KTable");
+
+ final List<List<String>> expectedResult = Arrays.asList(
+ null,
+ null,
+ Collections.singletonList("A-null"),
+ null,
+ Collections.singletonList("B-a"),
+ null,
+ null,
+ null,
+ Collections.singletonList("C-null"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList("D-d")
+ );
+
+ leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
+ runTest(expectedResult);
+ }
+
+ @Test
+ public void testInnerKTableKTable() throws Exception {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KTable-KTable");
+
+ final List<List<String>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Collections.singletonList("A-a"),
+ Collections.singletonList("B-a"),
+ Collections.singletonList("B-b"),
+ Collections.singletonList((String) null),
+ null,
+ null,
+ Collections.singletonList("C-c"),
+ Collections.singletonList((String) null),
+ null,
+ null,
+ null,
+ Collections.singletonList("D-d")
+ );
+
+ leftTable.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
+ runTest(expectedResult);
+ }
+
+ @Test
+ public void testLeftKTableKTable() throws Exception {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KTable-KTable");
+
+ final List<List<String>> expectedResult = Arrays.asList(
+ null,
+ null,
+ Collections.singletonList("A-null"),
+ Collections.singletonList("A-a"),
+ Collections.singletonList("B-a"),
+ Collections.singletonList("B-b"),
+ Collections.singletonList((String) null),
+ null,
+ Collections.singletonList("C-null"),
+ Collections.singletonList("C-c"),
+ Collections.singletonList("C-null"),
+ Collections.singletonList((String) null),
+ null,
+ null,
+ Collections.singletonList("D-d")
+ );
+
+ leftTable.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
+ runTest(expectedResult);
+ }
+
+ @Test
+ public void testOuterKTableKTable() throws Exception {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KTable-KTable");
+
+ final List<List<String>> expectedResult = Arrays.asList(
+ null,
+ null,
+ Collections.singletonList("A-null"),
+ Collections.singletonList("A-a"),
+ Collections.singletonList("B-a"),
+ Collections.singletonList("B-b"),
+ Collections.singletonList("null-b"),
+ Collections.singletonList((String) null),
+ Collections.singletonList("C-null"),
+ Collections.singletonList("C-c"),
+ Collections.singletonList("C-null"),
+ Collections.singletonList((String) null),
+ null,
+ Collections.singletonList("null-d"),
+ Collections.singletonList("D-d")
+ );
+
+ leftTable.outerJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
+ runTest(expectedResult);
+ }
+
+ private final class TopicsGotDeletedCondition implements TestCondition {
+ @Override
+ public boolean conditionMet() {
+ final Set<String> allTopics = new HashSet<>();
+ allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+ return !allTopics.contains(INPUT_TOPIC_1) && !allTopics.contains(INPUT_TOPIC_2) && !allTopics.contains(OUTPUT_TOPIC);
+ }
+ }
+
+ private final class Input<V> {
+ String topic;
+ KeyValue<Long, V> record;
+
+ private final long anyUniqueKey = 0L;
+
+ Input(final String topic, final V value) {
+ this.topic = topic;
+ record = KeyValue.pair(anyUniqueKey, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 85e2cf7..2cd3859 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -71,49 +71,49 @@ public class KTableKTableJoinIntegrationTest {
public static Object[] parameters() {
return new Object[][]{
{JoinType.INNER, JoinType.INNER, Arrays.asList(
- new KeyValue<>("a", null),
- new KeyValue<>("b", null),
- new KeyValue<>("c", null),
- new KeyValue<>("a", null),
- new KeyValue<>("b", null),
- new KeyValue<>("b", "B1-B2-B3"),
- new KeyValue<>("c", null))
- },
+// new KeyValue<>("a", null),
+// new KeyValue<>("b", null),
+// new KeyValue<>("c", null),
+// new KeyValue<>("a", null),
+// new KeyValue<>("b", null),
+ new KeyValue<>("b", "B1-B2-B3")//,
+// new KeyValue<>("c", null)
+ )},
{JoinType.INNER, JoinType.LEFT, Arrays.asList(
- new KeyValue<>("a", null),
- new KeyValue<>("b", null),
- new KeyValue<>("c", null),
- new KeyValue<>("a", null),
- new KeyValue<>("b", null),
- new KeyValue<>("b", "B1-B2-B3"),
- new KeyValue<>("c", null)
+// new KeyValue<>("a", null),
+// new KeyValue<>("b", null),
+// new KeyValue<>("c", null),
+// new KeyValue<>("a", null),
+// new KeyValue<>("b", null),
+ new KeyValue<>("b", "B1-B2-B3")//,
+// new KeyValue<>("c", null)
)},
{JoinType.INNER, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("c", "null-C3"),
- new KeyValue<>("a", "null-A3"),
- new KeyValue<>("b", "null-B3"),
- new KeyValue<>("b", "B1-B2-B3"),
- new KeyValue<>("c", "null-C3")
+// new KeyValue<>("a", "null-A3"),
+// new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("b", "B1-B2-B3")//,
+// new KeyValue<>("c", "null-C3")
)},
{JoinType.LEFT, JoinType.INNER, Arrays.asList(
- new KeyValue<>("a", null),
- new KeyValue<>("b", null),
- new KeyValue<>("c", null),
+// new KeyValue<>("a", null),
+// new KeyValue<>("b", null),
+// new KeyValue<>("c", null),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
- new KeyValue<>("b", "B1-B2-B3"),
- new KeyValue<>("c", null)
+ new KeyValue<>("b", "B1-B2-B3")//,
+// new KeyValue<>("c", null)
)},
{JoinType.LEFT, JoinType.LEFT, Arrays.asList(
- new KeyValue<>("a", null),
- new KeyValue<>("b", null),
- new KeyValue<>("c", null),
+// new KeyValue<>("a", null),
+// new KeyValue<>("b", null),
+// new KeyValue<>("c", null),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
- new KeyValue<>("b", "B1-B2-B3"),
- new KeyValue<>("c", null)
+ new KeyValue<>("b", "B1-B2-B3")//,
+// new KeyValue<>("c", null)
)},
{JoinType.LEFT, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
@@ -121,22 +121,22 @@ public class KTableKTableJoinIntegrationTest {
new KeyValue<>("c", "null-C3"),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
- new KeyValue<>("b", "B1-B2-B3"),
- new KeyValue<>("c", "null-C3")
+ new KeyValue<>("b", "B1-B2-B3")//,
+// new KeyValue<>("c", "null-C3")
)},
{JoinType.OUTER, JoinType.INNER, Arrays.asList(
- new KeyValue<>("a", null),
- new KeyValue<>("b", null),
- new KeyValue<>("c", null),
+// new KeyValue<>("a", null),
+// new KeyValue<>("b", null),
+// new KeyValue<>("c", null),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C2-C3")
)},
{JoinType.OUTER, JoinType.LEFT, Arrays.asList(
- new KeyValue<>("a", null),
- new KeyValue<>("b", null),
- new KeyValue<>("c", null),
+// new KeyValue<>("a", null),
+// new KeyValue<>("b", null),
+// new KeyValue<>("c", null),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),