You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/07/12 19:34:40 UTC

[1/3] flink git commit: [FLINK-8480][DataStream] Add APIs for Interval Joins.

Repository: flink
Updated Branches:
  refs/heads/master f45b7f7ff -> ca0fa96bf


http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java
deleted file mode 100644
index 75543e7..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java
+++ /dev/null
@@ -1,941 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.functions.co.TimeBoundedJoinFunction;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkException;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Queue;
-import java.util.stream.Collectors;
-
-
-/**
- * Tests for {@link TimeBoundedStreamJoinOperator}.
- * Those tests cover correctness and cleaning of state
- */
-@RunWith(Parameterized.class)
-public class TimeBoundedStreamJoinOperatorTest {
-
-	private final boolean lhsFasterThanRhs;
-
-	@Parameters(name = "lhs faster than rhs: {0}")
-	public static Collection<Object[]> data() {
-		return Arrays.asList(new Object[][]{
-			{true}, {false}
-		});
-	}
-
-	public TimeBoundedStreamJoinOperatorTest(boolean lhsFasterThanRhs) {
-		this.lhsFasterThanRhs = lhsFasterThanRhs;
-	}
-
-	@Test
-	public void testImplementationMirrorsCorrectly() throws Exception {
-
-		long lowerBound = 1;
-		long upperBound = 3;
-
-		boolean lowerBoundInclusive = true;
-		boolean upperBoundInclusive = false;
-
-		setupHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive)
-			.processElementsAndWatermarks(1, 4)
-			.andExpect(
-				streamRecordOf(1, 2),
-				streamRecordOf(1, 3),
-				streamRecordOf(2, 3),
-				streamRecordOf(2, 4),
-				streamRecordOf(3, 4))
-			.noLateRecords()
-			.close();
-
-		setupHarness(-1 * upperBound, upperBoundInclusive, -1 * lowerBound, lowerBoundInclusive)
-			.processElementsAndWatermarks(1, 4)
-			.andExpect(
-				streamRecordOf(2, 1),
-				streamRecordOf(3, 1),
-				streamRecordOf(3, 2),
-				streamRecordOf(4, 2),
-				streamRecordOf(4, 3))
-			.noLateRecords()
-			.close();
-	}
-
-	@Test // lhs - 2 <= rhs <= rhs + 2
-	public void testNegativeInclusiveAndNegativeInclusive() throws Exception {
-
-		setupHarness(-2, true, -1, true)
-			.processElementsAndWatermarks(1, 4)
-			.andExpect(
-				streamRecordOf(2, 1),
-				streamRecordOf(3, 1),
-				streamRecordOf(3, 2),
-				streamRecordOf(4, 2),
-				streamRecordOf(4, 3)
-			)
-			.noLateRecords()
-			.close();
-	}
-
-	@Test // lhs - 1 <= rhs <= rhs + 1
-	public void testNegativeInclusiveAndPositiveInclusive() throws Exception {
-
-		setupHarness(-1, true, 1, true)
-			.processElementsAndWatermarks(1, 4)
-			.andExpect(
-				streamRecordOf(1, 1),
-				streamRecordOf(1, 2),
-				streamRecordOf(2, 1),
-				streamRecordOf(2, 2),
-				streamRecordOf(2, 3),
-				streamRecordOf(3, 2),
-				streamRecordOf(3, 3),
-				streamRecordOf(3, 4),
-				streamRecordOf(4, 3),
-				streamRecordOf(4, 4)
-			)
-			.noLateRecords()
-			.close();
-	}
-
-	@Test // lhs + 1 <= rhs <= lhs + 2
-	public void testPositiveInclusiveAndPositiveInclusive() throws Exception {
-
-		setupHarness(1, true, 2, true)
-			.processElementsAndWatermarks(1, 4)
-			.andExpect(
-				streamRecordOf(1, 2),
-				streamRecordOf(1, 3),
-				streamRecordOf(2, 3),
-				streamRecordOf(2, 4),
-				streamRecordOf(3, 4)
-			)
-			.noLateRecords()
-			.close();
-	}
-
-	@Test
-	public void testNegativeExclusiveAndNegativeExlusive() throws Exception {
-
-		setupHarness(-3, false, -1, false)
-			.processElementsAndWatermarks(1, 4)
-			.andExpect(
-				streamRecordOf(3, 1),
-				streamRecordOf(4, 2)
-			)
-			.noLateRecords()
-			.close();
-	}
-
-	@Test
-	public void testNegativeExclusiveAndPositiveExlusive() throws Exception {
-
-		setupHarness(-1, false, 1, false)
-			.processElementsAndWatermarks(1, 4)
-			.andExpect(
-				streamRecordOf(1, 1),
-				streamRecordOf(2, 2),
-				streamRecordOf(3, 3),
-				streamRecordOf(4, 4)
-			)
-			.noLateRecords()
-			.close();
-	}
-
-	@Test
-	public void testPositiveExclusiveAndPositiveExlusive() throws Exception {
-
-		setupHarness(1, false, 3, false)
-			.processElementsAndWatermarks(1, 4)
-			.andExpect(
-				streamRecordOf(1, 3),
-				streamRecordOf(2, 4)
-			)
-			.noLateRecords()
-			.close();
-	}
-
-	@Test
-	public void testStateCleanupNegativeInclusiveNegativeInclusive() throws Exception {
-
-		setupHarness(-1, true, 0, true)
-			.processElement1(1)
-			.processElement1(2)
-			.processElement1(3)
-			.processElement1(4)
-			.processElement1(5)
-
-			.processElement2(1)
-			.processElement2(2)
-			.processElement2(3)
-			.processElement2(4)
-			.processElement2(5) // fill both buffers with values
-
-			.processWatermark1(1)
-			.processWatermark2(1) // set common watermark to 1 and check that data is cleaned
-
-			.assertLeftBufferContainsOnly(2, 3, 4, 5)
-			.assertRightBufferContainsOnly(1, 2, 3, 4, 5)
-
-			.processWatermark1(4) // set common watermark to 4 and check that data is cleaned
-			.processWatermark2(4)
-
-			.assertLeftBufferContainsOnly(5)
-			.assertRightBufferContainsOnly(4, 5)
-
-			.processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
-			.processWatermark2(6)
-
-			.assertLeftBufferEmpty()
-			.assertRightBufferEmpty()
-
-			.close();
-	}
-
-	@Test
-	public void testStateCleanupNegativePositiveNegativeExlusive() throws Exception {
-		setupHarness(-2, false, 1, false)
-			.processElement1(1)
-			.processElement1(2)
-			.processElement1(3)
-			.processElement1(4)
-			.processElement1(5)
-
-			.processElement2(1)
-			.processElement2(2)
-			.processElement2(3)
-			.processElement2(4)
-			.processElement2(5) // fill both buffers with values
-
-			.processWatermark1(1)
-			.processWatermark2(1) // set common watermark to 1 and check that data is cleaned
-
-			.assertLeftBufferContainsOnly(2, 3, 4, 5)
-			.assertRightBufferContainsOnly(1, 2, 3, 4, 5)
-
-			.processWatermark1(4) // set common watermark to 4 and check that data is cleaned
-			.processWatermark2(4)
-
-			.assertLeftBufferContainsOnly(5)
-			.assertRightBufferContainsOnly(4, 5)
-
-			.processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
-			.processWatermark2(6)
-
-			.assertLeftBufferEmpty()
-			.assertRightBufferEmpty()
-
-			.close();
-	}
-
-	@Test
-	public void testStateCleanupPositiveInclusivePositiveInclusive() throws Exception {
-		setupHarness(0, true, 1, true)
-			.processElement1(1)
-			.processElement1(2)
-			.processElement1(3)
-			.processElement1(4)
-			.processElement1(5)
-
-			.processElement2(1)
-			.processElement2(2)
-			.processElement2(3)
-			.processElement2(4)
-			.processElement2(5) // fill both buffers with values
-
-			.processWatermark1(1)
-			.processWatermark2(1) // set common watermark to 1 and check that data is cleaned
-
-			.assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
-			.assertRightBufferContainsOnly(2, 3, 4, 5)
-
-			.processWatermark1(4) // set common watermark to 4 and check that data is cleaned
-			.processWatermark2(4)
-
-			.assertLeftBufferContainsOnly(4, 5)
-			.assertRightBufferContainsOnly(5)
-
-			.processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
-			.processWatermark2(6)
-
-			.assertLeftBufferEmpty()
-			.assertRightBufferEmpty()
-
-			.close();
-	}
-
-	@Test
-	public void testStateCleanupPositiveExlusivePositiveExclusive() throws Exception {
-		setupHarness(-1, false, 2, false)
-			.processElement1(1)
-			.processElement1(2)
-			.processElement1(3)
-			.processElement1(4)
-			.processElement1(5)
-
-			.processElement2(1)
-			.processElement2(2)
-			.processElement2(3)
-			.processElement2(4)
-			.processElement2(5) // fill both buffers with values
-
-			.processWatermark1(1)
-			.processWatermark2(1) // set common watermark to 1 and check that data is cleaned
-
-			.assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
-			.assertRightBufferContainsOnly(2, 3, 4, 5)
-
-			.processWatermark1(4) // set common watermark to 4 and check that data is cleaned
-			.processWatermark2(4)
-
-			.assertLeftBufferContainsOnly(4, 5)
-			.assertRightBufferContainsOnly(5)
-
-			.processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
-			.processWatermark2(6)
-
-			.assertLeftBufferEmpty()
-			.assertRightBufferEmpty()
-
-			.close();
-	}
-
-	@Test
-	public void testRestoreFromSnapshot() throws Exception {
-
-		// config
-		int lowerBound = -1;
-		boolean lowerBoundInclusive = true;
-		int upperBound = 1;
-		boolean upperBoundInclusive = true;
-
-		// create first test harness
-		OperatorSubtaskState handles;
-		List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput;
-
-		try (TestHarness testHarness = createTestHarness(
-			lowerBound,
-			lowerBoundInclusive,
-			upperBound,
-			upperBoundInclusive
-		)) {
-
-			testHarness.setup();
-			testHarness.open();
-
-			// process elements with first test harness
-			testHarness.processElement1(createStreamRecord(1, "lhs"));
-			testHarness.processWatermark1(new Watermark(1));
-
-			testHarness.processElement2(createStreamRecord(1, "rhs"));
-			testHarness.processWatermark2(new Watermark(1));
-
-			testHarness.processElement1(createStreamRecord(2, "lhs"));
-			testHarness.processWatermark1(new Watermark(2));
-
-			testHarness.processElement2(createStreamRecord(2, "rhs"));
-			testHarness.processWatermark2(new Watermark(2));
-
-			testHarness.processElement1(createStreamRecord(3, "lhs"));
-			testHarness.processWatermark1(new Watermark(3));
-
-			testHarness.processElement2(createStreamRecord(3, "rhs"));
-			testHarness.processWatermark2(new Watermark(3));
-
-			// snapshot and validate output
-			handles = testHarness.snapshot(0, 0);
-			testHarness.close();
-
-			expectedOutput = Lists.newArrayList(
-				streamRecordOf(1, 1),
-				streamRecordOf(1, 2),
-				streamRecordOf(2, 1),
-				streamRecordOf(2, 2),
-				streamRecordOf(2, 3),
-				streamRecordOf(3, 2),
-				streamRecordOf(3, 3)
-			);
-
-			TestHarnessUtil.assertNoLateRecords(testHarness.getOutput());
-			assertOutput(expectedOutput, testHarness.getOutput());
-		}
-
-		try (TestHarness newTestHarness = createTestHarness(
-			lowerBound,
-			lowerBoundInclusive,
-			upperBound,
-			upperBoundInclusive
-		)) {
-			// create new test harness from snapshpt
-
-			newTestHarness.setup();
-			newTestHarness.initializeState(handles);
-			newTestHarness.open();
-
-			// process elements
-			newTestHarness.processElement1(createStreamRecord(4, "lhs"));
-			newTestHarness.processWatermark1(new Watermark(4));
-
-			newTestHarness.processElement2(createStreamRecord(4, "rhs"));
-			newTestHarness.processWatermark2(new Watermark(4));
-
-			// assert expected output
-			expectedOutput = Lists.newArrayList(
-				streamRecordOf(3, 4),
-				streamRecordOf(4, 3),
-				streamRecordOf(4, 4)
-			);
-
-			TestHarnessUtil.assertNoLateRecords(newTestHarness.getOutput());
-			assertOutput(expectedOutput, newTestHarness.getOutput());
-		}
-	}
-
-	@Test
-	public void testContextCorrectLeftTimestamp() throws Exception {
-
-		TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
-			new TimeBoundedStreamJoinOperator<>(
-				-1,
-				1,
-				true,
-				true,
-				TestElem.serializer(),
-				TestElem.serializer(),
-				new TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
-					@Override
-					public void processElement(
-						TestElem left,
-						TestElem right,
-						Context ctx,
-						Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
-						Assert.assertEquals(left.ts, ctx.getLeftTimestamp());
-					}
-				}
-			);
-
-		try (TestHarness testHarness = new TestHarness(
-			op,
-			(elem) -> elem.key,
-			(elem) -> elem.key,
-			TypeInformation.of(String.class)
-		)) {
-
-			testHarness.setup();
-			testHarness.open();
-
-			processElementsAndWatermarks(testHarness);
-		}
-	}
-
-	@Test
-	public void testReturnsCorrectTimestamp() throws Exception {
-		TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
-			new TimeBoundedStreamJoinOperator<>(
-				-1,
-				1,
-				true,
-				true,
-				TestElem.serializer(),
-				TestElem.serializer(),
-				new TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
-					@Override
-					public void processElement(
-						TestElem left,
-						TestElem right,
-						Context ctx,
-						Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
-						Assert.assertEquals(left.ts, ctx.getTimestamp());
-					}
-				}
-			);
-
-		try (TestHarness testHarness = new TestHarness(
-			op,
-			(elem) -> elem.key,
-			(elem) -> elem.key,
-			TypeInformation.of(String.class)
-		)) {
-
-			testHarness.setup();
-			testHarness.open();
-
-			processElementsAndWatermarks(testHarness);
-		}
-	}
-
-	@Test
-	public void testContextCorrectRightTimestamp() throws Exception {
-
-		TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
-			new TimeBoundedStreamJoinOperator<>(
-				-1,
-				1,
-				true,
-				true,
-				TestElem.serializer(),
-				TestElem.serializer(),
-				new TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
-					@Override
-					public void processElement(
-						TestElem left,
-						TestElem right,
-						Context ctx,
-						Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
-						Assert.assertEquals(right.ts, ctx.getRightTimestamp());
-					}
-				}
-			);
-
-		try (TestHarness testHarness = new TestHarness(
-			op,
-			(elem) -> elem.key,
-			(elem) -> elem.key,
-			TypeInformation.of(String.class)
-		)) {
-
-			testHarness.setup();
-			testHarness.open();
-
-			processElementsAndWatermarks(testHarness);
-		}
-	}
-
-	@Test(expected = FlinkException.class)
-	public void testFailsWithNoTimestampsLeft() throws Exception {
-		TestHarness newTestHarness = createTestHarness(0L, true, 0L, true);
-
-		newTestHarness.setup();
-		newTestHarness.open();
-
-		// note that the StreamRecord has no timestamp in constructor
-		newTestHarness.processElement1(new StreamRecord<>(new TestElem(0, "lhs")));
-	}
-
-	@Test(expected = FlinkException.class)
-	public void testFailsWithNoTimestampsRight() throws Exception {
-		try (TestHarness newTestHarness = createTestHarness(0L, true, 0L, true)) {
-
-			newTestHarness.setup();
-			newTestHarness.open();
-
-			// note that the StreamRecord has no timestamp in constructor
-			newTestHarness.processElement2(new StreamRecord<>(new TestElem(0, "rhs")));
-		}
-	}
-
-	@Test
-	public void testDiscardsLateData() throws Exception {
-		setupHarness(-1, true, 1, true)
-			.processElement1(1)
-			.processElement2(1)
-			.processElement1(2)
-			.processElement2(2)
-			.processElement1(3)
-			.processElement2(3)
-			.processWatermark1(3)
-			.processWatermark2(3)
-			.processElement1(1) // this element is late and should not be joined again
-			.processElement1(4)
-			.processElement2(4)
-			.processElement1(5)
-			.processElement2(5)
-			.andExpect(
-				streamRecordOf(1, 1),
-				streamRecordOf(1, 2),
-
-				streamRecordOf(2, 1),
-				streamRecordOf(2, 2),
-				streamRecordOf(2, 3),
-
-				streamRecordOf(3, 2),
-				streamRecordOf(3, 3),
-				streamRecordOf(3, 4),
-
-				streamRecordOf(4, 3),
-				streamRecordOf(4, 4),
-				streamRecordOf(4, 5),
-
-				streamRecordOf(5, 4),
-				streamRecordOf(5, 5)
-			)
-			.noLateRecords()
-			.close();
-	}
-
-	private void assertEmpty(MapState<Long, ?> state) throws Exception {
-		boolean stateIsEmpty = Iterables.size(state.keys()) == 0;
-		Assert.assertTrue("state not empty", stateIsEmpty);
-	}
-
-	private void assertContainsOnly(MapState<Long, ?> state, long... ts) throws Exception {
-		for (long t : ts) {
-			String message = "Keys not found in state. \n Expected: " + Arrays.toString(ts) + "\n Actual:   " + state.keys();
-			Assert.assertTrue(message, state.contains(t));
-		}
-
-		String message = "Too many objects in state. \n Expected: " + Arrays.toString(ts) + "\n Actual:   " + state.keys();
-		Assert.assertEquals(message, ts.length, Iterables.size(state.keys()));
-	}
-
-	private void assertOutput(
-		Iterable<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput,
-		Queue<Object> actualOutput) {
-
-		int actualSize = actualOutput.stream()
-			.filter(elem -> elem instanceof StreamRecord)
-			.collect(Collectors.toList())
-			.size();
-
-		int expectedSize = Iterables.size(expectedOutput);
-
-		Assert.assertEquals(
-			"Expected and actual size of stream records different",
-			expectedSize,
-			actualSize
-		);
-
-		for (StreamRecord<Tuple2<TestElem, TestElem>> record : expectedOutput) {
-			Assert.assertTrue(actualOutput.contains(record));
-		}
-	}
-
-	private TestHarness createTestHarness(long lowerBound,
-		boolean lowerBoundInclusive,
-		long upperBound,
-		boolean upperBoundInclusive) throws Exception {
-
-		TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
-			new TimeBoundedStreamJoinOperator<>(
-				lowerBound,
-				upperBound,
-				lowerBoundInclusive,
-				upperBoundInclusive,
-				TestElem.serializer(),
-				TestElem.serializer(),
-				new PassthroughFunction()
-			);
-
-		return new TestHarness(
-			operator,
-			(elem) -> elem.key, // key
-			(elem) -> elem.key, // key
-			TypeInformation.of(String.class)
-		);
-	}
-
-	private JoinTestBuilder setupHarness(long lowerBound,
-		boolean lowerBoundInclusive,
-		long upperBound,
-		boolean upperBoundInclusive) throws Exception {
-
-		TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
-			new TimeBoundedStreamJoinOperator<>(
-				lowerBound,
-				upperBound,
-				lowerBoundInclusive,
-				upperBoundInclusive,
-				TestElem.serializer(),
-				TestElem.serializer(),
-				new PassthroughFunction()
-			);
-
-		TestHarness t = new TestHarness(
-			operator,
-			(elem) -> elem.key, // key
-			(elem) -> elem.key, // key
-			TypeInformation.of(String.class)
-		);
-
-		return new JoinTestBuilder(t, operator);
-	}
-
-	private class JoinTestBuilder {
-
-		private TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator;
-		private TestHarness testHarness;
-
-		public JoinTestBuilder(
-			TestHarness t,
-			TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator
-		) throws Exception {
-
-			this.testHarness = t;
-			this.operator = operator;
-			t.open();
-			t.setup();
-		}
-
-		public TestHarness get() {
-			return testHarness;
-		}
-
-		public JoinTestBuilder processElement1(int ts) throws Exception {
-			testHarness.processElement1(createStreamRecord(ts, "lhs"));
-			return this;
-		}
-
-		public JoinTestBuilder processElement2(int ts) throws Exception {
-			testHarness.processElement2(createStreamRecord(ts, "rhs"));
-			return this;
-		}
-
-		public JoinTestBuilder processWatermark1(int ts) throws Exception {
-			testHarness.processWatermark1(new Watermark(ts));
-			return this;
-		}
-
-		public JoinTestBuilder processWatermark2(int ts) throws Exception {
-			testHarness.processWatermark2(new Watermark(ts));
-			return this;
-		}
-
-		public JoinTestBuilder processElementsAndWatermarks(int from, int to) throws Exception {
-			if (lhsFasterThanRhs) {
-				// add to lhs
-				for (int i = from; i <= to; i++) {
-					testHarness.processElement1(createStreamRecord(i, "lhs"));
-					testHarness.processWatermark1(new Watermark(i));
-				}
-
-				// add to rhs
-				for (int i = from; i <= to; i++) {
-					testHarness.processElement2(createStreamRecord(i, "rhs"));
-					testHarness.processWatermark2(new Watermark(i));
-				}
-			} else {
-				// add to rhs
-				for (int i = from; i <= to; i++) {
-					testHarness.processElement2(createStreamRecord(i, "rhs"));
-					testHarness.processWatermark2(new Watermark(i));
-				}
-
-				// add to lhs
-				for (int i = from; i <= to; i++) {
-					testHarness.processElement1(createStreamRecord(i, "lhs"));
-					testHarness.processWatermark1(new Watermark(i));
-				}
-			}
-
-			return this;
-		}
-
-		@SafeVarargs
-		public final JoinTestBuilder andExpect(StreamRecord<Tuple2<TestElem, TestElem>>... elems) {
-			assertOutput(Lists.newArrayList(elems), testHarness.getOutput());
-			return this;
-		}
-
-		public JoinTestBuilder assertLeftBufferContainsOnly(long... timestamps) {
-
-			try {
-				assertContainsOnly(operator.getLeftBuffer(), timestamps);
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-			return this;
-		}
-
-		public JoinTestBuilder assertRightBufferContainsOnly(long... timestamps) {
-
-			try {
-				assertContainsOnly(operator.getRightBuffer(), timestamps);
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-			return this;
-		}
-
-		public JoinTestBuilder assertLeftBufferEmpty() {
-			try {
-				assertEmpty(operator.getLeftBuffer());
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-			return this;
-		}
-
-		public JoinTestBuilder assertRightBufferEmpty() {
-			try {
-				assertEmpty(operator.getRightBuffer());
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-			return this;
-		}
-
-		public JoinTestBuilder noLateRecords() {
-			TestHarnessUtil.assertNoLateRecords(this.testHarness.getOutput());
-			return this;
-		}
-
-		public void close() throws Exception {
-			testHarness.close();
-		}
-	}
-
-	private static class PassthroughFunction extends TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>> {
-
-		@Override
-		public void processElement(
-			TestElem left,
-			TestElem right,
-			Context ctx,
-			Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
-			out.collect(Tuple2.of(left, right));
-		}
-	}
-
-	private StreamRecord<Tuple2<TestElem, TestElem>> streamRecordOf(
-		long lhsTs,
-		long rhsTs
-	) {
-		TestElem lhs = new TestElem(lhsTs, "lhs");
-		TestElem rhs = new TestElem(rhsTs, "rhs");
-
-		long ts = Math.max(lhsTs, rhsTs);
-		return new StreamRecord<>(Tuple2.of(lhs, rhs), ts);
-	}
-
-	private static class TestElem {
-		String key;
-		long ts;
-		String source;
-
-		public TestElem(long ts, String source) {
-			this.key = "key";
-			this.ts = ts;
-			this.source = source;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-
-			TestElem testElem = (TestElem) o;
-
-			if (ts != testElem.ts) {
-				return false;
-			}
-
-			if (key != null ? !key.equals(testElem.key) : testElem.key != null) {
-				return false;
-			}
-
-			return source != null ? source.equals(testElem.source) : testElem.source == null;
-		}
-
-		@Override
-		public int hashCode() {
-			int result = key != null ? key.hashCode() : 0;
-			result = 31 * result + (int) (ts ^ (ts >>> 32));
-			result = 31 * result + (source != null ? source.hashCode() : 0);
-			return result;
-		}
-
-		@Override
-		public String toString() {
-			return this.source + ":" + this.ts;
-		}
-
-		public static TypeSerializer<TestElem> serializer() {
-			return TypeInformation.of(new TypeHint<TestElem>() {
-			}).createSerializer(new ExecutionConfig());
-		}
-	}
-
-	private static StreamRecord<TestElem> createStreamRecord(long ts, String source) {
-		TestElem testElem = new TestElem(ts, source);
-		return new StreamRecord<>(testElem, ts);
-	}
-
-	private void processElementsAndWatermarks(TestHarness testHarness) throws Exception {
-		if (lhsFasterThanRhs) {
-			// add to lhs
-			for (int i = 1; i <= 4; i++) {
-				testHarness.processElement1(createStreamRecord(i, "lhs"));
-				testHarness.processWatermark1(new Watermark(i));
-			}
-
-			// add to rhs
-			for (int i = 1; i <= 4; i++) {
-				testHarness.processElement2(createStreamRecord(i, "rhs"));
-				testHarness.processWatermark2(new Watermark(i));
-			}
-		} else {
-			// add to rhs
-			for (int i = 1; i <= 4; i++) {
-				testHarness.processElement2(createStreamRecord(i, "rhs"));
-				testHarness.processWatermark2(new Watermark(i));
-			}
-
-			// add to lhs
-			for (int i = 1; i <= 4; i++) {
-				testHarness.processElement1(createStreamRecord(i, "lhs"));
-				testHarness.processWatermark1(new Watermark(i));
-			}
-		}
-	}
-
-	/**
-	 * Custom test harness to avoid endless generics in all of the test code.
-	 */
-	private static class TestHarness extends KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> {
-
-		TestHarness(
-			TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>> operator,
-			KeySelector<TestElem, String> keySelector1,
-			KeySelector<TestElem, String> keySelector2,
-			TypeInformation<String> keyType) throws Exception {
-			super(operator, keySelector1, keySelector2, keyType);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 51def98..580dd46 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -24,10 +24,11 @@ import org.apache.flink.api.common.state.{FoldingStateDescriptor, ReducingStateD
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.streaming.api.datastream.{QueryableStateStream, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
-import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
 import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator}
+import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce
 import org.apache.flink.streaming.api.scala.function.StatefulFunction
 import org.apache.flink.streaming.api.windowing.assigners._
@@ -109,6 +110,109 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
     asScalaStream(javaStream.process(keyedProcessFunction, implicitly[TypeInformation[R]]))
   }
 
+
+  // ------------------------------------------------------------------------
+  //  Joining
+  // ------------------------------------------------------------------------
+
+  /**
+    * Join elements of this [[KeyedStream]] with elements of another [[KeyedStream]] over
+    * a time interval that can be specified with [[IntervalJoin.between]].
+    *
+    * @param otherStream The other keyed stream to join this keyed stream with
+    * @tparam OTHER Type parameter of elements in the other stream
+    * @return An instance of [[IntervalJoin]] with this keyed stream and the other keyed stream
+    */
+  @PublicEvolving
+  def intervalJoin[OTHER](otherStream: KeyedStream[OTHER, K]): IntervalJoin[T, OTHER, K] = {
+    new IntervalJoin[T, OTHER, K](this, otherStream)
+  }
+
+  /**
+    * Perform a join over a time interval.
+    *
+    * @tparam IN1 The type parameter of the elements in the first streams
+    * @tparam IN2 The The type parameter of the elements in the second stream
+    */
+  @PublicEvolving
+  class IntervalJoin[IN1, IN2, KEY](val streamOne: KeyedStream[IN1, KEY],
+                                    val streamTwo: KeyedStream[IN2, KEY]) {
+
+    /**
+      * Specifies the time boundaries over which the join operation works, so that
+      * <pre>leftElement.timestamp + lowerBound <= rightElement.timestamp
+      * <= leftElement.timestamp + upperBound</pre>
+      * By default both the lower and the upper bound are inclusive. This can be configured
+      * with [[IntervalJoined.lowerBoundExclusive]] and
+      * [[IntervalJoined.upperBoundExclusive]]
+      *
+      * @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound
+      * @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound
+      */
+    @PublicEvolving
+    def between(lowerBound: Time, upperBound: Time): IntervalJoined[IN1, IN2, KEY] = {
+      val lowerMillis = lowerBound.toMilliseconds
+      val upperMillis = upperBound.toMilliseconds
+      new IntervalJoined[IN1, IN2, KEY](streamOne, streamTwo, lowerMillis, upperMillis)
+    }
+  }
+
+  /**
+    * IntervalJoined is a container for two streams that have keys for both sides as well as
+    * the time boundaries over which elements should be joined.
+    *
+    * @tparam IN1 Input type of elements from the first stream
+    * @tparam IN2 Input type of elements from the second stream
+    * @tparam KEY The type of the key
+    */
+  @PublicEvolving
+  class IntervalJoined[IN1, IN2, KEY](private val firstStream: KeyedStream[IN1, KEY],
+                                      private val secondStream: KeyedStream[IN2, KEY],
+                                      private val lowerBound: Long,
+                                      private val upperBound: Long) {
+
+    private var lowerBoundInclusive = true
+    private var upperBoundInclusive = true
+
+    /**
+      * Set the lower bound to be exclusive
+      */
+    @PublicEvolving
+    def lowerBoundExclusive(): IntervalJoined[IN1, IN2, KEY] = {
+      this.lowerBoundInclusive = false
+      this
+    }
+
+    /**
+      * Set the upper bound to be exclusive
+      */
+    @PublicEvolving
+    def upperBoundExclusive(): IntervalJoined[IN1, IN2, KEY] = {
+      this.upperBoundInclusive = false
+      this
+    }
+
+    /**
+      * Completes the join operation with the user function that is executed for each joined pair
+      * of elements.
+      *
+      * @param processJoinFunction The user-defined function
+      * @tparam OUT The output type
+      * @return Returns a DataStream
+      */
+    @PublicEvolving
+    def process[OUT](processJoinFunction: ProcessJoinFunction[IN1, IN2, OUT]): DataStream[OUT] = {
+      val javaJoined = new KeyedJavaStream.IntervalJoined[IN1, IN2, KEY](
+        firstStream.javaStream.asInstanceOf[KeyedJavaStream[IN1, KEY]],
+        secondStream.javaStream.asInstanceOf[KeyedJavaStream[IN2, KEY]],
+        lowerBound,
+        upperBound,
+        lowerBoundInclusive,
+        upperBoundInclusive)
+      asScalaStream(javaJoined.process(processJoinFunction))
+    }
+  }
+
   // ------------------------------------------------------------------------
   //  Windowing
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/IntervalJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/IntervalJoinITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/IntervalJoinITCase.scala
new file mode 100644
index 0000000..80701c6
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/IntervalJoinITCase.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.test.util.AbstractTestBase
+import org.apache.flink.util.Collector
+import org.junit.{Assert, Test}
+
+import scala.collection.mutable.ListBuffer
+
+class IntervalJoinITCase extends AbstractTestBase {
+
+  @Test
+  def testInclusiveBounds(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val dataStream1 = env.fromElements(("key", 0L), ("key", 1L), ("key", 2L))
+      .assignTimestampsAndWatermarks(new TimestampExtractor())
+      .keyBy(elem => elem._1)
+
+    val dataStream2 = env.fromElements(("key", 0L), ("key", 1L), ("key", 2L))
+      .assignTimestampsAndWatermarks(new TimestampExtractor())
+      .keyBy(elem => elem._1)
+
+    val sink = new ResultSink()
+
+    dataStream1.intervalJoin(dataStream2)
+      .between(Time.milliseconds(0), Time.milliseconds(2))
+      .process(new CombineToStringJoinFunction())
+      .addSink(sink)
+
+    env.execute()
+
+    sink.expectInAnyOrder(
+      "(key,0):(key,0)",
+      "(key,0):(key,1)",
+      "(key,0):(key,2)",
+
+      "(key,1):(key,1)",
+      "(key,1):(key,2)",
+
+      "(key,2):(key,2)"
+    )
+  }
+
+  @Test
+  def testExclusiveBounds(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val dataStream1 = env.fromElements(("key", 0L), ("key", 1L), ("key", 2L))
+      .assignTimestampsAndWatermarks(new TimestampExtractor())
+      .keyBy(elem => elem._1)
+
+    val dataStream2 = env.fromElements(("key", 0L), ("key", 1L), ("key", 2L))
+      .assignTimestampsAndWatermarks(new TimestampExtractor())
+      .keyBy(elem => elem._1)
+
+    val sink = new ResultSink()
+
+    dataStream1.intervalJoin(dataStream2)
+      .between(Time.milliseconds(0), Time.milliseconds(2))
+      .lowerBoundExclusive()
+      .upperBoundExclusive()
+      .process(new CombineToStringJoinFunction())
+      .addSink(sink)
+
+    env.execute()
+
+    sink.expectInAnyOrder(
+      "(key,0):(key,1)",
+      "(key,1):(key,2)"
+    )
+  }
+}
+
+object Companion {
+  val results: ListBuffer[String] = new ListBuffer()
+}
+
+class ResultSink extends SinkFunction[String] {
+
+  override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
+    Companion.results.append(value)
+  }
+
+  def expectInAnyOrder(expected: String*): Unit = {
+    Assert.assertTrue(expected.toSet.equals(Companion.results.toSet))
+  }
+}
+
+class TimestampExtractor extends AscendingTimestampExtractor[(String, Long)] {
+  override def extractAscendingTimestamp(element: (String, Long)): Long = element._2
+}
+
+class CombineToStringJoinFunction
+  extends ProcessJoinFunction[(String, Long), (String, Long), String] {
+
+  override def processElement(
+                        left: (String, Long),
+                        right: (String, Long),
+                        ctx: ProcessJoinFunction[(String, Long), (String, Long), String]#Context,
+                        out: Collector[String]): Unit = {
+    out.collect(left + ":" + right)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java
new file mode 100644
index 0000000..7d9fe7b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.UnsupportedTimeCharacteristicException;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Integration tests for interval joins.
+ */
+public class IntervalJoinITCase {
+
+	private static List<String> testResults;
+
+	@Before
+	public void setup() {
+		testResults = new ArrayList<>();
+	}
+
+	@Test
+	public void testCanJoinOverSameKey() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		KeyedStream<Tuple2<String, Integer>, String> streamOne = env.fromElements(
+			Tuple2.of("key", 0),
+			Tuple2.of("key", 1),
+			Tuple2.of("key", 2),
+			Tuple2.of("key", 3),
+			Tuple2.of("key", 4),
+			Tuple2.of("key", 5)
+		)
+			.assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor())
+			.keyBy(new Tuple2KeyExtractor());
+
+		KeyedStream<Tuple2<String, Integer>, String> streamTwo = env.fromElements(
+			Tuple2.of("key", 0),
+			Tuple2.of("key", 1),
+			Tuple2.of("key", 2),
+			Tuple2.of("key", 3),
+			Tuple2.of("key", 4),
+			Tuple2.of("key", 5)
+		)
+			.assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor())
+			.keyBy(new Tuple2KeyExtractor());
+
+		streamOne
+			.intervalJoin(streamTwo)
+			.between(Time.milliseconds(0), Time.milliseconds(0))
+			.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
+				@Override
+				public void processElement(Tuple2<String, Integer> left,
+					Tuple2<String, Integer> right, Context ctx,
+					Collector<String> out) throws Exception {
+					out.collect(left + ":" + right);
+				}
+			}).addSink(new ResultSink());
+
+		env.execute();
+
+		expectInAnyOrder(
+			"(key,0):(key,0)",
+			"(key,1):(key,1)",
+			"(key,2):(key,2)",
+			"(key,3):(key,3)",
+			"(key,4):(key,4)",
+			"(key,5):(key,5)"
+		);
+	}
+
+	@Test
+	public void testJoinsCorrectlyWithMultipleKeys() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		KeyedStream<Tuple2<String, Integer>, String> streamOne = env.fromElements(
+			Tuple2.of("key1", 0),
+			Tuple2.of("key2", 1),
+			Tuple2.of("key1", 2),
+			Tuple2.of("key2", 3),
+			Tuple2.of("key1", 4),
+			Tuple2.of("key2", 5)
+		)
+			.assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor())
+			.keyBy(new Tuple2KeyExtractor());
+
+		KeyedStream<Tuple2<String, Integer>, String> streamTwo = env.fromElements(
+			Tuple2.of("key1", 0),
+			Tuple2.of("key2", 1),
+			Tuple2.of("key1", 2),
+			Tuple2.of("key2", 3),
+			Tuple2.of("key1", 4),
+			Tuple2.of("key2", 5)
+		)
+			.assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor())
+			.keyBy(new Tuple2KeyExtractor());
+
+		streamOne
+			.intervalJoin(streamTwo)
+			// if it were not keyed then the boundaries [0; 1] would lead to the pairs (1, 1),
+			// (1, 2), (2, 2), (2, 3)..., so that this is not happening is what we are testing here
+			.between(Time.milliseconds(0), Time.milliseconds(1))
+			.process(new CombineToStringJoinFunction())
+			.addSink(new ResultSink());
+
+		env.execute();
+
+		expectInAnyOrder(
+			"(key1,0):(key1,0)",
+			"(key2,1):(key2,1)",
+			"(key1,2):(key1,2)",
+			"(key2,3):(key2,3)",
+			"(key1,4):(key1,4)",
+			"(key2,5):(key2,5)"
+		);
+	}
+
+	@Test
+	public void testBoundedUnorderedStreamsStillJoinCorrectly() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> streamOne = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+			@Override
+			public void run(SourceContext<Tuple2<String, Integer>> ctx) {
+				ctx.collectWithTimestamp(Tuple2.of("key", 5), 5L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 4), 4L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
+				ctx.emitWatermark(new Watermark(5));
+				ctx.collectWithTimestamp(Tuple2.of("key", 9), 9L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 8), 8L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 7), 7L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 6), 6L);
+			}
+
+			@Override
+			public void cancel() {
+				// do nothing
+			}
+		});
+
+		DataStream<Tuple2<String, Integer>> streamTwo = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+			@Override
+			public void run(SourceContext<Tuple2<String, Integer>> ctx) {
+				ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 4), 4L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 5), 5L);
+				ctx.emitWatermark(new Watermark(5));
+				ctx.collectWithTimestamp(Tuple2.of("key", 8), 8L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 7), 7L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 9), 9L);
+				ctx.collectWithTimestamp(Tuple2.of("key", 6), 6L);
+			}
+
+			@Override
+			public void cancel() {
+				// do nothing
+			}
+		});
+
+		streamOne
+			.keyBy(new Tuple2KeyExtractor())
+			.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+			.between(Time.milliseconds(-1), Time.milliseconds(1))
+			.process(new CombineToStringJoinFunction())
+			.addSink(new ResultSink());
+
+		env.execute();
+
+		expectInAnyOrder(
+			"(key,1):(key,1)",
+			"(key,1):(key,2)",
+
+			"(key,2):(key,1)",
+			"(key,2):(key,2)",
+			"(key,2):(key,3)",
+
+			"(key,3):(key,2)",
+			"(key,3):(key,3)",
+			"(key,3):(key,4)",
+
+			"(key,4):(key,3)",
+			"(key,4):(key,4)",
+			"(key,4):(key,5)",
+
+			"(key,5):(key,4)",
+			"(key,5):(key,5)",
+			"(key,5):(key,6)",
+
+			"(key,6):(key,5)",
+			"(key,6):(key,6)",
+			"(key,6):(key,7)",
+
+			"(key,7):(key,6)",
+			"(key,7):(key,7)",
+			"(key,7):(key,8)",
+
+			"(key,8):(key,7)",
+			"(key,8):(key,8)",
+			"(key,8):(key,9)",
+
+			"(key,9):(key,8)",
+			"(key,9):(key,9)"
+		);
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void testFailsWithoutUpperBound() {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
+		DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
+
+		streamOne
+			.keyBy(new Tuple2KeyExtractor())
+			.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+			.between(Time.milliseconds(0), null);
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void testFailsWithoutLowerBound() {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
+		DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
+
+		streamOne
+			.keyBy(new Tuple2KeyExtractor())
+			.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+			.between(null, Time.milliseconds(1));
+	}
+
+	@Test
+	public void testBoundsCanBeExclusive() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
+			Tuple2.of("key", 0),
+			Tuple2.of("key", 1),
+			Tuple2.of("key", 2)
+		).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
+
+		DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
+			Tuple2.of("key", 0),
+			Tuple2.of("key", 1),
+			Tuple2.of("key", 2)
+		).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
+
+		streamOne.keyBy(new Tuple2KeyExtractor())
+			.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+			.between(Time.milliseconds(0), Time.milliseconds(2))
+			.upperBoundExclusive()
+			.lowerBoundExclusive()
+			.process(new CombineToStringJoinFunction())
+			.addSink(new ResultSink());
+
+		env.execute();
+
+		expectInAnyOrder(
+			"(key,0):(key,1)",
+			"(key,1):(key,2)"
+		);
+	}
+
+	@Test
+	public void testBoundsCanBeInclusive() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
+			Tuple2.of("key", 0),
+			Tuple2.of("key", 1),
+			Tuple2.of("key", 2)
+		).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
+
+		DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
+			Tuple2.of("key", 0),
+			Tuple2.of("key", 1),
+			Tuple2.of("key", 2)
+		).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
+
+		streamOne.keyBy(new Tuple2KeyExtractor())
+			.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+			.between(Time.milliseconds(0), Time.milliseconds(2))
+			.process(new CombineToStringJoinFunction())
+			.addSink(new ResultSink());
+
+		env.execute();
+
+		expectInAnyOrder(
+			"(key,0):(key,0)",
+			"(key,0):(key,1)",
+			"(key,0):(key,2)",
+
+			"(key,1):(key,1)",
+			"(key,1):(key,2)",
+
+			"(key,2):(key,2)"
+		);
+	}
+
+	@Test
+	public void testBoundsAreInclusiveByDefault() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
+			Tuple2.of("key", 0),
+			Tuple2.of("key", 1),
+			Tuple2.of("key", 2)
+		).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
+
+		DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
+			Tuple2.of("key", 0),
+			Tuple2.of("key", 1),
+			Tuple2.of("key", 2)
+		).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
+
+		streamOne.keyBy(new Tuple2KeyExtractor())
+			.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+			.between(Time.milliseconds(0), Time.milliseconds(2))
+			.process(new CombineToStringJoinFunction())
+			.addSink(new ResultSink());
+
+		env.execute();
+
+		expectInAnyOrder(
+			"(key,0):(key,0)",
+			"(key,0):(key,1)",
+			"(key,0):(key,2)",
+
+			"(key,1):(key,1)",
+			"(key,1):(key,2)",
+
+			"(key,2):(key,2)"
+		);
+	}
+
+	@Test(expected = UnsupportedTimeCharacteristicException.class)
+	public void testExecutionFailsInProcessingTime() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
+		DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
+
+		streamOne.keyBy(new Tuple2KeyExtractor())
+			.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+			.between(Time.milliseconds(0), Time.milliseconds(0))
+			.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
+				@Override
+				public void processElement(Tuple2<String, Integer> left,
+					Tuple2<String, Integer> right, Context ctx,
+					Collector<String> out) throws Exception {
+					out.collect(left + ":" + right);
+				}
+			});
+	}
+
+	private static void expectInAnyOrder(String... expected) {
+		List<String> listExpected = Lists.newArrayList(expected);
+		Collections.sort(listExpected);
+		Collections.sort(testResults);
+		Assert.assertEquals(listExpected, testResults);
+	}
+
+	private static class AscendingTuple2TimestampExtractor extends AscendingTimestampExtractor<Tuple2<String, Integer>> {
+		@Override
+		public long extractAscendingTimestamp(Tuple2<String, Integer> element) {
+			return element.f1;
+		}
+	}
+
+	private static class ResultSink implements SinkFunction<String> {
+		@Override
+		public void invoke(String value, Context context) throws Exception {
+			testResults.add(value);
+		}
+	}
+
+	private static class CombineToStringJoinFunction extends ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
+		@Override
+		public void processElement(Tuple2<String, Integer> left,
+			Tuple2<String, Integer> right, Context ctx, Collector<String> out) {
+			out.collect(left + ":" + right);
+		}
+	}
+
+	private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String, Integer>, String> {
+
+		@Override
+		public String getKey(Tuple2<String, Integer> value) throws Exception {
+			return value.f0;
+		}
+	}
+}


