You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/07/05 13:52:28 UTC
flink git commit: [FLINK-8479] Add TimeBoundedStreamJoinOperator.
Repository: flink
Updated Branches:
refs/heads/master e236680f1 -> ce345e394
[FLINK-8479] Add TimeBoundedStreamJoinOperator.
Adds an implementation of an operator for interval joins.
The timestamp of a joined pair for now is the max timestamp
of the elements in the pair. In addition, it contains the
state for outer joins in the future.
This closes #5342.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce345e39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce345e39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce345e39
Branch: refs/heads/master
Commit: ce345e394dfd176fe1433a5fcb0f46ad122787ca
Parents: e236680
Author: Florian Schmidt <fl...@icloud.com>
Authored: Thu Jan 18 15:47:14 2018 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Jul 5 15:51:43 2018 +0200
----------------------------------------------------------------------
.../functions/co/TimeBoundedJoinFunction.java | 87 ++
.../co/TimeBoundedStreamJoinOperator.java | 513 ++++++++++
.../co/TimeBoundedStreamJoinOperatorTest.java | 941 +++++++++++++++++++
.../flink/streaming/util/TestHarnessUtil.java | 20 +
4 files changed, 1561 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ce345e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
new file mode 100644
index 0000000..cd745ca
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A function that processes two joined elements and produces a single output one.
+ *
+ * <p>This function will get called for every joined pair of elements the joined two streams.
+ * The timestamp of the joined pair as well as the timestamp of the left element and the right
+ * element can be accessed through the {@link Context}.
+ *
+ * @param <IN1> Type of the first input
+ * @param <IN2> Type of the second input
+ * @param <OUT> Type of the output
+ */
+@PublicEvolving
+public abstract class TimeBoundedJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {
+
+ private static final long serialVersionUID = -2444626938039012398L;
+
+ /**
+ * This method is called for each joined pair of elements. It can output zero or more elements
+ * through the provided {@link Collector} and has access to the timestamps of the joined elements
+ * and the result through the {@link Context}.
+ *
+ * @param left The left element of the joined pair.
+ * @param right The right element of the joined pair.
+ * @param ctx A context that allows querying the timestamps of the left, right and
+ * joined pair. In addition, this context allows to emit elements on a side output.
+ * @param out The collector to emit resulting elements to.
+ * @throws Exception This function may throw exceptions which cause the streaming program to
+ * fail and go in recovery mode.
+ */
+ public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;
+
+ /**
+ * The context that is available during an invocation of
+ * {@link #processElement(Object, Object, Context, Collector)}. It gives access to the timestamps of the
+ * left element in the joined pair, the right one, and that of the joined pair. In addition, this context
+ * allows to emit elements on a side output.
+ */
+ public abstract class Context {
+
+ /**
+ * @return The timestamp of the left element of a joined pair
+ */
+ public abstract long getLeftTimestamp();
+
+ /**
+ * @return The timestamp of the right element of a joined pair
+ */
+ public abstract long getRightTimestamp();
+
+ /**
+ * @return The timestamp of the joined pair.
+ */
+ public abstract long getTimestamp();
+
+ /**
+ * Emits a record to the side output identified by the {@link OutputTag}.
+ * @param outputTag The output tag that identifies the side output to emit to
+ * @param value The record to emit
+ */
+ public abstract <X> void output(OutputTag<X> outputTag, X value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce345e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
new file mode 100644
index 0000000..26ad26b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.functions.co.TimeBoundedJoinFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * An {@link TwoInputStreamOperator operator} to execute time-bounded stream inner joins.
+ *
+ * <p>By using a configurable lower and upper bound this operator will emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * <p>As soon as elements are joined they are passed to a user-defined {@link TimeBoundedJoinFunction}.
+ *
+ * <p>The basic idea of this implementation is as follows: Whenever we receive an element at
+ * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to the left buffer.
+ * We then check the right buffer to see whether there are any elements that can be joined. If
+ * there are, they are joined and passed to the aforementioned function. The same happens the
+ * other way around when receiving an element on the right side.
+ *
+ * <p>Whenever a pair of elements is emitted it will be assigned the max timestamp of either of
+ * the elements.
+ *
+ * <p>In order to avoid the element buffers to grow indefinitely a cleanup timer is registered
+ * per element. This timer indicates when an element is not considered for joining anymore and can
+ * be removed from the state.
+ *
+ * @param <K> The type of the key based on which we join elements.
+ * @param <T1> The type of the elements in the left stream.
+ * @param <T2> The type of the elements in the right stream.
+ * @param <OUT> The output type created by the user-defined function.
+ */
+@Internal
+public class TimeBoundedStreamJoinOperator<K, T1, T2, OUT>
+ extends AbstractUdfStreamOperator<OUT, TimeBoundedJoinFunction<T1, T2, OUT>>
+ implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
+
+ private static final long serialVersionUID = -5380774605111543454L;
+
+ private static final Logger logger = LoggerFactory.getLogger(TimeBoundedStreamJoinOperator.class);
+
+ private static final String LEFT_BUFFER = "LEFT_BUFFER";
+ private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+ private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
+ private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
+ private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
+
+ private final long lowerBound;
+ private final long upperBound;
+
+ private final TypeSerializer<T1> leftTypeSerializer;
+ private final TypeSerializer<T2> rightTypeSerializer;
+
+ private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
+ private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
+
+ private transient TimestampedCollector<OUT> collector;
+ private transient ContextImpl context;
+
+ private transient InternalTimerService<String> internalTimerService;
+
+ /**
+ * Creates a new TimeBoundedStreamJoinOperator.
+ *
+ * @param lowerBound The lower bound for evaluating if elements should be joined
+ * @param upperBound The upper bound for evaluating if elements should be joined
+ * @param lowerBoundInclusive Whether or not to include elements where the timestamp matches
+ * the lower bound
+ * @param upperBoundInclusive Whether or not to include elements where the timestamp matches
+ * the upper bound
+ * @param udf A user-defined {@link TimeBoundedJoinFunction} that gets called
+ * whenever two elements of T1 and T2 are joined
+ */
+ public TimeBoundedStreamJoinOperator(
+ long lowerBound,
+ long upperBound,
+ boolean lowerBoundInclusive,
+ boolean upperBoundInclusive,
+ TypeSerializer<T1> leftTypeSerializer,
+ TypeSerializer<T2> rightTypeSerializer,
+ TimeBoundedJoinFunction<T1, T2, OUT> udf) {
+
+ super(Preconditions.checkNotNull(udf));
+
+ Preconditions.checkArgument(lowerBound <= upperBound,
+ "lowerBound <= upperBound must be fulfilled");
+
+ // Move buffer by +1 / -1 depending on inclusiveness in order not needing
+ // to check for inclusiveness later on
+ this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
+ this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;
+
+ this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
+ this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ collector = new TimestampedCollector<>(output);
+ context = new ContextImpl(userFunction);
+ internalTimerService =
+ getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ super.initializeState(context);
+
+ this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
+ LEFT_BUFFER,
+ LongSerializer.INSTANCE,
+ new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
+ ));
+
+ this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
+ RIGHT_BUFFER,
+ LongSerializer.INSTANCE,
+ new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
+ ));
+ }
+
+ /**
+ * Process a {@link StreamRecord} from the left stream. Whenever an {@link StreamRecord}
+ * arrives at the left stream, it will get added to the left buffer. Possible join candidates
+ * for that element will be looked up from the right buffer and if the pair lies within the
+ * user defined boundaries, it gets passed to the {@link TimeBoundedJoinFunction}.
+ *
+ * @param record An incoming record to be joined
+ * @throws Exception Can throw an Exception during state access
+ */
+ @Override
+ public void processElement1(StreamRecord<T1> record) throws Exception {
+ processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
+ }
+
+ /**
+ * Process a {@link StreamRecord} from the right stream. Whenever a {@link StreamRecord}
+ * arrives at the right stream, it will get added to the right buffer. Possible join candidates
+ * for that element will be looked up from the left buffer and if the pair lies within the user
+ * defined boundaries, it gets passed to the {@link TimeBoundedJoinFunction}.
+ *
+ * @param record An incoming record to be joined
+ * @throws Exception Can throw an exception during state access
+ */
+ @Override
+ public void processElement2(StreamRecord<T2> record) throws Exception {
+ processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <OUR, OTHER> void processElement(
+ StreamRecord<OUR> record,
+ MapState<Long, List<BufferEntry<OUR>>> ourBuffer,
+ MapState<Long, List<BufferEntry<OTHER>>> otherBuffer,
+ long relativeLowerBound,
+ long relativeUpperBound,
+ boolean isLeft) throws Exception {
+
+ final OUR ourValue = record.getValue();
+ final long ourTimestamp = record.getTimestamp();
+
+ if (ourTimestamp == Long.MIN_VALUE) {
+ throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
+ "interval stream joins need to have timestamps meaningful timestamps.");
+ }
+
+ if (isLate(ourTimestamp)) {
+ return;
+ }
+
+ addToBuffer(ourBuffer, ourValue, ourTimestamp);
+
+ for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
+ final long timestamp = bucket.getKey();
+
+ if (timestamp < ourTimestamp + relativeLowerBound ||
+ timestamp > ourTimestamp + relativeUpperBound) {
+ continue;
+ }
+
+ for (BufferEntry<OTHER> entry: bucket.getValue()) {
+ if (isLeft) {
+ collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
+ } else {
+ collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
+ }
+ }
+ }
+
+ long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
+ if (isLeft) {
+ internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
+ } else {
+ internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
+ }
+ }
+
+ private boolean isLate(long timestamp) {
+ long currentWatermark = internalTimerService.currentWatermark();
+ return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
+ }
+
+ private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
+ long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
+ collector.setAbsoluteTimestamp(resultTimestamp);
+ context.leftTimestamp = leftTimestamp;
+ context.rightTimestamp = rightTimestamp;
+ userFunction.processElement(left, right, context, collector);
+ }
+
+ private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception {
+ List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
+ if (elemsInBucket == null) {
+ elemsInBucket = new ArrayList<>();
+ }
+ elemsInBucket.add(new BufferEntry<>(value, false));
+ buffer.put(timestamp, elemsInBucket);
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<K, String> timer) throws Exception {
+
+ long timerTimestamp = timer.getTimestamp();
+ String namespace = timer.getNamespace();
+
+ logger.trace("onEventTime @ {}", timerTimestamp);
+
+ switch (namespace) {
+ case CLEANUP_NAMESPACE_LEFT: {
+ long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
+ logger.trace("Removing from left buffer @ {}", timestamp);
+ leftBuffer.remove(timestamp);
+ break;
+ }
+ case CLEANUP_NAMESPACE_RIGHT: {
+ long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
+ logger.trace("Removing from right buffer @ {}", timestamp);
+ rightBuffer.remove(timestamp);
+ break;
+ }
+ default:
+ throw new RuntimeException("Invalid namespace " + namespace);
+ }
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
+ // do nothing.
+ }
+
+ /**
+ * The context that is available during an invocation of
+ * {@link TimeBoundedJoinFunction#processElement(Object, Object, TimeBoundedJoinFunction.Context, Collector)}.
+ *
+ * <p>It gives access to the timestamps of the left element in the joined pair, the right one, and that of
+ * the joined pair. In addition, this context allows to emit elements on a side output.
+ */
+ private final class ContextImpl extends TimeBoundedJoinFunction<T1, T2, OUT>.Context {
+
+ private long leftTimestamp = Long.MIN_VALUE;
+
+ private long rightTimestamp = Long.MIN_VALUE;
+
+ private ContextImpl(TimeBoundedJoinFunction<T1, T2, OUT> func) {
+ func.super();
+ }
+
+ @Override
+ public long getLeftTimestamp() {
+ return leftTimestamp;
+ }
+
+ @Override
+ public long getRightTimestamp() {
+ return rightTimestamp;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return leftTimestamp;
+ }
+
+ @Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ Preconditions.checkArgument(outputTag != null, "OutputTag must not be null");
+ output.collect(outputTag, new StreamRecord<>(value, getTimestamp()));
+ }
+ }
+
+ /**
+ * A container for elements put in the left/write buffer.
+ * This will contain the element itself along with a flag indicating
+ * if it has been joined or not.
+ */
+ private static class BufferEntry<T> {
+
+ private final T element;
+ private final boolean hasBeenJoined;
+
+ BufferEntry(T element, boolean hasBeenJoined) {
+ this.element = element;
+ this.hasBeenJoined = hasBeenJoined;
+ }
+ }
+
+ /**
+ * A {@link TypeSerializer serializer} for the {@link BufferEntry}.
+ */
+ private static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
+
+ private static final long serialVersionUID = -20197698803836236L;
+
+ private final TypeSerializer<T> elementSerializer;
+
+ private BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
+ this.elementSerializer = Preconditions.checkNotNull(elementSerializer);
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public TypeSerializer<BufferEntry<T>> duplicate() {
+ return new BufferEntrySerializer<>(elementSerializer.duplicate());
+ }
+
+ @Override
+ public BufferEntry<T> createInstance() {
+ return null;
+ }
+
+ @Override
+ public BufferEntry<T> copy(BufferEntry<T> from) {
+ return new BufferEntry<>(from.element, from.hasBeenJoined);
+ }
+
+ @Override
+ public BufferEntry<T> copy(BufferEntry<T> from, BufferEntry<T> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(BufferEntry<T> record, DataOutputView target) throws IOException {
+ target.writeBoolean(record.hasBeenJoined);
+ elementSerializer.serialize(record.element, target);
+ }
+
+ @Override
+ public BufferEntry<T> deserialize(DataInputView source) throws IOException {
+ boolean hasBeenJoined = source.readBoolean();
+ T element = elementSerializer.deserialize(source);
+ return new BufferEntry<>(element, hasBeenJoined);
+ }
+
+ @Override
+ public BufferEntry<T> deserialize(BufferEntry<T> reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.writeBoolean(source.readBoolean());
+ elementSerializer.copy(source, target);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BufferEntrySerializer<?> that = (BufferEntrySerializer<?>) o;
+ return Objects.equals(elementSerializer, that.elementSerializer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(elementSerializer);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj.getClass().equals(BufferEntrySerializer.class);
+ }
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ return new BufferSerializerConfigSnapshot<>(elementSerializer);
+ }
+
+ @Override
+ public CompatibilityResult<BufferEntry<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof BufferSerializerConfigSnapshot) {
+ Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousSerializerAndConfig =
+ ((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
+
+ CompatibilityResult<T> compatResult =
+ CompatibilityUtil.resolveCompatibilityResult(
+ previousSerializerAndConfig.f0,
+ UnloadableDummyTypeSerializer.class,
+ previousSerializerAndConfig.f1,
+ elementSerializer);
+
+ if (!compatResult.isRequiresMigration()) {
+ return CompatibilityResult.compatible();
+ } else if (compatResult.getConvertDeserializer() != null) {
+ return CompatibilityResult.requiresMigration(
+ new BufferEntrySerializer<>(
+ new TypeDeserializerAdapter<>(
+ compatResult.getConvertDeserializer())));
+ }
+ }
+ return CompatibilityResult.requiresMigration();
+ }
+ }
+
+ /**
+ * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer.
+ */
+ public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+
+ private static final int VERSION = 1;
+
+ public BufferSerializerConfigSnapshot() {
+ }
+
+ public BufferSerializerConfigSnapshot(final TypeSerializer<T> userTypeSerializer) {
+ super(userTypeSerializer);
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+ }
+
+ @VisibleForTesting
+ MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() {
+ return leftBuffer;
+ }
+
+ @VisibleForTesting
+ MapState<Long, List<BufferEntry<T2>>> getRightBuffer() {
+ return rightBuffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce345e39/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java
new file mode 100644
index 0000000..75543e7
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java
@@ -0,0 +1,941 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.co.TimeBoundedJoinFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+
+/**
+ * Tests for {@link TimeBoundedStreamJoinOperator}.
+ * Those tests cover correctness and cleaning of state
+ */
+@RunWith(Parameterized.class)
+public class TimeBoundedStreamJoinOperatorTest {
+
+ private final boolean lhsFasterThanRhs;
+
+ @Parameters(name = "lhs faster than rhs: {0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {true}, {false}
+ });
+ }
+
+ public TimeBoundedStreamJoinOperatorTest(boolean lhsFasterThanRhs) {
+ this.lhsFasterThanRhs = lhsFasterThanRhs;
+ }
+
+ @Test
+ public void testImplementationMirrorsCorrectly() throws Exception {
+
+ long lowerBound = 1;
+ long upperBound = 3;
+
+ boolean lowerBoundInclusive = true;
+ boolean upperBoundInclusive = false;
+
+ setupHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 2),
+ streamRecordOf(1, 3),
+ streamRecordOf(2, 3),
+ streamRecordOf(2, 4),
+ streamRecordOf(3, 4))
+ .noLateRecords()
+ .close();
+
+ setupHarness(-1 * upperBound, upperBoundInclusive, -1 * lowerBound, lowerBoundInclusive)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(2, 1),
+ streamRecordOf(3, 1),
+ streamRecordOf(3, 2),
+ streamRecordOf(4, 2),
+ streamRecordOf(4, 3))
+ .noLateRecords()
+ .close();
+ }
+
+ @Test // lhs - 2 <= rhs <= rhs + 2
+ public void testNegativeInclusiveAndNegativeInclusive() throws Exception {
+
+ setupHarness(-2, true, -1, true)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(2, 1),
+ streamRecordOf(3, 1),
+ streamRecordOf(3, 2),
+ streamRecordOf(4, 2),
+ streamRecordOf(4, 3)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test // lhs - 1 <= rhs <= rhs + 1
+ public void testNegativeInclusiveAndPositiveInclusive() throws Exception {
+
+ setupHarness(-1, true, 1, true)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 1),
+ streamRecordOf(1, 2),
+ streamRecordOf(2, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(2, 3),
+ streamRecordOf(3, 2),
+ streamRecordOf(3, 3),
+ streamRecordOf(3, 4),
+ streamRecordOf(4, 3),
+ streamRecordOf(4, 4)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test // lhs + 1 <= rhs <= lhs + 2
+ public void testPositiveInclusiveAndPositiveInclusive() throws Exception {
+
+ setupHarness(1, true, 2, true)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 2),
+ streamRecordOf(1, 3),
+ streamRecordOf(2, 3),
+ streamRecordOf(2, 4),
+ streamRecordOf(3, 4)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test
+ public void testNegativeExclusiveAndNegativeExlusive() throws Exception {
+
+ setupHarness(-3, false, -1, false)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(3, 1),
+ streamRecordOf(4, 2)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test
+ public void testNegativeExclusiveAndPositiveExlusive() throws Exception {
+
+ setupHarness(-1, false, 1, false)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(3, 3),
+ streamRecordOf(4, 4)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test
+ public void testPositiveExclusiveAndPositiveExlusive() throws Exception {
+
+ setupHarness(1, false, 3, false)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 3),
+ streamRecordOf(2, 4)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test
+ public void testStateCleanupNegativeInclusiveNegativeInclusive() throws Exception {
+
+ setupHarness(-1, true, 0, true)
+ .processElement1(1)
+ .processElement1(2)
+ .processElement1(3)
+ .processElement1(4)
+ .processElement1(5)
+
+ .processElement2(1)
+ .processElement2(2)
+ .processElement2(3)
+ .processElement2(4)
+ .processElement2(5) // fill both buffers with values
+
+ .processWatermark1(1)
+ .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+ .assertLeftBufferContainsOnly(2, 3, 4, 5)
+ .assertRightBufferContainsOnly(1, 2, 3, 4, 5)
+
+ .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+ .processWatermark2(4)
+
+ .assertLeftBufferContainsOnly(5)
+ .assertRightBufferContainsOnly(4, 5)
+
+ .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+ .processWatermark2(6)
+
+ .assertLeftBufferEmpty()
+ .assertRightBufferEmpty()
+
+ .close();
+ }
+
+ @Test
+ public void testStateCleanupNegativePositiveNegativeExlusive() throws Exception {
+ setupHarness(-2, false, 1, false)
+ .processElement1(1)
+ .processElement1(2)
+ .processElement1(3)
+ .processElement1(4)
+ .processElement1(5)
+
+ .processElement2(1)
+ .processElement2(2)
+ .processElement2(3)
+ .processElement2(4)
+ .processElement2(5) // fill both buffers with values
+
+ .processWatermark1(1)
+ .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+ .assertLeftBufferContainsOnly(2, 3, 4, 5)
+ .assertRightBufferContainsOnly(1, 2, 3, 4, 5)
+
+ .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+ .processWatermark2(4)
+
+ .assertLeftBufferContainsOnly(5)
+ .assertRightBufferContainsOnly(4, 5)
+
+ .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+ .processWatermark2(6)
+
+ .assertLeftBufferEmpty()
+ .assertRightBufferEmpty()
+
+ .close();
+ }
+
+ @Test
+ public void testStateCleanupPositiveInclusivePositiveInclusive() throws Exception {
+ setupHarness(0, true, 1, true)
+ .processElement1(1)
+ .processElement1(2)
+ .processElement1(3)
+ .processElement1(4)
+ .processElement1(5)
+
+ .processElement2(1)
+ .processElement2(2)
+ .processElement2(3)
+ .processElement2(4)
+ .processElement2(5) // fill both buffers with values
+
+ .processWatermark1(1)
+ .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+ .assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
+ .assertRightBufferContainsOnly(2, 3, 4, 5)
+
+ .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+ .processWatermark2(4)
+
+ .assertLeftBufferContainsOnly(4, 5)
+ .assertRightBufferContainsOnly(5)
+
+ .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+ .processWatermark2(6)
+
+ .assertLeftBufferEmpty()
+ .assertRightBufferEmpty()
+
+ .close();
+ }
+
+ @Test
+ public void testStateCleanupPositiveExlusivePositiveExclusive() throws Exception {
+ setupHarness(-1, false, 2, false)
+ .processElement1(1)
+ .processElement1(2)
+ .processElement1(3)
+ .processElement1(4)
+ .processElement1(5)
+
+ .processElement2(1)
+ .processElement2(2)
+ .processElement2(3)
+ .processElement2(4)
+ .processElement2(5) // fill both buffers with values
+
+ .processWatermark1(1)
+ .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+ .assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
+ .assertRightBufferContainsOnly(2, 3, 4, 5)
+
+ .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+ .processWatermark2(4)
+
+ .assertLeftBufferContainsOnly(4, 5)
+ .assertRightBufferContainsOnly(5)
+
+ .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+ .processWatermark2(6)
+
+ .assertLeftBufferEmpty()
+ .assertRightBufferEmpty()
+
+ .close();
+ }
+
+ @Test
+ public void testRestoreFromSnapshot() throws Exception {
+
+ // config
+ int lowerBound = -1;
+ boolean lowerBoundInclusive = true;
+ int upperBound = 1;
+ boolean upperBoundInclusive = true;
+
+ // create first test harness
+ OperatorSubtaskState handles;
+ List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput;
+
+ try (TestHarness testHarness = createTestHarness(
+ lowerBound,
+ lowerBoundInclusive,
+ upperBound,
+ upperBoundInclusive
+ )) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ // process elements with first test harness
+ testHarness.processElement1(createStreamRecord(1, "lhs"));
+ testHarness.processWatermark1(new Watermark(1));
+
+ testHarness.processElement2(createStreamRecord(1, "rhs"));
+ testHarness.processWatermark2(new Watermark(1));
+
+ testHarness.processElement1(createStreamRecord(2, "lhs"));
+ testHarness.processWatermark1(new Watermark(2));
+
+ testHarness.processElement2(createStreamRecord(2, "rhs"));
+ testHarness.processWatermark2(new Watermark(2));
+
+ testHarness.processElement1(createStreamRecord(3, "lhs"));
+ testHarness.processWatermark1(new Watermark(3));
+
+ testHarness.processElement2(createStreamRecord(3, "rhs"));
+ testHarness.processWatermark2(new Watermark(3));
+
+ // snapshot and validate output
+ handles = testHarness.snapshot(0, 0);
+ testHarness.close();
+
+ expectedOutput = Lists.newArrayList(
+ streamRecordOf(1, 1),
+ streamRecordOf(1, 2),
+ streamRecordOf(2, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(2, 3),
+ streamRecordOf(3, 2),
+ streamRecordOf(3, 3)
+ );
+
+ TestHarnessUtil.assertNoLateRecords(testHarness.getOutput());
+ assertOutput(expectedOutput, testHarness.getOutput());
+ }
+
+ try (TestHarness newTestHarness = createTestHarness(
+ lowerBound,
+ lowerBoundInclusive,
+ upperBound,
+ upperBoundInclusive
+ )) {
+ // create new test harness from snapshpt
+
+ newTestHarness.setup();
+ newTestHarness.initializeState(handles);
+ newTestHarness.open();
+
+ // process elements
+ newTestHarness.processElement1(createStreamRecord(4, "lhs"));
+ newTestHarness.processWatermark1(new Watermark(4));
+
+ newTestHarness.processElement2(createStreamRecord(4, "rhs"));
+ newTestHarness.processWatermark2(new Watermark(4));
+
+ // assert expected output
+ expectedOutput = Lists.newArrayList(
+ streamRecordOf(3, 4),
+ streamRecordOf(4, 3),
+ streamRecordOf(4, 4)
+ );
+
+ TestHarnessUtil.assertNoLateRecords(newTestHarness.getOutput());
+ assertOutput(expectedOutput, newTestHarness.getOutput());
+ }
+ }
+
+ @Test
+ public void testContextCorrectLeftTimestamp() throws Exception {
+
+ TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+ new TimeBoundedStreamJoinOperator<>(
+ -1,
+ 1,
+ true,
+ true,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+ @Override
+ public void processElement(
+ TestElem left,
+ TestElem right,
+ Context ctx,
+ Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+ Assert.assertEquals(left.ts, ctx.getLeftTimestamp());
+ }
+ }
+ );
+
+ try (TestHarness testHarness = new TestHarness(
+ op,
+ (elem) -> elem.key,
+ (elem) -> elem.key,
+ TypeInformation.of(String.class)
+ )) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ processElementsAndWatermarks(testHarness);
+ }
+ }
+
+ @Test
+ public void testReturnsCorrectTimestamp() throws Exception {
+ TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+ new TimeBoundedStreamJoinOperator<>(
+ -1,
+ 1,
+ true,
+ true,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+ @Override
+ public void processElement(
+ TestElem left,
+ TestElem right,
+ Context ctx,
+ Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+ Assert.assertEquals(left.ts, ctx.getTimestamp());
+ }
+ }
+ );
+
+ try (TestHarness testHarness = new TestHarness(
+ op,
+ (elem) -> elem.key,
+ (elem) -> elem.key,
+ TypeInformation.of(String.class)
+ )) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ processElementsAndWatermarks(testHarness);
+ }
+ }
+
+ @Test
+ public void testContextCorrectRightTimestamp() throws Exception {
+
+ TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+ new TimeBoundedStreamJoinOperator<>(
+ -1,
+ 1,
+ true,
+ true,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+ @Override
+ public void processElement(
+ TestElem left,
+ TestElem right,
+ Context ctx,
+ Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+ Assert.assertEquals(right.ts, ctx.getRightTimestamp());
+ }
+ }
+ );
+
+ try (TestHarness testHarness = new TestHarness(
+ op,
+ (elem) -> elem.key,
+ (elem) -> elem.key,
+ TypeInformation.of(String.class)
+ )) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ processElementsAndWatermarks(testHarness);
+ }
+ }
+
+ @Test(expected = FlinkException.class)
+ public void testFailsWithNoTimestampsLeft() throws Exception {
+ TestHarness newTestHarness = createTestHarness(0L, true, 0L, true);
+
+ newTestHarness.setup();
+ newTestHarness.open();
+
+ // note that the StreamRecord has no timestamp in constructor
+ newTestHarness.processElement1(new StreamRecord<>(new TestElem(0, "lhs")));
+ }
+
+ @Test(expected = FlinkException.class)
+ public void testFailsWithNoTimestampsRight() throws Exception {
+ try (TestHarness newTestHarness = createTestHarness(0L, true, 0L, true)) {
+
+ newTestHarness.setup();
+ newTestHarness.open();
+
+ // note that the StreamRecord has no timestamp in constructor
+ newTestHarness.processElement2(new StreamRecord<>(new TestElem(0, "rhs")));
+ }
+ }
+
+ @Test
+ public void testDiscardsLateData() throws Exception {
+ setupHarness(-1, true, 1, true)
+ .processElement1(1)
+ .processElement2(1)
+ .processElement1(2)
+ .processElement2(2)
+ .processElement1(3)
+ .processElement2(3)
+ .processWatermark1(3)
+ .processWatermark2(3)
+ .processElement1(1) // this element is late and should not be joined again
+ .processElement1(4)
+ .processElement2(4)
+ .processElement1(5)
+ .processElement2(5)
+ .andExpect(
+ streamRecordOf(1, 1),
+ streamRecordOf(1, 2),
+
+ streamRecordOf(2, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(2, 3),
+
+ streamRecordOf(3, 2),
+ streamRecordOf(3, 3),
+ streamRecordOf(3, 4),
+
+ streamRecordOf(4, 3),
+ streamRecordOf(4, 4),
+ streamRecordOf(4, 5),
+
+ streamRecordOf(5, 4),
+ streamRecordOf(5, 5)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ private void assertEmpty(MapState<Long, ?> state) throws Exception {
+ boolean stateIsEmpty = Iterables.size(state.keys()) == 0;
+ Assert.assertTrue("state not empty", stateIsEmpty);
+ }
+
+ private void assertContainsOnly(MapState<Long, ?> state, long... ts) throws Exception {
+ for (long t : ts) {
+ String message = "Keys not found in state. \n Expected: " + Arrays.toString(ts) + "\n Actual: " + state.keys();
+ Assert.assertTrue(message, state.contains(t));
+ }
+
+ String message = "Too many objects in state. \n Expected: " + Arrays.toString(ts) + "\n Actual: " + state.keys();
+ Assert.assertEquals(message, ts.length, Iterables.size(state.keys()));
+ }
+
+ private void assertOutput(
+ Iterable<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput,
+ Queue<Object> actualOutput) {
+
+ int actualSize = actualOutput.stream()
+ .filter(elem -> elem instanceof StreamRecord)
+ .collect(Collectors.toList())
+ .size();
+
+ int expectedSize = Iterables.size(expectedOutput);
+
+ Assert.assertEquals(
+ "Expected and actual size of stream records different",
+ expectedSize,
+ actualSize
+ );
+
+ for (StreamRecord<Tuple2<TestElem, TestElem>> record : expectedOutput) {
+ Assert.assertTrue(actualOutput.contains(record));
+ }
+ }
+
+ private TestHarness createTestHarness(long lowerBound,
+ boolean lowerBoundInclusive,
+ long upperBound,
+ boolean upperBoundInclusive) throws Exception {
+
+ TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
+ new TimeBoundedStreamJoinOperator<>(
+ lowerBound,
+ upperBound,
+ lowerBoundInclusive,
+ upperBoundInclusive,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new PassthroughFunction()
+ );
+
+ return new TestHarness(
+ operator,
+ (elem) -> elem.key, // key
+ (elem) -> elem.key, // key
+ TypeInformation.of(String.class)
+ );
+ }
+
+ private JoinTestBuilder setupHarness(long lowerBound,
+ boolean lowerBoundInclusive,
+ long upperBound,
+ boolean upperBoundInclusive) throws Exception {
+
+ TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
+ new TimeBoundedStreamJoinOperator<>(
+ lowerBound,
+ upperBound,
+ lowerBoundInclusive,
+ upperBoundInclusive,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new PassthroughFunction()
+ );
+
+ TestHarness t = new TestHarness(
+ operator,
+ (elem) -> elem.key, // key
+ (elem) -> elem.key, // key
+ TypeInformation.of(String.class)
+ );
+
+ return new JoinTestBuilder(t, operator);
+ }
+
+ private class JoinTestBuilder {
+
+ private TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator;
+ private TestHarness testHarness;
+
+ public JoinTestBuilder(
+ TestHarness t,
+ TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator
+ ) throws Exception {
+
+ this.testHarness = t;
+ this.operator = operator;
+ t.open();
+ t.setup();
+ }
+
+ public TestHarness get() {
+ return testHarness;
+ }
+
+ public JoinTestBuilder processElement1(int ts) throws Exception {
+ testHarness.processElement1(createStreamRecord(ts, "lhs"));
+ return this;
+ }
+
+ public JoinTestBuilder processElement2(int ts) throws Exception {
+ testHarness.processElement2(createStreamRecord(ts, "rhs"));
+ return this;
+ }
+
+ public JoinTestBuilder processWatermark1(int ts) throws Exception {
+ testHarness.processWatermark1(new Watermark(ts));
+ return this;
+ }
+
+ public JoinTestBuilder processWatermark2(int ts) throws Exception {
+ testHarness.processWatermark2(new Watermark(ts));
+ return this;
+ }
+
+ public JoinTestBuilder processElementsAndWatermarks(int from, int to) throws Exception {
+ if (lhsFasterThanRhs) {
+ // add to lhs
+ for (int i = from; i <= to; i++) {
+ testHarness.processElement1(createStreamRecord(i, "lhs"));
+ testHarness.processWatermark1(new Watermark(i));
+ }
+
+ // add to rhs
+ for (int i = from; i <= to; i++) {
+ testHarness.processElement2(createStreamRecord(i, "rhs"));
+ testHarness.processWatermark2(new Watermark(i));
+ }
+ } else {
+ // add to rhs
+ for (int i = from; i <= to; i++) {
+ testHarness.processElement2(createStreamRecord(i, "rhs"));
+ testHarness.processWatermark2(new Watermark(i));
+ }
+
+ // add to lhs
+ for (int i = from; i <= to; i++) {
+ testHarness.processElement1(createStreamRecord(i, "lhs"));
+ testHarness.processWatermark1(new Watermark(i));
+ }
+ }
+
+ return this;
+ }
+
+ @SafeVarargs
+ public final JoinTestBuilder andExpect(StreamRecord<Tuple2<TestElem, TestElem>>... elems) {
+ assertOutput(Lists.newArrayList(elems), testHarness.getOutput());
+ return this;
+ }
+
+ public JoinTestBuilder assertLeftBufferContainsOnly(long... timestamps) {
+
+ try {
+ assertContainsOnly(operator.getLeftBuffer(), timestamps);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ public JoinTestBuilder assertRightBufferContainsOnly(long... timestamps) {
+
+ try {
+ assertContainsOnly(operator.getRightBuffer(), timestamps);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ public JoinTestBuilder assertLeftBufferEmpty() {
+ try {
+ assertEmpty(operator.getLeftBuffer());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ public JoinTestBuilder assertRightBufferEmpty() {
+ try {
+ assertEmpty(operator.getRightBuffer());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ public JoinTestBuilder noLateRecords() {
+ TestHarnessUtil.assertNoLateRecords(this.testHarness.getOutput());
+ return this;
+ }
+
+ public void close() throws Exception {
+ testHarness.close();
+ }
+ }
+
+ private static class PassthroughFunction extends TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>> {
+
+ @Override
+ public void processElement(
+ TestElem left,
+ TestElem right,
+ Context ctx,
+ Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+ out.collect(Tuple2.of(left, right));
+ }
+ }
+
+ private StreamRecord<Tuple2<TestElem, TestElem>> streamRecordOf(
+ long lhsTs,
+ long rhsTs
+ ) {
+ TestElem lhs = new TestElem(lhsTs, "lhs");
+ TestElem rhs = new TestElem(rhsTs, "rhs");
+
+ long ts = Math.max(lhsTs, rhsTs);
+ return new StreamRecord<>(Tuple2.of(lhs, rhs), ts);
+ }
+
+ private static class TestElem {
+ String key;
+ long ts;
+ String source;
+
+ public TestElem(long ts, String source) {
+ this.key = "key";
+ this.ts = ts;
+ this.source = source;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TestElem testElem = (TestElem) o;
+
+ if (ts != testElem.ts) {
+ return false;
+ }
+
+ if (key != null ? !key.equals(testElem.key) : testElem.key != null) {
+ return false;
+ }
+
+ return source != null ? source.equals(testElem.source) : testElem.source == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = key != null ? key.hashCode() : 0;
+ result = 31 * result + (int) (ts ^ (ts >>> 32));
+ result = 31 * result + (source != null ? source.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return this.source + ":" + this.ts;
+ }
+
+ public static TypeSerializer<TestElem> serializer() {
+ return TypeInformation.of(new TypeHint<TestElem>() {
+ }).createSerializer(new ExecutionConfig());
+ }
+ }
+
+ private static StreamRecord<TestElem> createStreamRecord(long ts, String source) {
+ TestElem testElem = new TestElem(ts, source);
+ return new StreamRecord<>(testElem, ts);
+ }
+
+ private void processElementsAndWatermarks(TestHarness testHarness) throws Exception {
+ if (lhsFasterThanRhs) {
+ // add to lhs
+ for (int i = 1; i <= 4; i++) {
+ testHarness.processElement1(createStreamRecord(i, "lhs"));
+ testHarness.processWatermark1(new Watermark(i));
+ }
+
+ // add to rhs
+ for (int i = 1; i <= 4; i++) {
+ testHarness.processElement2(createStreamRecord(i, "rhs"));
+ testHarness.processWatermark2(new Watermark(i));
+ }
+ } else {
+ // add to rhs
+ for (int i = 1; i <= 4; i++) {
+ testHarness.processElement2(createStreamRecord(i, "rhs"));
+ testHarness.processWatermark2(new Watermark(i));
+ }
+
+ // add to lhs
+ for (int i = 1; i <= 4; i++) {
+ testHarness.processElement1(createStreamRecord(i, "lhs"));
+ testHarness.processWatermark1(new Watermark(i));
+ }
+ }
+ }
+
+ /**
+ * Custom test harness to avoid endless generics in all of the test code.
+ */
+ private static class TestHarness extends KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> {
+
+ TestHarness(
+ TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>> operator,
+ KeySelector<TestElem, String> keySelector1,
+ KeySelector<TestElem, String> keySelector2,
+ TypeInformation<String> keyType) throws Exception {
+ super(operator, keySelector1, keySelector2, keyType);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce345e39/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index a1d732b..54c7e3e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -105,4 +105,24 @@ public class TestHarnessUtil {
Assert.assertArrayEquals(message, sortedExpected, sortedActual);
}
+
+ /**
+ * Verify no StreamRecord is equal to or later than any watermarks. This is checked over the
+ * order of the elements
+ *
+ * @param elements An iterable containing StreamRecords and watermarks
+ */
+ public static void assertNoLateRecords(Iterable<Object> elements) {
+ // check that no watermark is violated
+ long highestWatermark = Long.MIN_VALUE;
+
+ for (Object elem : elements) {
+ if (elem instanceof Watermark) {
+ highestWatermark = ((Watermark) elem).asWatermark().getTimestamp();
+ } else if (elem instanceof StreamRecord) {
+ boolean dataIsOnTime = highestWatermark < ((StreamRecord) elem).getTimestamp();
+ Assert.assertTrue("Late data was emitted after join", dataIsOnTime);
+ }
+ }
+ }
}