You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/07/05 13:52:28 UTC

flink git commit: [FLINK-8479] Add TimeBoundedStreamJoinOperator.

Repository: flink
Updated Branches:
  refs/heads/master e236680f1 -> ce345e394


[FLINK-8479] Add TimeBoundedStreamJoinOperator.

Adds an implementation of an operator for interval joins.
The timestamp of a joined pair for now is the max timestamp
of the elements in the pair. In addition, it contains the
state for outer joins in the future.

This closes #5342.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce345e39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce345e39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce345e39

Branch: refs/heads/master
Commit: ce345e394dfd176fe1433a5fcb0f46ad122787ca
Parents: e236680
Author: Florian Schmidt <fl...@icloud.com>
Authored: Thu Jan 18 15:47:14 2018 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Jul 5 15:51:43 2018 +0200

----------------------------------------------------------------------
 .../functions/co/TimeBoundedJoinFunction.java   |  87 ++
 .../co/TimeBoundedStreamJoinOperator.java       | 513 ++++++++++
 .../co/TimeBoundedStreamJoinOperatorTest.java   | 941 +++++++++++++++++++
 .../flink/streaming/util/TestHarnessUtil.java   |  20 +
 4 files changed, 1561 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ce345e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
new file mode 100644
index 0000000..cd745ca
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A function that processes two joined elements and produces a single output one.
+ *
+ * <p>This function will get called for every joined pair of elements the joined two streams.
+ * The timestamp of the joined pair as well as the timestamp of the left element and the right
+ * element can be accessed through the {@link Context}.
+ *
+ * @param <IN1> Type of the first input
+ * @param <IN2> Type of the second input
+ * @param <OUT> Type of the output
+ */
+@PublicEvolving
+public abstract class TimeBoundedJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {
+
+	private static final long serialVersionUID = -2444626938039012398L;
+
+	/**
+	 * This method is called for each joined pair of elements. It can output zero or more elements
+	 * through the provided {@link Collector} and has access to the timestamps of the joined elements
+	 * and the result through the {@link Context}.
+	 *
+	 * @param left         The left element of the joined pair.
+	 * @param right        The right element of the joined pair.
+	 * @param ctx          A context that allows querying the timestamps of the left, right and
+	 *                     joined pair. In addition, this context allows to emit elements on a side output.
+	 * @param out          The collector to emit resulting elements to.
+	 * @throws Exception   This function may throw exceptions which cause the streaming program to
+	 * 					   fail and go in recovery mode.
+	 */
+	public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;
+
+	/**
+	 * The context that is available during an invocation of
+	 * {@link #processElement(Object, Object, Context, Collector)}. It gives access to the timestamps of the
+	 * left element in the joined pair, the right one, and that of the joined pair. In addition, this context
+	 * allows to emit elements on a side output.
+	 */
+	public abstract class Context {
+
+		/**
+		 * @return The timestamp of the left element of a joined pair
+		 */
+		public abstract long getLeftTimestamp();
+
+		/**
+		 * @return The timestamp of the right element of a joined pair
+		 */
+		public abstract long getRightTimestamp();
+
+		/**
+		 * @return The timestamp of the joined pair.
+		 */
+		public abstract long getTimestamp();
+
+		/**
+		 * Emits a record to the side output identified by the {@link OutputTag}.
+		 * @param outputTag The output tag that identifies the side output to emit to
+		 * @param value The record to emit
+		 */
+		public abstract <X> void output(OutputTag<X> outputTag, X value);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce345e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
new file mode 100644
index 0000000..26ad26b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.functions.co.TimeBoundedJoinFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * An {@link TwoInputStreamOperator operator} to execute time-bounded stream inner joins.
+ *
+ * <p>By using a configurable lower and upper bound this operator will emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * <p>As soon as elements are joined they are passed to a user-defined {@link TimeBoundedJoinFunction}.
+ *
+ * <p>The basic idea of this implementation is as follows: Whenever we receive an element at
+ * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to the left buffer.
+ * We then check the right buffer to see whether there are any elements that can be joined. If
+ * there are, they are joined and passed to the aforementioned function. The same happens the
+ * other way around when receiving an element on the right side.
+ *
+ * <p>Whenever a pair of elements is emitted it will be assigned the max timestamp of either of
+ * the elements.
+ *
+ * <p>In order to avoid the element buffers to grow indefinitely a cleanup timer is registered
+ * per element. This timer indicates when an element is not considered for joining anymore and can
+ * be removed from the state.
+ *
+ * @param <K>	The type of the key based on which we join elements.
+ * @param <T1>	The type of the elements in the left stream.
+ * @param <T2>	The type of the elements in the right stream.
+ * @param <OUT>	The output type created by the user-defined function.
+ */
+@Internal
+public class TimeBoundedStreamJoinOperator<K, T1, T2, OUT>
+		extends AbstractUdfStreamOperator<OUT, TimeBoundedJoinFunction<T1, T2, OUT>>
+		implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
+
+	private static final long serialVersionUID = -5380774605111543454L;
+
+	private static final Logger logger = LoggerFactory.getLogger(TimeBoundedStreamJoinOperator.class);
+
+	private static final String LEFT_BUFFER = "LEFT_BUFFER";
+	private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+	private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
+	private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
+	private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
+
+	private final long lowerBound;
+	private final long upperBound;
+
+	private final TypeSerializer<T1> leftTypeSerializer;
+	private final TypeSerializer<T2> rightTypeSerializer;
+
+	private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
+	private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
+
+	private transient TimestampedCollector<OUT> collector;
+	private transient ContextImpl context;
+
+	private transient InternalTimerService<String> internalTimerService;
+
+	/**
+	 * Creates a new TimeBoundedStreamJoinOperator.
+	 *
+	 * @param lowerBound          The lower bound for evaluating if elements should be joined
+	 * @param upperBound          The upper bound for evaluating if elements should be joined
+	 * @param lowerBoundInclusive Whether or not to include elements where the timestamp matches
+	 *                            the lower bound
+	 * @param upperBoundInclusive Whether or not to include elements where the timestamp matches
+	 *                            the upper bound
+	 * @param udf                 A user-defined {@link TimeBoundedJoinFunction} that gets called
+	 *                            whenever two elements of T1 and T2 are joined
+	 */
+	public TimeBoundedStreamJoinOperator(
+			long lowerBound,
+			long upperBound,
+			boolean lowerBoundInclusive,
+			boolean upperBoundInclusive,
+			TypeSerializer<T1> leftTypeSerializer,
+			TypeSerializer<T2> rightTypeSerializer,
+			TimeBoundedJoinFunction<T1, T2, OUT> udf) {
+
+		super(Preconditions.checkNotNull(udf));
+
+		Preconditions.checkArgument(lowerBound <= upperBound,
+			"lowerBound <= upperBound must be fulfilled");
+
+		// Move buffer by +1 / -1 depending on inclusiveness in order not needing
+		// to check for inclusiveness later on
+		this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
+		this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;
+
+		this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
+		this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		collector = new TimestampedCollector<>(output);
+		context = new ContextImpl(userFunction);
+		internalTimerService =
+			getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
+	}
+
+	@Override
+	public void initializeState(StateInitializationContext context) throws Exception {
+		super.initializeState(context);
+
+		this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
+			LEFT_BUFFER,
+			LongSerializer.INSTANCE,
+			new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
+		));
+
+		this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
+			RIGHT_BUFFER,
+			LongSerializer.INSTANCE,
+			new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
+		));
+	}
+
+	/**
+	 * Process a {@link StreamRecord} from the left stream. Whenever an {@link StreamRecord}
+	 * arrives at the left stream, it will get added to the left buffer. Possible join candidates
+	 * for that element will be looked up from the right buffer and if the pair lies within the
+	 * user defined boundaries, it gets passed to the {@link TimeBoundedJoinFunction}.
+	 *
+	 * @param record An incoming record to be joined
+	 * @throws Exception Can throw an Exception during state access
+	 */
+	@Override
+	public void processElement1(StreamRecord<T1> record) throws Exception {
+		processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
+	}
+
+	/**
+	 * Process a {@link StreamRecord} from the right stream. Whenever a {@link StreamRecord}
+	 * arrives at the right stream, it will get added to the right buffer. Possible join candidates
+	 * for that element will be looked up from the left buffer and if the pair lies within the user
+	 * defined boundaries, it gets passed to the {@link TimeBoundedJoinFunction}.
+	 *
+	 * @param record An incoming record to be joined
+	 * @throws Exception Can throw an exception during state access
+	 */
+	@Override
+	public void processElement2(StreamRecord<T2> record) throws Exception {
+		processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
+	}
+
+	@SuppressWarnings("unchecked")
+	private <OUR, OTHER> void processElement(
+			StreamRecord<OUR> record,
+			MapState<Long, List<BufferEntry<OUR>>> ourBuffer,
+			MapState<Long, List<BufferEntry<OTHER>>> otherBuffer,
+			long relativeLowerBound,
+			long relativeUpperBound,
+			boolean isLeft) throws Exception {
+
+		final OUR ourValue = record.getValue();
+		final long ourTimestamp = record.getTimestamp();
+
+		if (ourTimestamp == Long.MIN_VALUE) {
+			throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
+					"interval stream joins need to have timestamps meaningful timestamps.");
+		}
+
+		if (isLate(ourTimestamp)) {
+			return;
+		}
+
+		addToBuffer(ourBuffer, ourValue, ourTimestamp);
+
+		for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
+			final long timestamp  = bucket.getKey();
+
+			if (timestamp < ourTimestamp + relativeLowerBound ||
+					timestamp > ourTimestamp + relativeUpperBound) {
+				continue;
+			}
+
+			for (BufferEntry<OTHER> entry: bucket.getValue()) {
+				if (isLeft) {
+					collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
+				} else {
+					collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
+				}
+			}
+		}
+
+		long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
+		if (isLeft) {
+			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
+		} else {
+			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
+		}
+	}
+
+	private boolean isLate(long timestamp) {
+		long currentWatermark = internalTimerService.currentWatermark();
+		return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
+	}
+
+	private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
+		long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
+		collector.setAbsoluteTimestamp(resultTimestamp);
+		context.leftTimestamp = leftTimestamp;
+		context.rightTimestamp = rightTimestamp;
+		userFunction.processElement(left, right, context, collector);
+	}
+
+	private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception {
+		List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
+		if (elemsInBucket == null) {
+			elemsInBucket = new ArrayList<>();
+		}
+		elemsInBucket.add(new BufferEntry<>(value, false));
+		buffer.put(timestamp, elemsInBucket);
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, String> timer) throws Exception {
+
+		long timerTimestamp = timer.getTimestamp();
+		String namespace = timer.getNamespace();
+
+		logger.trace("onEventTime @ {}", timerTimestamp);
+
+		switch (namespace) {
+			case CLEANUP_NAMESPACE_LEFT: {
+				long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
+				logger.trace("Removing from left buffer @ {}", timestamp);
+				leftBuffer.remove(timestamp);
+				break;
+			}
+			case CLEANUP_NAMESPACE_RIGHT: {
+				long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
+				logger.trace("Removing from right buffer @ {}", timestamp);
+				rightBuffer.remove(timestamp);
+				break;
+			}
+			default:
+				throw new RuntimeException("Invalid namespace " + namespace);
+		}
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
+		// do nothing.
+	}
+
+	/**
+	 * The context that is available during an invocation of
+	 * {@link TimeBoundedJoinFunction#processElement(Object, Object, TimeBoundedJoinFunction.Context, Collector)}.
+	 *
+	 * <p>It gives access to the timestamps of the left element in the joined pair, the right one, and that of
+	 * the joined pair. In addition, this context allows to emit elements on a side output.
+	 */
+	private final class ContextImpl extends TimeBoundedJoinFunction<T1, T2, OUT>.Context {
+
+		private long leftTimestamp = Long.MIN_VALUE;
+
+		private long rightTimestamp = Long.MIN_VALUE;
+
+		private ContextImpl(TimeBoundedJoinFunction<T1, T2, OUT> func) {
+			func.super();
+		}
+
+		@Override
+		public long getLeftTimestamp() {
+			return leftTimestamp;
+		}
+
+		@Override
+		public long getRightTimestamp() {
+			return rightTimestamp;
+		}
+
+		@Override
+		public long getTimestamp() {
+			return leftTimestamp;
+		}
+
+		@Override
+		public <X> void output(OutputTag<X> outputTag, X value) {
+			Preconditions.checkArgument(outputTag != null, "OutputTag must not be null");
+			output.collect(outputTag, new StreamRecord<>(value, getTimestamp()));
+		}
+	}
+
+	/**
+	 * A container for elements put in the left/write buffer.
+	 * This will contain the element itself along with a flag indicating
+	 * if it has been joined or not.
+	 */
+	private static class BufferEntry<T> {
+
+		private final T element;
+		private final boolean hasBeenJoined;
+
+		BufferEntry(T element, boolean hasBeenJoined) {
+			this.element = element;
+			this.hasBeenJoined = hasBeenJoined;
+		}
+	}
+
+	/**
+	 * A {@link TypeSerializer serializer} for the {@link BufferEntry}.
+	 */
+	private static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
+
+		private static final long serialVersionUID = -20197698803836236L;
+
+		private final TypeSerializer<T> elementSerializer;
+
+		private BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
+			this.elementSerializer = Preconditions.checkNotNull(elementSerializer);
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public TypeSerializer<BufferEntry<T>> duplicate() {
+			return new BufferEntrySerializer<>(elementSerializer.duplicate());
+		}
+
+		@Override
+		public BufferEntry<T> createInstance() {
+			return null;
+		}
+
+		@Override
+		public BufferEntry<T> copy(BufferEntry<T> from) {
+			return new BufferEntry<>(from.element, from.hasBeenJoined);
+		}
+
+		@Override
+		public BufferEntry<T> copy(BufferEntry<T> from, BufferEntry<T> reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return -1;
+		}
+
+		@Override
+		public void serialize(BufferEntry<T> record, DataOutputView target) throws IOException {
+			target.writeBoolean(record.hasBeenJoined);
+			elementSerializer.serialize(record.element, target);
+		}
+
+		@Override
+		public BufferEntry<T> deserialize(DataInputView source) throws IOException {
+			boolean hasBeenJoined = source.readBoolean();
+			T element = elementSerializer.deserialize(source);
+			return new BufferEntry<>(element, hasBeenJoined);
+		}
+
+		@Override
+		public BufferEntry<T> deserialize(BufferEntry<T> reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			target.writeBoolean(source.readBoolean());
+			elementSerializer.copy(source, target);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			BufferEntrySerializer<?> that = (BufferEntrySerializer<?>) o;
+			return Objects.equals(elementSerializer, that.elementSerializer);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(elementSerializer);
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj.getClass().equals(BufferEntrySerializer.class);
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			return new BufferSerializerConfigSnapshot<>(elementSerializer);
+		}
+
+		@Override
+		public CompatibilityResult<BufferEntry<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			if (configSnapshot instanceof BufferSerializerConfigSnapshot) {
+				Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousSerializerAndConfig =
+						((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
+
+				CompatibilityResult<T> compatResult =
+						CompatibilityUtil.resolveCompatibilityResult(
+								previousSerializerAndConfig.f0,
+								UnloadableDummyTypeSerializer.class,
+								previousSerializerAndConfig.f1,
+								elementSerializer);
+
+				if (!compatResult.isRequiresMigration()) {
+					return CompatibilityResult.compatible();
+				} else if (compatResult.getConvertDeserializer() != null) {
+					return CompatibilityResult.requiresMigration(
+							new BufferEntrySerializer<>(
+									new TypeDeserializerAdapter<>(
+											compatResult.getConvertDeserializer())));
+				}
+			}
+			return CompatibilityResult.requiresMigration();
+		}
+	}
+
+	/**
+	 * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer.
+	 */
+	public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+
+		private static final int VERSION = 1;
+
+		public BufferSerializerConfigSnapshot() {
+		}
+
+		public BufferSerializerConfigSnapshot(final TypeSerializer<T> userTypeSerializer) {
+			super(userTypeSerializer);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+	}
+
+	@VisibleForTesting
+	MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() {
+		return leftBuffer;
+	}
+
+	@VisibleForTesting
+	MapState<Long, List<BufferEntry<T2>>> getRightBuffer() {
+		return rightBuffer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce345e39/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java
new file mode 100644
index 0000000..75543e7
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java
@@ -0,0 +1,941 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.co.TimeBoundedJoinFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+
+/**
+ * Tests for {@link TimeBoundedStreamJoinOperator}.
+ * Those tests cover correctness and cleaning of state
+ */
+@RunWith(Parameterized.class)
+public class TimeBoundedStreamJoinOperatorTest {
+
+	private final boolean lhsFasterThanRhs;
+
+	@Parameters(name = "lhs faster than rhs: {0}")
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][]{
+			{true}, {false}
+		});
+	}
+
+	public TimeBoundedStreamJoinOperatorTest(boolean lhsFasterThanRhs) {
+		this.lhsFasterThanRhs = lhsFasterThanRhs;
+	}
+
+	@Test
+	public void testImplementationMirrorsCorrectly() throws Exception {
+
+		long lowerBound = 1;
+		long upperBound = 3;
+
+		boolean lowerBoundInclusive = true;
+		boolean upperBoundInclusive = false;
+
+		setupHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(1, 2),
+				streamRecordOf(1, 3),
+				streamRecordOf(2, 3),
+				streamRecordOf(2, 4),
+				streamRecordOf(3, 4))
+			.noLateRecords()
+			.close();
+
+		setupHarness(-1 * upperBound, upperBoundInclusive, -1 * lowerBound, lowerBoundInclusive)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(2, 1),
+				streamRecordOf(3, 1),
+				streamRecordOf(3, 2),
+				streamRecordOf(4, 2),
+				streamRecordOf(4, 3))
+			.noLateRecords()
+			.close();
+	}
+
+	@Test // lhs - 2 <= rhs <= rhs + 2
+	public void testNegativeInclusiveAndNegativeInclusive() throws Exception {
+
+		setupHarness(-2, true, -1, true)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(2, 1),
+				streamRecordOf(3, 1),
+				streamRecordOf(3, 2),
+				streamRecordOf(4, 2),
+				streamRecordOf(4, 3)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	@Test // lhs - 1 <= rhs <= rhs + 1
+	public void testNegativeInclusiveAndPositiveInclusive() throws Exception {
+
+		setupHarness(-1, true, 1, true)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(1, 1),
+				streamRecordOf(1, 2),
+				streamRecordOf(2, 1),
+				streamRecordOf(2, 2),
+				streamRecordOf(2, 3),
+				streamRecordOf(3, 2),
+				streamRecordOf(3, 3),
+				streamRecordOf(3, 4),
+				streamRecordOf(4, 3),
+				streamRecordOf(4, 4)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	@Test // lhs + 1 <= rhs <= lhs + 2
+	public void testPositiveInclusiveAndPositiveInclusive() throws Exception {
+
+		setupHarness(1, true, 2, true)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(1, 2),
+				streamRecordOf(1, 3),
+				streamRecordOf(2, 3),
+				streamRecordOf(2, 4),
+				streamRecordOf(3, 4)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	@Test
+	public void testNegativeExclusiveAndNegativeExlusive() throws Exception {
+
+		setupHarness(-3, false, -1, false)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(3, 1),
+				streamRecordOf(4, 2)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	@Test
+	public void testNegativeExclusiveAndPositiveExlusive() throws Exception {
+
+		setupHarness(-1, false, 1, false)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(1, 1),
+				streamRecordOf(2, 2),
+				streamRecordOf(3, 3),
+				streamRecordOf(4, 4)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	@Test
+	public void testPositiveExclusiveAndPositiveExlusive() throws Exception {
+
+		setupHarness(1, false, 3, false)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(1, 3),
+				streamRecordOf(2, 4)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	@Test
+	public void testStateCleanupNegativeInclusiveNegativeInclusive() throws Exception {
+
+		setupHarness(-1, true, 0, true)
+			.processElement1(1)
+			.processElement1(2)
+			.processElement1(3)
+			.processElement1(4)
+			.processElement1(5)
+
+			.processElement2(1)
+			.processElement2(2)
+			.processElement2(3)
+			.processElement2(4)
+			.processElement2(5) // fill both buffers with values
+
+			.processWatermark1(1)
+			.processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+			.assertLeftBufferContainsOnly(2, 3, 4, 5)
+			.assertRightBufferContainsOnly(1, 2, 3, 4, 5)
+
+			.processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+			.processWatermark2(4)
+
+			.assertLeftBufferContainsOnly(5)
+			.assertRightBufferContainsOnly(4, 5)
+
+			.processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+			.processWatermark2(6)
+
+			.assertLeftBufferEmpty()
+			.assertRightBufferEmpty()
+
+			.close();
+	}
+
+	@Test
+	public void testStateCleanupNegativePositiveNegativeExlusive() throws Exception {
+		setupHarness(-2, false, 1, false)
+			.processElement1(1)
+			.processElement1(2)
+			.processElement1(3)
+			.processElement1(4)
+			.processElement1(5)
+
+			.processElement2(1)
+			.processElement2(2)
+			.processElement2(3)
+			.processElement2(4)
+			.processElement2(5) // fill both buffers with values
+
+			.processWatermark1(1)
+			.processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+			.assertLeftBufferContainsOnly(2, 3, 4, 5)
+			.assertRightBufferContainsOnly(1, 2, 3, 4, 5)
+
+			.processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+			.processWatermark2(4)
+
+			.assertLeftBufferContainsOnly(5)
+			.assertRightBufferContainsOnly(4, 5)
+
+			.processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+			.processWatermark2(6)
+
+			.assertLeftBufferEmpty()
+			.assertRightBufferEmpty()
+
+			.close();
+	}
+
+	@Test
+	public void testStateCleanupPositiveInclusivePositiveInclusive() throws Exception {
+		setupHarness(0, true, 1, true)
+			.processElement1(1)
+			.processElement1(2)
+			.processElement1(3)
+			.processElement1(4)
+			.processElement1(5)
+
+			.processElement2(1)
+			.processElement2(2)
+			.processElement2(3)
+			.processElement2(4)
+			.processElement2(5) // fill both buffers with values
+
+			.processWatermark1(1)
+			.processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+			.assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
+			.assertRightBufferContainsOnly(2, 3, 4, 5)
+
+			.processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+			.processWatermark2(4)
+
+			.assertLeftBufferContainsOnly(4, 5)
+			.assertRightBufferContainsOnly(5)
+
+			.processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+			.processWatermark2(6)
+
+			.assertLeftBufferEmpty()
+			.assertRightBufferEmpty()
+
+			.close();
+	}
+
+	@Test
+	public void testStateCleanupPositiveExlusivePositiveExclusive() throws Exception {
+		setupHarness(-1, false, 2, false)
+			.processElement1(1)
+			.processElement1(2)
+			.processElement1(3)
+			.processElement1(4)
+			.processElement1(5)
+
+			.processElement2(1)
+			.processElement2(2)
+			.processElement2(3)
+			.processElement2(4)
+			.processElement2(5) // fill both buffers with values
+
+			.processWatermark1(1)
+			.processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+			.assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
+			.assertRightBufferContainsOnly(2, 3, 4, 5)
+
+			.processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+			.processWatermark2(4)
+
+			.assertLeftBufferContainsOnly(4, 5)
+			.assertRightBufferContainsOnly(5)
+
+			.processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+			.processWatermark2(6)
+
+			.assertLeftBufferEmpty()
+			.assertRightBufferEmpty()
+
+			.close();
+	}
+
+	@Test
+	public void testRestoreFromSnapshot() throws Exception {
+
+		// config
+		int lowerBound = -1;
+		boolean lowerBoundInclusive = true;
+		int upperBound = 1;
+		boolean upperBoundInclusive = true;
+
+		// create first test harness
+		OperatorSubtaskState handles;
+		List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput;
+
+		try (TestHarness testHarness = createTestHarness(
+			lowerBound,
+			lowerBoundInclusive,
+			upperBound,
+			upperBoundInclusive
+		)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			// process elements with first test harness
+			testHarness.processElement1(createStreamRecord(1, "lhs"));
+			testHarness.processWatermark1(new Watermark(1));
+
+			testHarness.processElement2(createStreamRecord(1, "rhs"));
+			testHarness.processWatermark2(new Watermark(1));
+
+			testHarness.processElement1(createStreamRecord(2, "lhs"));
+			testHarness.processWatermark1(new Watermark(2));
+
+			testHarness.processElement2(createStreamRecord(2, "rhs"));
+			testHarness.processWatermark2(new Watermark(2));
+
+			testHarness.processElement1(createStreamRecord(3, "lhs"));
+			testHarness.processWatermark1(new Watermark(3));
+
+			testHarness.processElement2(createStreamRecord(3, "rhs"));
+			testHarness.processWatermark2(new Watermark(3));
+
+			// snapshot and validate output
+			handles = testHarness.snapshot(0, 0);
+			testHarness.close();
+
+			expectedOutput = Lists.newArrayList(
+				streamRecordOf(1, 1),
+				streamRecordOf(1, 2),
+				streamRecordOf(2, 1),
+				streamRecordOf(2, 2),
+				streamRecordOf(2, 3),
+				streamRecordOf(3, 2),
+				streamRecordOf(3, 3)
+			);
+
+			TestHarnessUtil.assertNoLateRecords(testHarness.getOutput());
+			assertOutput(expectedOutput, testHarness.getOutput());
+		}
+
+		try (TestHarness newTestHarness = createTestHarness(
+			lowerBound,
+			lowerBoundInclusive,
+			upperBound,
+			upperBoundInclusive
+		)) {
+			// create new test harness from snapshpt
+
+			newTestHarness.setup();
+			newTestHarness.initializeState(handles);
+			newTestHarness.open();
+
+			// process elements
+			newTestHarness.processElement1(createStreamRecord(4, "lhs"));
+			newTestHarness.processWatermark1(new Watermark(4));
+
+			newTestHarness.processElement2(createStreamRecord(4, "rhs"));
+			newTestHarness.processWatermark2(new Watermark(4));
+
+			// assert expected output
+			expectedOutput = Lists.newArrayList(
+				streamRecordOf(3, 4),
+				streamRecordOf(4, 3),
+				streamRecordOf(4, 4)
+			);
+
+			TestHarnessUtil.assertNoLateRecords(newTestHarness.getOutput());
+			assertOutput(expectedOutput, newTestHarness.getOutput());
+		}
+	}
+
+	@Test
+	public void testContextCorrectLeftTimestamp() throws Exception {
+
+		TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+			new TimeBoundedStreamJoinOperator<>(
+				-1,
+				1,
+				true,
+				true,
+				TestElem.serializer(),
+				TestElem.serializer(),
+				new TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+					@Override
+					public void processElement(
+						TestElem left,
+						TestElem right,
+						Context ctx,
+						Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+						Assert.assertEquals(left.ts, ctx.getLeftTimestamp());
+					}
+				}
+			);
+
+		try (TestHarness testHarness = new TestHarness(
+			op,
+			(elem) -> elem.key,
+			(elem) -> elem.key,
+			TypeInformation.of(String.class)
+		)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			processElementsAndWatermarks(testHarness);
+		}
+	}
+
+	@Test
+	public void testReturnsCorrectTimestamp() throws Exception {
+		TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+			new TimeBoundedStreamJoinOperator<>(
+				-1,
+				1,
+				true,
+				true,
+				TestElem.serializer(),
+				TestElem.serializer(),
+				new TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+					@Override
+					public void processElement(
+						TestElem left,
+						TestElem right,
+						Context ctx,
+						Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+						Assert.assertEquals(left.ts, ctx.getTimestamp());
+					}
+				}
+			);
+
+		try (TestHarness testHarness = new TestHarness(
+			op,
+			(elem) -> elem.key,
+			(elem) -> elem.key,
+			TypeInformation.of(String.class)
+		)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			processElementsAndWatermarks(testHarness);
+		}
+	}
+
+	@Test
+	public void testContextCorrectRightTimestamp() throws Exception {
+
+		TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+			new TimeBoundedStreamJoinOperator<>(
+				-1,
+				1,
+				true,
+				true,
+				TestElem.serializer(),
+				TestElem.serializer(),
+				new TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+					@Override
+					public void processElement(
+						TestElem left,
+						TestElem right,
+						Context ctx,
+						Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+						Assert.assertEquals(right.ts, ctx.getRightTimestamp());
+					}
+				}
+			);
+
+		try (TestHarness testHarness = new TestHarness(
+			op,
+			(elem) -> elem.key,
+			(elem) -> elem.key,
+			TypeInformation.of(String.class)
+		)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			processElementsAndWatermarks(testHarness);
+		}
+	}
+
+	@Test(expected = FlinkException.class)
+	public void testFailsWithNoTimestampsLeft() throws Exception {
+		TestHarness newTestHarness = createTestHarness(0L, true, 0L, true);
+
+		newTestHarness.setup();
+		newTestHarness.open();
+
+		// note that the StreamRecord has no timestamp in constructor
+		newTestHarness.processElement1(new StreamRecord<>(new TestElem(0, "lhs")));
+	}
+
+	@Test(expected = FlinkException.class)
+	public void testFailsWithNoTimestampsRight() throws Exception {
+		try (TestHarness newTestHarness = createTestHarness(0L, true, 0L, true)) {
+
+			newTestHarness.setup();
+			newTestHarness.open();
+
+			// note that the StreamRecord has no timestamp in constructor
+			newTestHarness.processElement2(new StreamRecord<>(new TestElem(0, "rhs")));
+		}
+	}
+
+	@Test
+	public void testDiscardsLateData() throws Exception {
+		setupHarness(-1, true, 1, true)
+			.processElement1(1)
+			.processElement2(1)
+			.processElement1(2)
+			.processElement2(2)
+			.processElement1(3)
+			.processElement2(3)
+			.processWatermark1(3)
+			.processWatermark2(3)
+			.processElement1(1) // this element is late and should not be joined again
+			.processElement1(4)
+			.processElement2(4)
+			.processElement1(5)
+			.processElement2(5)
+			.andExpect(
+				streamRecordOf(1, 1),
+				streamRecordOf(1, 2),
+
+				streamRecordOf(2, 1),
+				streamRecordOf(2, 2),
+				streamRecordOf(2, 3),
+
+				streamRecordOf(3, 2),
+				streamRecordOf(3, 3),
+				streamRecordOf(3, 4),
+
+				streamRecordOf(4, 3),
+				streamRecordOf(4, 4),
+				streamRecordOf(4, 5),
+
+				streamRecordOf(5, 4),
+				streamRecordOf(5, 5)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	private void assertEmpty(MapState<Long, ?> state) throws Exception {
+		boolean stateIsEmpty = Iterables.size(state.keys()) == 0;
+		Assert.assertTrue("state not empty", stateIsEmpty);
+	}
+
+	private void assertContainsOnly(MapState<Long, ?> state, long... ts) throws Exception {
+		for (long t : ts) {
+			String message = "Keys not found in state. \n Expected: " + Arrays.toString(ts) + "\n Actual:   " + state.keys();
+			Assert.assertTrue(message, state.contains(t));
+		}
+
+		String message = "Too many objects in state. \n Expected: " + Arrays.toString(ts) + "\n Actual:   " + state.keys();
+		Assert.assertEquals(message, ts.length, Iterables.size(state.keys()));
+	}
+
+	private void assertOutput(
+		Iterable<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput,
+		Queue<Object> actualOutput) {
+
+		int actualSize = actualOutput.stream()
+			.filter(elem -> elem instanceof StreamRecord)
+			.collect(Collectors.toList())
+			.size();
+
+		int expectedSize = Iterables.size(expectedOutput);
+
+		Assert.assertEquals(
+			"Expected and actual size of stream records different",
+			expectedSize,
+			actualSize
+		);
+
+		for (StreamRecord<Tuple2<TestElem, TestElem>> record : expectedOutput) {
+			Assert.assertTrue(actualOutput.contains(record));
+		}
+	}
+
+	private TestHarness createTestHarness(long lowerBound,
+		boolean lowerBoundInclusive,
+		long upperBound,
+		boolean upperBoundInclusive) throws Exception {
+
+		TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
+			new TimeBoundedStreamJoinOperator<>(
+				lowerBound,
+				upperBound,
+				lowerBoundInclusive,
+				upperBoundInclusive,
+				TestElem.serializer(),
+				TestElem.serializer(),
+				new PassthroughFunction()
+			);
+
+		return new TestHarness(
+			operator,
+			(elem) -> elem.key, // key
+			(elem) -> elem.key, // key
+			TypeInformation.of(String.class)
+		);
+	}
+
+	private JoinTestBuilder setupHarness(long lowerBound,
+		boolean lowerBoundInclusive,
+		long upperBound,
+		boolean upperBoundInclusive) throws Exception {
+
+		TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
+			new TimeBoundedStreamJoinOperator<>(
+				lowerBound,
+				upperBound,
+				lowerBoundInclusive,
+				upperBoundInclusive,
+				TestElem.serializer(),
+				TestElem.serializer(),
+				new PassthroughFunction()
+			);
+
+		TestHarness t = new TestHarness(
+			operator,
+			(elem) -> elem.key, // key
+			(elem) -> elem.key, // key
+			TypeInformation.of(String.class)
+		);
+
+		return new JoinTestBuilder(t, operator);
+	}
+
+	private class JoinTestBuilder {
+
+		private TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator;
+		private TestHarness testHarness;
+
+		public JoinTestBuilder(
+			TestHarness t,
+			TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator
+		) throws Exception {
+
+			this.testHarness = t;
+			this.operator = operator;
+			t.open();
+			t.setup();
+		}
+
+		public TestHarness get() {
+			return testHarness;
+		}
+
+		public JoinTestBuilder processElement1(int ts) throws Exception {
+			testHarness.processElement1(createStreamRecord(ts, "lhs"));
+			return this;
+		}
+
+		public JoinTestBuilder processElement2(int ts) throws Exception {
+			testHarness.processElement2(createStreamRecord(ts, "rhs"));
+			return this;
+		}
+
+		public JoinTestBuilder processWatermark1(int ts) throws Exception {
+			testHarness.processWatermark1(new Watermark(ts));
+			return this;
+		}
+
+		public JoinTestBuilder processWatermark2(int ts) throws Exception {
+			testHarness.processWatermark2(new Watermark(ts));
+			return this;
+		}
+
+		public JoinTestBuilder processElementsAndWatermarks(int from, int to) throws Exception {
+			if (lhsFasterThanRhs) {
+				// add to lhs
+				for (int i = from; i <= to; i++) {
+					testHarness.processElement1(createStreamRecord(i, "lhs"));
+					testHarness.processWatermark1(new Watermark(i));
+				}
+
+				// add to rhs
+				for (int i = from; i <= to; i++) {
+					testHarness.processElement2(createStreamRecord(i, "rhs"));
+					testHarness.processWatermark2(new Watermark(i));
+				}
+			} else {
+				// add to rhs
+				for (int i = from; i <= to; i++) {
+					testHarness.processElement2(createStreamRecord(i, "rhs"));
+					testHarness.processWatermark2(new Watermark(i));
+				}
+
+				// add to lhs
+				for (int i = from; i <= to; i++) {
+					testHarness.processElement1(createStreamRecord(i, "lhs"));
+					testHarness.processWatermark1(new Watermark(i));
+				}
+			}
+
+			return this;
+		}
+
+		@SafeVarargs
+		public final JoinTestBuilder andExpect(StreamRecord<Tuple2<TestElem, TestElem>>... elems) {
+			assertOutput(Lists.newArrayList(elems), testHarness.getOutput());
+			return this;
+		}
+
+		public JoinTestBuilder assertLeftBufferContainsOnly(long... timestamps) {
+
+			try {
+				assertContainsOnly(operator.getLeftBuffer(), timestamps);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			return this;
+		}
+
+		public JoinTestBuilder assertRightBufferContainsOnly(long... timestamps) {
+
+			try {
+				assertContainsOnly(operator.getRightBuffer(), timestamps);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			return this;
+		}
+
+		public JoinTestBuilder assertLeftBufferEmpty() {
+			try {
+				assertEmpty(operator.getLeftBuffer());
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			return this;
+		}
+
+		public JoinTestBuilder assertRightBufferEmpty() {
+			try {
+				assertEmpty(operator.getRightBuffer());
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			return this;
+		}
+
+		public JoinTestBuilder noLateRecords() {
+			TestHarnessUtil.assertNoLateRecords(this.testHarness.getOutput());
+			return this;
+		}
+
+		public void close() throws Exception {
+			testHarness.close();
+		}
+	}
+
+	private static class PassthroughFunction extends TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>> {
+
+		@Override
+		public void processElement(
+			TestElem left,
+			TestElem right,
+			Context ctx,
+			Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+			out.collect(Tuple2.of(left, right));
+		}
+	}
+
+	private StreamRecord<Tuple2<TestElem, TestElem>> streamRecordOf(
+		long lhsTs,
+		long rhsTs
+	) {
+		TestElem lhs = new TestElem(lhsTs, "lhs");
+		TestElem rhs = new TestElem(rhsTs, "rhs");
+
+		long ts = Math.max(lhsTs, rhsTs);
+		return new StreamRecord<>(Tuple2.of(lhs, rhs), ts);
+	}
+
+	private static class TestElem {
+		String key;
+		long ts;
+		String source;
+
+		public TestElem(long ts, String source) {
+			this.key = "key";
+			this.ts = ts;
+			this.source = source;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			TestElem testElem = (TestElem) o;
+
+			if (ts != testElem.ts) {
+				return false;
+			}
+
+			if (key != null ? !key.equals(testElem.key) : testElem.key != null) {
+				return false;
+			}
+
+			return source != null ? source.equals(testElem.source) : testElem.source == null;
+		}
+
+		@Override
+		public int hashCode() {
+			int result = key != null ? key.hashCode() : 0;
+			result = 31 * result + (int) (ts ^ (ts >>> 32));
+			result = 31 * result + (source != null ? source.hashCode() : 0);
+			return result;
+		}
+
+		@Override
+		public String toString() {
+			return this.source + ":" + this.ts;
+		}
+
+		public static TypeSerializer<TestElem> serializer() {
+			return TypeInformation.of(new TypeHint<TestElem>() {
+			}).createSerializer(new ExecutionConfig());
+		}
+	}
+
+	private static StreamRecord<TestElem> createStreamRecord(long ts, String source) {
+		TestElem testElem = new TestElem(ts, source);
+		return new StreamRecord<>(testElem, ts);
+	}
+
+	private void processElementsAndWatermarks(TestHarness testHarness) throws Exception {
+		if (lhsFasterThanRhs) {
+			// add to lhs
+			for (int i = 1; i <= 4; i++) {
+				testHarness.processElement1(createStreamRecord(i, "lhs"));
+				testHarness.processWatermark1(new Watermark(i));
+			}
+
+			// add to rhs
+			for (int i = 1; i <= 4; i++) {
+				testHarness.processElement2(createStreamRecord(i, "rhs"));
+				testHarness.processWatermark2(new Watermark(i));
+			}
+		} else {
+			// add to rhs
+			for (int i = 1; i <= 4; i++) {
+				testHarness.processElement2(createStreamRecord(i, "rhs"));
+				testHarness.processWatermark2(new Watermark(i));
+			}
+
+			// add to lhs
+			for (int i = 1; i <= 4; i++) {
+				testHarness.processElement1(createStreamRecord(i, "lhs"));
+				testHarness.processWatermark1(new Watermark(i));
+			}
+		}
+	}
+
+	/**
+	 * Custom test harness to avoid endless generics in all of the test code.
+	 */
+	private static class TestHarness extends KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> {
+
+		TestHarness(
+			TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>> operator,
+			KeySelector<TestElem, String> keySelector1,
+			KeySelector<TestElem, String> keySelector2,
+			TypeInformation<String> keyType) throws Exception {
+			super(operator, keySelector1, keySelector2, keyType);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce345e39/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index a1d732b..54c7e3e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -105,4 +105,24 @@ public class TestHarnessUtil {
 		Assert.assertArrayEquals(message, sortedExpected, sortedActual);
 
 	}
+
+	/**
+	 * Verify no StreamRecord is equal to or later than any watermarks. This is checked over the
+	 * order of the elements
+	 *
+	 * @param elements An iterable containing StreamRecords and watermarks
+	 */
+	public static void assertNoLateRecords(Iterable<Object> elements) {
+		// check that no watermark is violated
+		long highestWatermark = Long.MIN_VALUE;
+
+		for (Object elem : elements) {
+			if (elem instanceof Watermark) {
+				highestWatermark = ((Watermark) elem).asWatermark().getTimestamp();
+			} else if (elem instanceof StreamRecord) {
+				boolean dataIsOnTime = highestWatermark < ((StreamRecord) elem).getTimestamp();
+				Assert.assertTrue("Late data was emitted after join", dataIsOnTime);
+			}
+		}
+	}
 }