[2/3] flink git commit: [FLINK-8480][DataStream] Add APIs for Interval Joins.

Posted by kk...@apache.org.
[FLINK-8480][DataStream] Add APIs for Interval Joins.

This adds the Java and Scala API for performing an IntervalJoin.
In jave this will look like:

Example:

```java
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});
```

This closes #5482.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42ada8ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42ada8ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42ada8ad

Branch: refs/heads/master
Commit: 42ada8ad9ca28f94d0a0355658330198bbc2b577
Parents: f45b7f7
Author: Florian Schmidt <fl...@icloud.com>
Authored: Mon Jul 9 12:02:24 2018 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Jul 12 21:03:26 2018 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/index.md              |  15 +
 .../streaming/api/datastream/KeyedStream.java   | 184 ++++
 .../UnsupportedTimeCharacteristicException.java |  35 +
 .../api/functions/co/ProcessJoinFunction.java   |  87 ++
 .../functions/co/TimeBoundedJoinFunction.java   |  87 --
 .../api/operators/co/IntervalJoinOperator.java  | 513 ++++++++++
 .../co/TimeBoundedStreamJoinOperator.java       | 513 ----------
 .../operators/co/IntervalJoinOperatorTest.java  | 941 +++++++++++++++++++
 .../co/TimeBoundedStreamJoinOperatorTest.java   | 941 -------------------
 .../flink/streaming/api/scala/KeyedStream.scala | 106 ++-
 .../api/scala/IntervalJoinITCase.scala          | 130 +++
 .../streaming/runtime/IntervalJoinITCase.java   | 451 +++++++++
 12 files changed, 2461 insertions(+), 1542 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/docs/dev/stream/operators/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/index.md b/docs/dev/stream/operators/index.md
