You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/06 23:34:46 UTC
[2/2] kafka git commit: KAFKA-3016: phase-2. stream join
implementations
KAFKA-3016: phase-2. stream join implementations
guozhangwang
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #737 from ymatsuda/windowed_join2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5aad4999
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5aad4999
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5aad4999
Branch: refs/heads/trunk
Commit: 5aad4999d1a1d35d61365ff57a9b79a6af3e70d2
Parents: a788c65
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Wed Jan 6 14:34:40 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jan 6 14:34:40 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/kstream/JoinWindowSpec.java | 91 ++++
.../apache/kafka/streams/kstream/KStream.java | 97 +++-
.../streams/kstream/SlidingWindowSupplier.java | 266 ----------
.../apache/kafka/streams/kstream/Window.java | 36 --
.../kafka/streams/kstream/WindowSupplier.java | 25 -
.../streams/kstream/internals/KStreamImpl.java | 147 +++++-
.../streams/kstream/internals/KStreamJoin.java | 84 ---
.../kstream/internals/KStreamJoinWindow.java | 58 +++
.../kstream/internals/KStreamKStreamJoin.java | 73 +++
.../kstream/internals/KStreamWindow.java | 68 ---
.../kstream/internals/KStreamWindowedImpl.java | 67 ---
.../state/RocksDBWindowStoreSupplier.java | 2 +-
.../kstream/internals/KStreamImplTest.java | 68 +--
.../kstream/internals/KStreamJoinTest.java | 195 -------
.../internals/KStreamKStreamJoinTest.java | 505 +++++++++++++++++++
.../internals/KStreamKStreamLeftJoinTest.java | 289 +++++++++++
.../kstream/internals/KStreamWindowedTest.java | 91 ----
.../apache/kafka/test/UnlimitedWindowDef.java | 104 ----
18 files changed, 1266 insertions(+), 1000 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java
new file mode 100644
index 0000000..8f0f839
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+/**
+ * This class is used to specify the behaviour of windowed joins.
+ */
+public class JoinWindowSpec {
+
+ public final String name;
+ public final long before;
+ public final long after;
+ public final long retention;
+ public final int segments;
+
+ private JoinWindowSpec(String name, long before, long after, long retention, int segments) {
+ this.name = name;
+ this.after = after;
+ this.before = before;
+ this.retention = retention;
+ this.segments = segments;
+ }
+
+ public static JoinWindowSpec of(String name) {
+ return new JoinWindowSpec(name, 0L, 0L, 0L, 3);
+ }
+
+ /**
+ * Specifies that records of the same key are joinable if their timestamp stamps are within
+ * timeDifference.
+ *
+ * @param timeDifference
+ * @return
+ */
+ public JoinWindowSpec within(long timeDifference) {
+ return new JoinWindowSpec(name, timeDifference, timeDifference, retention, segments);
+ }
+
+ /**
+ * Specifies that records of the same key are joinable if their timestamp stamps are within
+ * timeDifference, and if the timestamp of a record from the secondary stream is
+ * is earlier than or equal to the timestamp of a record from the first stream.
+ *
+ * @param timeDifference
+ * @return
+ */
+ public JoinWindowSpec before(long timeDifference) {
+ return new JoinWindowSpec(name, timeDifference, 0L, retention, segments);
+ }
+
+ /**
+ * Specifies that records of the same key are joinable if their timestamp stamps are within
+ * timeDifference, and if the timestamp of a record from the secondary stream is
+ * is later than or equal to the timestamp of a record from the first stream.
+ *
+ * @param timeDifference
+ * @return
+ */
+ public JoinWindowSpec after(long timeDifference) {
+ return new JoinWindowSpec(name, 0L, timeDifference, retention, segments);
+ }
+
+ /**
+ * Specifies the retention period of windows
+ * @param retentionPeriod
+ * @return
+ */
+ public JoinWindowSpec retentionPeriod(long retentionPeriod) {
+ return new JoinWindowSpec(name, before, after, retentionPeriod, segments);
+ }
+
+ public JoinWindowSpec segments(int segments) {
+ return new JoinWindowSpec(name, before, after, retention, segments);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index d3931ef..29115c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -84,14 +84,6 @@ public interface KStream<K, V> {
<V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor);
/**
- * Creates a new windowed stream using a specified window instance.
- *
- * @param windowDef the instance of Window
- * @return the windowed stream
- */
- KStreamWindowed<K, V> with(WindowSupplier<K, V> windowDef);
-
- /**
* Creates an array of streams from this stream. Each stream in the array corresponds to a predicate in
* supplied predicates in the same order. Predicates are evaluated in order. An element is streamed to
* a corresponding stream for the first predicate is evaluated true.
@@ -173,6 +165,95 @@ public interface KStream<K, V> {
void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
/**
+ * Combines values of this stream with another KStream using Windowed Inner Join.
+ *
+ * @param otherStream the instance of KStream joined with this stream
+ * @param joiner ValueJoiner
+ * @param joinWindowSpec the specification of the join window
+ * @param keySerializer key serializer,
+ * if not specified the default serializer defined in the configs will be used
+ * @param thisValueSerializer value serializer for this stream,
+ * if not specified the default serializer defined in the configs will be used
+ * @param otherValueSerializer value serializer for other stream,
+ * if not specified the default serializer defined in the configs will be used
+ * @param keyDeserializer key deserializer,
+ * if not specified the default serializer defined in the configs will be used
+ * @param thisValueDeserializer value deserializer for this stream,
+ * if not specified the default serializer defined in the configs will be used
+ * @param otherValueDeserializer value deserializer for other stream,
+ * if not specified the default serializer defined in the configs will be used
+ * @param <V1> the value type of the other stream
+ * @param <V2> the value type of the new stream
+ */
+ <V1, V2> KStream<K, V2> join(
+ KStream<K, V1> otherStream,
+ ValueJoiner<V, V1, V2> joiner,
+ JoinWindowSpec joinWindowSpec,
+ Serializer<K> keySerializer,
+ Serializer<V> thisValueSerializer,
+ Serializer<V1> otherValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> thisValueDeserializer,
+ Deserializer<V1> otherValueDeserializer);
+
+ /**
+ * Combines values of this stream with another KStream using Windowed Outer Join.
+ *
+ * @param otherStream the instance of KStream joined with this stream
+ * @param joiner ValueJoiner
+ * @param joinWindowSpec the specification of the join window
+ * @param keySerializer key serializer,
+ * if not specified the default serializer defined in the configs will be used
+ * @param thisValueSerializer value serializer for this stream,
+ * if not specified the default serializer defined in the configs will be used
+ * @param otherValueSerializer value serializer for other stream,
+ * if not specified the default serializer defined in the configs will be used
+ * @param keyDeserializer key deserializer,
+ * if not specified the default serializer defined in the configs will be used
+ * @param thisValueDeserializer value deserializer for this stream,
+ * if not specified the default serializer defined in the configs will be used
+ * @param otherValueDeserializer value deserializer for other stream,
+ * if not specified the default serializer defined in the configs will be used
+ * @param <V1> the value type of the other stream
+ * @param <V2> the value type of the new stream
+ */
+ <V1, V2> KStream<K, V2> outerJoin(
+ KStream<K, V1> otherStream,
+ ValueJoiner<V, V1, V2> joiner,
+ JoinWindowSpec joinWindowSpec,
+ Serializer<K> keySerializer,
+ Serializer<V> thisValueSerializer,
+ Serializer<V1> otherValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> thisValueDeserializer,
+ Deserializer<V1> otherValueDeserializer);
+
+ /**
+ * Combines values of this stream with another KStream using Windowed Left Join.
+ *
+ * @param otherStream the instance of KStream joined with this stream
+ * @param joiner ValueJoiner
+ * @param keySerializer key serializer,
+ * if not specified the default serializer defined in the configs will be used
+ * @param otherValueSerializer value serializer for other stream,
+ * if not specified the default serializer defined in the configs will be used
+ * @param keyDeserializer key deserializer,
+ * if not specified the default serializer defined in the configs will be used
+ * @param otherValueDeserializer value deserializer for other stream,
+ * if not specified the default serializer defined in the configs will be used
+ * @param <V1> the value type of the other stream
+ * @param <V2> the value type of the new stream
+ */
+ <V1, V2> KStream<K, V2> leftJoin(
+ KStream<K, V1> otherStream,
+ ValueJoiner<V, V1, V2> joiner,
+ JoinWindowSpec joinWindowSpec,
+ Serializer<K> keySerializer,
+ Serializer<V1> otherValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V1> otherValueDeserializer);
+
+ /**
* Combines values of this stream with KTable using Left Join.
*
* @param ktable the instance of KTable joined with this stream
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
deleted file mode 100644
index 80e548f..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.internals.FilteredIterator;
-import org.apache.kafka.streams.kstream.internals.WindowSupport;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.Stamped;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-
-public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> {
- private final String name;
- private final long duration;
- private final int maxCount;
- private final Serializer<K> keySerializer;
- private final Serializer<V> valueSerializer;
- private final Deserializer<K> keyDeserializer;
- private final Deserializer<V> valueDeserializer;
-
- public SlidingWindowSupplier(
- String name,
- long duration,
- int maxCount,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer,
- Deserializer<K> keyDeseriaizer,
- Deserializer<V> valueDeserializer) {
- this.name = name;
- this.duration = duration;
- this.maxCount = maxCount;
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
- this.keyDeserializer = keyDeseriaizer;
- this.valueDeserializer = valueDeserializer;
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public Window<K, V> get() {
- return new SlidingWindow();
- }
-
- public class SlidingWindow extends WindowSupport implements Window<K, V> {
- private final Object lock = new Object();
- private ProcessorContext context;
- private int partition;
- private int slotNum; // used as a key for Kafka log compaction
- private LinkedList<K> list = new LinkedList<K>();
- private HashMap<K, ValueList<V>> map = new HashMap<>();
-
- @Override
- public void init(ProcessorContext context) {
- this.context = context;
- this.partition = context.id().partition;
- SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
- context.register(this, true, restoreFunc);
-
- for (ValueList<V> valueList : map.values()) {
- valueList.clearDirtyValues();
- }
- this.slotNum = restoreFunc.slotNum;
- }
-
- @Override
- public Iterator<V> findAfter(K key, final long timestamp) {
- return find(key, timestamp, timestamp + duration);
- }
-
- @Override
- public Iterator<V> findBefore(K key, final long timestamp) {
- return find(key, timestamp - duration, timestamp);
- }
-
- @Override
- public Iterator<V> find(K key, final long timestamp) {
- return find(key, timestamp - duration, timestamp + duration);
- }
-
- /*
- * finds items in the window between startTime and endTime (both inclusive)
- */
- private Iterator<V> find(K key, final long startTime, final long endTime) {
- final ValueList<V> values = map.get(key);
-
- if (values == null) {
- return Collections.emptyIterator();
- } else {
- return new FilteredIterator<V, Value<V>>(values.iterator()) {
- @Override
- protected V filter(Value<V> item) {
- if (startTime <= item.timestamp && item.timestamp <= endTime)
- return item.value;
- else
- return null;
- }
- };
- }
- }
-
- @Override
- public void put(K key, V value, long timestamp) {
- synchronized (lock) {
- slotNum++;
-
- list.offerLast(key);
-
- ValueList<V> values = map.get(key);
- if (values == null) {
- values = new ValueList<>();
- map.put(key, values);
- }
-
- values.add(slotNum, value, timestamp);
- }
- evictExcess();
- evictExpired(timestamp - duration);
- }
-
- private void evictExcess() {
- while (list.size() > maxCount) {
- K oldestKey = list.pollFirst();
-
- ValueList<V> values = map.get(oldestKey);
- values.removeFirst();
-
- if (values.isEmpty()) map.remove(oldestKey);
- }
- }
-
- private void evictExpired(long cutoffTime) {
- while (true) {
- K oldestKey = list.peekFirst();
-
- ValueList<V> values = map.get(oldestKey);
- Stamped<V> oldestValue = values.first();
-
- if (oldestValue.timestamp < cutoffTime) {
- list.pollFirst();
- values.removeFirst();
-
- if (values.isEmpty()) map.remove(oldestKey);
- } else {
- break;
- }
- }
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public void flush() {
- IntegerSerializer intSerializer = new IntegerSerializer();
- ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
-
- RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
-
- for (Map.Entry<K, ValueList<V>> entry : map.entrySet()) {
- ValueList<V> values = entry.getValue();
- if (values.hasDirtyValues()) {
- K key = entry.getKey();
-
- byte[] keyBytes = keySerializer.serialize(name, key);
-
- Iterator<Value<V>> iterator = values.dirtyValueIterator();
- while (iterator.hasNext()) {
- Value<V> dirtyValue = iterator.next();
- byte[] slot = intSerializer.serialize("", dirtyValue.slotNum);
- byte[] valBytes = valueSerializer.serialize(name, dirtyValue.value);
-
- byte[] combined = new byte[8 + 4 + keyBytes.length + 4 + valBytes.length];
-
- int offset = 0;
- offset += putLong(combined, offset, dirtyValue.timestamp);
- offset += puts(combined, offset, keyBytes);
- offset += puts(combined, offset, valBytes);
-
- if (offset != combined.length)
- throw new IllegalStateException("serialized length does not match");
-
- collector.send(new ProducerRecord<>(name, partition, slot, combined), byteArraySerializer, byteArraySerializer);
- }
- values.clearDirtyValues();
- }
- }
- }
-
- @Override
- public void close() {
- // TODO
- }
-
- @Override
- public boolean persistent() {
- // TODO: should not be persistent, right?
- return false;
- }
-
- private class SlidingWindowRegistryCallback implements StateRestoreCallback {
-
- final IntegerDeserializer intDeserializer;
- int slotNum = 0;
-
- SlidingWindowRegistryCallback() {
- intDeserializer = new IntegerDeserializer();
- }
-
- @Override
- public void restore(byte[] slot, byte[] bytes) {
-
- slotNum = intDeserializer.deserialize("", slot);
-
- int offset = 0;
- // timestamp
- long timestamp = getLong(bytes, offset);
- offset += 8;
- // key
- int length = getInt(bytes, offset);
- offset += 4;
- K key = deserialize(bytes, offset, length, name, keyDeserializer);
- offset += length;
- // value
- length = getInt(bytes, offset);
- offset += 4;
- V value = deserialize(bytes, offset, length, name, valueDeserializer);
-
- put(key, value, timestamp);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
deleted file mode 100644
index a1456f6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-
-import java.util.Iterator;
-
-public interface Window<K, V> extends StateStore {
-
- void init(ProcessorContext context);
-
- Iterator<V> find(K key, long timestamp);
-
- Iterator<V> findAfter(K key, long timestamp);
-
- Iterator<V> findBefore(K key, long timestamp);
-
- void put(K key, V value, long timestamp);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
deleted file mode 100644
index 46a2b9e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface WindowSupplier<K, V> {
-
- String name();
-
- Window<K, V> get();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 67a2d27..f47fe0f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.JoinWindowSpec;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValue;
@@ -26,12 +27,12 @@ import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamWindowed;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.WindowSupplier;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier;
+import org.apache.kafka.streams.state.Serdes;
import java.lang.reflect.Array;
import java.util.HashSet;
@@ -67,6 +68,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
+ public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
+
+ public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
+
public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
public static final String MERGE_NAME = "KSTREAM-MERGE-";
@@ -132,15 +137,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
@Override
- public KStreamWindowed<K, V> with(WindowSupplier<K, V> windowSupplier) {
- String name = topology.newName(WINDOWED_NAME);
-
- topology.addProcessor(name, new KStreamWindow<>(windowSupplier), this.name);
-
- return new KStreamWindowedImpl<>(topology, name, sourceNodes, windowSupplier);
- }
-
- @Override
@SuppressWarnings("unchecked")
public KStream<K, V>[] branch(Predicate<K, V>... predicates) {
String branchName = topology.newName(BRANCH_NAME);
@@ -239,6 +235,135 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
topology.connectProcessorAndStateStores(name, stateStoreNames);
}
+ @Override
+ public <V1, R> KStream<K, R> join(
+ KStream<K, V1> other,
+ ValueJoiner<V, V1, R> joiner,
+ JoinWindowSpec joinWindowSpec,
+ Serializer<K> keySerialzier,
+ Serializer<V> thisValueSerialzier,
+ Serializer<V1> otherValueSerialzier,
+ Deserializer<K> keyDeserialier,
+ Deserializer<V> thisValueDeserialzier,
+ Deserializer<V1> otherValueDeserialzier) {
+
+ return join(other, joiner, joinWindowSpec,
+ keySerialzier, thisValueSerialzier, otherValueSerialzier,
+ keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, false);
+ }
+
+ @Override
+ public <V1, R> KStream<K, R> outerJoin(
+ KStream<K, V1> other,
+ ValueJoiner<V, V1, R> joiner,
+ JoinWindowSpec joinWindowSpec,
+ Serializer<K> keySerialzier,
+ Serializer<V> thisValueSerialzier,
+ Serializer<V1> otherValueSerialzier,
+ Deserializer<K> keyDeserialier,
+ Deserializer<V> thisValueDeserialzier,
+ Deserializer<V1> otherValueDeserialzier) {
+
+ return join(other, joiner, joinWindowSpec,
+ keySerialzier, thisValueSerialzier, otherValueSerialzier,
+ keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, true);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <V1, R> KStream<K, R> join(
+ KStream<K, V1> other,
+ ValueJoiner<V, V1, R> joiner,
+ JoinWindowSpec joinWindowSpec,
+ Serializer<K> keySerialzier,
+ Serializer<V> thisValueSerialzier,
+ Serializer<V1> otherValueSerialzier,
+ Deserializer<K> keyDeserialier,
+ Deserializer<V> thisValueDeserialzier,
+ Deserializer<V1> otherValueDeserialzier,
+ boolean outer) {
+
+ Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+ RocksDBWindowStoreSupplier<K, V> thisWindow =
+ new RocksDBWindowStoreSupplier<>(
+ joinWindowSpec.name + "-1",
+ joinWindowSpec.before,
+ joinWindowSpec.after,
+ joinWindowSpec.retention,
+ joinWindowSpec.segments,
+ new Serdes<>("", keySerialzier, keyDeserialier, thisValueSerialzier, thisValueDeserialzier),
+ null);
+
+ RocksDBWindowStoreSupplier<K, V1> otherWindow =
+ new RocksDBWindowStoreSupplier<>(
+ joinWindowSpec.name + "-2",
+ joinWindowSpec.after,
+ joinWindowSpec.before,
+ joinWindowSpec.retention,
+ joinWindowSpec.segments,
+ new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
+ null);
+
+ KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name());
+ KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name());
+
+ KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, outer);
+ KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), reverseJoiner(joiner), outer);
+ KStreamPassThrough<K, R> joinMerge = new KStreamPassThrough<>();
+
+ String thisWindowStreamName = topology.newName(WINDOWED_NAME);
+ String otherWindowStreamName = topology.newName(WINDOWED_NAME);
+ String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
+ String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
+ String joinMergeName = topology.newName(MERGE_NAME);
+
+ topology.addProcessor(thisWindowStreamName, thisWindowedStream, this.name);
+ topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl) other).name);
+ topology.addProcessor(joinThisName, joinThis, thisWindowStreamName);
+ topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
+ topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+ topology.addStateStore(thisWindow, thisWindowStreamName, otherWindowStreamName);
+ topology.addStateStore(otherWindow, thisWindowStreamName, otherWindowStreamName);
+
+ return new KStreamImpl<>(topology, joinMergeName, allSourceNodes);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <V1, R> KStream<K, R> leftJoin(
+ KStream<K, V1> other,
+ ValueJoiner<V, V1, R> joiner,
+ JoinWindowSpec joinWindowSpec,
+ Serializer<K> keySerialzier,
+ Serializer<V1> otherValueSerialzier,
+ Deserializer<K> keyDeserialier,
+ Deserializer<V1> otherValueDeserialzier) {
+
+ Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+ RocksDBWindowStoreSupplier<K, V1> otherWindow =
+ new RocksDBWindowStoreSupplier<>(
+ joinWindowSpec.name,
+ joinWindowSpec.after,
+ joinWindowSpec.before,
+ joinWindowSpec.retention,
+ joinWindowSpec.segments,
+ new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
+ null);
+
+ KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name());
+ KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, true);
+
+ String otherWindowStreamName = topology.newName(WINDOWED_NAME);
+ String joinThisName = topology.newName(LEFTJOIN_NAME);
+
+ topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl) other).name);
+ topology.addProcessor(joinThisName, joinThis, this.name);
+ topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName);
+
+ return new KStreamImpl<>(topology, joinThisName, allSourceNodes);
+ }
+
@SuppressWarnings("unchecked")
@Override
public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
deleted file mode 100644
index eefb8c9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.util.Iterator;
-
-class KStreamJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> {
-
- private static abstract class Finder<K, T> {
- abstract Iterator<T> find(K key, long timestamp);
- }
-
- private final String windowName;
- private final ValueJoiner<V1, V2, V> joiner;
-
- KStreamJoin(String windowName, ValueJoiner<V1, V2, V> joiner) {
- this.windowName = windowName;
- this.joiner = joiner;
- }
-
- @Override
- public Processor<K, V1> get() {
- return new KStreamJoinProcessor(windowName);
- }
-
- private class KStreamJoinProcessor extends AbstractProcessor<K, V1> {
-
- private final String windowName;
- protected Finder<K, V2> finder;
-
- public KStreamJoinProcessor(String windowName) {
- this.windowName = windowName;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void init(ProcessorContext context) {
- super.init(context);
-
- final Window<K, V2> window = (Window<K, V2>) context.getStateStore(windowName);
-
- this.finder = new Finder<K, V2>() {
- @Override
- Iterator<V2> find(K key, long timestamp) {
- return window.find(key, timestamp);
- }
- };
- }
-
- @Override
- public void process(K key, V1 value) {
- long timestamp = context().timestamp();
- Iterator<V2> iter = finder.find(key, timestamp);
- if (iter != null) {
- while (iter.hasNext()) {
- context().forward(key, joiner.apply(value, iter.next()));
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
new file mode 100644
index 0000000..b122aa1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
+
+ private final String windowName;
+
+ KStreamJoinWindow(String windowName) {
+ this.windowName = windowName;
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return new KStreamJoinWindowProcessor();
+ }
+
+ private class KStreamJoinWindowProcessor extends AbstractProcessor<K, V> {
+
+ private WindowStore<K, V> window;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+
+ window = (WindowStore<K, V>) context.getStateStore(windowName);
+ }
+
+ @Override
+ public void process(K key, V value) {
+ context().forward(key, value);
+ window.put(key, value);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
new file mode 100644
index 0000000..8a9bf6c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.util.Iterator;
+
+class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
+
+ private final String otherWindowName;
+ private final ValueJoiner<V1, V2, R> joiner;
+ private final boolean outer;
+
+ KStreamKStreamJoin(String otherWindowName, ValueJoiner<V1, V2, R> joiner, boolean outer) {
+ this.otherWindowName = otherWindowName;
+ this.joiner = joiner;
+ this.outer = outer;
+ }
+
+ @Override
+ public Processor<K, V1> get() {
+ return new KStreamKStreamJoinProcessor();
+ }
+
+ private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+
+ private WindowStore<K, V2> otherWindow;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+
+ otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
+ }
+
+ @Override
+ public void process(K key, V1 value) {
+ boolean needOuterJoin = KStreamKStreamJoin.this.outer;
+
+ Iterator<V2> iter = otherWindow.fetch(key, context().timestamp());
+ while (iter.hasNext()) {
+ needOuterJoin = false;
+ context().forward(key, joiner.apply(value, iter.next()));
+ }
+
+ if (needOuterJoin)
+ context().forward(key, joiner.apply(value, null));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
deleted file mode 100644
index 2923936..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.WindowSupplier;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-public class KStreamWindow<K, V> implements ProcessorSupplier<K, V> {
-
- private final WindowSupplier<K, V> windowSupplier;
-
- KStreamWindow(WindowSupplier<K, V> windowSupplier) {
- this.windowSupplier = windowSupplier;
- }
-
- public WindowSupplier<K, V> window() {
- return windowSupplier;
- }
-
- @Override
- public Processor<K, V> get() {
- return new KStreamWindowProcessor();
- }
-
- private class KStreamWindowProcessor extends AbstractProcessor<K, V> {
-
- private Window<K, V> window;
-
- @Override
- public void init(ProcessorContext context) {
- super.init(context);
- this.window = windowSupplier.get();
- this.window.init(context);
- }
-
- @Override
- public void process(K key, V value) {
- synchronized (this) {
- window.put(key, value, context().timestamp());
- context().forward(key, value);
- }
- }
-
- @Override
- public void close() {
- window.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
deleted file mode 100644
index c71c11b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStreamWindowed;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.WindowSupplier;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K, V> implements KStreamWindowed<K, V> {
-
- private final WindowSupplier<K, V> windowSupplier;
-
- public KStreamWindowedImpl(KStreamBuilder topology, String name, Set<String> sourceNodes, WindowSupplier<K, V> windowSupplier) {
- super(topology, name, sourceNodes);
- this.windowSupplier = windowSupplier;
- }
-
- @Override
- public <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> valueJoiner) {
- String thisWindowName = this.windowSupplier.name();
- String otherWindowName = ((KStreamWindowedImpl<K, V1>) other).windowSupplier.name();
- Set<String> thisSourceNodes = this.sourceNodes;
- Set<String> otherSourceNodes = ((KStreamWindowedImpl<K, V1>) other).sourceNodes;
-
- if (thisSourceNodes == null || otherSourceNodes == null)
- throw new KafkaException("not joinable");
-
- Set<String> allSourceNodes = new HashSet<>(sourceNodes);
- allSourceNodes.addAll(((KStreamWindowedImpl<K, V1>) other).sourceNodes);
-
- KStreamJoin<K, V2, V, V1> joinThis = new KStreamJoin<>(otherWindowName, valueJoiner);
- KStreamJoin<K, V2, V1, V> joinOther = new KStreamJoin<>(thisWindowName, reverseJoiner(valueJoiner));
- KStreamPassThrough<K, V2> joinMerge = new KStreamPassThrough<>();
-
- String joinThisName = topology.newName(JOINTHIS_NAME);
- String joinOtherName = topology.newName(JOINOTHER_NAME);
- String joinMergeName = topology.newName(MERGE_NAME);
-
- topology.addProcessor(joinThisName, joinThis, this.name);
- topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name);
- topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
- topology.copartitionSources(allSourceNodes);
-
- return new KStreamImpl<>(topology, joinMergeName, allSourceNodes);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
index 41c725d..73814ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
@@ -39,7 +39,7 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
private final Serdes serdes;
private final Time time;
- protected RocksDBWindowStoreSupplier(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes, Time time) {
+ public RocksDBWindowStoreSupplier(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes, Time time) {
this.name = name;
this.windowBefore = windowBefore;
this.windowAfter = windowAfter;
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 1e775b8..108bf3c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -22,10 +22,8 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.UnlimitedWindowDef;
import org.junit.Test;
import java.util.Collections;
@@ -72,54 +70,38 @@ public class KStreamImplTest {
});
KStream<String, Integer>[] streams2 = stream2.branch(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return (value % 2) == 0;
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ },
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return true;
+ }
}
- },
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return true;
- }
- }
);
KStream<String, Integer>[] streams3 = stream3.branch(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return (value % 2) == 0;
- }
- },
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return true;
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ },
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return true;
+ }
}
- }
);
- KStream<String, Integer> stream4 = streams2[0].with(new UnlimitedWindowDef<String, Integer>("window"))
- .join(streams3[0].with(new UnlimitedWindowDef<String, Integer>("window")), new ValueJoiner<Integer, Integer, Integer>() {
- @Override
- public Integer apply(Integer value1, Integer value2) {
- return value1 + value2;
- }
- });
-
- KStream<String, Integer> stream5 = streams2[1].with(new UnlimitedWindowDef<String, Integer>("window"))
- .join(streams3[1].with(new UnlimitedWindowDef<String, Integer>("window")), new ValueJoiner<Integer, Integer, Integer>() {
- @Override
- public Integer apply(Integer value1, Integer value2) {
- return value1 + value2;
- }
- });
-
- stream4.to("topic-5");
+ streams2[0].to("topic-5");
- stream5.through("topic-6").process(new MockProcessorSupplier<String, Integer>());
+ streams2[1].through("topic-6").process(new MockProcessorSupplier<String, Integer>());
assertEquals(2 + // sources
2 + // stream1
@@ -127,8 +109,6 @@ public class KStreamImplTest {
1 + // stream3
1 + 2 + // streams2
1 + 2 + // streams3
- 2 + 3 + // stream4
- 2 + 3 + // stream5
1 + // to
2 + // through
1, // process
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
deleted file mode 100644
index 12bed17..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStreamWindowed;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.UnlimitedWindowDef;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamJoinTest {
-
- private String topic1 = "topic1";
- private String topic2 = "topic2";
- private String dummyTopic = "dummyTopic";
-
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private StringDeserializer valDeserializer = new StringDeserializer();
-
- private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
- @Override
- public String apply(String value1, String value2) {
- return value1 + "+" + value2;
- }
- };
-
- private ValueMapper<String, String> valueMapper = new ValueMapper<String, String>() {
- @Override
- public String apply(String value) {
- return "#" + value;
- }
- };
-
- private ValueMapper<String, Iterable<String>> valueMapper2 = new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(String value) {
- return (Iterable<String>) Utils.mkSet(value);
- }
- };
-
- private KeyValueMapper<Integer, String, KeyValue<Integer, String>> keyValueMapper =
- new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
- @Override
- public KeyValue<Integer, String> apply(Integer key, String value) {
- return KeyValue.pair(key, value);
- }
- };
-
- KeyValueMapper<Integer, String, KeyValue<Integer, Iterable<String>>> keyValueMapper2 =
- new KeyValueMapper<Integer, String, KeyValue<Integer, Iterable<String>>>() {
- @Override
- public KeyValue<Integer, Iterable<String>> apply(Integer key, String value) {
- return KeyValue.pair(key, (Iterable<String>) Utils.mkSet(value));
- }
- };
-
-
- @Test
- public void testJoin() {
- KStreamBuilder builder = new KStreamBuilder();
-
- final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
- KStream<Integer, String> stream1;
- KStream<Integer, String> stream2;
- KStream<Integer, String> dummyStream;
- KStreamWindowed<Integer, String> windowed1;
- KStreamWindowed<Integer, String> windowed2;
- MockProcessorSupplier<Integer, String> processor;
- String[] expected;
-
- processor = new MockProcessorSupplier<>();
- stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
- stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
- dummyStream = builder.from(keyDeserializer, valDeserializer, dummyTopic);
- windowed1 = stream1.with(new UnlimitedWindowDef<Integer, String>("window1"));
- windowed2 = stream2.with(new UnlimitedWindowDef<Integer, String>("window2"));
-
- windowed1.join(windowed2, joiner).process(processor);
-
- Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
-
- assertEquals(1, copartitionGroups.size());
- assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
-
- KStreamTestDriver driver = new KStreamTestDriver(builder);
- driver.setTime(0L);
-
- // push two items to the main stream. the other stream's window is empty
-
- for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
-
- assertEquals(0, processor.processed.size());
-
- // push two items to the other stream. the main stream's window has two items
-
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
- }
-
- assertEquals(2, processor.processed.size());
-
- expected = new String[]{"0:X0+Y0", "1:X1+Y1"};
-
- for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], processor.processed.get(i));
- }
-
- processor.processed.clear();
-
- // push all items to the main stream. this should produce two items.
-
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
-
- assertEquals(2, processor.processed.size());
-
- expected = new String[]{"0:X0+Y0", "1:X1+Y1"};
-
- for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], processor.processed.get(i));
- }
-
- processor.processed.clear();
-
- // there will be previous two items + all items in the main stream's window, thus two are duplicates.
-
- // push all items to the other stream. this should produce 6 items
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
- }
-
- assertEquals(6, processor.processed.size());
-
- expected = new String[]{"0:X0+Y0", "0:X0+Y0", "1:X1+Y1", "1:X1+Y1", "2:X2+Y2", "3:X3+Y3"};
-
- for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], processor.processed.get(i));
- }
- }
-
- @Test(expected = KafkaException.class)
- public void testNotJoinable() {
- KStreamBuilder builder = new KStreamBuilder();
-
- KStream<Integer, String> stream1;
- KStream<Integer, String> stream2;
- KStreamWindowed<Integer, String> windowed1;
- KStreamWindowed<Integer, String> windowed2;
- MockProcessorSupplier<Integer, String> processor;
-
- processor = new MockProcessorSupplier<>();
- stream1 = builder.from(keyDeserializer, valDeserializer, topic1).map(keyValueMapper);
- stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
- windowed1 = stream1.with(new UnlimitedWindowDef<Integer, String>("window1"));
- windowed2 = stream2.with(new UnlimitedWindowDef<Integer, String>("window2"));
-
- windowed1.join(windowed2, joiner).process(processor);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
new file mode 100644
index 0000000..5a937af
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.JoinWindowSpec;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamKStreamJoinTest {
+
+ private String topic1 = "topic1";
+ private String topic2 = "topic2";
+
+ private IntegerSerializer keySerializer = new IntegerSerializer();
+ private StringSerializer valSerializer = new StringSerializer();
+ private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+ private StringDeserializer valDeserializer = new StringDeserializer();
+
+ private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
+ @Override
+ public String apply(String value1, String value2) {
+ return value1 + "+" + value2;
+ }
+ };
+
+ @Test
+ public void testJoin() throws Exception {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+ KStream<Integer, String> stream1;
+ KStream<Integer, String> stream2;
+ KStream<Integer, String> joined;
+ MockProcessorSupplier<Integer, String> processor;
+
+ processor = new MockProcessorSupplier<>();
+ stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
+ stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
+ joined = stream1.join(stream2, joiner, JoinWindowSpec.of("test").within(100),
+ keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
+ joined.process(processor);
+
+ Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+ assertEquals(1, copartitionGroups.size());
+ assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+ driver.setTime(0L);
+
+ // push two items to the primary stream. the other window is empty
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:X1, 1:X1 }
+ // w2 = {}
+
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult();
+
+ // push two items to the other stream. this should produce two items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = {}
+ // --> w1 = { 0:X1, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
+
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+
+ // push all four items to the primary stream. this should produce two items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1 }
+
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+
+ // push all items to the other stream. this should produce six items.
+ // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+
+ // push all four items to the primary stream. this should produce six items.
+ // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+
+ // push two items to the other stream. this should produce six item.
+ // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
+ public void testOuterJoin() throws Exception {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+ KStream<Integer, String> stream1;
+ KStream<Integer, String> stream2;
+ KStream<Integer, String> joined;
+ MockProcessorSupplier<Integer, String> processor;
+
+ processor = new MockProcessorSupplier<>();
+ stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
+ stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
+ joined = stream1.outerJoin(stream2, joiner, JoinWindowSpec.of("test").within(100),
+ keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
+ joined.process(processor);
+
+ Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+ assertEquals(1, copartitionGroups.size());
+ assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+ driver.setTime(0L);
+
+ // push two items to the primary stream. the other window is empty.this should produce two items
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:X1, 1:X1 }
+ // w2 = {}
+
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+null", "1:X1+null");
+
+ // push two items to the other stream. this should produce two items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = {}
+ // --> w1 = { 0:X1, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
+
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+
+ // push all four items to the primary stream. this should produce four items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1 }
+
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+
+ // push all items to the other stream. this should produce six items.
+ // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+
+ // push all four items to the primary stream. this should produce six items.
+ // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+
+ // push two items to the other stream. this should produce six item.
+ // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
+ public void testWindowing() throws Exception {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+
+ long time = 0L;
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+ KStream<Integer, String> stream1;
+ KStream<Integer, String> stream2;
+ KStream<Integer, String> joined;
+ MockProcessorSupplier<Integer, String> processor;
+
+ processor = new MockProcessorSupplier<>();
+ stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
+ stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
+ joined = stream1.join(stream2, joiner, JoinWindowSpec.of("test").within(100),
+ keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
+ joined.process(processor);
+
+ Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+ assertEquals(1, copartitionGroups.size());
+ assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+ driver.setTime(time);
+
+ // push two items to the primary stream. the other window is empty. this should produce no items.
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:X1, 1:X1 }
+ // w2 = {}
+
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult();
+
+ // push two items to the other stream. this should produce two items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = {}
+ // --> w1 = { 0:X1, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
+
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+
+ // clear logically
+ time = 1000L;
+
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.setTime(time + i);
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
+ processor.checkAndClearResult();
+
+ // gradually expires items in w1
+ // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
+
+ time = 1000 + 100L;
+ driver.setTime(time);
+
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("2:X2+YY2", "3:X3+YY3");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("3:X3+YY3");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult();
+
+ // go back to the time before expiration
+
+ time = 1000L - 100L - 1L;
+ driver.setTime(time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult();
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+YY0");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+
+ // clear (logically)
+ time = 2000L;
+
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.setTime(time + i);
+ driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult();
+
+ // gradually expires items in w2
+ // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+
+ time = 2000L + 100L;
+ driver.setTime(time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("2:XX2+Y2", "3:XX3+Y3");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("3:XX3+Y3");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult();
+
+ // go back to the time before expiration
+
+ time = 2000L - 100L - 1L;
+ driver.setTime(time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult();
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:XX0+Y0");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
+
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
+
+ processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+}