You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/07/12 19:34:41 UTC
[2/3] flink git commit: [FLINK-8480][DataStream] Add APIs for
Interval Joins.
[FLINK-8480][DataStream] Add APIs for Interval Joins.
This adds the Java and Scala API for performing an IntervalJoin.
In jave this will look like:
Example:
```java
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});
```
This closes #5482.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42ada8ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42ada8ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42ada8ad
Branch: refs/heads/master
Commit: 42ada8ad9ca28f94d0a0355658330198bbc2b577
Parents: f45b7f7
Author: Florian Schmidt <fl...@icloud.com>
Authored: Mon Jul 9 12:02:24 2018 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Jul 12 21:03:26 2018 +0200
----------------------------------------------------------------------
docs/dev/stream/operators/index.md | 15 +
.../streaming/api/datastream/KeyedStream.java | 184 ++++
.../UnsupportedTimeCharacteristicException.java | 35 +
.../api/functions/co/ProcessJoinFunction.java | 87 ++
.../functions/co/TimeBoundedJoinFunction.java | 87 --
.../api/operators/co/IntervalJoinOperator.java | 513 ++++++++++
.../co/TimeBoundedStreamJoinOperator.java | 513 ----------
.../operators/co/IntervalJoinOperatorTest.java | 941 +++++++++++++++++++
.../co/TimeBoundedStreamJoinOperatorTest.java | 941 -------------------
.../flink/streaming/api/scala/KeyedStream.scala | 106 ++-
.../api/scala/IntervalJoinITCase.scala | 130 +++
.../streaming/runtime/IntervalJoinITCase.java | 451 +++++++++
12 files changed, 2461 insertions(+), 1542 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/docs/dev/stream/operators/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/index.md b/docs/dev/stream/operators/index.md
index 1dbdef4..422dbbf 100644
--- a/docs/dev/stream/operators/index.md
+++ b/docs/dev/stream/operators/index.md
@@ -310,6 +310,21 @@ dataStream.join(otherStream)
</td>
</tr>
<tr>
+ <td><strong>Interval Join</strong><br>KeyedStream,KeyedStream → DataStream</td>
+ <td>
+ <p>Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound</p>
+ {% highlight java %}
+// this will join the two streams so that
+// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
+keyedStream.intervalJoin(otherKeyedStream)
+ .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
+ .upperBoundExclusive(true) // optional
+ .lowerBoundExclusive(true) // optional
+ .process(new IntervalJoinFunction() {...});
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
<td><strong>Window CoGroup</strong><br>DataStream,DataStream → DataStream</td>
<td>
<p>Cogroups two data streams on a given key and a common window.</p>
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index a948ae2..32a5c96 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.query.QueryableAppendingStateOperator;
import org.apache.flink.streaming.api.functions.query.QueryableValueStateOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -51,6 +52,7 @@ import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -76,6 +78,8 @@ import java.util.List;
import java.util.Stack;
import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A {@link KeyedStream} represents a {@link DataStream} on which operator state is
* partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
@@ -396,6 +400,186 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
}
// ------------------------------------------------------------------------
+ // Joining
+ // ------------------------------------------------------------------------
+
+ /**
+ * Join elements of this {@link KeyedStream} with elements of another {@link KeyedStream} over
+ * a time interval that can be specified with {@link IntervalJoin#between(Time, Time)}.
+ *
+ * @param otherStream The other keyed stream to join this keyed stream with
+ * @param <T1> Type parameter of elements in the other stream
+ * @return An instance of {@link IntervalJoin} with this keyed stream and the other keyed stream
+ */
+ @PublicEvolving
+ public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
+ return new IntervalJoin<>(this, otherStream);
+ }
+
+ /**
+ * Perform a join over a time interval.
+ * @param <T1> The type parameter of the elements in the first streams
+ * @param <T2> The The type parameter of the elements in the second stream
+ */
+ @PublicEvolving
+ public static class IntervalJoin<T1, T2, KEY> {
+
+ private final KeyedStream<T1, KEY> streamOne;
+ private final KeyedStream<T2, KEY> streamTwo;
+
+ IntervalJoin(
+ KeyedStream<T1, KEY> streamOne,
+ KeyedStream<T2, KEY> streamTwo
+ ) {
+ this.streamOne = checkNotNull(streamOne);
+ this.streamTwo = checkNotNull(streamTwo);
+ }
+
+ /**
+ * Specifies the time boundaries over which the join operation works, so that
+ * <pre>leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound</pre>
+ * By default both the lower and the upper bound are inclusive. This can be configured
+ * with {@link IntervalJoined#lowerBoundExclusive()} and
+ * {@link IntervalJoined#upperBoundExclusive()}
+ *
+ * @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound
+ * @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound
+ */
+ @PublicEvolving
+ public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {
+
+ TimeCharacteristic timeCharacteristic =
+ streamOne.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+ if (timeCharacteristic != TimeCharacteristic.EventTime) {
+ throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
+ }
+
+ checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
+ checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");
+
+ return new IntervalJoined<>(
+ streamOne,
+ streamTwo,
+ lowerBound.toMilliseconds(),
+ upperBound.toMilliseconds(),
+ true,
+ true
+ );
+ }
+ }
+
+ /**
+ * IntervalJoined is a container for two streams that have keys for both sides as well as
+ * the time boundaries over which elements should be joined.
+ *
+ * @param <IN1> Input type of elements from the first stream
+ * @param <IN2> Input type of elements from the second stream
+ * @param <KEY> The type of the key
+ */
+ @PublicEvolving
+ public static class IntervalJoined<IN1, IN2, KEY> {
+
+ private static final String INTERVAL_JOIN_FUNC_NAME = "IntervalJoin";
+
+ private final KeyedStream<IN1, KEY> left;
+ private final KeyedStream<IN2, KEY> right;
+
+ private final long lowerBound;
+ private final long upperBound;
+
+ private final KeySelector<IN1, KEY> keySelector1;
+ private final KeySelector<IN2, KEY> keySelector2;
+
+ private boolean lowerBoundInclusive;
+ private boolean upperBoundInclusive;
+
+ public IntervalJoined(
+ KeyedStream<IN1, KEY> left,
+ KeyedStream<IN2, KEY> right,
+ long lowerBound,
+ long upperBound,
+ boolean lowerBoundInclusive,
+ boolean upperBoundInclusive) {
+
+ this.left = checkNotNull(left);
+ this.right = checkNotNull(right);
+
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+
+ this.lowerBoundInclusive = lowerBoundInclusive;
+ this.upperBoundInclusive = upperBoundInclusive;
+
+ this.keySelector1 = left.getKeySelector();
+ this.keySelector2 = right.getKeySelector();
+ }
+
+ /**
+ * Set the upper bound to be exclusive.
+ */
+ @PublicEvolving
+ public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
+ this.upperBoundInclusive = false;
+ return this;
+ }
+
+ /**
+ * Set the lower bound to be exclusive.
+ */
+ @PublicEvolving
+ public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
+ this.lowerBoundInclusive = false;
+ return this;
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed for each joined pair
+ * of elements.
+ * @param udf The user-defined function
+ * @param <OUT> The output type
+ * @return Returns a DataStream
+ */
+ @PublicEvolving
+ public <OUT> DataStream<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> udf) {
+
+ ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(udf);
+
+ TypeInformation<OUT> resultType = TypeExtractor.getBinaryOperatorReturnType(
+ cleanedUdf,
+ ProcessJoinFunction.class, // ProcessJoinFunction<IN1, IN2, OUT>
+ 0, // 0 1 2
+ 1,
+ 2,
+ new int[]{0}, // lambda input 1 type arg indices
+ new int[]{1}, // lambda input 1 type arg indices
+ TypeExtractor.NO_INDEX, // output arg indices
+ left.getType(), // input 1 type information
+ right.getType(), // input 2 type information
+ INTERVAL_JOIN_FUNC_NAME ,
+ false
+ );
+
+ IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
+ new IntervalJoinOperator<>(
+ lowerBound,
+ upperBound,
+ lowerBoundInclusive,
+ upperBoundInclusive,
+ left.getType().createSerializer(left.getExecutionConfig()),
+ right.getType().createSerializer(right.getExecutionConfig()),
+ cleanedUdf
+ );
+
+ return left
+ .connect(right)
+ .keyBy(keySelector1, keySelector2)
+ .transform(INTERVAL_JOIN_FUNC_NAME , resultType, operator);
+
+ }
+ }
+
+ // ------------------------------------------------------------------------
// Windowing
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java
new file mode 100644
index 0000000..cb2570a
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.FlinkRuntimeException;
+
+/**
+ * An exception that indicates that a time characteristic was used that is not supported in the
+ * current operation.
+ */
+@PublicEvolving
+public class UnsupportedTimeCharacteristicException extends FlinkRuntimeException {
+
+ private static final long serialVersionUID = -8109094930338075819L;
+
+ public UnsupportedTimeCharacteristicException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
new file mode 100644
index 0000000..2c39abc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A function that processes two joined elements and produces a single output one.
+ *
+ * <p>This function will get called for every joined pair of elements the joined two streams.
+ * The timestamp of the joined pair as well as the timestamp of the left element and the right
+ * element can be accessed through the {@link Context}.
+ *
+ * @param <IN1> Type of the first input
+ * @param <IN2> Type of the second input
+ * @param <OUT> Type of the output
+ */
+@PublicEvolving
+public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {
+
+ private static final long serialVersionUID = -2444626938039012398L;
+
+ /**
+ * This method is called for each joined pair of elements. It can output zero or more elements
+ * through the provided {@link Collector} and has access to the timestamps of the joined elements
+ * and the result through the {@link Context}.
+ *
+ * @param left The left element of the joined pair.
+ * @param right The right element of the joined pair.
+ * @param ctx A context that allows querying the timestamps of the left, right and
+ * joined pair. In addition, this context allows to emit elements on a side output.
+ * @param out The collector to emit resulting elements to.
+ * @throws Exception This function may throw exceptions which cause the streaming program to
+ * fail and go in recovery mode.
+ */
+ public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;
+
+ /**
+ * The context that is available during an invocation of
+ * {@link #processElement(Object, Object, Context, Collector)}. It gives access to the timestamps of the
+ * left element in the joined pair, the right one, and that of the joined pair. In addition, this context
+ * allows to emit elements on a side output.
+ */
+ public abstract class Context {
+
+ /**
+ * @return The timestamp of the left element of a joined pair
+ */
+ public abstract long getLeftTimestamp();
+
+ /**
+ * @return The timestamp of the right element of a joined pair
+ */
+ public abstract long getRightTimestamp();
+
+ /**
+ * @return The timestamp of the joined pair.
+ */
+ public abstract long getTimestamp();
+
+ /**
+ * Emits a record to the side output identified by the {@link OutputTag}.
+ * @param outputTag The output tag that identifies the side output to emit to
+ * @param value The record to emit
+ */
+ public abstract <X> void output(OutputTag<X> outputTag, X value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
deleted file mode 100644
index cd745ca..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-
-/**
- * A function that processes two joined elements and produces a single output one.
- *
- * <p>This function will get called for every joined pair of elements the joined two streams.
- * The timestamp of the joined pair as well as the timestamp of the left element and the right
- * element can be accessed through the {@link Context}.
- *
- * @param <IN1> Type of the first input
- * @param <IN2> Type of the second input
- * @param <OUT> Type of the output
- */
-@PublicEvolving
-public abstract class TimeBoundedJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {
-
- private static final long serialVersionUID = -2444626938039012398L;
-
- /**
- * This method is called for each joined pair of elements. It can output zero or more elements
- * through the provided {@link Collector} and has access to the timestamps of the joined elements
- * and the result through the {@link Context}.
- *
- * @param left The left element of the joined pair.
- * @param right The right element of the joined pair.
- * @param ctx A context that allows querying the timestamps of the left, right and
- * joined pair. In addition, this context allows to emit elements on a side output.
- * @param out The collector to emit resulting elements to.
- * @throws Exception This function may throw exceptions which cause the streaming program to
- * fail and go in recovery mode.
- */
- public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;
-
- /**
- * The context that is available during an invocation of
- * {@link #processElement(Object, Object, Context, Collector)}. It gives access to the timestamps of the
- * left element in the joined pair, the right one, and that of the joined pair. In addition, this context
- * allows to emit elements on a side output.
- */
- public abstract class Context {
-
- /**
- * @return The timestamp of the left element of a joined pair
- */
- public abstract long getLeftTimestamp();
-
- /**
- * @return The timestamp of the right element of a joined pair
- */
- public abstract long getRightTimestamp();
-
- /**
- * @return The timestamp of the joined pair.
- */
- public abstract long getTimestamp();
-
- /**
- * Emits a record to the side output identified by the {@link OutputTag}.
- * @param outputTag The output tag that identifies the side output to emit to
- * @param value The record to emit
- */
- public abstract <X> void output(OutputTag<X> outputTag, X value);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
new file mode 100644
index 0000000..0c449e6
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * An {@link TwoInputStreamOperator operator} to execute time-bounded stream inner joins.
+ *
+ * <p>By using a configurable lower and upper bound this operator will emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * <p>As soon as elements are joined they are passed to a user-defined {@link ProcessJoinFunction}.
+ *
+ * <p>The basic idea of this implementation is as follows: Whenever we receive an element at
+ * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to the left buffer.
+ * We then check the right buffer to see whether there are any elements that can be joined. If
+ * there are, they are joined and passed to the aforementioned function. The same happens the
+ * other way around when receiving an element on the right side.
+ *
+ * <p>Whenever a pair of elements is emitted it will be assigned the max timestamp of either of
+ * the elements.
+ *
+ * <p>In order to avoid the element buffers to grow indefinitely a cleanup timer is registered
+ * per element. This timer indicates when an element is not considered for joining anymore and can
+ * be removed from the state.
+ *
+ * @param <K> The type of the key based on which we join elements.
+ * @param <T1> The type of the elements in the left stream.
+ * @param <T2> The type of the elements in the right stream.
+ * @param <OUT> The output type created by the user-defined function.
+ */
+@Internal
+public class IntervalJoinOperator<K, T1, T2, OUT>
+ extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
+ implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
+
+ private static final long serialVersionUID = -5380774605111543454L;
+
+ private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class);
+
+ private static final String LEFT_BUFFER = "LEFT_BUFFER";
+ private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+ private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
+ private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
+ private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
+
+ private final long lowerBound;
+ private final long upperBound;
+
+ private final TypeSerializer<T1> leftTypeSerializer;
+ private final TypeSerializer<T2> rightTypeSerializer;
+
+ private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
+ private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
+
+ private transient TimestampedCollector<OUT> collector;
+ private transient ContextImpl context;
+
+ private transient InternalTimerService<String> internalTimerService;
+
+ /**
+ * Creates a new IntervalJoinOperator.
+ *
+ * @param lowerBound The lower bound for evaluating if elements should be joined
+ * @param upperBound The upper bound for evaluating if elements should be joined
+ * @param lowerBoundInclusive Whether or not to include elements where the timestamp matches
+ * the lower bound
+ * @param upperBoundInclusive Whether or not to include elements where the timestamp matches
+ * the upper bound
+ * @param udf A user-defined {@link ProcessJoinFunction} that gets called
+ * whenever two elements of T1 and T2 are joined
+ */
+ public IntervalJoinOperator(
+ long lowerBound,
+ long upperBound,
+ boolean lowerBoundInclusive,
+ boolean upperBoundInclusive,
+ TypeSerializer<T1> leftTypeSerializer,
+ TypeSerializer<T2> rightTypeSerializer,
+ ProcessJoinFunction<T1, T2, OUT> udf) {
+
+ super(Preconditions.checkNotNull(udf));
+
+ Preconditions.checkArgument(lowerBound <= upperBound,
+ "lowerBound <= upperBound must be fulfilled");
+
+ // Move buffer by +1 / -1 depending on inclusiveness in order not needing
+ // to check for inclusiveness later on
+ this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
+ this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;
+
+ this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
+ this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ collector = new TimestampedCollector<>(output);
+ context = new ContextImpl(userFunction);
+ internalTimerService =
+ getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ super.initializeState(context);
+
+ this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
+ LEFT_BUFFER,
+ LongSerializer.INSTANCE,
+ new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
+ ));
+
+ this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
+ RIGHT_BUFFER,
+ LongSerializer.INSTANCE,
+ new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
+ ));
+ }
+
+ /**
+ * Process a {@link StreamRecord} from the left stream. Whenever an {@link StreamRecord}
+ * arrives at the left stream, it will get added to the left buffer. Possible join candidates
+ * for that element will be looked up from the right buffer and if the pair lies within the
+ * user defined boundaries, it gets passed to the {@link ProcessJoinFunction}.
+ *
+ * @param record An incoming record to be joined
+ * @throws Exception Can throw an Exception during state access
+ */
+ @Override
+ public void processElement1(StreamRecord<T1> record) throws Exception {
+ processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
+ }
+
+ /**
+ * Process a {@link StreamRecord} from the right stream. Whenever a {@link StreamRecord}
+ * arrives at the right stream, it will get added to the right buffer. Possible join candidates
+ * for that element will be looked up from the left buffer and if the pair lies within the user
+ * defined boundaries, it gets passed to the {@link ProcessJoinFunction}.
+ *
+ * @param record An incoming record to be joined
+ * @throws Exception Can throw an exception during state access
+ */
+ @Override
+ public void processElement2(StreamRecord<T2> record) throws Exception {
+ processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <OUR, OTHER> void processElement(
+ StreamRecord<OUR> record,
+ MapState<Long, List<BufferEntry<OUR>>> ourBuffer,
+ MapState<Long, List<BufferEntry<OTHER>>> otherBuffer,
+ long relativeLowerBound,
+ long relativeUpperBound,
+ boolean isLeft) throws Exception {
+
+ final OUR ourValue = record.getValue();
+ final long ourTimestamp = record.getTimestamp();
+
+ if (ourTimestamp == Long.MIN_VALUE) {
+ throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
+ "interval stream joins need to have timestamps meaningful timestamps.");
+ }
+
+ if (isLate(ourTimestamp)) {
+ return;
+ }
+
+ addToBuffer(ourBuffer, ourValue, ourTimestamp);
+
+ for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
+ final long timestamp = bucket.getKey();
+
+ if (timestamp < ourTimestamp + relativeLowerBound ||
+ timestamp > ourTimestamp + relativeUpperBound) {
+ continue;
+ }
+
+ for (BufferEntry<OTHER> entry: bucket.getValue()) {
+ if (isLeft) {
+ collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
+ } else {
+ collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
+ }
+ }
+ }
+
+ long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
+ if (isLeft) {
+ internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
+ } else {
+ internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
+ }
+ }
+
+ private boolean isLate(long timestamp) {
+ long currentWatermark = internalTimerService.currentWatermark();
+ return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
+ }
+
+ private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
+ long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
+ collector.setAbsoluteTimestamp(resultTimestamp);
+ context.leftTimestamp = leftTimestamp;
+ context.rightTimestamp = rightTimestamp;
+ userFunction.processElement(left, right, context, collector);
+ }
+
+ private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception {
+ List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
+ if (elemsInBucket == null) {
+ elemsInBucket = new ArrayList<>();
+ }
+ elemsInBucket.add(new BufferEntry<>(value, false));
+ buffer.put(timestamp, elemsInBucket);
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<K, String> timer) throws Exception {
+
+ long timerTimestamp = timer.getTimestamp();
+ String namespace = timer.getNamespace();
+
+ logger.trace("onEventTime @ {}", timerTimestamp);
+
+ switch (namespace) {
+ case CLEANUP_NAMESPACE_LEFT: {
+ long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
+ logger.trace("Removing from left buffer @ {}", timestamp);
+ leftBuffer.remove(timestamp);
+ break;
+ }
+ case CLEANUP_NAMESPACE_RIGHT: {
+ long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
+ logger.trace("Removing from right buffer @ {}", timestamp);
+ rightBuffer.remove(timestamp);
+ break;
+ }
+ default:
+ throw new RuntimeException("Invalid namespace " + namespace);
+ }
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
+ // do nothing.
+ }
+
+ /**
+ * The context that is available during an invocation of
+ * {@link ProcessJoinFunction#processElement(Object, Object, ProcessJoinFunction.Context, Collector)}.
+ *
+ * <p>It gives access to the timestamps of the left element in the joined pair, the right one, and that of
+ * the joined pair. In addition, this context allows to emit elements on a side output.
+ */
+ private final class ContextImpl extends ProcessJoinFunction<T1, T2, OUT>.Context {
+
+ private long leftTimestamp = Long.MIN_VALUE;
+
+ private long rightTimestamp = Long.MIN_VALUE;
+
+ private ContextImpl(ProcessJoinFunction<T1, T2, OUT> func) {
+ func.super();
+ }
+
+ @Override
+ public long getLeftTimestamp() {
+ return leftTimestamp;
+ }
+
+ @Override
+ public long getRightTimestamp() {
+ return rightTimestamp;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return leftTimestamp;
+ }
+
+ @Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ Preconditions.checkArgument(outputTag != null, "OutputTag must not be null");
+ output.collect(outputTag, new StreamRecord<>(value, getTimestamp()));
+ }
+ }
+
+ /**
+ * A container for elements put in the left/write buffer.
+ * This will contain the element itself along with a flag indicating
+ * if it has been joined or not.
+ */
+ private static class BufferEntry<T> {
+
+ private final T element;
+ private final boolean hasBeenJoined;
+
+ BufferEntry(T element, boolean hasBeenJoined) {
+ this.element = element;
+ this.hasBeenJoined = hasBeenJoined;
+ }
+ }
+
+ /**
+ * A {@link TypeSerializer serializer} for the {@link BufferEntry}.
+ */
+ private static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
+
+ private static final long serialVersionUID = -20197698803836236L;
+
+ private final TypeSerializer<T> elementSerializer;
+
+ private BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
+ this.elementSerializer = Preconditions.checkNotNull(elementSerializer);
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public TypeSerializer<BufferEntry<T>> duplicate() {
+ return new BufferEntrySerializer<>(elementSerializer.duplicate());
+ }
+
+ @Override
+ public BufferEntry<T> createInstance() {
+ return null;
+ }
+
+ @Override
+ public BufferEntry<T> copy(BufferEntry<T> from) {
+ return new BufferEntry<>(from.element, from.hasBeenJoined);
+ }
+
+ @Override
+ public BufferEntry<T> copy(BufferEntry<T> from, BufferEntry<T> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(BufferEntry<T> record, DataOutputView target) throws IOException {
+ target.writeBoolean(record.hasBeenJoined);
+ elementSerializer.serialize(record.element, target);
+ }
+
+ @Override
+ public BufferEntry<T> deserialize(DataInputView source) throws IOException {
+ boolean hasBeenJoined = source.readBoolean();
+ T element = elementSerializer.deserialize(source);
+ return new BufferEntry<>(element, hasBeenJoined);
+ }
+
+ @Override
+ public BufferEntry<T> deserialize(BufferEntry<T> reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.writeBoolean(source.readBoolean());
+ elementSerializer.copy(source, target);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BufferEntrySerializer<?> that = (BufferEntrySerializer<?>) o;
+ return Objects.equals(elementSerializer, that.elementSerializer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(elementSerializer);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj.getClass().equals(BufferEntrySerializer.class);
+ }
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ return new BufferSerializerConfigSnapshot<>(elementSerializer);
+ }
+
+ @Override
+ public CompatibilityResult<BufferEntry<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof BufferSerializerConfigSnapshot) {
+ Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousSerializerAndConfig =
+ ((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
+
+ CompatibilityResult<T> compatResult =
+ CompatibilityUtil.resolveCompatibilityResult(
+ previousSerializerAndConfig.f0,
+ UnloadableDummyTypeSerializer.class,
+ previousSerializerAndConfig.f1,
+ elementSerializer);
+
+ if (!compatResult.isRequiresMigration()) {
+ return CompatibilityResult.compatible();
+ } else if (compatResult.getConvertDeserializer() != null) {
+ return CompatibilityResult.requiresMigration(
+ new BufferEntrySerializer<>(
+ new TypeDeserializerAdapter<>(
+ compatResult.getConvertDeserializer())));
+ }
+ }
+ return CompatibilityResult.requiresMigration();
+ }
+ }
+
+ /**
+ * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer.
+ */
+ public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+
+ private static final int VERSION = 1;
+
+ public BufferSerializerConfigSnapshot() {
+ }
+
+ public BufferSerializerConfigSnapshot(final TypeSerializer<T> userTypeSerializer) {
+ super(userTypeSerializer);
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+ }
+
+ @VisibleForTesting
+ MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() {
+ return leftBuffer;
+ }
+
+ @VisibleForTesting
+ MapState<Long, List<BufferEntry<T2>>> getRightBuffer() {
+ return rightBuffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
deleted file mode 100644
index 26ad26b..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.streaming.api.functions.co.TimeBoundedJoinFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.Triggerable;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.OutputTag;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * An {@link TwoInputStreamOperator operator} to execute time-bounded stream inner joins.
- *
- * <p>By using a configurable lower and upper bound this operator will emit exactly those pairs
- * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the
- * upper bound can be configured to be either inclusive or exclusive.
- *
- * <p>As soon as elements are joined they are passed to a user-defined {@link TimeBoundedJoinFunction}.
- *
- * <p>The basic idea of this implementation is as follows: Whenever we receive an element at
- * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to the left buffer.
- * We then check the right buffer to see whether there are any elements that can be joined. If
- * there are, they are joined and passed to the aforementioned function. The same happens the
- * other way around when receiving an element on the right side.
- *
- * <p>Whenever a pair of elements is emitted it will be assigned the max timestamp of either of
- * the elements.
- *
- * <p>In order to avoid the element buffers to grow indefinitely a cleanup timer is registered
- * per element. This timer indicates when an element is not considered for joining anymore and can
- * be removed from the state.
- *
- * @param <K> The type of the key based on which we join elements.
- * @param <T1> The type of the elements in the left stream.
- * @param <T2> The type of the elements in the right stream.
- * @param <OUT> The output type created by the user-defined function.
- */
-@Internal
-public class TimeBoundedStreamJoinOperator<K, T1, T2, OUT>
- extends AbstractUdfStreamOperator<OUT, TimeBoundedJoinFunction<T1, T2, OUT>>
- implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
-
- private static final long serialVersionUID = -5380774605111543454L;
-
- private static final Logger logger = LoggerFactory.getLogger(TimeBoundedStreamJoinOperator.class);
-
- private static final String LEFT_BUFFER = "LEFT_BUFFER";
- private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
- private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
- private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
- private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
-
- private final long lowerBound;
- private final long upperBound;
-
- private final TypeSerializer<T1> leftTypeSerializer;
- private final TypeSerializer<T2> rightTypeSerializer;
-
- private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
- private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
-
- private transient TimestampedCollector<OUT> collector;
- private transient ContextImpl context;
-
- private transient InternalTimerService<String> internalTimerService;
-
- /**
- * Creates a new TimeBoundedStreamJoinOperator.
- *
- * @param lowerBound The lower bound for evaluating if elements should be joined
- * @param upperBound The upper bound for evaluating if elements should be joined
- * @param lowerBoundInclusive Whether or not to include elements where the timestamp matches
- * the lower bound
- * @param upperBoundInclusive Whether or not to include elements where the timestamp matches
- * the upper bound
- * @param udf A user-defined {@link TimeBoundedJoinFunction} that gets called
- * whenever two elements of T1 and T2 are joined
- */
- public TimeBoundedStreamJoinOperator(
- long lowerBound,
- long upperBound,
- boolean lowerBoundInclusive,
- boolean upperBoundInclusive,
- TypeSerializer<T1> leftTypeSerializer,
- TypeSerializer<T2> rightTypeSerializer,
- TimeBoundedJoinFunction<T1, T2, OUT> udf) {
-
- super(Preconditions.checkNotNull(udf));
-
- Preconditions.checkArgument(lowerBound <= upperBound,
- "lowerBound <= upperBound must be fulfilled");
-
- // Move buffer by +1 / -1 depending on inclusiveness in order not needing
- // to check for inclusiveness later on
- this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
- this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;
-
- this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
- this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- collector = new TimestampedCollector<>(output);
- context = new ContextImpl(userFunction);
- internalTimerService =
- getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
- }
-
- @Override
- public void initializeState(StateInitializationContext context) throws Exception {
- super.initializeState(context);
-
- this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
- LEFT_BUFFER,
- LongSerializer.INSTANCE,
- new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
- ));
-
- this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
- RIGHT_BUFFER,
- LongSerializer.INSTANCE,
- new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
- ));
- }
-
- /**
- * Process a {@link StreamRecord} from the left stream. Whenever an {@link StreamRecord}
- * arrives at the left stream, it will get added to the left buffer. Possible join candidates
- * for that element will be looked up from the right buffer and if the pair lies within the
- * user defined boundaries, it gets passed to the {@link TimeBoundedJoinFunction}.
- *
- * @param record An incoming record to be joined
- * @throws Exception Can throw an Exception during state access
- */
- @Override
- public void processElement1(StreamRecord<T1> record) throws Exception {
- processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
- }
-
- /**
- * Process a {@link StreamRecord} from the right stream. Whenever a {@link StreamRecord}
- * arrives at the right stream, it will get added to the right buffer. Possible join candidates
- * for that element will be looked up from the left buffer and if the pair lies within the user
- * defined boundaries, it gets passed to the {@link TimeBoundedJoinFunction}.
- *
- * @param record An incoming record to be joined
- * @throws Exception Can throw an exception during state access
- */
- @Override
- public void processElement2(StreamRecord<T2> record) throws Exception {
- processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
- }
-
- @SuppressWarnings("unchecked")
- private <OUR, OTHER> void processElement(
- StreamRecord<OUR> record,
- MapState<Long, List<BufferEntry<OUR>>> ourBuffer,
- MapState<Long, List<BufferEntry<OTHER>>> otherBuffer,
- long relativeLowerBound,
- long relativeUpperBound,
- boolean isLeft) throws Exception {
-
- final OUR ourValue = record.getValue();
- final long ourTimestamp = record.getTimestamp();
-
- if (ourTimestamp == Long.MIN_VALUE) {
- throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
- "interval stream joins need to have timestamps meaningful timestamps.");
- }
-
- if (isLate(ourTimestamp)) {
- return;
- }
-
- addToBuffer(ourBuffer, ourValue, ourTimestamp);
-
- for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
- final long timestamp = bucket.getKey();
-
- if (timestamp < ourTimestamp + relativeLowerBound ||
- timestamp > ourTimestamp + relativeUpperBound) {
- continue;
- }
-
- for (BufferEntry<OTHER> entry: bucket.getValue()) {
- if (isLeft) {
- collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
- } else {
- collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
- }
- }
- }
-
- long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
- if (isLeft) {
- internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
- } else {
- internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
- }
- }
-
- private boolean isLate(long timestamp) {
- long currentWatermark = internalTimerService.currentWatermark();
- return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
- }
-
- private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
- long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
- collector.setAbsoluteTimestamp(resultTimestamp);
- context.leftTimestamp = leftTimestamp;
- context.rightTimestamp = rightTimestamp;
- userFunction.processElement(left, right, context, collector);
- }
-
- private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception {
- List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
- if (elemsInBucket == null) {
- elemsInBucket = new ArrayList<>();
- }
- elemsInBucket.add(new BufferEntry<>(value, false));
- buffer.put(timestamp, elemsInBucket);
- }
-
- @Override
- public void onEventTime(InternalTimer<K, String> timer) throws Exception {
-
- long timerTimestamp = timer.getTimestamp();
- String namespace = timer.getNamespace();
-
- logger.trace("onEventTime @ {}", timerTimestamp);
-
- switch (namespace) {
- case CLEANUP_NAMESPACE_LEFT: {
- long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
- logger.trace("Removing from left buffer @ {}", timestamp);
- leftBuffer.remove(timestamp);
- break;
- }
- case CLEANUP_NAMESPACE_RIGHT: {
- long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
- logger.trace("Removing from right buffer @ {}", timestamp);
- rightBuffer.remove(timestamp);
- break;
- }
- default:
- throw new RuntimeException("Invalid namespace " + namespace);
- }
- }
-
- @Override
- public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
- // do nothing.
- }
-
- /**
- * The context that is available during an invocation of
- * {@link TimeBoundedJoinFunction#processElement(Object, Object, TimeBoundedJoinFunction.Context, Collector)}.
- *
- * <p>It gives access to the timestamps of the left element in the joined pair, the right one, and that of
- * the joined pair. In addition, this context allows to emit elements on a side output.
- */
- private final class ContextImpl extends TimeBoundedJoinFunction<T1, T2, OUT>.Context {
-
- private long leftTimestamp = Long.MIN_VALUE;
-
- private long rightTimestamp = Long.MIN_VALUE;
-
- private ContextImpl(TimeBoundedJoinFunction<T1, T2, OUT> func) {
- func.super();
- }
-
- @Override
- public long getLeftTimestamp() {
- return leftTimestamp;
- }
-
- @Override
- public long getRightTimestamp() {
- return rightTimestamp;
- }
-
- @Override
- public long getTimestamp() {
- return leftTimestamp;
- }
-
- @Override
- public <X> void output(OutputTag<X> outputTag, X value) {
- Preconditions.checkArgument(outputTag != null, "OutputTag must not be null");
- output.collect(outputTag, new StreamRecord<>(value, getTimestamp()));
- }
- }
-
- /**
- * A container for elements put in the left/write buffer.
- * This will contain the element itself along with a flag indicating
- * if it has been joined or not.
- */
- private static class BufferEntry<T> {
-
- private final T element;
- private final boolean hasBeenJoined;
-
- BufferEntry(T element, boolean hasBeenJoined) {
- this.element = element;
- this.hasBeenJoined = hasBeenJoined;
- }
- }
-
- /**
- * A {@link TypeSerializer serializer} for the {@link BufferEntry}.
- */
- private static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
-
- private static final long serialVersionUID = -20197698803836236L;
-
- private final TypeSerializer<T> elementSerializer;
-
- private BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
- this.elementSerializer = Preconditions.checkNotNull(elementSerializer);
- }
-
- @Override
- public boolean isImmutableType() {
- return true;
- }
-
- @Override
- public TypeSerializer<BufferEntry<T>> duplicate() {
- return new BufferEntrySerializer<>(elementSerializer.duplicate());
- }
-
- @Override
- public BufferEntry<T> createInstance() {
- return null;
- }
-
- @Override
- public BufferEntry<T> copy(BufferEntry<T> from) {
- return new BufferEntry<>(from.element, from.hasBeenJoined);
- }
-
- @Override
- public BufferEntry<T> copy(BufferEntry<T> from, BufferEntry<T> reuse) {
- return copy(from);
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(BufferEntry<T> record, DataOutputView target) throws IOException {
- target.writeBoolean(record.hasBeenJoined);
- elementSerializer.serialize(record.element, target);
- }
-
- @Override
- public BufferEntry<T> deserialize(DataInputView source) throws IOException {
- boolean hasBeenJoined = source.readBoolean();
- T element = elementSerializer.deserialize(source);
- return new BufferEntry<>(element, hasBeenJoined);
- }
-
- @Override
- public BufferEntry<T> deserialize(BufferEntry<T> reuse, DataInputView source) throws IOException {
- return deserialize(source);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- target.writeBoolean(source.readBoolean());
- elementSerializer.copy(source, target);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- BufferEntrySerializer<?> that = (BufferEntrySerializer<?>) o;
- return Objects.equals(elementSerializer, that.elementSerializer);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(elementSerializer);
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj.getClass().equals(BufferEntrySerializer.class);
- }
-
- @Override
- public TypeSerializerConfigSnapshot snapshotConfiguration() {
- return new BufferSerializerConfigSnapshot<>(elementSerializer);
- }
-
- @Override
- public CompatibilityResult<BufferEntry<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- if (configSnapshot instanceof BufferSerializerConfigSnapshot) {
- Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousSerializerAndConfig =
- ((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
-
- CompatibilityResult<T> compatResult =
- CompatibilityUtil.resolveCompatibilityResult(
- previousSerializerAndConfig.f0,
- UnloadableDummyTypeSerializer.class,
- previousSerializerAndConfig.f1,
- elementSerializer);
-
- if (!compatResult.isRequiresMigration()) {
- return CompatibilityResult.compatible();
- } else if (compatResult.getConvertDeserializer() != null) {
- return CompatibilityResult.requiresMigration(
- new BufferEntrySerializer<>(
- new TypeDeserializerAdapter<>(
- compatResult.getConvertDeserializer())));
- }
- }
- return CompatibilityResult.requiresMigration();
- }
- }
-
- /**
- * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer.
- */
- public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
-
- private static final int VERSION = 1;
-
- public BufferSerializerConfigSnapshot() {
- }
-
- public BufferSerializerConfigSnapshot(final TypeSerializer<T> userTypeSerializer) {
- super(userTypeSerializer);
- }
-
- @Override
- public int getVersion() {
- return VERSION;
- }
- }
-
- @VisibleForTesting
- MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() {
- return leftBuffer;
- }
-
- @VisibleForTesting
- MapState<Long, List<BufferEntry<T2>>> getRightBuffer() {
- return rightBuffer;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
new file mode 100644
index 0000000..ee3f4d8
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
@@ -0,0 +1,941 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+
+/**
+ * Tests for {@link IntervalJoinOperator}.
+ * Those tests cover correctness and cleaning of state
+ */
+@RunWith(Parameterized.class)
+public class IntervalJoinOperatorTest {
+
+ private final boolean lhsFasterThanRhs;
+
+ @Parameters(name = "lhs faster than rhs: {0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {true}, {false}
+ });
+ }
+
+ public IntervalJoinOperatorTest(boolean lhsFasterThanRhs) {
+ this.lhsFasterThanRhs = lhsFasterThanRhs;
+ }
+
+ @Test
+ public void testImplementationMirrorsCorrectly() throws Exception {
+
+ long lowerBound = 1;
+ long upperBound = 3;
+
+ boolean lowerBoundInclusive = true;
+ boolean upperBoundInclusive = false;
+
+ setupHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 2),
+ streamRecordOf(1, 3),
+ streamRecordOf(2, 3),
+ streamRecordOf(2, 4),
+ streamRecordOf(3, 4))
+ .noLateRecords()
+ .close();
+
+ setupHarness(-1 * upperBound, upperBoundInclusive, -1 * lowerBound, lowerBoundInclusive)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(2, 1),
+ streamRecordOf(3, 1),
+ streamRecordOf(3, 2),
+ streamRecordOf(4, 2),
+ streamRecordOf(4, 3))
+ .noLateRecords()
+ .close();
+ }
+
+ @Test // lhs - 2 <= rhs <= rhs + 2
+ public void testNegativeInclusiveAndNegativeInclusive() throws Exception {
+
+ setupHarness(-2, true, -1, true)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(2, 1),
+ streamRecordOf(3, 1),
+ streamRecordOf(3, 2),
+ streamRecordOf(4, 2),
+ streamRecordOf(4, 3)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test // lhs - 1 <= rhs <= rhs + 1
+ public void testNegativeInclusiveAndPositiveInclusive() throws Exception {
+
+ setupHarness(-1, true, 1, true)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 1),
+ streamRecordOf(1, 2),
+ streamRecordOf(2, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(2, 3),
+ streamRecordOf(3, 2),
+ streamRecordOf(3, 3),
+ streamRecordOf(3, 4),
+ streamRecordOf(4, 3),
+ streamRecordOf(4, 4)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test // lhs + 1 <= rhs <= lhs + 2
+ public void testPositiveInclusiveAndPositiveInclusive() throws Exception {
+
+ setupHarness(1, true, 2, true)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 2),
+ streamRecordOf(1, 3),
+ streamRecordOf(2, 3),
+ streamRecordOf(2, 4),
+ streamRecordOf(3, 4)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test
+ public void testNegativeExclusiveAndNegativeExlusive() throws Exception {
+
+ setupHarness(-3, false, -1, false)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(3, 1),
+ streamRecordOf(4, 2)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test
+ public void testNegativeExclusiveAndPositiveExlusive() throws Exception {
+
+ setupHarness(-1, false, 1, false)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(3, 3),
+ streamRecordOf(4, 4)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test
+ public void testPositiveExclusiveAndPositiveExlusive() throws Exception {
+
+ setupHarness(1, false, 3, false)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 3),
+ streamRecordOf(2, 4)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test
+ public void testStateCleanupNegativeInclusiveNegativeInclusive() throws Exception {
+
+ setupHarness(-1, true, 0, true)
+ .processElement1(1)
+ .processElement1(2)
+ .processElement1(3)
+ .processElement1(4)
+ .processElement1(5)
+
+ .processElement2(1)
+ .processElement2(2)
+ .processElement2(3)
+ .processElement2(4)
+ .processElement2(5) // fill both buffers with values
+
+ .processWatermark1(1)
+ .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+ .assertLeftBufferContainsOnly(2, 3, 4, 5)
+ .assertRightBufferContainsOnly(1, 2, 3, 4, 5)
+
+ .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+ .processWatermark2(4)
+
+ .assertLeftBufferContainsOnly(5)
+ .assertRightBufferContainsOnly(4, 5)
+
+ .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+ .processWatermark2(6)
+
+ .assertLeftBufferEmpty()
+ .assertRightBufferEmpty()
+
+ .close();
+ }
+
+ @Test
+ public void testStateCleanupNegativePositiveNegativeExlusive() throws Exception {
+ setupHarness(-2, false, 1, false)
+ .processElement1(1)
+ .processElement1(2)
+ .processElement1(3)
+ .processElement1(4)
+ .processElement1(5)
+
+ .processElement2(1)
+ .processElement2(2)
+ .processElement2(3)
+ .processElement2(4)
+ .processElement2(5) // fill both buffers with values
+
+ .processWatermark1(1)
+ .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+ .assertLeftBufferContainsOnly(2, 3, 4, 5)
+ .assertRightBufferContainsOnly(1, 2, 3, 4, 5)
+
+ .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+ .processWatermark2(4)
+
+ .assertLeftBufferContainsOnly(5)
+ .assertRightBufferContainsOnly(4, 5)
+
+ .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+ .processWatermark2(6)
+
+ .assertLeftBufferEmpty()
+ .assertRightBufferEmpty()
+
+ .close();
+ }
+
+ @Test
+ public void testStateCleanupPositiveInclusivePositiveInclusive() throws Exception {
+ setupHarness(0, true, 1, true)
+ .processElement1(1)
+ .processElement1(2)
+ .processElement1(3)
+ .processElement1(4)
+ .processElement1(5)
+
+ .processElement2(1)
+ .processElement2(2)
+ .processElement2(3)
+ .processElement2(4)
+ .processElement2(5) // fill both buffers with values
+
+ .processWatermark1(1)
+ .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+ .assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
+ .assertRightBufferContainsOnly(2, 3, 4, 5)
+
+ .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+ .processWatermark2(4)
+
+ .assertLeftBufferContainsOnly(4, 5)
+ .assertRightBufferContainsOnly(5)
+
+ .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+ .processWatermark2(6)
+
+ .assertLeftBufferEmpty()
+ .assertRightBufferEmpty()
+
+ .close();
+ }
+
+ @Test
+ public void testStateCleanupPositiveExlusivePositiveExclusive() throws Exception {
+ setupHarness(-1, false, 2, false)
+ .processElement1(1)
+ .processElement1(2)
+ .processElement1(3)
+ .processElement1(4)
+ .processElement1(5)
+
+ .processElement2(1)
+ .processElement2(2)
+ .processElement2(3)
+ .processElement2(4)
+ .processElement2(5) // fill both buffers with values
+
+ .processWatermark1(1)
+ .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+ .assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
+ .assertRightBufferContainsOnly(2, 3, 4, 5)
+
+ .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+ .processWatermark2(4)
+
+ .assertLeftBufferContainsOnly(4, 5)
+ .assertRightBufferContainsOnly(5)
+
+ .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+ .processWatermark2(6)
+
+ .assertLeftBufferEmpty()
+ .assertRightBufferEmpty()
+
+ .close();
+ }
+
+ @Test
+ public void testRestoreFromSnapshot() throws Exception {
+
+ // config
+ int lowerBound = -1;
+ boolean lowerBoundInclusive = true;
+ int upperBound = 1;
+ boolean upperBoundInclusive = true;
+
+ // create first test harness
+ OperatorSubtaskState handles;
+ List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput;
+
+ try (TestHarness testHarness = createTestHarness(
+ lowerBound,
+ lowerBoundInclusive,
+ upperBound,
+ upperBoundInclusive
+ )) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ // process elements with first test harness
+ testHarness.processElement1(createStreamRecord(1, "lhs"));
+ testHarness.processWatermark1(new Watermark(1));
+
+ testHarness.processElement2(createStreamRecord(1, "rhs"));
+ testHarness.processWatermark2(new Watermark(1));
+
+ testHarness.processElement1(createStreamRecord(2, "lhs"));
+ testHarness.processWatermark1(new Watermark(2));
+
+ testHarness.processElement2(createStreamRecord(2, "rhs"));
+ testHarness.processWatermark2(new Watermark(2));
+
+ testHarness.processElement1(createStreamRecord(3, "lhs"));
+ testHarness.processWatermark1(new Watermark(3));
+
+ testHarness.processElement2(createStreamRecord(3, "rhs"));
+ testHarness.processWatermark2(new Watermark(3));
+
+ // snapshot and validate output
+ handles = testHarness.snapshot(0, 0);
+ testHarness.close();
+
+ expectedOutput = Lists.newArrayList(
+ streamRecordOf(1, 1),
+ streamRecordOf(1, 2),
+ streamRecordOf(2, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(2, 3),
+ streamRecordOf(3, 2),
+ streamRecordOf(3, 3)
+ );
+
+ TestHarnessUtil.assertNoLateRecords(testHarness.getOutput());
+ assertOutput(expectedOutput, testHarness.getOutput());
+ }
+
+ try (TestHarness newTestHarness = createTestHarness(
+ lowerBound,
+ lowerBoundInclusive,
+ upperBound,
+ upperBoundInclusive
+ )) {
+ // create new test harness from snapshpt
+
+ newTestHarness.setup();
+ newTestHarness.initializeState(handles);
+ newTestHarness.open();
+
+ // process elements
+ newTestHarness.processElement1(createStreamRecord(4, "lhs"));
+ newTestHarness.processWatermark1(new Watermark(4));
+
+ newTestHarness.processElement2(createStreamRecord(4, "rhs"));
+ newTestHarness.processWatermark2(new Watermark(4));
+
+ // assert expected output
+ expectedOutput = Lists.newArrayList(
+ streamRecordOf(3, 4),
+ streamRecordOf(4, 3),
+ streamRecordOf(4, 4)
+ );
+
+ TestHarnessUtil.assertNoLateRecords(newTestHarness.getOutput());
+ assertOutput(expectedOutput, newTestHarness.getOutput());
+ }
+ }
+
+ @Test
+ public void testContextCorrectLeftTimestamp() throws Exception {
+
+ IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+ new IntervalJoinOperator<>(
+ -1,
+ 1,
+ true,
+ true,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+ @Override
+ public void processElement(
+ TestElem left,
+ TestElem right,
+ Context ctx,
+ Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+ Assert.assertEquals(left.ts, ctx.getLeftTimestamp());
+ }
+ }
+ );
+
+ try (TestHarness testHarness = new TestHarness(
+ op,
+ (elem) -> elem.key,
+ (elem) -> elem.key,
+ TypeInformation.of(String.class)
+ )) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ processElementsAndWatermarks(testHarness);
+ }
+ }
+
+ @Test
+ public void testReturnsCorrectTimestamp() throws Exception {
+ IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+ new IntervalJoinOperator<>(
+ -1,
+ 1,
+ true,
+ true,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+ @Override
+ public void processElement(
+ TestElem left,
+ TestElem right,
+ Context ctx,
+ Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+ Assert.assertEquals(left.ts, ctx.getTimestamp());
+ }
+ }
+ );
+
+ try (TestHarness testHarness = new TestHarness(
+ op,
+ (elem) -> elem.key,
+ (elem) -> elem.key,
+ TypeInformation.of(String.class)
+ )) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ processElementsAndWatermarks(testHarness);
+ }
+ }
+
+ @Test
+ public void testContextCorrectRightTimestamp() throws Exception {
+
+ IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+ new IntervalJoinOperator<>(
+ -1,
+ 1,
+ true,
+ true,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+ @Override
+ public void processElement(
+ TestElem left,
+ TestElem right,
+ Context ctx,
+ Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+ Assert.assertEquals(right.ts, ctx.getRightTimestamp());
+ }
+ }
+ );
+
+ try (TestHarness testHarness = new TestHarness(
+ op,
+ (elem) -> elem.key,
+ (elem) -> elem.key,
+ TypeInformation.of(String.class)
+ )) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ processElementsAndWatermarks(testHarness);
+ }
+ }
+
+ @Test(expected = FlinkException.class)
+ public void testFailsWithNoTimestampsLeft() throws Exception {
+ TestHarness newTestHarness = createTestHarness(0L, true, 0L, true);
+
+ newTestHarness.setup();
+ newTestHarness.open();
+
+ // note that the StreamRecord has no timestamp in constructor
+ newTestHarness.processElement1(new StreamRecord<>(new TestElem(0, "lhs")));
+ }
+
+ @Test(expected = FlinkException.class)
+ public void testFailsWithNoTimestampsRight() throws Exception {
+ try (TestHarness newTestHarness = createTestHarness(0L, true, 0L, true)) {
+
+ newTestHarness.setup();
+ newTestHarness.open();
+
+ // note that the StreamRecord has no timestamp in constructor
+ newTestHarness.processElement2(new StreamRecord<>(new TestElem(0, "rhs")));
+ }
+ }
+
+ @Test
+ public void testDiscardsLateData() throws Exception {
+ setupHarness(-1, true, 1, true)
+ .processElement1(1)
+ .processElement2(1)
+ .processElement1(2)
+ .processElement2(2)
+ .processElement1(3)
+ .processElement2(3)
+ .processWatermark1(3)
+ .processWatermark2(3)
+ .processElement1(1) // this element is late and should not be joined again
+ .processElement1(4)
+ .processElement2(4)
+ .processElement1(5)
+ .processElement2(5)
+ .andExpect(
+ streamRecordOf(1, 1),
+ streamRecordOf(1, 2),
+
+ streamRecordOf(2, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(2, 3),
+
+ streamRecordOf(3, 2),
+ streamRecordOf(3, 3),
+ streamRecordOf(3, 4),
+
+ streamRecordOf(4, 3),
+ streamRecordOf(4, 4),
+ streamRecordOf(4, 5),
+
+ streamRecordOf(5, 4),
+ streamRecordOf(5, 5)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ private void assertEmpty(MapState<Long, ?> state) throws Exception {
+ boolean stateIsEmpty = Iterables.size(state.keys()) == 0;
+ Assert.assertTrue("state not empty", stateIsEmpty);
+ }
+
+ private void assertContainsOnly(MapState<Long, ?> state, long... ts) throws Exception {
+ for (long t : ts) {
+ String message = "Keys not found in state. \n Expected: " + Arrays.toString(ts) + "\n Actual: " + state.keys();
+ Assert.assertTrue(message, state.contains(t));
+ }
+
+ String message = "Too many objects in state. \n Expected: " + Arrays.toString(ts) + "\n Actual: " + state.keys();
+ Assert.assertEquals(message, ts.length, Iterables.size(state.keys()));
+ }
+
+ private void assertOutput(
+ Iterable<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput,
+ Queue<Object> actualOutput) {
+
+ int actualSize = actualOutput.stream()
+ .filter(elem -> elem instanceof StreamRecord)
+ .collect(Collectors.toList())
+ .size();
+
+ int expectedSize = Iterables.size(expectedOutput);
+
+ Assert.assertEquals(
+ "Expected and actual size of stream records different",
+ expectedSize,
+ actualSize
+ );
+
+ for (StreamRecord<Tuple2<TestElem, TestElem>> record : expectedOutput) {
+ Assert.assertTrue(actualOutput.contains(record));
+ }
+ }
+
+ private TestHarness createTestHarness(long lowerBound,
+ boolean lowerBoundInclusive,
+ long upperBound,
+ boolean upperBoundInclusive) throws Exception {
+
+ IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
+ new IntervalJoinOperator<>(
+ lowerBound,
+ upperBound,
+ lowerBoundInclusive,
+ upperBoundInclusive,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new PassthroughFunction()
+ );
+
+ return new TestHarness(
+ operator,
+ (elem) -> elem.key, // key
+ (elem) -> elem.key, // key
+ TypeInformation.of(String.class)
+ );
+ }
+
+ private JoinTestBuilder setupHarness(long lowerBound,
+ boolean lowerBoundInclusive,
+ long upperBound,
+ boolean upperBoundInclusive) throws Exception {
+
+ IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
+ new IntervalJoinOperator<>(
+ lowerBound,
+ upperBound,
+ lowerBoundInclusive,
+ upperBoundInclusive,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new PassthroughFunction()
+ );
+
+ TestHarness t = new TestHarness(
+ operator,
+ (elem) -> elem.key, // key
+ (elem) -> elem.key, // key
+ TypeInformation.of(String.class)
+ );
+
+ return new JoinTestBuilder(t, operator);
+ }
+
+ private class JoinTestBuilder {
+
+ private IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator;
+ private TestHarness testHarness;
+
+ public JoinTestBuilder(
+ TestHarness t,
+ IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator
+ ) throws Exception {
+
+ this.testHarness = t;
+ this.operator = operator;
+ t.open();
+ t.setup();
+ }
+
+ public TestHarness get() {
+ return testHarness;
+ }
+
+ public JoinTestBuilder processElement1(int ts) throws Exception {
+ testHarness.processElement1(createStreamRecord(ts, "lhs"));
+ return this;
+ }
+
+ public JoinTestBuilder processElement2(int ts) throws Exception {
+ testHarness.processElement2(createStreamRecord(ts, "rhs"));
+ return this;
+ }
+
+ public JoinTestBuilder processWatermark1(int ts) throws Exception {
+ testHarness.processWatermark1(new Watermark(ts));
+ return this;
+ }
+
+ public JoinTestBuilder processWatermark2(int ts) throws Exception {
+ testHarness.processWatermark2(new Watermark(ts));
+ return this;
+ }
+
+ public JoinTestBuilder processElementsAndWatermarks(int from, int to) throws Exception {
+ if (lhsFasterThanRhs) {
+ // add to lhs
+ for (int i = from; i <= to; i++) {
+ testHarness.processElement1(createStreamRecord(i, "lhs"));
+ testHarness.processWatermark1(new Watermark(i));
+ }
+
+ // add to rhs
+ for (int i = from; i <= to; i++) {
+ testHarness.processElement2(createStreamRecord(i, "rhs"));
+ testHarness.processWatermark2(new Watermark(i));
+ }
+ } else {
+ // add to rhs
+ for (int i = from; i <= to; i++) {
+ testHarness.processElement2(createStreamRecord(i, "rhs"));
+ testHarness.processWatermark2(new Watermark(i));
+ }
+
+ // add to lhs
+ for (int i = from; i <= to; i++) {
+ testHarness.processElement1(createStreamRecord(i, "lhs"));
+ testHarness.processWatermark1(new Watermark(i));
+ }
+ }
+
+ return this;
+ }
+
+ @SafeVarargs
+ public final JoinTestBuilder andExpect(StreamRecord<Tuple2<TestElem, TestElem>>... elems) {
+ assertOutput(Lists.newArrayList(elems), testHarness.getOutput());
+ return this;
+ }
+
+ public JoinTestBuilder assertLeftBufferContainsOnly(long... timestamps) {
+
+ try {
+ assertContainsOnly(operator.getLeftBuffer(), timestamps);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ public JoinTestBuilder assertRightBufferContainsOnly(long... timestamps) {
+
+ try {
+ assertContainsOnly(operator.getRightBuffer(), timestamps);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ public JoinTestBuilder assertLeftBufferEmpty() {
+ try {
+ assertEmpty(operator.getLeftBuffer());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ public JoinTestBuilder assertRightBufferEmpty() {
+ try {
+ assertEmpty(operator.getRightBuffer());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ public JoinTestBuilder noLateRecords() {
+ TestHarnessUtil.assertNoLateRecords(this.testHarness.getOutput());
+ return this;
+ }
+
+ public void close() throws Exception {
+ testHarness.close();
+ }
+ }
+
+ private static class PassthroughFunction extends ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>> {
+
+ @Override
+ public void processElement(
+ TestElem left,
+ TestElem right,
+ Context ctx,
+ Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+ out.collect(Tuple2.of(left, right));
+ }
+ }
+
+ private StreamRecord<Tuple2<TestElem, TestElem>> streamRecordOf(
+ long lhsTs,
+ long rhsTs
+ ) {
+ TestElem lhs = new TestElem(lhsTs, "lhs");
+ TestElem rhs = new TestElem(rhsTs, "rhs");
+
+ long ts = Math.max(lhsTs, rhsTs);
+ return new StreamRecord<>(Tuple2.of(lhs, rhs), ts);
+ }
+
+ private static class TestElem {
+ String key;
+ long ts;
+ String source;
+
+ public TestElem(long ts, String source) {
+ this.key = "key";
+ this.ts = ts;
+ this.source = source;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TestElem testElem = (TestElem) o;
+
+ if (ts != testElem.ts) {
+ return false;
+ }
+
+ if (key != null ? !key.equals(testElem.key) : testElem.key != null) {
+ return false;
+ }
+
+ return source != null ? source.equals(testElem.source) : testElem.source == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = key != null ? key.hashCode() : 0;
+ result = 31 * result + (int) (ts ^ (ts >>> 32));
+ result = 31 * result + (source != null ? source.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return this.source + ":" + this.ts;
+ }
+
+ public static TypeSerializer<TestElem> serializer() {
+ return TypeInformation.of(new TypeHint<TestElem>() {
+ }).createSerializer(new ExecutionConfig());
+ }
+ }
+
+ private static StreamRecord<TestElem> createStreamRecord(long ts, String source) {
+ TestElem testElem = new TestElem(ts, source);
+ return new StreamRecord<>(testElem, ts);
+ }
+
+ private void processElementsAndWatermarks(TestHarness testHarness) throws Exception {
+ if (lhsFasterThanRhs) {
+ // add to lhs
+ for (int i = 1; i <= 4; i++) {
+ testHarness.processElement1(createStreamRecord(i, "lhs"));
+ testHarness.processWatermark1(new Watermark(i));
+ }
+
+ // add to rhs
+ for (int i = 1; i <= 4; i++) {
+ testHarness.processElement2(createStreamRecord(i, "rhs"));
+ testHarness.processWatermark2(new Watermark(i));
+ }
+ } else {
+ // add to rhs
+ for (int i = 1; i <= 4; i++) {
+ testHarness.processElement2(createStreamRecord(i, "rhs"));
+ testHarness.processWatermark2(new Watermark(i));
+ }
+
+ // add to lhs
+ for (int i = 1; i <= 4; i++) {
+ testHarness.processElement1(createStreamRecord(i, "lhs"));
+ testHarness.processWatermark1(new Watermark(i));
+ }
+ }
+ }
+
+ /**
+ * Custom test harness to avoid endless generics in all of the test code.
+ */
+ private static class TestHarness extends KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> {
+
+ TestHarness(
+ TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>> operator,
+ KeySelector<TestElem, String> keySelector1,
+ KeySelector<TestElem, String> keySelector2,
+ TypeInformation<String> keyType) throws Exception {
+ super(operator, keySelector1, keySelector2, keyType);
+ }
+ }
+}