index 1dbdef4..422dbbf 100644
--- a/docs/dev/stream/operators/index.md
+++ b/docs/dev/stream/operators/index.md
@@ -310,6 +310,21 @@ dataStream.join(otherStream)
           </td>
         </tr>
         <tr>
+          <td><strong>Interval Join</strong><br>KeyedStream,KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound</p>
+    {% highlight java %}
+// this will join the two streams so that
+// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
+keyedStream.intervalJoin(otherKeyedStream)
+    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
+    .upperBoundExclusive(true) // optional
+    .lowerBoundExclusive(true) // optional
+    .process(new IntervalJoinFunction() {...});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
           <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; DataStream</td>
           <td>
             <p>Cogroups two data streams on a given key and a common window.</p>

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index a948ae2..32a5c96 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
 import org.apache.flink.streaming.api.functions.query.QueryableAppendingStateOperator;
 import org.apache.flink.streaming.api.functions.query.QueryableValueStateOperator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -51,6 +52,7 @@ import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -76,6 +78,8 @@ import java.util.List;
 import java.util.Stack;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link KeyedStream} represents a {@link DataStream} on which operator state is
  * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
@@ -396,6 +400,186 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Joining
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Join elements of this {@link KeyedStream} with elements of another {@link KeyedStream} over
+	 * a time interval that can be specified with {@link IntervalJoin#between(Time, Time)}.
+	 *
+	 * @param otherStream The other keyed stream to join this keyed stream with
+	 * @param <T1> Type parameter of elements in the other stream
+	 * @return An instance of {@link IntervalJoin} with this keyed stream and the other keyed stream
+	 */
+	@PublicEvolving
+	public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
+		return new IntervalJoin<>(this, otherStream);
+	}
+
+	/**
+	 * Perform a join over a time interval.
+	 * @param <T1> The type parameter of the elements in the first streams
+	 * @param <T2> The The type parameter of the elements in the second stream
+	 */
+	@PublicEvolving
+	public static class IntervalJoin<T1, T2, KEY> {
+
+		private final KeyedStream<T1, KEY> streamOne;
+		private final KeyedStream<T2, KEY> streamTwo;
+
+		IntervalJoin(
+				KeyedStream<T1, KEY> streamOne,
+				KeyedStream<T2, KEY> streamTwo
+		) {
+			this.streamOne = checkNotNull(streamOne);
+			this.streamTwo = checkNotNull(streamTwo);
+		}
+
+		/**
+		 * Specifies the time boundaries over which the join operation works, so that
+		 * <pre>leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound</pre>
+		 * By default both the lower and the upper bound are inclusive. This can be configured
+		 * with {@link IntervalJoined#lowerBoundExclusive()} and
+		 * {@link IntervalJoined#upperBoundExclusive()}
+		 *
+		 * @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound
+		 * @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound
+		 */
+		@PublicEvolving
+		public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {
+
+			TimeCharacteristic timeCharacteristic =
+				streamOne.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+			if (timeCharacteristic != TimeCharacteristic.EventTime) {
+				throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
+			}
+
+			checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
+			checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");
+
+			return new IntervalJoined<>(
+				streamOne,
+				streamTwo,
+				lowerBound.toMilliseconds(),
+				upperBound.toMilliseconds(),
+				true,
+				true
+			);
+		}
+	}
+
+	/**
+	 * IntervalJoined is a container for two streams that have keys for both sides as well as
+	 * the time boundaries over which elements should be joined.
+	 *
+	 * @param <IN1> Input type of elements from the first stream
+	 * @param <IN2> Input type of elements from the second stream
+	 * @param <KEY> The type of the key
+	 */
+	@PublicEvolving
+	public static class IntervalJoined<IN1, IN2, KEY> {
+
+		private static final String INTERVAL_JOIN_FUNC_NAME = "IntervalJoin";
+
+		private final KeyedStream<IN1, KEY> left;
+		private final KeyedStream<IN2, KEY> right;
+
+		private final long lowerBound;
+		private final long upperBound;
+
+		private final KeySelector<IN1, KEY> keySelector1;
+		private final KeySelector<IN2, KEY> keySelector2;
+
+		private boolean lowerBoundInclusive;
+		private boolean upperBoundInclusive;
+
+		public IntervalJoined(
+				KeyedStream<IN1, KEY> left,
+				KeyedStream<IN2, KEY> right,
+				long lowerBound,
+				long upperBound,
+				boolean lowerBoundInclusive,
+				boolean upperBoundInclusive) {
+
+			this.left = checkNotNull(left);
+			this.right = checkNotNull(right);
+
+			this.lowerBound = lowerBound;
+			this.upperBound = upperBound;
+
+			this.lowerBoundInclusive = lowerBoundInclusive;
+			this.upperBoundInclusive = upperBoundInclusive;
+
+			this.keySelector1 = left.getKeySelector();
+			this.keySelector2 = right.getKeySelector();
+		}
+
+		/**
+		 * Set the upper bound to be exclusive.
+		 */
+		@PublicEvolving
+		public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
+			this.upperBoundInclusive = false;
+			return this;
+		}
+
+		/**
+		 * Set the lower bound to be exclusive.
+		 */
+		@PublicEvolving
+		public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
+			this.lowerBoundInclusive = false;
+			return this;
+		}
+
+		/**
+		 * Completes the join operation with the user function that is executed for each joined pair
+		 * of elements.
+		 * @param udf The user-defined function
+		 * @param <OUT> The output type
+		 * @return Returns a DataStream
+		 */
+		@PublicEvolving
+		public <OUT> DataStream<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> udf) {
+
+			ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(udf);
+
+			TypeInformation<OUT> resultType = TypeExtractor.getBinaryOperatorReturnType(
+				cleanedUdf,
+				ProcessJoinFunction.class,    // ProcessJoinFunction<IN1, IN2, OUT>
+				0,     //					    0    1    2
+				1,
+				2,
+				new int[]{0},                   // lambda input 1 type arg indices
+				new int[]{1},                   // lambda input 1 type arg indices
+				TypeExtractor.NO_INDEX,         // output arg indices
+				left.getType(),                 // input 1 type information
+				right.getType(),                // input 2 type information
+				INTERVAL_JOIN_FUNC_NAME ,
+				false
+			);
+
+			IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
+				new IntervalJoinOperator<>(
+					lowerBound,
+					upperBound,
+					lowerBoundInclusive,
+					upperBoundInclusive,
+					left.getType().createSerializer(left.getExecutionConfig()),
+					right.getType().createSerializer(right.getExecutionConfig()),
+					cleanedUdf
+				);
+
+			return left
+				.connect(right)
+				.keyBy(keySelector1, keySelector2)
+				.transform(INTERVAL_JOIN_FUNC_NAME , resultType, operator);
+
+		}
+	}
+
+	// ------------------------------------------------------------------------
 	//  Windowing
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java
new file mode 100644
index 0000000..cb2570a
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.FlinkRuntimeException;
+
+/**
+ * An exception that indicates that a time characteristic was used that is not supported in the
+ * current operation.
+ */
+@PublicEvolving
+public class UnsupportedTimeCharacteristicException extends FlinkRuntimeException {
+
+	private static final long serialVersionUID = -8109094930338075819L;
+
+	public UnsupportedTimeCharacteristicException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
new file mode 100644
index 0000000..2c39abc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A function that processes two joined elements and produces a single output one.
+ *
+ * <p>This function will get called for every joined pair of elements the joined two streams.
+ * The timestamp of the joined pair as well as the timestamp of the left element and the right
+ * element can be accessed through the {@link Context}.
+ *
+ * @param <IN1> Type of the first input
+ * @param <IN2> Type of the second input
+ * @param <OUT> Type of the output
+ */
+@PublicEvolving
+public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {
+
+	private static final long serialVersionUID = -2444626938039012398L;
+
+	/**
+	 * This method is called for each joined pair of elements. It can output zero or more elements
+	 * through the provided {@link Collector} and has access to the timestamps of the joined elements
+	 * and the result through the {@link Context}.
+	 *
+	 * @param left         The left element of the joined pair.
+	 * @param right        The right element of the joined pair.
+	 * @param ctx          A context that allows querying the timestamps of the left, right and
+	 *                     joined pair. In addition, this context allows to emit elements on a side output.
+	 * @param out          The collector to emit resulting elements to.
+	 * @throws Exception   This function may throw exceptions which cause the streaming program to
+	 * 					   fail and go in recovery mode.
+	 */
+	public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;
+
+	/**
+	 * The context that is available during an invocation of
+	 * {@link #processElement(Object, Object, Context, Collector)}. It gives access to the timestamps of the
+	 * left element in the joined pair, the right one, and that of the joined pair. In addition, this context
+	 * allows to emit elements on a side output.
+	 */
+	public abstract class Context {
+
+		/**
+		 * @return The timestamp of the left element of a joined pair
+		 */
+		public abstract long getLeftTimestamp();
+
+		/**
+		 * @return The timestamp of the right element of a joined pair
+		 */
+		public abstract long getRightTimestamp();
+
+		/**
+		 * @return The timestamp of the joined pair.
+		 */
+		public abstract long getTimestamp();
+
+		/**
+		 * Emits a record to the side output identified by the {@link OutputTag}.
+		 * @param outputTag The output tag that identifies the side output to emit to
+		 * @param value The record to emit
+		 */
+		public abstract <X> void output(OutputTag<X> outputTag, X value);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
deleted file mode 100644
index cd745ca..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-
-/**
- * A function that processes two joined elements and produces a single output one.
- *
- * <p>This function will get called for every joined pair of elements the joined two streams.
- * The timestamp of the joined pair as well as the timestamp of the left element and the right
- * element can be accessed through the {@link Context}.
- *
- * @param <IN1> Type of the first input
- * @param <IN2> Type of the second input
- * @param <OUT> Type of the output
- */
-@PublicEvolving
-public abstract class TimeBoundedJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {
-
-	private static final long serialVersionUID = -2444626938039012398L;
-
-	/**
-	 * This method is called for each joined pair of elements. It can output zero or more elements
-	 * through the provided {@link Collector} and has access to the timestamps of the joined elements
-	 * and the result through the {@link Context}.
-	 *
-	 * @param left         The left element of the joined pair.
-	 * @param right        The right element of the joined pair.
-	 * @param ctx          A context that allows querying the timestamps of the left, right and
-	 *                     joined pair. In addition, this context allows to emit elements on a side output.
-	 * @param out          The collector to emit resulting elements to.
-	 * @throws Exception   This function may throw exceptions which cause the streaming program to
-	 * 					   fail and go in recovery mode.
-	 */
-	public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;
-
-	/**
-	 * The context that is available during an invocation of
-	 * {@link #processElement(Object, Object, Context, Collector)}. It gives access to the timestamps of the
-	 * left element in the joined pair, the right one, and that of the joined pair. In addition, this context
-	 * allows to emit elements on a side output.
-	 */
-	public abstract class Context {
-
-		/**
-		 * @return The timestamp of the left element of a joined pair
-		 */
-		public abstract long getLeftTimestamp();
-
-		/**
-		 * @return The timestamp of the right element of a joined pair
-		 */
-		public abstract long getRightTimestamp();
-
-		/**
-		 * @return The timestamp of the joined pair.
-		 */
-		public abstract long getTimestamp();
-
-		/**
-		 * Emits a record to the side output identified by the {@link OutputTag}.
-		 * @param outputTag The output tag that identifies the side output to emit to
-		 * @param value The record to emit
-		 */
-		public abstract <X> void output(OutputTag<X> outputTag, X value);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
new file mode 100644
index 0000000..0c449e6
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * An {@link TwoInputStreamOperator operator} to execute time-bounded stream inner joins.
+ *
+ * <p>By using a configurable lower and upper bound this operator will emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * <p>As soon as elements are joined they are passed to a user-defined {@link ProcessJoinFunction}.
+ *
+ * <p>The basic idea of this implementation is as follows: Whenever we receive an element at
+ * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to the left buffer.
+ * We then check the right buffer to see whether there are any elements that can be joined. If
+ * there are, they are joined and passed to the aforementioned function. The same happens the
+ * other way around when receiving an element on the right side.
+ *
+ * <p>Whenever a pair of elements is emitted it will be assigned the max timestamp of either of
+ * the elements.
+ *
+ * <p>In order to avoid the element buffers to grow indefinitely a cleanup timer is registered
+ * per element. This timer indicates when an element is not considered for joining anymore and can
+ * be removed from the state.
+ *
+ * @param <K>	The type of the key based on which we join elements.
+ * @param <T1>	The type of the elements in the left stream.
+ * @param <T2>	The type of the elements in the right stream.
+ * @param <OUT>	The output type created by the user-defined function.
+ */
+@Internal
+public class IntervalJoinOperator<K, T1, T2, OUT>
+		extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
+		implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
+
+	private static final long serialVersionUID = -5380774605111543454L;
+
+	private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class);
+
+	private static final String LEFT_BUFFER = "LEFT_BUFFER";
+	private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+	private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
+	private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
+	private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
+
+	private final long lowerBound;
+	private final long upperBound;
+
+	private final TypeSerializer<T1> leftTypeSerializer;
+	private final TypeSerializer<T2> rightTypeSerializer;
+
+	private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
+	private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
+
+	private transient TimestampedCollector<OUT> collector;
+	private transient ContextImpl context;
+
+	private transient InternalTimerService<String> internalTimerService;
+
+	/**
+	 * Creates a new IntervalJoinOperator.
+	 *
+	 * @param lowerBound          The lower bound for evaluating if elements should be joined
+	 * @param upperBound          The upper bound for evaluating if elements should be joined
+	 * @param lowerBoundInclusive Whether or not to include elements where the timestamp matches
+	 *                            the lower bound
+	 * @param upperBoundInclusive Whether or not to include elements where the timestamp matches
+	 *                            the upper bound
+	 * @param udf                 A user-defined {@link ProcessJoinFunction} that gets called
+	 *                            whenever two elements of T1 and T2 are joined
+	 */
+	public IntervalJoinOperator(
+			long lowerBound,
+			long upperBound,
+			boolean lowerBoundInclusive,
+			boolean upperBoundInclusive,
+			TypeSerializer<T1> leftTypeSerializer,
+			TypeSerializer<T2> rightTypeSerializer,
+			ProcessJoinFunction<T1, T2, OUT> udf) {
+
+		super(Preconditions.checkNotNull(udf));
+
+		Preconditions.checkArgument(lowerBound <= upperBound,
+			"lowerBound <= upperBound must be fulfilled");
+
+		// Move buffer by +1 / -1 depending on inclusiveness in order not needing
+		// to check for inclusiveness later on
+		this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
+		this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;
+
+		this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
+		this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		collector = new TimestampedCollector<>(output);
+		context = new ContextImpl(userFunction);
+		internalTimerService =
+			getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
+	}
+
+	@Override
+	public void initializeState(StateInitializationContext context) throws Exception {
+		super.initializeState(context);
+
+		this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
+			LEFT_BUFFER,
+			LongSerializer.INSTANCE,
+			new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
+		));
+
+		this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
+			RIGHT_BUFFER,
+			LongSerializer.INSTANCE,
+			new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
+		));
+	}
+
+	/**
+	 * Process a {@link StreamRecord} from the left stream. Whenever an {@link StreamRecord}
+	 * arrives at the left stream, it will get added to the left buffer. Possible join candidates
+	 * for that element will be looked up from the right buffer and if the pair lies within the
+	 * user defined boundaries, it gets passed to the {@link ProcessJoinFunction}.
+	 *
+	 * @param record An incoming record to be joined
+	 * @throws Exception Can throw an Exception during state access
+	 */
+	@Override
+	public void processElement1(StreamRecord<T1> record) throws Exception {
+		processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
+	}
+
+	/**
+	 * Process a {@link StreamRecord} from the right stream. Whenever a {@link StreamRecord}
+	 * arrives at the right stream, it will get added to the right buffer. Possible join candidates
+	 * for that element will be looked up from the left buffer and if the pair lies within the user
+	 * defined boundaries, it gets passed to the {@link ProcessJoinFunction}.
+	 *
+	 * @param record An incoming record to be joined
+	 * @throws Exception Can throw an exception during state access
+	 */
+	@Override
+	public void processElement2(StreamRecord<T2> record) throws Exception {
+		processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
+	}
+
+	@SuppressWarnings("unchecked")
+	private <OUR, OTHER> void processElement(
+			StreamRecord<OUR> record,
+			MapState<Long, List<BufferEntry<OUR>>> ourBuffer,
+			MapState<Long, List<BufferEntry<OTHER>>> otherBuffer,
+			long relativeLowerBound,
+			long relativeUpperBound,
+			boolean isLeft) throws Exception {
+
+		final OUR ourValue = record.getValue();
+		final long ourTimestamp = record.getTimestamp();
+
+		if (ourTimestamp == Long.MIN_VALUE) {
+			throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
+					"interval stream joins need to have timestamps meaningful timestamps.");
+		}
+
+		if (isLate(ourTimestamp)) {
+			return;
+		}
+
+		addToBuffer(ourBuffer, ourValue, ourTimestamp);
+
+		for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
+			final long timestamp  = bucket.getKey();
+
+			if (timestamp < ourTimestamp + relativeLowerBound ||
+					timestamp > ourTimestamp + relativeUpperBound) {
+				continue;
+			}
+
+			for (BufferEntry<OTHER> entry: bucket.getValue()) {
+				if (isLeft) {
+					collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
+				} else {
+					collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
+				}
+			}
+		}
+
+		long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
+		if (isLeft) {
+			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
+		} else {
+			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
+		}
+	}
+
+	private boolean isLate(long timestamp) {
+		long currentWatermark = internalTimerService.currentWatermark();
+		return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
+	}
+
+	private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
+		long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
+		collector.setAbsoluteTimestamp(resultTimestamp);
+		context.leftTimestamp = leftTimestamp;
+		context.rightTimestamp = rightTimestamp;
+		userFunction.processElement(left, right, context, collector);
+	}
+
+	private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception {
+		List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
+		if (elemsInBucket == null) {
+			elemsInBucket = new ArrayList<>();
+		}
+		elemsInBucket.add(new BufferEntry<>(value, false));
+		buffer.put(timestamp, elemsInBucket);
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, String> timer) throws Exception {
+
+		long timerTimestamp = timer.getTimestamp();
+		String namespace = timer.getNamespace();
+
+		logger.trace("onEventTime @ {}", timerTimestamp);
+
+		switch (namespace) {
+			case CLEANUP_NAMESPACE_LEFT: {
+				long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
+				logger.trace("Removing from left buffer @ {}", timestamp);
+				leftBuffer.remove(timestamp);
+				break;
+			}
+			case CLEANUP_NAMESPACE_RIGHT: {
+				long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
+				logger.trace("Removing from right buffer @ {}", timestamp);
+				rightBuffer.remove(timestamp);
+				break;
+			}
+			default:
+				throw new RuntimeException("Invalid namespace " + namespace);
+		}
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
+		// do nothing.
+	}
+
+	/**
+	 * The context that is available during an invocation of
+	 * {@link ProcessJoinFunction#processElement(Object, Object, ProcessJoinFunction.Context, Collector)}.
+	 *
+	 * <p>It gives access to the timestamps of the left element in the joined pair, the right one, and that of
+	 * the joined pair. In addition, this context allows to emit elements on a side output.
+	 */
+	private final class ContextImpl extends ProcessJoinFunction<T1, T2, OUT>.Context {
+
+		private long leftTimestamp = Long.MIN_VALUE;
+
+		private long rightTimestamp = Long.MIN_VALUE;
+
+		private ContextImpl(ProcessJoinFunction<T1, T2, OUT> func) {
+			func.super();
+		}
+
+		@Override
+		public long getLeftTimestamp() {
+			return leftTimestamp;
+		}
+
+		@Override
+		public long getRightTimestamp() {
+			return rightTimestamp;
+		}
+
+		@Override
+		public long getTimestamp() {
+			return leftTimestamp;
+		}
+
+		@Override
+		public <X> void output(OutputTag<X> outputTag, X value) {
+			Preconditions.checkArgument(outputTag != null, "OutputTag must not be null");
+			output.collect(outputTag, new StreamRecord<>(value, getTimestamp()));
+		}
+	}
+
+	/**
+	 * A container for elements put in the left/write buffer.
+	 * This will contain the element itself along with a flag indicating
+	 * if it has been joined or not.
+	 */
+	private static class BufferEntry<T> {
+
+		private final T element;
+		private final boolean hasBeenJoined;
+
+		BufferEntry(T element, boolean hasBeenJoined) {
+			this.element = element;
+			this.hasBeenJoined = hasBeenJoined;
+		}
+	}
+
+	/**
+	 * A {@link TypeSerializer serializer} for the {@link BufferEntry}.
+	 */
+	private static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
+
+		private static final long serialVersionUID = -20197698803836236L;
+
+		private final TypeSerializer<T> elementSerializer;
+
+		private BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
+			this.elementSerializer = Preconditions.checkNotNull(elementSerializer);
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public TypeSerializer<BufferEntry<T>> duplicate() {
+			return new BufferEntrySerializer<>(elementSerializer.duplicate());
+		}
+
+		@Override
+		public BufferEntry<T> createInstance() {
+			return null;
+		}
+
+		@Override
+		public BufferEntry<T> copy(BufferEntry<T> from) {
+			return new BufferEntry<>(from.element, from.hasBeenJoined);
+		}
+
+		@Override
+		public BufferEntry<T> copy(BufferEntry<T> from, BufferEntry<T> reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return -1;
+		}
+
+		@Override
+		public void serialize(BufferEntry<T> record, DataOutputView target) throws IOException {
+			target.writeBoolean(record.hasBeenJoined);
+			elementSerializer.serialize(record.element, target);
+		}
+
+		@Override
+		public BufferEntry<T> deserialize(DataInputView source) throws IOException {
+			boolean hasBeenJoined = source.readBoolean();
+			T element = elementSerializer.deserialize(source);
+			return new BufferEntry<>(element, hasBeenJoined);
+		}
+
+		@Override
+		public BufferEntry<T> deserialize(BufferEntry<T> reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			target.writeBoolean(source.readBoolean());
+			elementSerializer.copy(source, target);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			BufferEntrySerializer<?> that = (BufferEntrySerializer<?>) o;
+			return Objects.equals(elementSerializer, that.elementSerializer);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(elementSerializer);
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj.getClass().equals(BufferEntrySerializer.class);
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			return new BufferSerializerConfigSnapshot<>(elementSerializer);
+		}
+
+		@Override
+		public CompatibilityResult<BufferEntry<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			if (configSnapshot instanceof BufferSerializerConfigSnapshot) {
+				Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousSerializerAndConfig =
+						((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
+
+				CompatibilityResult<T> compatResult =
+						CompatibilityUtil.resolveCompatibilityResult(
+								previousSerializerAndConfig.f0,
+								UnloadableDummyTypeSerializer.class,
+								previousSerializerAndConfig.f1,
+								elementSerializer);
+
+				if (!compatResult.isRequiresMigration()) {
+					return CompatibilityResult.compatible();
+				} else if (compatResult.getConvertDeserializer() != null) {
+					return CompatibilityResult.requiresMigration(
+							new BufferEntrySerializer<>(
+									new TypeDeserializerAdapter<>(
+											compatResult.getConvertDeserializer())));
+				}
+			}
+			return CompatibilityResult.requiresMigration();
+		}
+	}
+
+	/**
+	 * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer.
+	 */
+	public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+
+		private static final int VERSION = 1;
+
+		public BufferSerializerConfigSnapshot() {
+		}
+
+		public BufferSerializerConfigSnapshot(final TypeSerializer<T> userTypeSerializer) {
+			super(userTypeSerializer);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+	}
+
+	@VisibleForTesting
+	MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() {
+		return leftBuffer;
+	}
+
+	@VisibleForTesting
+	MapState<Long, List<BufferEntry<T2>>> getRightBuffer() {
+		return rightBuffer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
deleted file mode 100644
index 26ad26b..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.streaming.api.functions.co.TimeBoundedJoinFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.Triggerable;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.OutputTag;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * An {@link TwoInputStreamOperator operator} to execute time-bounded stream inner joins.
- *
- * <p>By using a configurable lower and upper bound this operator will emit exactly those pairs
- * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the
- * upper bound can be configured to be either inclusive or exclusive.
- *
- * <p>As soon as elements are joined they are passed to a user-defined {@link TimeBoundedJoinFunction}.
- *
- * <p>The basic idea of this implementation is as follows: Whenever we receive an element at
- * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to the left buffer.
- * We then check the right buffer to see whether there are any elements that can be joined. If
- * there are, they are joined and passed to the aforementioned function. The same happens the
- * other way around when receiving an element on the right side.
- *
- * <p>Whenever a pair of elements is emitted it will be assigned the max timestamp of either of
- * the elements.
- *
- * <p>In order to avoid the element buffers to grow indefinitely a cleanup timer is registered
- * per element. This timer indicates when an element is not considered for joining anymore and can
- * be removed from the state.
- *
- * @param <K>	The type of the key based on which we join elements.
- * @param <T1>	The type of the elements in the left stream.
- * @param <T2>	The type of the elements in the right stream.
- * @param <OUT>	The output type created by the user-defined function.
- */
-@Internal
-public class TimeBoundedStreamJoinOperator<K, T1, T2, OUT>
-		extends AbstractUdfStreamOperator<OUT, TimeBoundedJoinFunction<T1, T2, OUT>>
-		implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
-
-	private static final long serialVersionUID = -5380774605111543454L;
-
-	private static final Logger logger = LoggerFactory.getLogger(TimeBoundedStreamJoinOperator.class);
-
-	private static final String LEFT_BUFFER = "LEFT_BUFFER";
-	private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
-	private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
-	private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
-	private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
-
-	private final long lowerBound;
-	private final long upperBound;
-
-	private final TypeSerializer<T1> leftTypeSerializer;
-	private final TypeSerializer<T2> rightTypeSerializer;
-
-	private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
-	private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
-
-	private transient TimestampedCollector<OUT> collector;
-	private transient ContextImpl context;
-
-	private transient InternalTimerService<String> internalTimerService;
-
-	/**
-	 * Creates a new TimeBoundedStreamJoinOperator.
-	 *
-	 * @param lowerBound          The lower bound for evaluating if elements should be joined
-	 * @param upperBound          The upper bound for evaluating if elements should be joined
-	 * @param lowerBoundInclusive Whether or not to include elements where the timestamp matches
-	 *                            the lower bound
-	 * @param upperBoundInclusive Whether or not to include elements where the timestamp matches
-	 *                            the upper bound
-	 * @param udf                 A user-defined {@link TimeBoundedJoinFunction} that gets called
-	 *                            whenever two elements of T1 and T2 are joined
-	 */
-	public TimeBoundedStreamJoinOperator(
-			long lowerBound,
-			long upperBound,
-			boolean lowerBoundInclusive,
-			boolean upperBoundInclusive,
-			TypeSerializer<T1> leftTypeSerializer,
-			TypeSerializer<T2> rightTypeSerializer,
-			TimeBoundedJoinFunction<T1, T2, OUT> udf) {
-
-		super(Preconditions.checkNotNull(udf));
-
-		Preconditions.checkArgument(lowerBound <= upperBound,
-			"lowerBound <= upperBound must be fulfilled");
-
-		// Move buffer by +1 / -1 depending on inclusiveness in order not needing
-		// to check for inclusiveness later on
-		this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
-		this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;
-
-		this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
-		this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		collector = new TimestampedCollector<>(output);
-		context = new ContextImpl(userFunction);
-		internalTimerService =
-			getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
-	}
-
-	@Override
-	public void initializeState(StateInitializationContext context) throws Exception {
-		super.initializeState(context);
-
-		this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
-			LEFT_BUFFER,
-			LongSerializer.INSTANCE,
-			new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
-		));
-
-		this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
-			RIGHT_BUFFER,
-			LongSerializer.INSTANCE,
-			new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
-		));
-	}
-
-	/**
-	 * Process a {@link StreamRecord} from the left stream. Whenever an {@link StreamRecord}
-	 * arrives at the left stream, it will get added to the left buffer. Possible join candidates
-	 * for that element will be looked up from the right buffer and if the pair lies within the
-	 * user defined boundaries, it gets passed to the {@link TimeBoundedJoinFunction}.
-	 *
-	 * @param record An incoming record to be joined
-	 * @throws Exception Can throw an Exception during state access
-	 */
-	@Override
-	public void processElement1(StreamRecord<T1> record) throws Exception {
-		processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
-	}
-
-	/**
-	 * Process a {@link StreamRecord} from the right stream. Whenever a {@link StreamRecord}
-	 * arrives at the right stream, it will get added to the right buffer. Possible join candidates
-	 * for that element will be looked up from the left buffer and if the pair lies within the user
-	 * defined boundaries, it gets passed to the {@link TimeBoundedJoinFunction}.
-	 *
-	 * @param record An incoming record to be joined
-	 * @throws Exception Can throw an exception during state access
-	 */
-	@Override
-	public void processElement2(StreamRecord<T2> record) throws Exception {
-		processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
-	}
-
-	@SuppressWarnings("unchecked")
-	private <OUR, OTHER> void processElement(
-			StreamRecord<OUR> record,
-			MapState<Long, List<BufferEntry<OUR>>> ourBuffer,
-			MapState<Long, List<BufferEntry<OTHER>>> otherBuffer,
-			long relativeLowerBound,
-			long relativeUpperBound,
-			boolean isLeft) throws Exception {
-
-		final OUR ourValue = record.getValue();
-		final long ourTimestamp = record.getTimestamp();
-
-		if (ourTimestamp == Long.MIN_VALUE) {
-			throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
-					"interval stream joins need to have timestamps meaningful timestamps.");
-		}
-
-		if (isLate(ourTimestamp)) {
-			return;
-		}
-
-		addToBuffer(ourBuffer, ourValue, ourTimestamp);
-
-		for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
-			final long timestamp  = bucket.getKey();
-
-			if (timestamp < ourTimestamp + relativeLowerBound ||
-					timestamp > ourTimestamp + relativeUpperBound) {
-				continue;
-			}
-
-			for (BufferEntry<OTHER> entry: bucket.getValue()) {
-				if (isLeft) {
-					collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
-				} else {
-					collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
-				}
-			}
-		}
-
-		long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
-		if (isLeft) {
-			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
-		} else {
-			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
-		}
-	}
-
-	private boolean isLate(long timestamp) {
-		long currentWatermark = internalTimerService.currentWatermark();
-		return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
-	}
-
-	private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
-		long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
-		collector.setAbsoluteTimestamp(resultTimestamp);
-		context.leftTimestamp = leftTimestamp;
-		context.rightTimestamp = rightTimestamp;
-		userFunction.processElement(left, right, context, collector);
-	}
-
-	private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception {
-		List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
-		if (elemsInBucket == null) {
-			elemsInBucket = new ArrayList<>();
-		}
-		elemsInBucket.add(new BufferEntry<>(value, false));
-		buffer.put(timestamp, elemsInBucket);
-	}
-
-	@Override
-	public void onEventTime(InternalTimer<K, String> timer) throws Exception {
-
-		long timerTimestamp = timer.getTimestamp();
-		String namespace = timer.getNamespace();
-
-		logger.trace("onEventTime @ {}", timerTimestamp);
-
-		switch (namespace) {
-			case CLEANUP_NAMESPACE_LEFT: {
-				long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
-				logger.trace("Removing from left buffer @ {}", timestamp);
-				leftBuffer.remove(timestamp);
-				break;
-			}
-			case CLEANUP_NAMESPACE_RIGHT: {
-				long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
-				logger.trace("Removing from right buffer @ {}", timestamp);
-				rightBuffer.remove(timestamp);
-				break;
-			}
-			default:
-				throw new RuntimeException("Invalid namespace " + namespace);
-		}
-	}
-
-	@Override
-	public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
-		// do nothing.
-	}
-
-	/**
-	 * The context that is available during an invocation of
-	 * {@link TimeBoundedJoinFunction#processElement(Object, Object, TimeBoundedJoinFunction.Context, Collector)}.
-	 *
-	 * <p>It gives access to the timestamps of the left element in the joined pair, the right one, and that of
-	 * the joined pair. In addition, this context allows to emit elements on a side output.
-	 */
-	private final class ContextImpl extends TimeBoundedJoinFunction<T1, T2, OUT>.Context {
-
-		private long leftTimestamp = Long.MIN_VALUE;
-
-		private long rightTimestamp = Long.MIN_VALUE;
-
-		private ContextImpl(TimeBoundedJoinFunction<T1, T2, OUT> func) {
-			func.super();
-		}
-
-		@Override
-		public long getLeftTimestamp() {
-			return leftTimestamp;
-		}
-
-		@Override
-		public long getRightTimestamp() {
-			return rightTimestamp;
-		}
-
-		@Override
-		public long getTimestamp() {
-			return leftTimestamp;
-		}
-
-		@Override
-		public <X> void output(OutputTag<X> outputTag, X value) {
-			Preconditions.checkArgument(outputTag != null, "OutputTag must not be null");
-			output.collect(outputTag, new StreamRecord<>(value, getTimestamp()));
-		}
-	}
-
-	/**
-	 * A container for elements put in the left/write buffer.
-	 * This will contain the element itself along with a flag indicating
-	 * if it has been joined or not.
-	 */
-	private static class BufferEntry<T> {
-
-		private final T element;
-		private final boolean hasBeenJoined;
-
-		BufferEntry(T element, boolean hasBeenJoined) {
-			this.element = element;
-			this.hasBeenJoined = hasBeenJoined;
-		}
-	}
-
-	/**
-	 * A {@link TypeSerializer serializer} for the {@link BufferEntry}.
-	 */
-	private static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
-
-		private static final long serialVersionUID = -20197698803836236L;
-
-		private final TypeSerializer<T> elementSerializer;
-
-		private BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
-			this.elementSerializer = Preconditions.checkNotNull(elementSerializer);
-		}
-
-		@Override
-		public boolean isImmutableType() {
-			return true;
-		}
-
-		@Override
-		public TypeSerializer<BufferEntry<T>> duplicate() {
-			return new BufferEntrySerializer<>(elementSerializer.duplicate());
-		}
-
-		@Override
-		public BufferEntry<T> createInstance() {
-			return null;
-		}
-
-		@Override
-		public BufferEntry<T> copy(BufferEntry<T> from) {
-			return new BufferEntry<>(from.element, from.hasBeenJoined);
-		}
-
-		@Override
-		public BufferEntry<T> copy(BufferEntry<T> from, BufferEntry<T> reuse) {
-			return copy(from);
-		}
-
-		@Override
-		public int getLength() {
-			return -1;
-		}
-
-		@Override
-		public void serialize(BufferEntry<T> record, DataOutputView target) throws IOException {
-			target.writeBoolean(record.hasBeenJoined);
-			elementSerializer.serialize(record.element, target);
-		}
-
-		@Override
-		public BufferEntry<T> deserialize(DataInputView source) throws IOException {
-			boolean hasBeenJoined = source.readBoolean();
-			T element = elementSerializer.deserialize(source);
-			return new BufferEntry<>(element, hasBeenJoined);
-		}
-
-		@Override
-		public BufferEntry<T> deserialize(BufferEntry<T> reuse, DataInputView source) throws IOException {
-			return deserialize(source);
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			target.writeBoolean(source.readBoolean());
-			elementSerializer.copy(source, target);
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-
-			BufferEntrySerializer<?> that = (BufferEntrySerializer<?>) o;
-			return Objects.equals(elementSerializer, that.elementSerializer);
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(elementSerializer);
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return obj.getClass().equals(BufferEntrySerializer.class);
-		}
-
-		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
-			return new BufferSerializerConfigSnapshot<>(elementSerializer);
-		}
-
-		@Override
-		public CompatibilityResult<BufferEntry<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-			if (configSnapshot instanceof BufferSerializerConfigSnapshot) {
-				Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousSerializerAndConfig =
-						((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
-
-				CompatibilityResult<T> compatResult =
-						CompatibilityUtil.resolveCompatibilityResult(
-								previousSerializerAndConfig.f0,
-								UnloadableDummyTypeSerializer.class,
-								previousSerializerAndConfig.f1,
-								elementSerializer);
-
-				if (!compatResult.isRequiresMigration()) {
-					return CompatibilityResult.compatible();
-				} else if (compatResult.getConvertDeserializer() != null) {
-					return CompatibilityResult.requiresMigration(
-							new BufferEntrySerializer<>(
-									new TypeDeserializerAdapter<>(
-											compatResult.getConvertDeserializer())));
-				}
-			}
-			return CompatibilityResult.requiresMigration();
-		}
-	}
-
-	/**
-	 * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer.
-	 */
-	public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
-
-		private static final int VERSION = 1;
-
-		public BufferSerializerConfigSnapshot() {
-		}
-
-		public BufferSerializerConfigSnapshot(final TypeSerializer<T> userTypeSerializer) {
-			super(userTypeSerializer);
-		}
-
-		@Override
-		public int getVersion() {
-			return VERSION;
-		}
-	}
-
-	@VisibleForTesting
-	MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() {
-		return leftBuffer;
-	}
-
-	@VisibleForTesting
-	MapState<Long, List<BufferEntry<T2>>> getRightBuffer() {
-		return rightBuffer;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
new file mode 100644
index 0000000..ee3f4d8
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
@@ -0,0 +1,941 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+
+/**
+ * Tests for {@link IntervalJoinOperator}.
+ * Those tests cover correctness and cleaning of state
+ */
+@RunWith(Parameterized.class)
+public class IntervalJoinOperatorTest {
+
+	private final boolean lhsFasterThanRhs;
+
+	@Parameters(name = "lhs faster than rhs: {0}")
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][]{
+			{true}, {false}
+		});
+	}
+
+	public IntervalJoinOperatorTest(boolean lhsFasterThanRhs) {
+		this.lhsFasterThanRhs = lhsFasterThanRhs;
+	}
+
+	@Test
+	public void testImplementationMirrorsCorrectly() throws Exception {
+
+		long lowerBound = 1;
+		long upperBound = 3;
+
+		boolean lowerBoundInclusive = true;
+		boolean upperBoundInclusive = false;
+
+		setupHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(1, 2),
+				streamRecordOf(1, 3),
+				streamRecordOf(2, 3),
+				streamRecordOf(2, 4),
+				streamRecordOf(3, 4))
+			.noLateRecords()
+			.close();
+
+		setupHarness(-1 * upperBound, upperBoundInclusive, -1 * lowerBound, lowerBoundInclusive)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(2, 1),
+				streamRecordOf(3, 1),
+				streamRecordOf(3, 2),
+				streamRecordOf(4, 2),
+				streamRecordOf(4, 3))
+			.noLateRecords()
+			.close();
+	}
+
+	@Test // lhs - 2 <= rhs <= rhs + 2
+	public void testNegativeInclusiveAndNegativeInclusive() throws Exception {
+
+		setupHarness(-2, true, -1, true)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(2, 1),
+				streamRecordOf(3, 1),
+				streamRecordOf(3, 2),
+				streamRecordOf(4, 2),
+				streamRecordOf(4, 3)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	@Test // lhs - 1 <= rhs <= rhs + 1
+	public void testNegativeInclusiveAndPositiveInclusive() throws Exception {
+
+		setupHarness(-1, true, 1, true)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(1, 1),
+				streamRecordOf(1, 2),
+				streamRecordOf(2, 1),
+				streamRecordOf(2, 2),
+				streamRecordOf(2, 3),
+				streamRecordOf(3, 2),
+				streamRecordOf(3, 3),
+				streamRecordOf(3, 4),
+				streamRecordOf(4, 3),
+				streamRecordOf(4, 4)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	@Test // lhs + 1 <= rhs <= lhs + 2
+	public void testPositiveInclusiveAndPositiveInclusive() throws Exception {
+
+		setupHarness(1, true, 2, true)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(1, 2),
+				streamRecordOf(1, 3),
+				streamRecordOf(2, 3),
+				streamRecordOf(2, 4),
+				streamRecordOf(3, 4)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	@Test
+	public void testNegativeExclusiveAndNegativeExlusive() throws Exception {
+
+		setupHarness(-3, false, -1, false)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(3, 1),
+				streamRecordOf(4, 2)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	@Test
+	public void testNegativeExclusiveAndPositiveExlusive() throws Exception {
+
+		setupHarness(-1, false, 1, false)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(1, 1),
+				streamRecordOf(2, 2),
+				streamRecordOf(3, 3),
+				streamRecordOf(4, 4)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	@Test
+	public void testPositiveExclusiveAndPositiveExlusive() throws Exception {
+
+		setupHarness(1, false, 3, false)
+			.processElementsAndWatermarks(1, 4)
+			.andExpect(
+				streamRecordOf(1, 3),
+				streamRecordOf(2, 4)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	@Test
+	public void testStateCleanupNegativeInclusiveNegativeInclusive() throws Exception {
+
+		setupHarness(-1, true, 0, true)
+			.processElement1(1)
+			.processElement1(2)
+			.processElement1(3)
+			.processElement1(4)
+			.processElement1(5)
+
+			.processElement2(1)
+			.processElement2(2)
+			.processElement2(3)
+			.processElement2(4)
+			.processElement2(5) // fill both buffers with values
+
+			.processWatermark1(1)
+			.processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+			.assertLeftBufferContainsOnly(2, 3, 4, 5)
+			.assertRightBufferContainsOnly(1, 2, 3, 4, 5)
+
+			.processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+			.processWatermark2(4)
+
+			.assertLeftBufferContainsOnly(5)
+			.assertRightBufferContainsOnly(4, 5)
+
+			.processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+			.processWatermark2(6)
+
+			.assertLeftBufferEmpty()
+			.assertRightBufferEmpty()
+
+			.close();
+	}
+
+	@Test
+	public void testStateCleanupNegativePositiveNegativeExlusive() throws Exception {
+		setupHarness(-2, false, 1, false)
+			.processElement1(1)
+			.processElement1(2)
+			.processElement1(3)
+			.processElement1(4)
+			.processElement1(5)
+
+			.processElement2(1)
+			.processElement2(2)
+			.processElement2(3)
+			.processElement2(4)
+			.processElement2(5) // fill both buffers with values
+
+			.processWatermark1(1)
+			.processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+			.assertLeftBufferContainsOnly(2, 3, 4, 5)
+			.assertRightBufferContainsOnly(1, 2, 3, 4, 5)
+
+			.processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+			.processWatermark2(4)
+
+			.assertLeftBufferContainsOnly(5)
+			.assertRightBufferContainsOnly(4, 5)
+
+			.processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+			.processWatermark2(6)
+
+			.assertLeftBufferEmpty()
+			.assertRightBufferEmpty()
+
+			.close();
+	}
+
+	@Test
+	public void testStateCleanupPositiveInclusivePositiveInclusive() throws Exception {
+		setupHarness(0, true, 1, true)
+			.processElement1(1)
+			.processElement1(2)
+			.processElement1(3)
+			.processElement1(4)
+			.processElement1(5)
+
+			.processElement2(1)
+			.processElement2(2)
+			.processElement2(3)
+			.processElement2(4)
+			.processElement2(5) // fill both buffers with values
+
+			.processWatermark1(1)
+			.processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+			.assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
+			.assertRightBufferContainsOnly(2, 3, 4, 5)
+
+			.processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+			.processWatermark2(4)
+
+			.assertLeftBufferContainsOnly(4, 5)
+			.assertRightBufferContainsOnly(5)
+
+			.processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+			.processWatermark2(6)
+
+			.assertLeftBufferEmpty()
+			.assertRightBufferEmpty()
+
+			.close();
+	}
+
+	@Test
+	public void testStateCleanupPositiveExlusivePositiveExclusive() throws Exception {
+		setupHarness(-1, false, 2, false)
+			.processElement1(1)
+			.processElement1(2)
+			.processElement1(3)
+			.processElement1(4)
+			.processElement1(5)
+
+			.processElement2(1)
+			.processElement2(2)
+			.processElement2(3)
+			.processElement2(4)
+			.processElement2(5) // fill both buffers with values
+
+			.processWatermark1(1)
+			.processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+			.assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
+			.assertRightBufferContainsOnly(2, 3, 4, 5)
+
+			.processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+			.processWatermark2(4)
+
+			.assertLeftBufferContainsOnly(4, 5)
+			.assertRightBufferContainsOnly(5)
+
+			.processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+			.processWatermark2(6)
+
+			.assertLeftBufferEmpty()
+			.assertRightBufferEmpty()
+
+			.close();
+	}
+
+	@Test
+	public void testRestoreFromSnapshot() throws Exception {
+
+		// config
+		int lowerBound = -1;
+		boolean lowerBoundInclusive = true;
+		int upperBound = 1;
+		boolean upperBoundInclusive = true;
+
+		// create first test harness
+		OperatorSubtaskState handles;
+		List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput;
+
+		try (TestHarness testHarness = createTestHarness(
+			lowerBound,
+			lowerBoundInclusive,
+			upperBound,
+			upperBoundInclusive
+		)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			// process elements with first test harness
+			testHarness.processElement1(createStreamRecord(1, "lhs"));
+			testHarness.processWatermark1(new Watermark(1));
+
+			testHarness.processElement2(createStreamRecord(1, "rhs"));
+			testHarness.processWatermark2(new Watermark(1));
+
+			testHarness.processElement1(createStreamRecord(2, "lhs"));
+			testHarness.processWatermark1(new Watermark(2));
+
+			testHarness.processElement2(createStreamRecord(2, "rhs"));
+			testHarness.processWatermark2(new Watermark(2));
+
+			testHarness.processElement1(createStreamRecord(3, "lhs"));
+			testHarness.processWatermark1(new Watermark(3));
+
+			testHarness.processElement2(createStreamRecord(3, "rhs"));
+			testHarness.processWatermark2(new Watermark(3));
+
+			// snapshot and validate output
+			handles = testHarness.snapshot(0, 0);
+			testHarness.close();
+
+			expectedOutput = Lists.newArrayList(
+				streamRecordOf(1, 1),
+				streamRecordOf(1, 2),
+				streamRecordOf(2, 1),
+				streamRecordOf(2, 2),
+				streamRecordOf(2, 3),
+				streamRecordOf(3, 2),
+				streamRecordOf(3, 3)
+			);
+
+			TestHarnessUtil.assertNoLateRecords(testHarness.getOutput());
+			assertOutput(expectedOutput, testHarness.getOutput());
+		}
+
+		try (TestHarness newTestHarness = createTestHarness(
+			lowerBound,
+			lowerBoundInclusive,
+			upperBound,
+			upperBoundInclusive
+		)) {
+			// create new test harness from snapshpt
+
+			newTestHarness.setup();
+			newTestHarness.initializeState(handles);
+			newTestHarness.open();
+
+			// process elements
+			newTestHarness.processElement1(createStreamRecord(4, "lhs"));
+			newTestHarness.processWatermark1(new Watermark(4));
+
+			newTestHarness.processElement2(createStreamRecord(4, "rhs"));
+			newTestHarness.processWatermark2(new Watermark(4));
+
+			// assert expected output
+			expectedOutput = Lists.newArrayList(
+				streamRecordOf(3, 4),
+				streamRecordOf(4, 3),
+				streamRecordOf(4, 4)
+			);
+
+			TestHarnessUtil.assertNoLateRecords(newTestHarness.getOutput());
+			assertOutput(expectedOutput, newTestHarness.getOutput());
+		}
+	}
+
+	@Test
+	public void testContextCorrectLeftTimestamp() throws Exception {
+
+		IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+			new IntervalJoinOperator<>(
+				-1,
+				1,
+				true,
+				true,
+				TestElem.serializer(),
+				TestElem.serializer(),
+				new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+					@Override
+					public void processElement(
+						TestElem left,
+						TestElem right,
+						Context ctx,
+						Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+						Assert.assertEquals(left.ts, ctx.getLeftTimestamp());
+					}
+				}
+			);
+
+		try (TestHarness testHarness = new TestHarness(
+			op,
+			(elem) -> elem.key,
+			(elem) -> elem.key,
+			TypeInformation.of(String.class)
+		)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			processElementsAndWatermarks(testHarness);
+		}
+	}
+
+	@Test
+	public void testReturnsCorrectTimestamp() throws Exception {
+		IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+			new IntervalJoinOperator<>(
+				-1,
+				1,
+				true,
+				true,
+				TestElem.serializer(),
+				TestElem.serializer(),
+				new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+					@Override
+					public void processElement(
+						TestElem left,
+						TestElem right,
+						Context ctx,
+						Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+						Assert.assertEquals(left.ts, ctx.getTimestamp());
+					}
+				}
+			);
+
+		try (TestHarness testHarness = new TestHarness(
+			op,
+			(elem) -> elem.key,
+			(elem) -> elem.key,
+			TypeInformation.of(String.class)
+		)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			processElementsAndWatermarks(testHarness);
+		}
+	}
+
+	@Test
+	public void testContextCorrectRightTimestamp() throws Exception {
+
+		IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+			new IntervalJoinOperator<>(
+				-1,
+				1,
+				true,
+				true,
+				TestElem.serializer(),
+				TestElem.serializer(),
+				new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+					@Override
+					public void processElement(
+						TestElem left,
+						TestElem right,
+						Context ctx,
+						Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+						Assert.assertEquals(right.ts, ctx.getRightTimestamp());
+					}
+				}
+			);
+
+		try (TestHarness testHarness = new TestHarness(
+			op,
+			(elem) -> elem.key,
+			(elem) -> elem.key,
+			TypeInformation.of(String.class)
+		)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			processElementsAndWatermarks(testHarness);
+		}
+	}
+
+	@Test(expected = FlinkException.class)
+	public void testFailsWithNoTimestampsLeft() throws Exception {
+		TestHarness newTestHarness = createTestHarness(0L, true, 0L, true);
+
+		newTestHarness.setup();
+		newTestHarness.open();
+
+		// note that the StreamRecord has no timestamp in constructor
+		newTestHarness.processElement1(new StreamRecord<>(new TestElem(0, "lhs")));
+	}
+
+	@Test(expected = FlinkException.class)
+	public void testFailsWithNoTimestampsRight() throws Exception {
+		try (TestHarness newTestHarness = createTestHarness(0L, true, 0L, true)) {
+
+			newTestHarness.setup();
+			newTestHarness.open();
+
+			// note that the StreamRecord has no timestamp in constructor
+			newTestHarness.processElement2(new StreamRecord<>(new TestElem(0, "rhs")));
+		}
+	}
+
+	@Test
+	public void testDiscardsLateData() throws Exception {
+		setupHarness(-1, true, 1, true)
+			.processElement1(1)
+			.processElement2(1)
+			.processElement1(2)
+			.processElement2(2)
+			.processElement1(3)
+			.processElement2(3)
+			.processWatermark1(3)
+			.processWatermark2(3)
+			.processElement1(1) // this element is late and should not be joined again
+			.processElement1(4)
+			.processElement2(4)
+			.processElement1(5)
+			.processElement2(5)
+			.andExpect(
+				streamRecordOf(1, 1),
+				streamRecordOf(1, 2),
+
+				streamRecordOf(2, 1),
+				streamRecordOf(2, 2),
+				streamRecordOf(2, 3),
+
+				streamRecordOf(3, 2),
+				streamRecordOf(3, 3),
+				streamRecordOf(3, 4),
+
+				streamRecordOf(4, 3),
+				streamRecordOf(4, 4),
+				streamRecordOf(4, 5),
+
+				streamRecordOf(5, 4),
+				streamRecordOf(5, 5)
+			)
+			.noLateRecords()
+			.close();
+	}
+
+	private void assertEmpty(MapState<Long, ?> state) throws Exception {
+		boolean stateIsEmpty = Iterables.size(state.keys()) == 0;
+		Assert.assertTrue("state not empty", stateIsEmpty);
+	}
+
+	private void assertContainsOnly(MapState<Long, ?> state, long... ts) throws Exception {
+		for (long t : ts) {
+			String message = "Keys not found in state. \n Expected: " + Arrays.toString(ts) + "\n Actual:   " + state.keys();
+			Assert.assertTrue(message, state.contains(t));
+		}
+
+		String message = "Too many objects in state. \n Expected: " + Arrays.toString(ts) + "\n Actual:   " + state.keys();
+		Assert.assertEquals(message, ts.length, Iterables.size(state.keys()));
+	}
+
+	private void assertOutput(
+		Iterable<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput,
+		Queue<Object> actualOutput) {
+
+		int actualSize = actualOutput.stream()
+			.filter(elem -> elem instanceof StreamRecord)
+			.collect(Collectors.toList())
+			.size();
+
+		int expectedSize = Iterables.size(expectedOutput);
+
+		Assert.assertEquals(
+			"Expected and actual size of stream records different",
+			expectedSize,
+			actualSize
+		);
+
+		for (StreamRecord<Tuple2<TestElem, TestElem>> record : expectedOutput) {
+			Assert.assertTrue(actualOutput.contains(record));
+		}
+	}
+
+	private TestHarness createTestHarness(long lowerBound,
+		boolean lowerBoundInclusive,
+		long upperBound,
+		boolean upperBoundInclusive) throws Exception {
+
+		IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
+			new IntervalJoinOperator<>(
+				lowerBound,
+				upperBound,
+				lowerBoundInclusive,
+				upperBoundInclusive,
+				TestElem.serializer(),
+				TestElem.serializer(),
+				new PassthroughFunction()
+			);
+
+		return new TestHarness(
+			operator,
+			(elem) -> elem.key, // key
+			(elem) -> elem.key, // key
+			TypeInformation.of(String.class)
+		);
+	}
+
+	private JoinTestBuilder setupHarness(long lowerBound,
+		boolean lowerBoundInclusive,
+		long upperBound,
+		boolean upperBoundInclusive) throws Exception {
+
+		IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
+			new IntervalJoinOperator<>(
+				lowerBound,
+				upperBound,
+				lowerBoundInclusive,
+				upperBoundInclusive,
+				TestElem.serializer(),
+				TestElem.serializer(),
+				new PassthroughFunction()
+			);
+
+		TestHarness t = new TestHarness(
+			operator,
+			(elem) -> elem.key, // key
+			(elem) -> elem.key, // key
+			TypeInformation.of(String.class)
+		);
+
+		return new JoinTestBuilder(t, operator);
+	}
+
+	private class JoinTestBuilder {
+
+		private IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator;
+		private TestHarness testHarness;
+
+		public JoinTestBuilder(
+			TestHarness t,
+			IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator
+		) throws Exception {
+
+			this.testHarness = t;
+			this.operator = operator;
+			t.open();
+			t.setup();
+		}
+
+		public TestHarness get() {
+			return testHarness;
+		}
+
+		public JoinTestBuilder processElement1(int ts) throws Exception {
+			testHarness.processElement1(createStreamRecord(ts, "lhs"));
+			return this;
+		}
+
+		public JoinTestBuilder processElement2(int ts) throws Exception {
+			testHarness.processElement2(createStreamRecord(ts, "rhs"));
+			return this;
+		}
+
+		public JoinTestBuilder processWatermark1(int ts) throws Exception {
+			testHarness.processWatermark1(new Watermark(ts));
+			return this;
+		}
+
+		public JoinTestBuilder processWatermark2(int ts) throws Exception {
+			testHarness.processWatermark2(new Watermark(ts));
+			return this;
+		}
+
+		public JoinTestBuilder processElementsAndWatermarks(int from, int to) throws Exception {
+			if (lhsFasterThanRhs) {
+				// add to lhs
+				for (int i = from; i <= to; i++) {
+					testHarness.processElement1(createStreamRecord(i, "lhs"));
+					testHarness.processWatermark1(new Watermark(i));
+				}
+
+				// add to rhs
+				for (int i = from; i <= to; i++) {
+					testHarness.processElement2(createStreamRecord(i, "rhs"));
+					testHarness.processWatermark2(new Watermark(i));
+				}
+			} else {
+				// add to rhs
+				for (int i = from; i <= to; i++) {
+					testHarness.processElement2(createStreamRecord(i, "rhs"));
+					testHarness.processWatermark2(new Watermark(i));
+				}
+
+				// add to lhs
+				for (int i = from; i <= to; i++) {
+					testHarness.processElement1(createStreamRecord(i, "lhs"));
+					testHarness.processWatermark1(new Watermark(i));
+				}
+			}
+
+			return this;
+		}
+
+		@SafeVarargs
+		public final JoinTestBuilder andExpect(StreamRecord<Tuple2<TestElem, TestElem>>... elems) {
+			assertOutput(Lists.newArrayList(elems), testHarness.getOutput());
+			return this;
+		}
+
+		public JoinTestBuilder assertLeftBufferContainsOnly(long... timestamps) {
+
+			try {
+				assertContainsOnly(operator.getLeftBuffer(), timestamps);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			return this;
+		}
+
+		public JoinTestBuilder assertRightBufferContainsOnly(long... timestamps) {
+
+			try {
+				assertContainsOnly(operator.getRightBuffer(), timestamps);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			return this;
+		}
+
+		public JoinTestBuilder assertLeftBufferEmpty() {
+			try {
+				assertEmpty(operator.getLeftBuffer());
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			return this;
+		}
+
+		public JoinTestBuilder assertRightBufferEmpty() {
+			try {
+				assertEmpty(operator.getRightBuffer());
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			return this;
+		}
+
+		public JoinTestBuilder noLateRecords() {
+			TestHarnessUtil.assertNoLateRecords(this.testHarness.getOutput());
+			return this;
+		}
+
+		public void close() throws Exception {
+			testHarness.close();
+		}
+	}
+
+	private static class PassthroughFunction extends ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>> {
+
+		@Override
+		public void processElement(
+			TestElem left,
+			TestElem right,
+			Context ctx,
+			Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+			out.collect(Tuple2.of(left, right));
+		}
+	}
+
+	private StreamRecord<Tuple2<TestElem, TestElem>> streamRecordOf(
+		long lhsTs,
+		long rhsTs
+	) {
+		TestElem lhs = new TestElem(lhsTs, "lhs");
+		TestElem rhs = new TestElem(rhsTs, "rhs");
+
+		long ts = Math.max(lhsTs, rhsTs);
+		return new StreamRecord<>(Tuple2.of(lhs, rhs), ts);
+	}
+
+	private static class TestElem {
+		String key;
+		long ts;
+		String source;
+
+		public TestElem(long ts, String source) {
+			this.key = "key";
+			this.ts = ts;
+			this.source = source;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			TestElem testElem = (TestElem) o;
+
+			if (ts != testElem.ts) {
+				return false;
+			}
+
+			if (key != null ? !key.equals(testElem.key) : testElem.key != null) {
+				return false;
+			}
+
+			return source != null ? source.equals(testElem.source) : testElem.source == null;
+		}
+
+		@Override
+		public int hashCode() {
+			int result = key != null ? key.hashCode() : 0;
+			result = 31 * result + (int) (ts ^ (ts >>> 32));
+			result = 31 * result + (source != null ? source.hashCode() : 0);
+			return result;
+		}
+
+		@Override
+		public String toString() {
+			return this.source + ":" + this.ts;
+		}
+
+		public static TypeSerializer<TestElem> serializer() {
+			return TypeInformation.of(new TypeHint<TestElem>() {
+			}).createSerializer(new ExecutionConfig());
+		}
+	}
+
+	private static StreamRecord<TestElem> createStreamRecord(long ts, String source) {
+		TestElem testElem = new TestElem(ts, source);
+		return new StreamRecord<>(testElem, ts);
+	}
+
+	private void processElementsAndWatermarks(TestHarness testHarness) throws Exception {
+		if (lhsFasterThanRhs) {
+			// add to lhs
+			for (int i = 1; i <= 4; i++) {
+				testHarness.processElement1(createStreamRecord(i, "lhs"));
+				testHarness.processWatermark1(new Watermark(i));
+			}
+
+			// add to rhs
+			for (int i = 1; i <= 4; i++) {
+				testHarness.processElement2(createStreamRecord(i, "rhs"));
+				testHarness.processWatermark2(new Watermark(i));
+			}
+		} else {
+			// add to rhs
+			for (int i = 1; i <= 4; i++) {
+				testHarness.processElement2(createStreamRecord(i, "rhs"));
+				testHarness.processWatermark2(new Watermark(i));
+			}
+
+			// add to lhs
+			for (int i = 1; i <= 4; i++) {
+				testHarness.processElement1(createStreamRecord(i, "lhs"));
+				testHarness.processWatermark1(new Watermark(i));
+			}
+		}
+	}
+
+	/**
+	 * Custom test harness to avoid endless generics in all of the test code.
+	 */
+	private static class TestHarness extends KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> {
+
+		TestHarness(
+			TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>> operator,
+			KeySelector<TestElem, String> keySelector1,
+			KeySelector<TestElem, String> keySelector2,
+			TypeInformation<String> keyType) throws Exception {
+			super(operator, keySelector1, keySelector2, keyType);
+		}
+	}
+}


[3/3] flink git commit: [hotfix] Fixed import order in RocksDBKeyedStateBackend.

Posted by kk...@apache.org.
[hotfix] Fixed import order in RocksDBKeyedStateBackend.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca0fa96b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca0fa96b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca0fa96b

Branch: refs/heads/master
Commit: ca0fa96bfacdc3a3a9e149b68c282c19fea2e2db
Parents: 42ada8a
Author: kkloudas <kk...@gmail.com>
Authored: Thu Jul 12 21:15:29 2018 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Jul 12 21:15:29 2018 +0200

----------------------------------------------------------------------
 .../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ca0fa96b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 3bf0aea..622dd0c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -87,8 +87,8 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
 import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
-import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkRuntimeException;