You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/07/12 19:34:40 UTC
[1/3] flink git commit: [FLINK-8480][DataStream] Add APIs for
Interval Joins.
Repository: flink
Updated Branches:
refs/heads/master f45b7f7ff -> ca0fa96bf
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java
deleted file mode 100644
index 75543e7..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperatorTest.java
+++ /dev/null
@@ -1,941 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.functions.co.TimeBoundedJoinFunction;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkException;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Queue;
-import java.util.stream.Collectors;
-
-
-/**
- * Tests for {@link TimeBoundedStreamJoinOperator}.
- * Those tests cover correctness and cleaning of state
- */
-@RunWith(Parameterized.class)
-public class TimeBoundedStreamJoinOperatorTest {
-
- private final boolean lhsFasterThanRhs;
-
- @Parameters(name = "lhs faster than rhs: {0}")
- public static Collection<Object[]> data() {
- return Arrays.asList(new Object[][]{
- {true}, {false}
- });
- }
-
- public TimeBoundedStreamJoinOperatorTest(boolean lhsFasterThanRhs) {
- this.lhsFasterThanRhs = lhsFasterThanRhs;
- }
-
- @Test
- public void testImplementationMirrorsCorrectly() throws Exception {
-
- long lowerBound = 1;
- long upperBound = 3;
-
- boolean lowerBoundInclusive = true;
- boolean upperBoundInclusive = false;
-
- setupHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive)
- .processElementsAndWatermarks(1, 4)
- .andExpect(
- streamRecordOf(1, 2),
- streamRecordOf(1, 3),
- streamRecordOf(2, 3),
- streamRecordOf(2, 4),
- streamRecordOf(3, 4))
- .noLateRecords()
- .close();
-
- setupHarness(-1 * upperBound, upperBoundInclusive, -1 * lowerBound, lowerBoundInclusive)
- .processElementsAndWatermarks(1, 4)
- .andExpect(
- streamRecordOf(2, 1),
- streamRecordOf(3, 1),
- streamRecordOf(3, 2),
- streamRecordOf(4, 2),
- streamRecordOf(4, 3))
- .noLateRecords()
- .close();
- }
-
- @Test // lhs - 2 <= rhs <= rhs + 2
- public void testNegativeInclusiveAndNegativeInclusive() throws Exception {
-
- setupHarness(-2, true, -1, true)
- .processElementsAndWatermarks(1, 4)
- .andExpect(
- streamRecordOf(2, 1),
- streamRecordOf(3, 1),
- streamRecordOf(3, 2),
- streamRecordOf(4, 2),
- streamRecordOf(4, 3)
- )
- .noLateRecords()
- .close();
- }
-
- @Test // lhs - 1 <= rhs <= rhs + 1
- public void testNegativeInclusiveAndPositiveInclusive() throws Exception {
-
- setupHarness(-1, true, 1, true)
- .processElementsAndWatermarks(1, 4)
- .andExpect(
- streamRecordOf(1, 1),
- streamRecordOf(1, 2),
- streamRecordOf(2, 1),
- streamRecordOf(2, 2),
- streamRecordOf(2, 3),
- streamRecordOf(3, 2),
- streamRecordOf(3, 3),
- streamRecordOf(3, 4),
- streamRecordOf(4, 3),
- streamRecordOf(4, 4)
- )
- .noLateRecords()
- .close();
- }
-
- @Test // lhs + 1 <= rhs <= lhs + 2
- public void testPositiveInclusiveAndPositiveInclusive() throws Exception {
-
- setupHarness(1, true, 2, true)
- .processElementsAndWatermarks(1, 4)
- .andExpect(
- streamRecordOf(1, 2),
- streamRecordOf(1, 3),
- streamRecordOf(2, 3),
- streamRecordOf(2, 4),
- streamRecordOf(3, 4)
- )
- .noLateRecords()
- .close();
- }
-
- @Test
- public void testNegativeExclusiveAndNegativeExlusive() throws Exception {
-
- setupHarness(-3, false, -1, false)
- .processElementsAndWatermarks(1, 4)
- .andExpect(
- streamRecordOf(3, 1),
- streamRecordOf(4, 2)
- )
- .noLateRecords()
- .close();
- }
-
- @Test
- public void testNegativeExclusiveAndPositiveExlusive() throws Exception {
-
- setupHarness(-1, false, 1, false)
- .processElementsAndWatermarks(1, 4)
- .andExpect(
- streamRecordOf(1, 1),
- streamRecordOf(2, 2),
- streamRecordOf(3, 3),
- streamRecordOf(4, 4)
- )
- .noLateRecords()
- .close();
- }
-
- @Test
- public void testPositiveExclusiveAndPositiveExlusive() throws Exception {
-
- setupHarness(1, false, 3, false)
- .processElementsAndWatermarks(1, 4)
- .andExpect(
- streamRecordOf(1, 3),
- streamRecordOf(2, 4)
- )
- .noLateRecords()
- .close();
- }
-
- @Test
- public void testStateCleanupNegativeInclusiveNegativeInclusive() throws Exception {
-
- setupHarness(-1, true, 0, true)
- .processElement1(1)
- .processElement1(2)
- .processElement1(3)
- .processElement1(4)
- .processElement1(5)
-
- .processElement2(1)
- .processElement2(2)
- .processElement2(3)
- .processElement2(4)
- .processElement2(5) // fill both buffers with values
-
- .processWatermark1(1)
- .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
-
- .assertLeftBufferContainsOnly(2, 3, 4, 5)
- .assertRightBufferContainsOnly(1, 2, 3, 4, 5)
-
- .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
- .processWatermark2(4)
-
- .assertLeftBufferContainsOnly(5)
- .assertRightBufferContainsOnly(4, 5)
-
- .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
- .processWatermark2(6)
-
- .assertLeftBufferEmpty()
- .assertRightBufferEmpty()
-
- .close();
- }
-
- @Test
- public void testStateCleanupNegativePositiveNegativeExlusive() throws Exception {
- setupHarness(-2, false, 1, false)
- .processElement1(1)
- .processElement1(2)
- .processElement1(3)
- .processElement1(4)
- .processElement1(5)
-
- .processElement2(1)
- .processElement2(2)
- .processElement2(3)
- .processElement2(4)
- .processElement2(5) // fill both buffers with values
-
- .processWatermark1(1)
- .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
-
- .assertLeftBufferContainsOnly(2, 3, 4, 5)
- .assertRightBufferContainsOnly(1, 2, 3, 4, 5)
-
- .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
- .processWatermark2(4)
-
- .assertLeftBufferContainsOnly(5)
- .assertRightBufferContainsOnly(4, 5)
-
- .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
- .processWatermark2(6)
-
- .assertLeftBufferEmpty()
- .assertRightBufferEmpty()
-
- .close();
- }
-
- @Test
- public void testStateCleanupPositiveInclusivePositiveInclusive() throws Exception {
- setupHarness(0, true, 1, true)
- .processElement1(1)
- .processElement1(2)
- .processElement1(3)
- .processElement1(4)
- .processElement1(5)
-
- .processElement2(1)
- .processElement2(2)
- .processElement2(3)
- .processElement2(4)
- .processElement2(5) // fill both buffers with values
-
- .processWatermark1(1)
- .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
-
- .assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
- .assertRightBufferContainsOnly(2, 3, 4, 5)
-
- .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
- .processWatermark2(4)
-
- .assertLeftBufferContainsOnly(4, 5)
- .assertRightBufferContainsOnly(5)
-
- .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
- .processWatermark2(6)
-
- .assertLeftBufferEmpty()
- .assertRightBufferEmpty()
-
- .close();
- }
-
- @Test
- public void testStateCleanupPositiveExlusivePositiveExclusive() throws Exception {
- setupHarness(-1, false, 2, false)
- .processElement1(1)
- .processElement1(2)
- .processElement1(3)
- .processElement1(4)
- .processElement1(5)
-
- .processElement2(1)
- .processElement2(2)
- .processElement2(3)
- .processElement2(4)
- .processElement2(5) // fill both buffers with values
-
- .processWatermark1(1)
- .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
-
- .assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
- .assertRightBufferContainsOnly(2, 3, 4, 5)
-
- .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
- .processWatermark2(4)
-
- .assertLeftBufferContainsOnly(4, 5)
- .assertRightBufferContainsOnly(5)
-
- .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
- .processWatermark2(6)
-
- .assertLeftBufferEmpty()
- .assertRightBufferEmpty()
-
- .close();
- }
-
- @Test
- public void testRestoreFromSnapshot() throws Exception {
-
- // config
- int lowerBound = -1;
- boolean lowerBoundInclusive = true;
- int upperBound = 1;
- boolean upperBoundInclusive = true;
-
- // create first test harness
- OperatorSubtaskState handles;
- List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput;
-
- try (TestHarness testHarness = createTestHarness(
- lowerBound,
- lowerBoundInclusive,
- upperBound,
- upperBoundInclusive
- )) {
-
- testHarness.setup();
- testHarness.open();
-
- // process elements with first test harness
- testHarness.processElement1(createStreamRecord(1, "lhs"));
- testHarness.processWatermark1(new Watermark(1));
-
- testHarness.processElement2(createStreamRecord(1, "rhs"));
- testHarness.processWatermark2(new Watermark(1));
-
- testHarness.processElement1(createStreamRecord(2, "lhs"));
- testHarness.processWatermark1(new Watermark(2));
-
- testHarness.processElement2(createStreamRecord(2, "rhs"));
- testHarness.processWatermark2(new Watermark(2));
-
- testHarness.processElement1(createStreamRecord(3, "lhs"));
- testHarness.processWatermark1(new Watermark(3));
-
- testHarness.processElement2(createStreamRecord(3, "rhs"));
- testHarness.processWatermark2(new Watermark(3));
-
- // snapshot and validate output
- handles = testHarness.snapshot(0, 0);
- testHarness.close();
-
- expectedOutput = Lists.newArrayList(
- streamRecordOf(1, 1),
- streamRecordOf(1, 2),
- streamRecordOf(2, 1),
- streamRecordOf(2, 2),
- streamRecordOf(2, 3),
- streamRecordOf(3, 2),
- streamRecordOf(3, 3)
- );
-
- TestHarnessUtil.assertNoLateRecords(testHarness.getOutput());
- assertOutput(expectedOutput, testHarness.getOutput());
- }
-
- try (TestHarness newTestHarness = createTestHarness(
- lowerBound,
- lowerBoundInclusive,
- upperBound,
- upperBoundInclusive
- )) {
- // create new test harness from snapshpt
-
- newTestHarness.setup();
- newTestHarness.initializeState(handles);
- newTestHarness.open();
-
- // process elements
- newTestHarness.processElement1(createStreamRecord(4, "lhs"));
- newTestHarness.processWatermark1(new Watermark(4));
-
- newTestHarness.processElement2(createStreamRecord(4, "rhs"));
- newTestHarness.processWatermark2(new Watermark(4));
-
- // assert expected output
- expectedOutput = Lists.newArrayList(
- streamRecordOf(3, 4),
- streamRecordOf(4, 3),
- streamRecordOf(4, 4)
- );
-
- TestHarnessUtil.assertNoLateRecords(newTestHarness.getOutput());
- assertOutput(expectedOutput, newTestHarness.getOutput());
- }
- }
-
- @Test
- public void testContextCorrectLeftTimestamp() throws Exception {
-
- TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
- new TimeBoundedStreamJoinOperator<>(
- -1,
- 1,
- true,
- true,
- TestElem.serializer(),
- TestElem.serializer(),
- new TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
- @Override
- public void processElement(
- TestElem left,
- TestElem right,
- Context ctx,
- Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
- Assert.assertEquals(left.ts, ctx.getLeftTimestamp());
- }
- }
- );
-
- try (TestHarness testHarness = new TestHarness(
- op,
- (elem) -> elem.key,
- (elem) -> elem.key,
- TypeInformation.of(String.class)
- )) {
-
- testHarness.setup();
- testHarness.open();
-
- processElementsAndWatermarks(testHarness);
- }
- }
-
- @Test
- public void testReturnsCorrectTimestamp() throws Exception {
- TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
- new TimeBoundedStreamJoinOperator<>(
- -1,
- 1,
- true,
- true,
- TestElem.serializer(),
- TestElem.serializer(),
- new TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
- @Override
- public void processElement(
- TestElem left,
- TestElem right,
- Context ctx,
- Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
- Assert.assertEquals(left.ts, ctx.getTimestamp());
- }
- }
- );
-
- try (TestHarness testHarness = new TestHarness(
- op,
- (elem) -> elem.key,
- (elem) -> elem.key,
- TypeInformation.of(String.class)
- )) {
-
- testHarness.setup();
- testHarness.open();
-
- processElementsAndWatermarks(testHarness);
- }
- }
-
- @Test
- public void testContextCorrectRightTimestamp() throws Exception {
-
- TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
- new TimeBoundedStreamJoinOperator<>(
- -1,
- 1,
- true,
- true,
- TestElem.serializer(),
- TestElem.serializer(),
- new TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
- @Override
- public void processElement(
- TestElem left,
- TestElem right,
- Context ctx,
- Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
- Assert.assertEquals(right.ts, ctx.getRightTimestamp());
- }
- }
- );
-
- try (TestHarness testHarness = new TestHarness(
- op,
- (elem) -> elem.key,
- (elem) -> elem.key,
- TypeInformation.of(String.class)
- )) {
-
- testHarness.setup();
- testHarness.open();
-
- processElementsAndWatermarks(testHarness);
- }
- }
-
- @Test(expected = FlinkException.class)
- public void testFailsWithNoTimestampsLeft() throws Exception {
- TestHarness newTestHarness = createTestHarness(0L, true, 0L, true);
-
- newTestHarness.setup();
- newTestHarness.open();
-
- // note that the StreamRecord has no timestamp in constructor
- newTestHarness.processElement1(new StreamRecord<>(new TestElem(0, "lhs")));
- }
-
- @Test(expected = FlinkException.class)
- public void testFailsWithNoTimestampsRight() throws Exception {
- try (TestHarness newTestHarness = createTestHarness(0L, true, 0L, true)) {
-
- newTestHarness.setup();
- newTestHarness.open();
-
- // note that the StreamRecord has no timestamp in constructor
- newTestHarness.processElement2(new StreamRecord<>(new TestElem(0, "rhs")));
- }
- }
-
- @Test
- public void testDiscardsLateData() throws Exception {
- setupHarness(-1, true, 1, true)
- .processElement1(1)
- .processElement2(1)
- .processElement1(2)
- .processElement2(2)
- .processElement1(3)
- .processElement2(3)
- .processWatermark1(3)
- .processWatermark2(3)
- .processElement1(1) // this element is late and should not be joined again
- .processElement1(4)
- .processElement2(4)
- .processElement1(5)
- .processElement2(5)
- .andExpect(
- streamRecordOf(1, 1),
- streamRecordOf(1, 2),
-
- streamRecordOf(2, 1),
- streamRecordOf(2, 2),
- streamRecordOf(2, 3),
-
- streamRecordOf(3, 2),
- streamRecordOf(3, 3),
- streamRecordOf(3, 4),
-
- streamRecordOf(4, 3),
- streamRecordOf(4, 4),
- streamRecordOf(4, 5),
-
- streamRecordOf(5, 4),
- streamRecordOf(5, 5)
- )
- .noLateRecords()
- .close();
- }
-
- private void assertEmpty(MapState<Long, ?> state) throws Exception {
- boolean stateIsEmpty = Iterables.size(state.keys()) == 0;
- Assert.assertTrue("state not empty", stateIsEmpty);
- }
-
- private void assertContainsOnly(MapState<Long, ?> state, long... ts) throws Exception {
- for (long t : ts) {
- String message = "Keys not found in state. \n Expected: " + Arrays.toString(ts) + "\n Actual: " + state.keys();
- Assert.assertTrue(message, state.contains(t));
- }
-
- String message = "Too many objects in state. \n Expected: " + Arrays.toString(ts) + "\n Actual: " + state.keys();
- Assert.assertEquals(message, ts.length, Iterables.size(state.keys()));
- }
-
- private void assertOutput(
- Iterable<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput,
- Queue<Object> actualOutput) {
-
- int actualSize = actualOutput.stream()
- .filter(elem -> elem instanceof StreamRecord)
- .collect(Collectors.toList())
- .size();
-
- int expectedSize = Iterables.size(expectedOutput);
-
- Assert.assertEquals(
- "Expected and actual size of stream records different",
- expectedSize,
- actualSize
- );
-
- for (StreamRecord<Tuple2<TestElem, TestElem>> record : expectedOutput) {
- Assert.assertTrue(actualOutput.contains(record));
- }
- }
-
- private TestHarness createTestHarness(long lowerBound,
- boolean lowerBoundInclusive,
- long upperBound,
- boolean upperBoundInclusive) throws Exception {
-
- TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
- new TimeBoundedStreamJoinOperator<>(
- lowerBound,
- upperBound,
- lowerBoundInclusive,
- upperBoundInclusive,
- TestElem.serializer(),
- TestElem.serializer(),
- new PassthroughFunction()
- );
-
- return new TestHarness(
- operator,
- (elem) -> elem.key, // key
- (elem) -> elem.key, // key
- TypeInformation.of(String.class)
- );
- }
-
- private JoinTestBuilder setupHarness(long lowerBound,
- boolean lowerBoundInclusive,
- long upperBound,
- boolean upperBoundInclusive) throws Exception {
-
- TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
- new TimeBoundedStreamJoinOperator<>(
- lowerBound,
- upperBound,
- lowerBoundInclusive,
- upperBoundInclusive,
- TestElem.serializer(),
- TestElem.serializer(),
- new PassthroughFunction()
- );
-
- TestHarness t = new TestHarness(
- operator,
- (elem) -> elem.key, // key
- (elem) -> elem.key, // key
- TypeInformation.of(String.class)
- );
-
- return new JoinTestBuilder(t, operator);
- }
-
- private class JoinTestBuilder {
-
- private TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator;
- private TestHarness testHarness;
-
- public JoinTestBuilder(
- TestHarness t,
- TimeBoundedStreamJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator
- ) throws Exception {
-
- this.testHarness = t;
- this.operator = operator;
- t.open();
- t.setup();
- }
-
- public TestHarness get() {
- return testHarness;
- }
-
- public JoinTestBuilder processElement1(int ts) throws Exception {
- testHarness.processElement1(createStreamRecord(ts, "lhs"));
- return this;
- }
-
- public JoinTestBuilder processElement2(int ts) throws Exception {
- testHarness.processElement2(createStreamRecord(ts, "rhs"));
- return this;
- }
-
- public JoinTestBuilder processWatermark1(int ts) throws Exception {
- testHarness.processWatermark1(new Watermark(ts));
- return this;
- }
-
- public JoinTestBuilder processWatermark2(int ts) throws Exception {
- testHarness.processWatermark2(new Watermark(ts));
- return this;
- }
-
- public JoinTestBuilder processElementsAndWatermarks(int from, int to) throws Exception {
- if (lhsFasterThanRhs) {
- // add to lhs
- for (int i = from; i <= to; i++) {
- testHarness.processElement1(createStreamRecord(i, "lhs"));
- testHarness.processWatermark1(new Watermark(i));
- }
-
- // add to rhs
- for (int i = from; i <= to; i++) {
- testHarness.processElement2(createStreamRecord(i, "rhs"));
- testHarness.processWatermark2(new Watermark(i));
- }
- } else {
- // add to rhs
- for (int i = from; i <= to; i++) {
- testHarness.processElement2(createStreamRecord(i, "rhs"));
- testHarness.processWatermark2(new Watermark(i));
- }
-
- // add to lhs
- for (int i = from; i <= to; i++) {
- testHarness.processElement1(createStreamRecord(i, "lhs"));
- testHarness.processWatermark1(new Watermark(i));
- }
- }
-
- return this;
- }
-
- @SafeVarargs
- public final JoinTestBuilder andExpect(StreamRecord<Tuple2<TestElem, TestElem>>... elems) {
- assertOutput(Lists.newArrayList(elems), testHarness.getOutput());
- return this;
- }
-
- public JoinTestBuilder assertLeftBufferContainsOnly(long... timestamps) {
-
- try {
- assertContainsOnly(operator.getLeftBuffer(), timestamps);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return this;
- }
-
- public JoinTestBuilder assertRightBufferContainsOnly(long... timestamps) {
-
- try {
- assertContainsOnly(operator.getRightBuffer(), timestamps);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return this;
- }
-
- public JoinTestBuilder assertLeftBufferEmpty() {
- try {
- assertEmpty(operator.getLeftBuffer());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return this;
- }
-
- public JoinTestBuilder assertRightBufferEmpty() {
- try {
- assertEmpty(operator.getRightBuffer());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return this;
- }
-
- public JoinTestBuilder noLateRecords() {
- TestHarnessUtil.assertNoLateRecords(this.testHarness.getOutput());
- return this;
- }
-
- public void close() throws Exception {
- testHarness.close();
- }
- }
-
- private static class PassthroughFunction extends TimeBoundedJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>> {
-
- @Override
- public void processElement(
- TestElem left,
- TestElem right,
- Context ctx,
- Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
- out.collect(Tuple2.of(left, right));
- }
- }
-
- private StreamRecord<Tuple2<TestElem, TestElem>> streamRecordOf(
- long lhsTs,
- long rhsTs
- ) {
- TestElem lhs = new TestElem(lhsTs, "lhs");
- TestElem rhs = new TestElem(rhsTs, "rhs");
-
- long ts = Math.max(lhsTs, rhsTs);
- return new StreamRecord<>(Tuple2.of(lhs, rhs), ts);
- }
-
- private static class TestElem {
- String key;
- long ts;
- String source;
-
- public TestElem(long ts, String source) {
- this.key = "key";
- this.ts = ts;
- this.source = source;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- TestElem testElem = (TestElem) o;
-
- if (ts != testElem.ts) {
- return false;
- }
-
- if (key != null ? !key.equals(testElem.key) : testElem.key != null) {
- return false;
- }
-
- return source != null ? source.equals(testElem.source) : testElem.source == null;
- }
-
- @Override
- public int hashCode() {
- int result = key != null ? key.hashCode() : 0;
- result = 31 * result + (int) (ts ^ (ts >>> 32));
- result = 31 * result + (source != null ? source.hashCode() : 0);
- return result;
- }
-
- @Override
- public String toString() {
- return this.source + ":" + this.ts;
- }
-
- public static TypeSerializer<TestElem> serializer() {
- return TypeInformation.of(new TypeHint<TestElem>() {
- }).createSerializer(new ExecutionConfig());
- }
- }
-
- private static StreamRecord<TestElem> createStreamRecord(long ts, String source) {
- TestElem testElem = new TestElem(ts, source);
- return new StreamRecord<>(testElem, ts);
- }
-
- private void processElementsAndWatermarks(TestHarness testHarness) throws Exception {
- if (lhsFasterThanRhs) {
- // add to lhs
- for (int i = 1; i <= 4; i++) {
- testHarness.processElement1(createStreamRecord(i, "lhs"));
- testHarness.processWatermark1(new Watermark(i));
- }
-
- // add to rhs
- for (int i = 1; i <= 4; i++) {
- testHarness.processElement2(createStreamRecord(i, "rhs"));
- testHarness.processWatermark2(new Watermark(i));
- }
- } else {
- // add to rhs
- for (int i = 1; i <= 4; i++) {
- testHarness.processElement2(createStreamRecord(i, "rhs"));
- testHarness.processWatermark2(new Watermark(i));
- }
-
- // add to lhs
- for (int i = 1; i <= 4; i++) {
- testHarness.processElement1(createStreamRecord(i, "lhs"));
- testHarness.processWatermark1(new Watermark(i));
- }
- }
- }
-
- /**
- * Custom test harness to avoid endless generics in all of the test code.
- */
- private static class TestHarness extends KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> {
-
- TestHarness(
- TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>> operator,
- KeySelector<TestElem, String> keySelector1,
- KeySelector<TestElem, String> keySelector2,
- TypeInformation<String> keyType) throws Exception {
- super(operator, keySelector1, keySelector2, keyType);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 51def98..580dd46 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -24,10 +24,11 @@ import org.apache.flink.api.common.state.{FoldingStateDescriptor, ReducingStateD
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.streaming.api.datastream.{QueryableStateStream, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
-import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator}
+import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
import org.apache.flink.streaming.api.operators.StreamGroupedReduce
import org.apache.flink.streaming.api.scala.function.StatefulFunction
import org.apache.flink.streaming.api.windowing.assigners._
@@ -109,6 +110,109 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
asScalaStream(javaStream.process(keyedProcessFunction, implicitly[TypeInformation[R]]))
}
+
+ // ------------------------------------------------------------------------
+ // Joining
+ // ------------------------------------------------------------------------
+
+ /**
+ * Join elements of this [[KeyedStream]] with elements of another [[KeyedStream]] over
+ * a time interval that can be specified with [[IntervalJoin.between]].
+ *
+ * @param otherStream The other keyed stream to join this keyed stream with
+ * @tparam OTHER Type parameter of elements in the other stream
+ * @return An instance of [[IntervalJoin]] with this keyed stream and the other keyed stream
+ */
+ @PublicEvolving
+ def intervalJoin[OTHER](otherStream: KeyedStream[OTHER, K]): IntervalJoin[T, OTHER, K] = {
+ new IntervalJoin[T, OTHER, K](this, otherStream)
+ }
+
+ /**
+ * Perform a join over a time interval.
+ *
+ * @tparam IN1 The type parameter of the elements in the first streams
+ * @tparam IN2 The The type parameter of the elements in the second stream
+ */
+ @PublicEvolving
+ class IntervalJoin[IN1, IN2, KEY](val streamOne: KeyedStream[IN1, KEY],
+ val streamTwo: KeyedStream[IN2, KEY]) {
+
+ /**
+ * Specifies the time boundaries over which the join operation works, so that
+ * <pre>leftElement.timestamp + lowerBound <= rightElement.timestamp
+ * <= leftElement.timestamp + upperBound</pre>
+ * By default both the lower and the upper bound are inclusive. This can be configured
+ * with [[IntervalJoined.lowerBoundExclusive]] and
+ * [[IntervalJoined.upperBoundExclusive]]
+ *
+ * @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound
+ * @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound
+ */
+ @PublicEvolving
+ def between(lowerBound: Time, upperBound: Time): IntervalJoined[IN1, IN2, KEY] = {
+ val lowerMillis = lowerBound.toMilliseconds
+ val upperMillis = upperBound.toMilliseconds
+ new IntervalJoined[IN1, IN2, KEY](streamOne, streamTwo, lowerMillis, upperMillis)
+ }
+ }
+
+ /**
+ * IntervalJoined is a container for two streams that have keys for both sides as well as
+ * the time boundaries over which elements should be joined.
+ *
+ * @tparam IN1 Input type of elements from the first stream
+ * @tparam IN2 Input type of elements from the second stream
+ * @tparam KEY The type of the key
+ */
+ @PublicEvolving
+ class IntervalJoined[IN1, IN2, KEY](private val firstStream: KeyedStream[IN1, KEY],
+ private val secondStream: KeyedStream[IN2, KEY],
+ private val lowerBound: Long,
+ private val upperBound: Long) {
+
+ private var lowerBoundInclusive = true
+ private var upperBoundInclusive = true
+
+ /**
+ * Set the lower bound to be exclusive
+ */
+ @PublicEvolving
+ def lowerBoundExclusive(): IntervalJoined[IN1, IN2, KEY] = {
+ this.lowerBoundInclusive = false
+ this
+ }
+
+ /**
+ * Set the upper bound to be exclusive
+ */
+ @PublicEvolving
+ def upperBoundExclusive(): IntervalJoined[IN1, IN2, KEY] = {
+ this.upperBoundInclusive = false
+ this
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed for each joined pair
+ * of elements.
+ *
+ * @param processJoinFunction The user-defined function
+ * @tparam OUT The output type
+ * @return Returns a DataStream
+ */
+ @PublicEvolving
+ def process[OUT](processJoinFunction: ProcessJoinFunction[IN1, IN2, OUT]): DataStream[OUT] = {
+ val javaJoined = new KeyedJavaStream.IntervalJoined[IN1, IN2, KEY](
+ firstStream.javaStream.asInstanceOf[KeyedJavaStream[IN1, KEY]],
+ secondStream.javaStream.asInstanceOf[KeyedJavaStream[IN2, KEY]],
+ lowerBound,
+ upperBound,
+ lowerBoundInclusive,
+ upperBoundInclusive)
+ asScalaStream(javaJoined.process(processJoinFunction))
+ }
+ }
+
// ------------------------------------------------------------------------
// Windowing
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/IntervalJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/IntervalJoinITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/IntervalJoinITCase.scala
new file mode 100644
index 0000000..80701c6
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/IntervalJoinITCase.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.test.util.AbstractTestBase
+import org.apache.flink.util.Collector
+import org.junit.{Assert, Test}
+
+import scala.collection.mutable.ListBuffer
+
+class IntervalJoinITCase extends AbstractTestBase {
+
+ @Test
+ def testInclusiveBounds(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+
+ val dataStream1 = env.fromElements(("key", 0L), ("key", 1L), ("key", 2L))
+ .assignTimestampsAndWatermarks(new TimestampExtractor())
+ .keyBy(elem => elem._1)
+
+ val dataStream2 = env.fromElements(("key", 0L), ("key", 1L), ("key", 2L))
+ .assignTimestampsAndWatermarks(new TimestampExtractor())
+ .keyBy(elem => elem._1)
+
+ val sink = new ResultSink()
+
+ dataStream1.intervalJoin(dataStream2)
+ .between(Time.milliseconds(0), Time.milliseconds(2))
+ .process(new CombineToStringJoinFunction())
+ .addSink(sink)
+
+ env.execute()
+
+ sink.expectInAnyOrder(
+ "(key,0):(key,0)",
+ "(key,0):(key,1)",
+ "(key,0):(key,2)",
+
+ "(key,1):(key,1)",
+ "(key,1):(key,2)",
+
+ "(key,2):(key,2)"
+ )
+ }
+
+ @Test
+ def testExclusiveBounds(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+
+ val dataStream1 = env.fromElements(("key", 0L), ("key", 1L), ("key", 2L))
+ .assignTimestampsAndWatermarks(new TimestampExtractor())
+ .keyBy(elem => elem._1)
+
+ val dataStream2 = env.fromElements(("key", 0L), ("key", 1L), ("key", 2L))
+ .assignTimestampsAndWatermarks(new TimestampExtractor())
+ .keyBy(elem => elem._1)
+
+ val sink = new ResultSink()
+
+ dataStream1.intervalJoin(dataStream2)
+ .between(Time.milliseconds(0), Time.milliseconds(2))
+ .lowerBoundExclusive()
+ .upperBoundExclusive()
+ .process(new CombineToStringJoinFunction())
+ .addSink(sink)
+
+ env.execute()
+
+ sink.expectInAnyOrder(
+ "(key,0):(key,1)",
+ "(key,1):(key,2)"
+ )
+ }
+}
+
+object Companion {
+ val results: ListBuffer[String] = new ListBuffer()
+}
+
+class ResultSink extends SinkFunction[String] {
+
+ override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
+ Companion.results.append(value)
+ }
+
+ def expectInAnyOrder(expected: String*): Unit = {
+ Assert.assertTrue(expected.toSet.equals(Companion.results.toSet))
+ }
+}
+
+class TimestampExtractor extends AscendingTimestampExtractor[(String, Long)] {
+ override def extractAscendingTimestamp(element: (String, Long)): Long = element._2
+}
+
+class CombineToStringJoinFunction
+ extends ProcessJoinFunction[(String, Long), (String, Long), String] {
+
+ override def processElement(
+ left: (String, Long),
+ right: (String, Long),
+ ctx: ProcessJoinFunction[(String, Long), (String, Long), String]#Context,
+ out: Collector[String]): Unit = {
+ out.collect(left + ":" + right)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java
new file mode 100644
index 0000000..7d9fe7b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.UnsupportedTimeCharacteristicException;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Integration tests for interval joins.
+ */
+public class IntervalJoinITCase {
+
+ private static List<String> testResults;
+
+ @Before
+ public void setup() {
+ testResults = new ArrayList<>();
+ }
+
+ @Test
+ public void testCanJoinOverSameKey() throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ KeyedStream<Tuple2<String, Integer>, String> streamOne = env.fromElements(
+ Tuple2.of("key", 0),
+ Tuple2.of("key", 1),
+ Tuple2.of("key", 2),
+ Tuple2.of("key", 3),
+ Tuple2.of("key", 4),
+ Tuple2.of("key", 5)
+ )
+ .assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor())
+ .keyBy(new Tuple2KeyExtractor());
+
+ KeyedStream<Tuple2<String, Integer>, String> streamTwo = env.fromElements(
+ Tuple2.of("key", 0),
+ Tuple2.of("key", 1),
+ Tuple2.of("key", 2),
+ Tuple2.of("key", 3),
+ Tuple2.of("key", 4),
+ Tuple2.of("key", 5)
+ )
+ .assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor())
+ .keyBy(new Tuple2KeyExtractor());
+
+ streamOne
+ .intervalJoin(streamTwo)
+ .between(Time.milliseconds(0), Time.milliseconds(0))
+ .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
+ @Override
+ public void processElement(Tuple2<String, Integer> left,
+ Tuple2<String, Integer> right, Context ctx,
+ Collector<String> out) throws Exception {
+ out.collect(left + ":" + right);
+ }
+ }).addSink(new ResultSink());
+
+ env.execute();
+
+ expectInAnyOrder(
+ "(key,0):(key,0)",
+ "(key,1):(key,1)",
+ "(key,2):(key,2)",
+ "(key,3):(key,3)",
+ "(key,4):(key,4)",
+ "(key,5):(key,5)"
+ );
+ }
+
+ @Test
+ public void testJoinsCorrectlyWithMultipleKeys() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ KeyedStream<Tuple2<String, Integer>, String> streamOne = env.fromElements(
+ Tuple2.of("key1", 0),
+ Tuple2.of("key2", 1),
+ Tuple2.of("key1", 2),
+ Tuple2.of("key2", 3),
+ Tuple2.of("key1", 4),
+ Tuple2.of("key2", 5)
+ )
+ .assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor())
+ .keyBy(new Tuple2KeyExtractor());
+
+ KeyedStream<Tuple2<String, Integer>, String> streamTwo = env.fromElements(
+ Tuple2.of("key1", 0),
+ Tuple2.of("key2", 1),
+ Tuple2.of("key1", 2),
+ Tuple2.of("key2", 3),
+ Tuple2.of("key1", 4),
+ Tuple2.of("key2", 5)
+ )
+ .assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor())
+ .keyBy(new Tuple2KeyExtractor());
+
+ streamOne
+ .intervalJoin(streamTwo)
+ // if it were not keyed then the boundaries [0; 1] would lead to the pairs (1, 1),
+ // (1, 2), (2, 2), (2, 3)..., so that this is not happening is what we are testing here
+ .between(Time.milliseconds(0), Time.milliseconds(1))
+ .process(new CombineToStringJoinFunction())
+ .addSink(new ResultSink());
+
+ env.execute();
+
+ expectInAnyOrder(
+ "(key1,0):(key1,0)",
+ "(key2,1):(key2,1)",
+ "(key1,2):(key1,2)",
+ "(key2,3):(key2,3)",
+ "(key1,4):(key1,4)",
+ "(key2,5):(key2,5)"
+ );
+ }
+
+ @Test
+ public void testBoundedUnorderedStreamsStillJoinCorrectly() throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple2<String, Integer>> streamOne = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+ @Override
+ public void run(SourceContext<Tuple2<String, Integer>> ctx) {
+ ctx.collectWithTimestamp(Tuple2.of("key", 5), 5L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 4), 4L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
+ ctx.emitWatermark(new Watermark(5));
+ ctx.collectWithTimestamp(Tuple2.of("key", 9), 9L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 8), 8L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 7), 7L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 6), 6L);
+ }
+
+ @Override
+ public void cancel() {
+ // do nothing
+ }
+ });
+
+ DataStream<Tuple2<String, Integer>> streamTwo = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+ @Override
+ public void run(SourceContext<Tuple2<String, Integer>> ctx) {
+ ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 4), 4L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 5), 5L);
+ ctx.emitWatermark(new Watermark(5));
+ ctx.collectWithTimestamp(Tuple2.of("key", 8), 8L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 7), 7L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 9), 9L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 6), 6L);
+ }
+
+ @Override
+ public void cancel() {
+ // do nothing
+ }
+ });
+
+ streamOne
+ .keyBy(new Tuple2KeyExtractor())
+ .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+ .between(Time.milliseconds(-1), Time.milliseconds(1))
+ .process(new CombineToStringJoinFunction())
+ .addSink(new ResultSink());
+
+ env.execute();
+
+ expectInAnyOrder(
+ "(key,1):(key,1)",
+ "(key,1):(key,2)",
+
+ "(key,2):(key,1)",
+ "(key,2):(key,2)",
+ "(key,2):(key,3)",
+
+ "(key,3):(key,2)",
+ "(key,3):(key,3)",
+ "(key,3):(key,4)",
+
+ "(key,4):(key,3)",
+ "(key,4):(key,4)",
+ "(key,4):(key,5)",
+
+ "(key,5):(key,4)",
+ "(key,5):(key,5)",
+ "(key,5):(key,6)",
+
+ "(key,6):(key,5)",
+ "(key,6):(key,6)",
+ "(key,6):(key,7)",
+
+ "(key,7):(key,6)",
+ "(key,7):(key,7)",
+ "(key,7):(key,8)",
+
+ "(key,8):(key,7)",
+ "(key,8):(key,8)",
+ "(key,8):(key,9)",
+
+ "(key,9):(key,8)",
+ "(key,9):(key,9)"
+ );
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testFailsWithoutUpperBound() {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
+ DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
+
+ streamOne
+ .keyBy(new Tuple2KeyExtractor())
+ .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+ .between(Time.milliseconds(0), null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testFailsWithoutLowerBound() {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
+ DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
+
+ streamOne
+ .keyBy(new Tuple2KeyExtractor())
+ .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+ .between(null, Time.milliseconds(1));
+ }
+
+ @Test
+ public void testBoundsCanBeExclusive() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
+ Tuple2.of("key", 0),
+ Tuple2.of("key", 1),
+ Tuple2.of("key", 2)
+ ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
+
+ DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
+ Tuple2.of("key", 0),
+ Tuple2.of("key", 1),
+ Tuple2.of("key", 2)
+ ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
+
+ streamOne.keyBy(new Tuple2KeyExtractor())
+ .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+ .between(Time.milliseconds(0), Time.milliseconds(2))
+ .upperBoundExclusive()
+ .lowerBoundExclusive()
+ .process(new CombineToStringJoinFunction())
+ .addSink(new ResultSink());
+
+ env.execute();
+
+ expectInAnyOrder(
+ "(key,0):(key,1)",
+ "(key,1):(key,2)"
+ );
+ }
+
+ @Test
+ public void testBoundsCanBeInclusive() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
+ Tuple2.of("key", 0),
+ Tuple2.of("key", 1),
+ Tuple2.of("key", 2)
+ ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
+
+ DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
+ Tuple2.of("key", 0),
+ Tuple2.of("key", 1),
+ Tuple2.of("key", 2)
+ ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
+
+ streamOne.keyBy(new Tuple2KeyExtractor())
+ .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+ .between(Time.milliseconds(0), Time.milliseconds(2))
+ .process(new CombineToStringJoinFunction())
+ .addSink(new ResultSink());
+
+ env.execute();
+
+ expectInAnyOrder(
+ "(key,0):(key,0)",
+ "(key,0):(key,1)",
+ "(key,0):(key,2)",
+
+ "(key,1):(key,1)",
+ "(key,1):(key,2)",
+
+ "(key,2):(key,2)"
+ );
+ }
+
+ @Test
+ public void testBoundsAreInclusiveByDefault() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
+ Tuple2.of("key", 0),
+ Tuple2.of("key", 1),
+ Tuple2.of("key", 2)
+ ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
+
+ DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
+ Tuple2.of("key", 0),
+ Tuple2.of("key", 1),
+ Tuple2.of("key", 2)
+ ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
+
+ streamOne.keyBy(new Tuple2KeyExtractor())
+ .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+ .between(Time.milliseconds(0), Time.milliseconds(2))
+ .process(new CombineToStringJoinFunction())
+ .addSink(new ResultSink());
+
+ env.execute();
+
+ expectInAnyOrder(
+ "(key,0):(key,0)",
+ "(key,0):(key,1)",
+ "(key,0):(key,2)",
+
+ "(key,1):(key,1)",
+ "(key,1):(key,2)",
+
+ "(key,2):(key,2)"
+ );
+ }
+
+ @Test(expected = UnsupportedTimeCharacteristicException.class)
+ public void testExecutionFailsInProcessingTime() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
+ DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
+
+ streamOne.keyBy(new Tuple2KeyExtractor())
+ .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
+ .between(Time.milliseconds(0), Time.milliseconds(0))
+ .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
+ @Override
+ public void processElement(Tuple2<String, Integer> left,
+ Tuple2<String, Integer> right, Context ctx,
+ Collector<String> out) throws Exception {
+ out.collect(left + ":" + right);
+ }
+ });
+ }
+
+ private static void expectInAnyOrder(String... expected) {
+ List<String> listExpected = Lists.newArrayList(expected);
+ Collections.sort(listExpected);
+ Collections.sort(testResults);
+ Assert.assertEquals(listExpected, testResults);
+ }
+
+ private static class AscendingTuple2TimestampExtractor extends AscendingTimestampExtractor<Tuple2<String, Integer>> {
+ @Override
+ public long extractAscendingTimestamp(Tuple2<String, Integer> element) {
+ return element.f1;
+ }
+ }
+
+ private static class ResultSink implements SinkFunction<String> {
+ @Override
+ public void invoke(String value, Context context) throws Exception {
+ testResults.add(value);
+ }
+ }
+
+ private static class CombineToStringJoinFunction extends ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
+ @Override
+ public void processElement(Tuple2<String, Integer> left,
+ Tuple2<String, Integer> right, Context ctx, Collector<String> out) {
+ out.collect(left + ":" + right);
+ }
+ }
+
+ private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String, Integer>, String> {
+
+ @Override
+ public String getKey(Tuple2<String, Integer> value) throws Exception {
+ return value.f0;
+ }
+ }
+}
[2/3] flink git commit: [FLINK-8480][DataStream] Add APIs for
Interval Joins.
Posted by kk...@apache.org.
[FLINK-8480][DataStream] Add APIs for Interval Joins.
This adds the Java and Scala API for performing an IntervalJoin.
In jave this will look like:
Example:
```java
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});
```
This closes #5482.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42ada8ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42ada8ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42ada8ad
Branch: refs/heads/master
Commit: 42ada8ad9ca28f94d0a0355658330198bbc2b577
Parents: f45b7f7
Author: Florian Schmidt <fl...@icloud.com>
Authored: Mon Jul 9 12:02:24 2018 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Jul 12 21:03:26 2018 +0200
----------------------------------------------------------------------
docs/dev/stream/operators/index.md | 15 +
.../streaming/api/datastream/KeyedStream.java | 184 ++++
.../UnsupportedTimeCharacteristicException.java | 35 +
.../api/functions/co/ProcessJoinFunction.java | 87 ++
.../functions/co/TimeBoundedJoinFunction.java | 87 --
.../api/operators/co/IntervalJoinOperator.java | 513 ++++++++++
.../co/TimeBoundedStreamJoinOperator.java | 513 ----------
.../operators/co/IntervalJoinOperatorTest.java | 941 +++++++++++++++++++
.../co/TimeBoundedStreamJoinOperatorTest.java | 941 -------------------
.../flink/streaming/api/scala/KeyedStream.scala | 106 ++-
.../api/scala/IntervalJoinITCase.scala | 130 +++
.../streaming/runtime/IntervalJoinITCase.java | 451 +++++++++
12 files changed, 2461 insertions(+), 1542 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/docs/dev/stream/operators/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/index.md b/docs/dev/stream/operators/index.md
index 1dbdef4..422dbbf 100644
--- a/docs/dev/stream/operators/index.md
+++ b/docs/dev/stream/operators/index.md
@@ -310,6 +310,21 @@ dataStream.join(otherStream)
</td>
</tr>
<tr>
+ <td><strong>Interval Join</strong><br>KeyedStream,KeyedStream → DataStream</td>
+ <td>
+ <p>Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound</p>
+ {% highlight java %}
+// this will join the two streams so that
+// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
+keyedStream.intervalJoin(otherKeyedStream)
+ .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
+ .upperBoundExclusive(true) // optional
+ .lowerBoundExclusive(true) // optional
+ .process(new IntervalJoinFunction() {...});
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
<td><strong>Window CoGroup</strong><br>DataStream,DataStream → DataStream</td>
<td>
<p>Cogroups two data streams on a given key and a common window.</p>
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index a948ae2..32a5c96 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.query.QueryableAppendingStateOperator;
import org.apache.flink.streaming.api.functions.query.QueryableValueStateOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -51,6 +52,7 @@ import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -76,6 +78,8 @@ import java.util.List;
import java.util.Stack;
import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A {@link KeyedStream} represents a {@link DataStream} on which operator state is
* partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
@@ -396,6 +400,186 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
}
// ------------------------------------------------------------------------
+ // Joining
+ // ------------------------------------------------------------------------
+
+ /**
+ * Join elements of this {@link KeyedStream} with elements of another {@link KeyedStream} over
+ * a time interval that can be specified with {@link IntervalJoin#between(Time, Time)}.
+ *
+ * @param otherStream The other keyed stream to join this keyed stream with
+ * @param <T1> Type parameter of elements in the other stream
+ * @return An instance of {@link IntervalJoin} with this keyed stream and the other keyed stream
+ */
+ @PublicEvolving
+ public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
+ return new IntervalJoin<>(this, otherStream);
+ }
+
+ /**
+ * Perform a join over a time interval.
+ * @param <T1> The type parameter of the elements in the first streams
+ * @param <T2> The The type parameter of the elements in the second stream
+ */
+ @PublicEvolving
+ public static class IntervalJoin<T1, T2, KEY> {
+
+ private final KeyedStream<T1, KEY> streamOne;
+ private final KeyedStream<T2, KEY> streamTwo;
+
+ IntervalJoin(
+ KeyedStream<T1, KEY> streamOne,
+ KeyedStream<T2, KEY> streamTwo
+ ) {
+ this.streamOne = checkNotNull(streamOne);
+ this.streamTwo = checkNotNull(streamTwo);
+ }
+
+ /**
+ * Specifies the time boundaries over which the join operation works, so that
+ * <pre>leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound</pre>
+ * By default both the lower and the upper bound are inclusive. This can be configured
+ * with {@link IntervalJoined#lowerBoundExclusive()} and
+ * {@link IntervalJoined#upperBoundExclusive()}
+ *
+ * @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound
+ * @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound
+ */
+ @PublicEvolving
+ public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {
+
+ TimeCharacteristic timeCharacteristic =
+ streamOne.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+ if (timeCharacteristic != TimeCharacteristic.EventTime) {
+ throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
+ }
+
+ checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
+ checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");
+
+ return new IntervalJoined<>(
+ streamOne,
+ streamTwo,
+ lowerBound.toMilliseconds(),
+ upperBound.toMilliseconds(),
+ true,
+ true
+ );
+ }
+ }
+
+ /**
+ * IntervalJoined is a container for two streams that have keys for both sides as well as
+ * the time boundaries over which elements should be joined.
+ *
+ * @param <IN1> Input type of elements from the first stream
+ * @param <IN2> Input type of elements from the second stream
+ * @param <KEY> The type of the key
+ */
+ @PublicEvolving
+ public static class IntervalJoined<IN1, IN2, KEY> {
+
+ private static final String INTERVAL_JOIN_FUNC_NAME = "IntervalJoin";
+
+ private final KeyedStream<IN1, KEY> left;
+ private final KeyedStream<IN2, KEY> right;
+
+ private final long lowerBound;
+ private final long upperBound;
+
+ private final KeySelector<IN1, KEY> keySelector1;
+ private final KeySelector<IN2, KEY> keySelector2;
+
+ private boolean lowerBoundInclusive;
+ private boolean upperBoundInclusive;
+
+ public IntervalJoined(
+ KeyedStream<IN1, KEY> left,
+ KeyedStream<IN2, KEY> right,
+ long lowerBound,
+ long upperBound,
+ boolean lowerBoundInclusive,
+ boolean upperBoundInclusive) {
+
+ this.left = checkNotNull(left);
+ this.right = checkNotNull(right);
+
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+
+ this.lowerBoundInclusive = lowerBoundInclusive;
+ this.upperBoundInclusive = upperBoundInclusive;
+
+ this.keySelector1 = left.getKeySelector();
+ this.keySelector2 = right.getKeySelector();
+ }
+
+ /**
+ * Set the upper bound to be exclusive.
+ */
+ @PublicEvolving
+ public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
+ this.upperBoundInclusive = false;
+ return this;
+ }
+
+ /**
+ * Set the lower bound to be exclusive.
+ */
+ @PublicEvolving
+ public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
+ this.lowerBoundInclusive = false;
+ return this;
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed for each joined pair
+ * of elements.
+ * @param udf The user-defined function
+ * @param <OUT> The output type
+ * @return Returns a DataStream
+ */
+ @PublicEvolving
+ public <OUT> DataStream<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> udf) {
+
+ ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(udf);
+
+ TypeInformation<OUT> resultType = TypeExtractor.getBinaryOperatorReturnType(
+ cleanedUdf,
+ ProcessJoinFunction.class, // ProcessJoinFunction<IN1, IN2, OUT>
+ 0, // 0 1 2
+ 1,
+ 2,
+ new int[]{0}, // lambda input 1 type arg indices
+ new int[]{1}, // lambda input 1 type arg indices
+ TypeExtractor.NO_INDEX, // output arg indices
+ left.getType(), // input 1 type information
+ right.getType(), // input 2 type information
+ INTERVAL_JOIN_FUNC_NAME ,
+ false
+ );
+
+ IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
+ new IntervalJoinOperator<>(
+ lowerBound,
+ upperBound,
+ lowerBoundInclusive,
+ upperBoundInclusive,
+ left.getType().createSerializer(left.getExecutionConfig()),
+ right.getType().createSerializer(right.getExecutionConfig()),
+ cleanedUdf
+ );
+
+ return left
+ .connect(right)
+ .keyBy(keySelector1, keySelector2)
+ .transform(INTERVAL_JOIN_FUNC_NAME , resultType, operator);
+
+ }
+ }
+
+ // ------------------------------------------------------------------------
// Windowing
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java
new file mode 100644
index 0000000..cb2570a
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.FlinkRuntimeException;
+
+/**
+ * An exception that indicates that a time characteristic was used that is not supported in the
+ * current operation.
+ */
+@PublicEvolving
+public class UnsupportedTimeCharacteristicException extends FlinkRuntimeException {
+
+ private static final long serialVersionUID = -8109094930338075819L;
+
+ public UnsupportedTimeCharacteristicException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
new file mode 100644
index 0000000..2c39abc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A function that processes two joined elements and produces a single output one.
+ *
+ * <p>This function will get called for every joined pair of elements the joined two streams.
+ * The timestamp of the joined pair as well as the timestamp of the left element and the right
+ * element can be accessed through the {@link Context}.
+ *
+ * @param <IN1> Type of the first input
+ * @param <IN2> Type of the second input
+ * @param <OUT> Type of the output
+ */
+@PublicEvolving
+public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {
+
+ private static final long serialVersionUID = -2444626938039012398L;
+
+ /**
+ * This method is called for each joined pair of elements. It can output zero or more elements
+ * through the provided {@link Collector} and has access to the timestamps of the joined elements
+ * and the result through the {@link Context}.
+ *
+ * @param left The left element of the joined pair.
+ * @param right The right element of the joined pair.
+ * @param ctx A context that allows querying the timestamps of the left, right and
+ * joined pair. In addition, this context allows to emit elements on a side output.
+ * @param out The collector to emit resulting elements to.
+ * @throws Exception This function may throw exceptions which cause the streaming program to
+ * fail and go in recovery mode.
+ */
+ public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;
+
+ /**
+ * The context that is available during an invocation of
+ * {@link #processElement(Object, Object, Context, Collector)}. It gives access to the timestamps of the
+ * left element in the joined pair, the right one, and that of the joined pair. In addition, this context
+ * allows to emit elements on a side output.
+ */
+ public abstract class Context {
+
+ /**
+ * @return The timestamp of the left element of a joined pair
+ */
+ public abstract long getLeftTimestamp();
+
+ /**
+ * @return The timestamp of the right element of a joined pair
+ */
+ public abstract long getRightTimestamp();
+
+ /**
+ * @return The timestamp of the joined pair.
+ */
+ public abstract long getTimestamp();
+
+ /**
+ * Emits a record to the side output identified by the {@link OutputTag}.
+ * @param outputTag The output tag that identifies the side output to emit to
+ * @param value The record to emit
+ */
+ public abstract <X> void output(OutputTag<X> outputTag, X value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
deleted file mode 100644
index cd745ca..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-
-/**
- * A function that processes two joined elements and produces a single output one.
- *
- * <p>This function will get called for every joined pair of elements the joined two streams.
- * The timestamp of the joined pair as well as the timestamp of the left element and the right
- * element can be accessed through the {@link Context}.
- *
- * @param <IN1> Type of the first input
- * @param <IN2> Type of the second input
- * @param <OUT> Type of the output
- */
-@PublicEvolving
-public abstract class TimeBoundedJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {
-
- private static final long serialVersionUID = -2444626938039012398L;
-
- /**
- * This method is called for each joined pair of elements. It can output zero or more elements
- * through the provided {@link Collector} and has access to the timestamps of the joined elements
- * and the result through the {@link Context}.
- *
- * @param left The left element of the joined pair.
- * @param right The right element of the joined pair.
- * @param ctx A context that allows querying the timestamps of the left, right and
- * joined pair. In addition, this context allows to emit elements on a side output.
- * @param out The collector to emit resulting elements to.
- * @throws Exception This function may throw exceptions which cause the streaming program to
- * fail and go in recovery mode.
- */
- public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;
-
- /**
- * The context that is available during an invocation of
- * {@link #processElement(Object, Object, Context, Collector)}. It gives access to the timestamps of the
- * left element in the joined pair, the right one, and that of the joined pair. In addition, this context
- * allows to emit elements on a side output.
- */
- public abstract class Context {
-
- /**
- * @return The timestamp of the left element of a joined pair
- */
- public abstract long getLeftTimestamp();
-
- /**
- * @return The timestamp of the right element of a joined pair
- */
- public abstract long getRightTimestamp();
-
- /**
- * @return The timestamp of the joined pair.
- */
- public abstract long getTimestamp();
-
- /**
- * Emits a record to the side output identified by the {@link OutputTag}.
- * @param outputTag The output tag that identifies the side output to emit to
- * @param value The record to emit
- */
- public abstract <X> void output(OutputTag<X> outputTag, X value);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
new file mode 100644
index 0000000..0c449e6
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * An {@link TwoInputStreamOperator operator} to execute time-bounded stream inner joins.
+ *
+ * <p>By using a configurable lower and upper bound this operator will emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * <p>As soon as elements are joined they are passed to a user-defined {@link ProcessJoinFunction}.
+ *
+ * <p>The basic idea of this implementation is as follows: Whenever we receive an element at
+ * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to the left buffer.
+ * We then check the right buffer to see whether there are any elements that can be joined. If
+ * there are, they are joined and passed to the aforementioned function. The same happens the
+ * other way around when receiving an element on the right side.
+ *
+ * <p>Whenever a pair of elements is emitted it will be assigned the max timestamp of either of
+ * the elements.
+ *
+ * <p>In order to avoid the element buffers to grow indefinitely a cleanup timer is registered
+ * per element. This timer indicates when an element is not considered for joining anymore and can
+ * be removed from the state.
+ *
+ * @param <K> The type of the key based on which we join elements.
+ * @param <T1> The type of the elements in the left stream.
+ * @param <T2> The type of the elements in the right stream.
+ * @param <OUT> The output type created by the user-defined function.
+ */
+@Internal
+public class IntervalJoinOperator<K, T1, T2, OUT>
+ extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
+ implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
+
+ private static final long serialVersionUID = -5380774605111543454L;
+
+ private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class);
+
+ private static final String LEFT_BUFFER = "LEFT_BUFFER";
+ private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+ private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
+ private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
+ private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
+
+ private final long lowerBound;
+ private final long upperBound;
+
+ private final TypeSerializer<T1> leftTypeSerializer;
+ private final TypeSerializer<T2> rightTypeSerializer;
+
+ private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
+ private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
+
+ private transient TimestampedCollector<OUT> collector;
+ private transient ContextImpl context;
+
+ private transient InternalTimerService<String> internalTimerService;
+
+ /**
+ * Creates a new IntervalJoinOperator.
+ *
+ * @param lowerBound The lower bound for evaluating if elements should be joined
+ * @param upperBound The upper bound for evaluating if elements should be joined
+ * @param lowerBoundInclusive Whether or not to include elements where the timestamp matches
+ * the lower bound
+ * @param upperBoundInclusive Whether or not to include elements where the timestamp matches
+ * the upper bound
+ * @param udf A user-defined {@link ProcessJoinFunction} that gets called
+ * whenever two elements of T1 and T2 are joined
+ */
+ public IntervalJoinOperator(
+ long lowerBound,
+ long upperBound,
+ boolean lowerBoundInclusive,
+ boolean upperBoundInclusive,
+ TypeSerializer<T1> leftTypeSerializer,
+ TypeSerializer<T2> rightTypeSerializer,
+ ProcessJoinFunction<T1, T2, OUT> udf) {
+
+ super(Preconditions.checkNotNull(udf));
+
+ Preconditions.checkArgument(lowerBound <= upperBound,
+ "lowerBound <= upperBound must be fulfilled");
+
+ // Move buffer by +1 / -1 depending on inclusiveness in order not needing
+ // to check for inclusiveness later on
+ this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
+ this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;
+
+ this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
+ this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ collector = new TimestampedCollector<>(output);
+ context = new ContextImpl(userFunction);
+ internalTimerService =
+ getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ super.initializeState(context);
+
+ this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
+ LEFT_BUFFER,
+ LongSerializer.INSTANCE,
+ new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
+ ));
+
+ this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
+ RIGHT_BUFFER,
+ LongSerializer.INSTANCE,
+ new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
+ ));
+ }
+
+ /**
+ * Process a {@link StreamRecord} from the left stream. Whenever an {@link StreamRecord}
+ * arrives at the left stream, it will get added to the left buffer. Possible join candidates
+ * for that element will be looked up from the right buffer and if the pair lies within the
+ * user defined boundaries, it gets passed to the {@link ProcessJoinFunction}.
+ *
+ * @param record An incoming record to be joined
+ * @throws Exception Can throw an Exception during state access
+ */
+ @Override
+ public void processElement1(StreamRecord<T1> record) throws Exception {
+ processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
+ }
+
+ /**
+ * Process a {@link StreamRecord} from the right stream. Whenever a {@link StreamRecord}
+ * arrives at the right stream, it will get added to the right buffer. Possible join candidates
+ * for that element will be looked up from the left buffer and if the pair lies within the user
+ * defined boundaries, it gets passed to the {@link ProcessJoinFunction}.
+ *
+ * @param record An incoming record to be joined
+ * @throws Exception Can throw an exception during state access
+ */
+ @Override
+ public void processElement2(StreamRecord<T2> record) throws Exception {
+ processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <OUR, OTHER> void processElement(
+ StreamRecord<OUR> record,
+ MapState<Long, List<BufferEntry<OUR>>> ourBuffer,
+ MapState<Long, List<BufferEntry<OTHER>>> otherBuffer,
+ long relativeLowerBound,
+ long relativeUpperBound,
+ boolean isLeft) throws Exception {
+
+ final OUR ourValue = record.getValue();
+ final long ourTimestamp = record.getTimestamp();
+
+ if (ourTimestamp == Long.MIN_VALUE) {
+ throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
+ "interval stream joins need to have timestamps meaningful timestamps.");
+ }
+
+ if (isLate(ourTimestamp)) {
+ return;
+ }
+
+ addToBuffer(ourBuffer, ourValue, ourTimestamp);
+
+ for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
+ final long timestamp = bucket.getKey();
+
+ if (timestamp < ourTimestamp + relativeLowerBound ||
+ timestamp > ourTimestamp + relativeUpperBound) {
+ continue;
+ }
+
+ for (BufferEntry<OTHER> entry: bucket.getValue()) {
+ if (isLeft) {
+ collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
+ } else {
+ collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
+ }
+ }
+ }
+
+ long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
+ if (isLeft) {
+ internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
+ } else {
+ internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
+ }
+ }
+
+ private boolean isLate(long timestamp) {
+ long currentWatermark = internalTimerService.currentWatermark();
+ return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
+ }
+
+ private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
+ long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
+ collector.setAbsoluteTimestamp(resultTimestamp);
+ context.leftTimestamp = leftTimestamp;
+ context.rightTimestamp = rightTimestamp;
+ userFunction.processElement(left, right, context, collector);
+ }
+
+ private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception {
+ List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
+ if (elemsInBucket == null) {
+ elemsInBucket = new ArrayList<>();
+ }
+ elemsInBucket.add(new BufferEntry<>(value, false));
+ buffer.put(timestamp, elemsInBucket);
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<K, String> timer) throws Exception {
+
+ long timerTimestamp = timer.getTimestamp();
+ String namespace = timer.getNamespace();
+
+ logger.trace("onEventTime @ {}", timerTimestamp);
+
+ switch (namespace) {
+ case CLEANUP_NAMESPACE_LEFT: {
+ long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
+ logger.trace("Removing from left buffer @ {}", timestamp);
+ leftBuffer.remove(timestamp);
+ break;
+ }
+ case CLEANUP_NAMESPACE_RIGHT: {
+ long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
+ logger.trace("Removing from right buffer @ {}", timestamp);
+ rightBuffer.remove(timestamp);
+ break;
+ }
+ default:
+ throw new RuntimeException("Invalid namespace " + namespace);
+ }
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
+ // do nothing.
+ }
+
+ /**
+ * The context that is available during an invocation of
+ * {@link ProcessJoinFunction#processElement(Object, Object, ProcessJoinFunction.Context, Collector)}.
+ *
+ * <p>It gives access to the timestamps of the left element in the joined pair, the right one, and that of
+ * the joined pair. In addition, this context allows to emit elements on a side output.
+ */
+ private final class ContextImpl extends ProcessJoinFunction<T1, T2, OUT>.Context {
+
+ private long leftTimestamp = Long.MIN_VALUE;
+
+ private long rightTimestamp = Long.MIN_VALUE;
+
+ private ContextImpl(ProcessJoinFunction<T1, T2, OUT> func) {
+ func.super();
+ }
+
+ @Override
+ public long getLeftTimestamp() {
+ return leftTimestamp;
+ }
+
+ @Override
+ public long getRightTimestamp() {
+ return rightTimestamp;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return leftTimestamp;
+ }
+
+ @Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ Preconditions.checkArgument(outputTag != null, "OutputTag must not be null");
+ output.collect(outputTag, new StreamRecord<>(value, getTimestamp()));
+ }
+ }
+
+ /**
+ * A container for elements put in the left/write buffer.
+ * This will contain the element itself along with a flag indicating
+ * if it has been joined or not.
+ */
+ private static class BufferEntry<T> {
+
+ private final T element;
+ private final boolean hasBeenJoined;
+
+ BufferEntry(T element, boolean hasBeenJoined) {
+ this.element = element;
+ this.hasBeenJoined = hasBeenJoined;
+ }
+ }
+
+ /**
+ * A {@link TypeSerializer serializer} for the {@link BufferEntry}.
+ */
+ private static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
+
+ private static final long serialVersionUID = -20197698803836236L;
+
+ private final TypeSerializer<T> elementSerializer;
+
+ private BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
+ this.elementSerializer = Preconditions.checkNotNull(elementSerializer);
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public TypeSerializer<BufferEntry<T>> duplicate() {
+ return new BufferEntrySerializer<>(elementSerializer.duplicate());
+ }
+
+ @Override
+ public BufferEntry<T> createInstance() {
+ return null;
+ }
+
+ @Override
+ public BufferEntry<T> copy(BufferEntry<T> from) {
+ return new BufferEntry<>(from.element, from.hasBeenJoined);
+ }
+
+ @Override
+ public BufferEntry<T> copy(BufferEntry<T> from, BufferEntry<T> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(BufferEntry<T> record, DataOutputView target) throws IOException {
+ target.writeBoolean(record.hasBeenJoined);
+ elementSerializer.serialize(record.element, target);
+ }
+
+ @Override
+ public BufferEntry<T> deserialize(DataInputView source) throws IOException {
+ boolean hasBeenJoined = source.readBoolean();
+ T element = elementSerializer.deserialize(source);
+ return new BufferEntry<>(element, hasBeenJoined);
+ }
+
+ @Override
+ public BufferEntry<T> deserialize(BufferEntry<T> reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.writeBoolean(source.readBoolean());
+ elementSerializer.copy(source, target);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BufferEntrySerializer<?> that = (BufferEntrySerializer<?>) o;
+ return Objects.equals(elementSerializer, that.elementSerializer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(elementSerializer);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj.getClass().equals(BufferEntrySerializer.class);
+ }
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ return new BufferSerializerConfigSnapshot<>(elementSerializer);
+ }
+
+ @Override
+ public CompatibilityResult<BufferEntry<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof BufferSerializerConfigSnapshot) {
+ Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousSerializerAndConfig =
+ ((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
+
+ CompatibilityResult<T> compatResult =
+ CompatibilityUtil.resolveCompatibilityResult(
+ previousSerializerAndConfig.f0,
+ UnloadableDummyTypeSerializer.class,
+ previousSerializerAndConfig.f1,
+ elementSerializer);
+
+ if (!compatResult.isRequiresMigration()) {
+ return CompatibilityResult.compatible();
+ } else if (compatResult.getConvertDeserializer() != null) {
+ return CompatibilityResult.requiresMigration(
+ new BufferEntrySerializer<>(
+ new TypeDeserializerAdapter<>(
+ compatResult.getConvertDeserializer())));
+ }
+ }
+ return CompatibilityResult.requiresMigration();
+ }
+ }
+
+ /**
+ * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer.
+ */
+ public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+
+ private static final int VERSION = 1;
+
+ public BufferSerializerConfigSnapshot() {
+ }
+
+ public BufferSerializerConfigSnapshot(final TypeSerializer<T> userTypeSerializer) {
+ super(userTypeSerializer);
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+ }
+
+ @VisibleForTesting
+ MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() {
+ return leftBuffer;
+ }
+
+ @VisibleForTesting
+ MapState<Long, List<BufferEntry<T2>>> getRightBuffer() {
+ return rightBuffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
deleted file mode 100644
index 26ad26b..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.streaming.api.functions.co.TimeBoundedJoinFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.Triggerable;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.OutputTag;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * An {@link TwoInputStreamOperator operator} to execute time-bounded stream inner joins.
- *
- * <p>By using a configurable lower and upper bound this operator will emit exactly those pairs
- * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the
- * upper bound can be configured to be either inclusive or exclusive.
- *
- * <p>As soon as elements are joined they are passed to a user-defined {@link TimeBoundedJoinFunction}.
- *
- * <p>The basic idea of this implementation is as follows: Whenever we receive an element at
- * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to the left buffer.
- * We then check the right buffer to see whether there are any elements that can be joined. If
- * there are, they are joined and passed to the aforementioned function. The same happens the
- * other way around when receiving an element on the right side.
- *
- * <p>Whenever a pair of elements is emitted it will be assigned the max timestamp of either of
- * the elements.
- *
- * <p>In order to avoid the element buffers to grow indefinitely a cleanup timer is registered
- * per element. This timer indicates when an element is not considered for joining anymore and can
- * be removed from the state.
- *
- * @param <K> The type of the key based on which we join elements.
- * @param <T1> The type of the elements in the left stream.
- * @param <T2> The type of the elements in the right stream.
- * @param <OUT> The output type created by the user-defined function.
- */
-@Internal
-public class TimeBoundedStreamJoinOperator<K, T1, T2, OUT>
- extends AbstractUdfStreamOperator<OUT, TimeBoundedJoinFunction<T1, T2, OUT>>
- implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
-
- private static final long serialVersionUID = -5380774605111543454L;
-
- private static final Logger logger = LoggerFactory.getLogger(TimeBoundedStreamJoinOperator.class);
-
- private static final String LEFT_BUFFER = "LEFT_BUFFER";
- private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
- private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
- private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
- private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
-
- private final long lowerBound;
- private final long upperBound;
-
- private final TypeSerializer<T1> leftTypeSerializer;
- private final TypeSerializer<T2> rightTypeSerializer;
-
- private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
- private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
-
- private transient TimestampedCollector<OUT> collector;
- private transient ContextImpl context;
-
- private transient InternalTimerService<String> internalTimerService;
-
- /**
- * Creates a new TimeBoundedStreamJoinOperator.
- *
- * @param lowerBound The lower bound for evaluating if elements should be joined
- * @param upperBound The upper bound for evaluating if elements should be joined
- * @param lowerBoundInclusive Whether or not to include elements where the timestamp matches
- * the lower bound
- * @param upperBoundInclusive Whether or not to include elements where the timestamp matches
- * the upper bound
- * @param udf A user-defined {@link TimeBoundedJoinFunction} that gets called
- * whenever two elements of T1 and T2 are joined
- */
- public TimeBoundedStreamJoinOperator(
- long lowerBound,
- long upperBound,
- boolean lowerBoundInclusive,
- boolean upperBoundInclusive,
- TypeSerializer<T1> leftTypeSerializer,
- TypeSerializer<T2> rightTypeSerializer,
- TimeBoundedJoinFunction<T1, T2, OUT> udf) {
-
- super(Preconditions.checkNotNull(udf));
-
- Preconditions.checkArgument(lowerBound <= upperBound,
- "lowerBound <= upperBound must be fulfilled");
-
- // Move buffer by +1 / -1 depending on inclusiveness in order not needing
- // to check for inclusiveness later on
- this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
- this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;
-
- this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
- this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- collector = new TimestampedCollector<>(output);
- context = new ContextImpl(userFunction);
- internalTimerService =
- getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
- }
-
- @Override
- public void initializeState(StateInitializationContext context) throws Exception {
- super.initializeState(context);
-
- this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
- LEFT_BUFFER,
- LongSerializer.INSTANCE,
- new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
- ));
-
- this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
- RIGHT_BUFFER,
- LongSerializer.INSTANCE,
- new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
- ));
- }
-
- /**
- * Process a {@link StreamRecord} from the left stream. Whenever an {@link StreamRecord}
- * arrives at the left stream, it will get added to the left buffer. Possible join candidates
- * for that element will be looked up from the right buffer and if the pair lies within the
- * user defined boundaries, it gets passed to the {@link TimeBoundedJoinFunction}.
- *
- * @param record An incoming record to be joined
- * @throws Exception Can throw an Exception during state access
- */
- @Override
- public void processElement1(StreamRecord<T1> record) throws Exception {
- processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
- }
-
- /**
- * Process a {@link StreamRecord} from the right stream. Whenever a {@link StreamRecord}
- * arrives at the right stream, it will get added to the right buffer. Possible join candidates
- * for that element will be looked up from the left buffer and if the pair lies within the user
- * defined boundaries, it gets passed to the {@link TimeBoundedJoinFunction}.
- *
- * @param record An incoming record to be joined
- * @throws Exception Can throw an exception during state access
- */
- @Override
- public void processElement2(StreamRecord<T2> record) throws Exception {
- processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
- }
-
- @SuppressWarnings("unchecked")
- private <OUR, OTHER> void processElement(
- StreamRecord<OUR> record,
- MapState<Long, List<BufferEntry<OUR>>> ourBuffer,
- MapState<Long, List<BufferEntry<OTHER>>> otherBuffer,
- long relativeLowerBound,
- long relativeUpperBound,
- boolean isLeft) throws Exception {
-
- final OUR ourValue = record.getValue();
- final long ourTimestamp = record.getTimestamp();
-
- if (ourTimestamp == Long.MIN_VALUE) {
- throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
- "interval stream joins need to have timestamps meaningful timestamps.");
- }
-
- if (isLate(ourTimestamp)) {
- return;
- }
-
- addToBuffer(ourBuffer, ourValue, ourTimestamp);
-
- for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
- final long timestamp = bucket.getKey();
-
- if (timestamp < ourTimestamp + relativeLowerBound ||
- timestamp > ourTimestamp + relativeUpperBound) {
- continue;
- }
-
- for (BufferEntry<OTHER> entry: bucket.getValue()) {
- if (isLeft) {
- collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
- } else {
- collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
- }
- }
- }
-
- long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
- if (isLeft) {
- internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
- } else {
- internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
- }
- }
-
- private boolean isLate(long timestamp) {
- long currentWatermark = internalTimerService.currentWatermark();
- return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
- }
-
- private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
- long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
- collector.setAbsoluteTimestamp(resultTimestamp);
- context.leftTimestamp = leftTimestamp;
- context.rightTimestamp = rightTimestamp;
- userFunction.processElement(left, right, context, collector);
- }
-
- private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception {
- List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
- if (elemsInBucket == null) {
- elemsInBucket = new ArrayList<>();
- }
- elemsInBucket.add(new BufferEntry<>(value, false));
- buffer.put(timestamp, elemsInBucket);
- }
-
- @Override
- public void onEventTime(InternalTimer<K, String> timer) throws Exception {
-
- long timerTimestamp = timer.getTimestamp();
- String namespace = timer.getNamespace();
-
- logger.trace("onEventTime @ {}", timerTimestamp);
-
- switch (namespace) {
- case CLEANUP_NAMESPACE_LEFT: {
- long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
- logger.trace("Removing from left buffer @ {}", timestamp);
- leftBuffer.remove(timestamp);
- break;
- }
- case CLEANUP_NAMESPACE_RIGHT: {
- long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
- logger.trace("Removing from right buffer @ {}", timestamp);
- rightBuffer.remove(timestamp);
- break;
- }
- default:
- throw new RuntimeException("Invalid namespace " + namespace);
- }
- }
-
- @Override
- public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
- // do nothing.
- }
-
- /**
- * The context that is available during an invocation of
- * {@link TimeBoundedJoinFunction#processElement(Object, Object, TimeBoundedJoinFunction.Context, Collector)}.
- *
- * <p>It gives access to the timestamps of the left element in the joined pair, the right one, and that of
- * the joined pair. In addition, this context allows to emit elements on a side output.
- */
- private final class ContextImpl extends TimeBoundedJoinFunction<T1, T2, OUT>.Context {
-
- private long leftTimestamp = Long.MIN_VALUE;
-
- private long rightTimestamp = Long.MIN_VALUE;
-
- private ContextImpl(TimeBoundedJoinFunction<T1, T2, OUT> func) {
- func.super();
- }
-
- @Override
- public long getLeftTimestamp() {
- return leftTimestamp;
- }
-
- @Override
- public long getRightTimestamp() {
- return rightTimestamp;
- }
-
- @Override
- public long getTimestamp() {
- return leftTimestamp;
- }
-
- @Override
- public <X> void output(OutputTag<X> outputTag, X value) {
- Preconditions.checkArgument(outputTag != null, "OutputTag must not be null");
- output.collect(outputTag, new StreamRecord<>(value, getTimestamp()));
- }
- }
-
- /**
- * A container for elements put in the left/write buffer.
- * This will contain the element itself along with a flag indicating
- * if it has been joined or not.
- */
- private static class BufferEntry<T> {
-
- private final T element;
- private final boolean hasBeenJoined;
-
- BufferEntry(T element, boolean hasBeenJoined) {
- this.element = element;
- this.hasBeenJoined = hasBeenJoined;
- }
- }
-
- /**
- * A {@link TypeSerializer serializer} for the {@link BufferEntry}.
- */
- private static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
-
- private static final long serialVersionUID = -20197698803836236L;
-
- private final TypeSerializer<T> elementSerializer;
-
- private BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
- this.elementSerializer = Preconditions.checkNotNull(elementSerializer);
- }
-
- @Override
- public boolean isImmutableType() {
- return true;
- }
-
- @Override
- public TypeSerializer<BufferEntry<T>> duplicate() {
- return new BufferEntrySerializer<>(elementSerializer.duplicate());
- }
-
- @Override
- public BufferEntry<T> createInstance() {
- return null;
- }
-
- @Override
- public BufferEntry<T> copy(BufferEntry<T> from) {
- return new BufferEntry<>(from.element, from.hasBeenJoined);
- }
-
- @Override
- public BufferEntry<T> copy(BufferEntry<T> from, BufferEntry<T> reuse) {
- return copy(from);
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(BufferEntry<T> record, DataOutputView target) throws IOException {
- target.writeBoolean(record.hasBeenJoined);
- elementSerializer.serialize(record.element, target);
- }
-
- @Override
- public BufferEntry<T> deserialize(DataInputView source) throws IOException {
- boolean hasBeenJoined = source.readBoolean();
- T element = elementSerializer.deserialize(source);
- return new BufferEntry<>(element, hasBeenJoined);
- }
-
- @Override
- public BufferEntry<T> deserialize(BufferEntry<T> reuse, DataInputView source) throws IOException {
- return deserialize(source);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- target.writeBoolean(source.readBoolean());
- elementSerializer.copy(source, target);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- BufferEntrySerializer<?> that = (BufferEntrySerializer<?>) o;
- return Objects.equals(elementSerializer, that.elementSerializer);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(elementSerializer);
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj.getClass().equals(BufferEntrySerializer.class);
- }
-
- @Override
- public TypeSerializerConfigSnapshot snapshotConfiguration() {
- return new BufferSerializerConfigSnapshot<>(elementSerializer);
- }
-
- @Override
- public CompatibilityResult<BufferEntry<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- if (configSnapshot instanceof BufferSerializerConfigSnapshot) {
- Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousSerializerAndConfig =
- ((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
-
- CompatibilityResult<T> compatResult =
- CompatibilityUtil.resolveCompatibilityResult(
- previousSerializerAndConfig.f0,
- UnloadableDummyTypeSerializer.class,
- previousSerializerAndConfig.f1,
- elementSerializer);
-
- if (!compatResult.isRequiresMigration()) {
- return CompatibilityResult.compatible();
- } else if (compatResult.getConvertDeserializer() != null) {
- return CompatibilityResult.requiresMigration(
- new BufferEntrySerializer<>(
- new TypeDeserializerAdapter<>(
- compatResult.getConvertDeserializer())));
- }
- }
- return CompatibilityResult.requiresMigration();
- }
- }
-
- /**
- * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer.
- */
- public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
-
- private static final int VERSION = 1;
-
- public BufferSerializerConfigSnapshot() {
- }
-
- public BufferSerializerConfigSnapshot(final TypeSerializer<T> userTypeSerializer) {
- super(userTypeSerializer);
- }
-
- @Override
- public int getVersion() {
- return VERSION;
- }
- }
-
- @VisibleForTesting
- MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() {
- return leftBuffer;
- }
-
- @VisibleForTesting
- MapState<Long, List<BufferEntry<T2>>> getRightBuffer() {
- return rightBuffer;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
new file mode 100644
index 0000000..ee3f4d8
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
@@ -0,0 +1,941 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+
+/**
+ * Tests for {@link IntervalJoinOperator}.
+ * Those tests cover correctness and cleaning of state
+ */
+@RunWith(Parameterized.class)
+public class IntervalJoinOperatorTest {
+
+ private final boolean lhsFasterThanRhs;
+
+ @Parameters(name = "lhs faster than rhs: {0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {true}, {false}
+ });
+ }
+
+ public IntervalJoinOperatorTest(boolean lhsFasterThanRhs) {
+ this.lhsFasterThanRhs = lhsFasterThanRhs;
+ }
+
+ @Test
+ public void testImplementationMirrorsCorrectly() throws Exception {
+
+ long lowerBound = 1;
+ long upperBound = 3;
+
+ boolean lowerBoundInclusive = true;
+ boolean upperBoundInclusive = false;
+
+ setupHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 2),
+ streamRecordOf(1, 3),
+ streamRecordOf(2, 3),
+ streamRecordOf(2, 4),
+ streamRecordOf(3, 4))
+ .noLateRecords()
+ .close();
+
+ setupHarness(-1 * upperBound, upperBoundInclusive, -1 * lowerBound, lowerBoundInclusive)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(2, 1),
+ streamRecordOf(3, 1),
+ streamRecordOf(3, 2),
+ streamRecordOf(4, 2),
+ streamRecordOf(4, 3))
+ .noLateRecords()
+ .close();
+ }
+
+ @Test // lhs - 2 <= rhs <= rhs + 2
+ public void testNegativeInclusiveAndNegativeInclusive() throws Exception {
+
+ setupHarness(-2, true, -1, true)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(2, 1),
+ streamRecordOf(3, 1),
+ streamRecordOf(3, 2),
+ streamRecordOf(4, 2),
+ streamRecordOf(4, 3)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test // lhs - 1 <= rhs <= rhs + 1
+ public void testNegativeInclusiveAndPositiveInclusive() throws Exception {
+
+ setupHarness(-1, true, 1, true)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 1),
+ streamRecordOf(1, 2),
+ streamRecordOf(2, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(2, 3),
+ streamRecordOf(3, 2),
+ streamRecordOf(3, 3),
+ streamRecordOf(3, 4),
+ streamRecordOf(4, 3),
+ streamRecordOf(4, 4)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test // lhs + 1 <= rhs <= lhs + 2
+ public void testPositiveInclusiveAndPositiveInclusive() throws Exception {
+
+ setupHarness(1, true, 2, true)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 2),
+ streamRecordOf(1, 3),
+ streamRecordOf(2, 3),
+ streamRecordOf(2, 4),
+ streamRecordOf(3, 4)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test
+ public void testNegativeExclusiveAndNegativeExlusive() throws Exception {
+
+ setupHarness(-3, false, -1, false)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(3, 1),
+ streamRecordOf(4, 2)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test
+ public void testNegativeExclusiveAndPositiveExlusive() throws Exception {
+
+ setupHarness(-1, false, 1, false)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(3, 3),
+ streamRecordOf(4, 4)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test
+ public void testPositiveExclusiveAndPositiveExlusive() throws Exception {
+
+ setupHarness(1, false, 3, false)
+ .processElementsAndWatermarks(1, 4)
+ .andExpect(
+ streamRecordOf(1, 3),
+ streamRecordOf(2, 4)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ @Test
+ public void testStateCleanupNegativeInclusiveNegativeInclusive() throws Exception {
+
+ setupHarness(-1, true, 0, true)
+ .processElement1(1)
+ .processElement1(2)
+ .processElement1(3)
+ .processElement1(4)
+ .processElement1(5)
+
+ .processElement2(1)
+ .processElement2(2)
+ .processElement2(3)
+ .processElement2(4)
+ .processElement2(5) // fill both buffers with values
+
+ .processWatermark1(1)
+ .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+ .assertLeftBufferContainsOnly(2, 3, 4, 5)
+ .assertRightBufferContainsOnly(1, 2, 3, 4, 5)
+
+ .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+ .processWatermark2(4)
+
+ .assertLeftBufferContainsOnly(5)
+ .assertRightBufferContainsOnly(4, 5)
+
+ .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+ .processWatermark2(6)
+
+ .assertLeftBufferEmpty()
+ .assertRightBufferEmpty()
+
+ .close();
+ }
+
+ @Test
+ public void testStateCleanupNegativePositiveNegativeExlusive() throws Exception {
+ setupHarness(-2, false, 1, false)
+ .processElement1(1)
+ .processElement1(2)
+ .processElement1(3)
+ .processElement1(4)
+ .processElement1(5)
+
+ .processElement2(1)
+ .processElement2(2)
+ .processElement2(3)
+ .processElement2(4)
+ .processElement2(5) // fill both buffers with values
+
+ .processWatermark1(1)
+ .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+ .assertLeftBufferContainsOnly(2, 3, 4, 5)
+ .assertRightBufferContainsOnly(1, 2, 3, 4, 5)
+
+ .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+ .processWatermark2(4)
+
+ .assertLeftBufferContainsOnly(5)
+ .assertRightBufferContainsOnly(4, 5)
+
+ .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+ .processWatermark2(6)
+
+ .assertLeftBufferEmpty()
+ .assertRightBufferEmpty()
+
+ .close();
+ }
+
+ @Test
+ public void testStateCleanupPositiveInclusivePositiveInclusive() throws Exception {
+ setupHarness(0, true, 1, true)
+ .processElement1(1)
+ .processElement1(2)
+ .processElement1(3)
+ .processElement1(4)
+ .processElement1(5)
+
+ .processElement2(1)
+ .processElement2(2)
+ .processElement2(3)
+ .processElement2(4)
+ .processElement2(5) // fill both buffers with values
+
+ .processWatermark1(1)
+ .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+ .assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
+ .assertRightBufferContainsOnly(2, 3, 4, 5)
+
+ .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+ .processWatermark2(4)
+
+ .assertLeftBufferContainsOnly(4, 5)
+ .assertRightBufferContainsOnly(5)
+
+ .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+ .processWatermark2(6)
+
+ .assertLeftBufferEmpty()
+ .assertRightBufferEmpty()
+
+ .close();
+ }
+
+ @Test
+ public void testStateCleanupPositiveExlusivePositiveExclusive() throws Exception {
+ setupHarness(-1, false, 2, false)
+ .processElement1(1)
+ .processElement1(2)
+ .processElement1(3)
+ .processElement1(4)
+ .processElement1(5)
+
+ .processElement2(1)
+ .processElement2(2)
+ .processElement2(3)
+ .processElement2(4)
+ .processElement2(5) // fill both buffers with values
+
+ .processWatermark1(1)
+ .processWatermark2(1) // set common watermark to 1 and check that data is cleaned
+
+ .assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
+ .assertRightBufferContainsOnly(2, 3, 4, 5)
+
+ .processWatermark1(4) // set common watermark to 4 and check that data is cleaned
+ .processWatermark2(4)
+
+ .assertLeftBufferContainsOnly(4, 5)
+ .assertRightBufferContainsOnly(5)
+
+ .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty
+ .processWatermark2(6)
+
+ .assertLeftBufferEmpty()
+ .assertRightBufferEmpty()
+
+ .close();
+ }
+
+ @Test
+ public void testRestoreFromSnapshot() throws Exception {
+
+ // config
+ int lowerBound = -1;
+ boolean lowerBoundInclusive = true;
+ int upperBound = 1;
+ boolean upperBoundInclusive = true;
+
+ // create first test harness
+ OperatorSubtaskState handles;
+ List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput;
+
+ try (TestHarness testHarness = createTestHarness(
+ lowerBound,
+ lowerBoundInclusive,
+ upperBound,
+ upperBoundInclusive
+ )) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ // process elements with first test harness
+ testHarness.processElement1(createStreamRecord(1, "lhs"));
+ testHarness.processWatermark1(new Watermark(1));
+
+ testHarness.processElement2(createStreamRecord(1, "rhs"));
+ testHarness.processWatermark2(new Watermark(1));
+
+ testHarness.processElement1(createStreamRecord(2, "lhs"));
+ testHarness.processWatermark1(new Watermark(2));
+
+ testHarness.processElement2(createStreamRecord(2, "rhs"));
+ testHarness.processWatermark2(new Watermark(2));
+
+ testHarness.processElement1(createStreamRecord(3, "lhs"));
+ testHarness.processWatermark1(new Watermark(3));
+
+ testHarness.processElement2(createStreamRecord(3, "rhs"));
+ testHarness.processWatermark2(new Watermark(3));
+
+ // snapshot and validate output
+ handles = testHarness.snapshot(0, 0);
+ testHarness.close();
+
+ expectedOutput = Lists.newArrayList(
+ streamRecordOf(1, 1),
+ streamRecordOf(1, 2),
+ streamRecordOf(2, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(2, 3),
+ streamRecordOf(3, 2),
+ streamRecordOf(3, 3)
+ );
+
+ TestHarnessUtil.assertNoLateRecords(testHarness.getOutput());
+ assertOutput(expectedOutput, testHarness.getOutput());
+ }
+
+ try (TestHarness newTestHarness = createTestHarness(
+ lowerBound,
+ lowerBoundInclusive,
+ upperBound,
+ upperBoundInclusive
+ )) {
+ // create new test harness from snapshpt
+
+ newTestHarness.setup();
+ newTestHarness.initializeState(handles);
+ newTestHarness.open();
+
+ // process elements
+ newTestHarness.processElement1(createStreamRecord(4, "lhs"));
+ newTestHarness.processWatermark1(new Watermark(4));
+
+ newTestHarness.processElement2(createStreamRecord(4, "rhs"));
+ newTestHarness.processWatermark2(new Watermark(4));
+
+ // assert expected output
+ expectedOutput = Lists.newArrayList(
+ streamRecordOf(3, 4),
+ streamRecordOf(4, 3),
+ streamRecordOf(4, 4)
+ );
+
+ TestHarnessUtil.assertNoLateRecords(newTestHarness.getOutput());
+ assertOutput(expectedOutput, newTestHarness.getOutput());
+ }
+ }
+
+ @Test
+ public void testContextCorrectLeftTimestamp() throws Exception {
+
+ IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+ new IntervalJoinOperator<>(
+ -1,
+ 1,
+ true,
+ true,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+ @Override
+ public void processElement(
+ TestElem left,
+ TestElem right,
+ Context ctx,
+ Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+ Assert.assertEquals(left.ts, ctx.getLeftTimestamp());
+ }
+ }
+ );
+
+ try (TestHarness testHarness = new TestHarness(
+ op,
+ (elem) -> elem.key,
+ (elem) -> elem.key,
+ TypeInformation.of(String.class)
+ )) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ processElementsAndWatermarks(testHarness);
+ }
+ }
+
+ @Test
+ public void testReturnsCorrectTimestamp() throws Exception {
+ IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+ new IntervalJoinOperator<>(
+ -1,
+ 1,
+ true,
+ true,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+ @Override
+ public void processElement(
+ TestElem left,
+ TestElem right,
+ Context ctx,
+ Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+ Assert.assertEquals(left.ts, ctx.getTimestamp());
+ }
+ }
+ );
+
+ try (TestHarness testHarness = new TestHarness(
+ op,
+ (elem) -> elem.key,
+ (elem) -> elem.key,
+ TypeInformation.of(String.class)
+ )) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ processElementsAndWatermarks(testHarness);
+ }
+ }
+
+ @Test
+ public void testContextCorrectRightTimestamp() throws Exception {
+
+ IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op =
+ new IntervalJoinOperator<>(
+ -1,
+ 1,
+ true,
+ true,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+ @Override
+ public void processElement(
+ TestElem left,
+ TestElem right,
+ Context ctx,
+ Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+ Assert.assertEquals(right.ts, ctx.getRightTimestamp());
+ }
+ }
+ );
+
+ try (TestHarness testHarness = new TestHarness(
+ op,
+ (elem) -> elem.key,
+ (elem) -> elem.key,
+ TypeInformation.of(String.class)
+ )) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ processElementsAndWatermarks(testHarness);
+ }
+ }
+
+ @Test(expected = FlinkException.class)
+ public void testFailsWithNoTimestampsLeft() throws Exception {
+ TestHarness newTestHarness = createTestHarness(0L, true, 0L, true);
+
+ newTestHarness.setup();
+ newTestHarness.open();
+
+ // note that the StreamRecord has no timestamp in constructor
+ newTestHarness.processElement1(new StreamRecord<>(new TestElem(0, "lhs")));
+ }
+
+ @Test(expected = FlinkException.class)
+ public void testFailsWithNoTimestampsRight() throws Exception {
+ try (TestHarness newTestHarness = createTestHarness(0L, true, 0L, true)) {
+
+ newTestHarness.setup();
+ newTestHarness.open();
+
+ // note that the StreamRecord has no timestamp in constructor
+ newTestHarness.processElement2(new StreamRecord<>(new TestElem(0, "rhs")));
+ }
+ }
+
+ @Test
+ public void testDiscardsLateData() throws Exception {
+ setupHarness(-1, true, 1, true)
+ .processElement1(1)
+ .processElement2(1)
+ .processElement1(2)
+ .processElement2(2)
+ .processElement1(3)
+ .processElement2(3)
+ .processWatermark1(3)
+ .processWatermark2(3)
+ .processElement1(1) // this element is late and should not be joined again
+ .processElement1(4)
+ .processElement2(4)
+ .processElement1(5)
+ .processElement2(5)
+ .andExpect(
+ streamRecordOf(1, 1),
+ streamRecordOf(1, 2),
+
+ streamRecordOf(2, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(2, 3),
+
+ streamRecordOf(3, 2),
+ streamRecordOf(3, 3),
+ streamRecordOf(3, 4),
+
+ streamRecordOf(4, 3),
+ streamRecordOf(4, 4),
+ streamRecordOf(4, 5),
+
+ streamRecordOf(5, 4),
+ streamRecordOf(5, 5)
+ )
+ .noLateRecords()
+ .close();
+ }
+
+ private void assertEmpty(MapState<Long, ?> state) throws Exception {
+ boolean stateIsEmpty = Iterables.size(state.keys()) == 0;
+ Assert.assertTrue("state not empty", stateIsEmpty);
+ }
+
+ private void assertContainsOnly(MapState<Long, ?> state, long... ts) throws Exception {
+ for (long t : ts) {
+ String message = "Keys not found in state. \n Expected: " + Arrays.toString(ts) + "\n Actual: " + state.keys();
+ Assert.assertTrue(message, state.contains(t));
+ }
+
+ String message = "Too many objects in state. \n Expected: " + Arrays.toString(ts) + "\n Actual: " + state.keys();
+ Assert.assertEquals(message, ts.length, Iterables.size(state.keys()));
+ }
+
+ private void assertOutput(
+ Iterable<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput,
+ Queue<Object> actualOutput) {
+
+ int actualSize = actualOutput.stream()
+ .filter(elem -> elem instanceof StreamRecord)
+ .collect(Collectors.toList())
+ .size();
+
+ int expectedSize = Iterables.size(expectedOutput);
+
+ Assert.assertEquals(
+ "Expected and actual size of stream records different",
+ expectedSize,
+ actualSize
+ );
+
+ for (StreamRecord<Tuple2<TestElem, TestElem>> record : expectedOutput) {
+ Assert.assertTrue(actualOutput.contains(record));
+ }
+ }
+
+ private TestHarness createTestHarness(long lowerBound,
+ boolean lowerBoundInclusive,
+ long upperBound,
+ boolean upperBoundInclusive) throws Exception {
+
+ IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
+ new IntervalJoinOperator<>(
+ lowerBound,
+ upperBound,
+ lowerBoundInclusive,
+ upperBoundInclusive,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new PassthroughFunction()
+ );
+
+ return new TestHarness(
+ operator,
+ (elem) -> elem.key, // key
+ (elem) -> elem.key, // key
+ TypeInformation.of(String.class)
+ );
+ }
+
+ private JoinTestBuilder setupHarness(long lowerBound,
+ boolean lowerBoundInclusive,
+ long upperBound,
+ boolean upperBoundInclusive) throws Exception {
+
+ IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
+ new IntervalJoinOperator<>(
+ lowerBound,
+ upperBound,
+ lowerBoundInclusive,
+ upperBoundInclusive,
+ TestElem.serializer(),
+ TestElem.serializer(),
+ new PassthroughFunction()
+ );
+
+ TestHarness t = new TestHarness(
+ operator,
+ (elem) -> elem.key, // key
+ (elem) -> elem.key, // key
+ TypeInformation.of(String.class)
+ );
+
+ return new JoinTestBuilder(t, operator);
+ }
+
+ private class JoinTestBuilder {
+
+ private IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator;
+ private TestHarness testHarness;
+
+ public JoinTestBuilder(
+ TestHarness t,
+ IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator
+ ) throws Exception {
+
+ this.testHarness = t;
+ this.operator = operator;
+ t.open();
+ t.setup();
+ }
+
+ public TestHarness get() {
+ return testHarness;
+ }
+
+ public JoinTestBuilder processElement1(int ts) throws Exception {
+ testHarness.processElement1(createStreamRecord(ts, "lhs"));
+ return this;
+ }
+
+ public JoinTestBuilder processElement2(int ts) throws Exception {
+ testHarness.processElement2(createStreamRecord(ts, "rhs"));
+ return this;
+ }
+
+ public JoinTestBuilder processWatermark1(int ts) throws Exception {
+ testHarness.processWatermark1(new Watermark(ts));
+ return this;
+ }
+
+ public JoinTestBuilder processWatermark2(int ts) throws Exception {
+ testHarness.processWatermark2(new Watermark(ts));
+ return this;
+ }
+
+ public JoinTestBuilder processElementsAndWatermarks(int from, int to) throws Exception {
+ if (lhsFasterThanRhs) {
+ // add to lhs
+ for (int i = from; i <= to; i++) {
+ testHarness.processElement1(createStreamRecord(i, "lhs"));
+ testHarness.processWatermark1(new Watermark(i));
+ }
+
+ // add to rhs
+ for (int i = from; i <= to; i++) {
+ testHarness.processElement2(createStreamRecord(i, "rhs"));
+ testHarness.processWatermark2(new Watermark(i));
+ }
+ } else {
+ // add to rhs
+ for (int i = from; i <= to; i++) {
+ testHarness.processElement2(createStreamRecord(i, "rhs"));
+ testHarness.processWatermark2(new Watermark(i));
+ }
+
+ // add to lhs
+ for (int i = from; i <= to; i++) {
+ testHarness.processElement1(createStreamRecord(i, "lhs"));
+ testHarness.processWatermark1(new Watermark(i));
+ }
+ }
+
+ return this;
+ }
+
+ @SafeVarargs
+ public final JoinTestBuilder andExpect(StreamRecord<Tuple2<TestElem, TestElem>>... elems) {
+ assertOutput(Lists.newArrayList(elems), testHarness.getOutput());
+ return this;
+ }
+
+ public JoinTestBuilder assertLeftBufferContainsOnly(long... timestamps) {
+
+ try {
+ assertContainsOnly(operator.getLeftBuffer(), timestamps);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ public JoinTestBuilder assertRightBufferContainsOnly(long... timestamps) {
+
+ try {
+ assertContainsOnly(operator.getRightBuffer(), timestamps);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ public JoinTestBuilder assertLeftBufferEmpty() {
+ try {
+ assertEmpty(operator.getLeftBuffer());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ public JoinTestBuilder assertRightBufferEmpty() {
+ try {
+ assertEmpty(operator.getRightBuffer());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ public JoinTestBuilder noLateRecords() {
+ TestHarnessUtil.assertNoLateRecords(this.testHarness.getOutput());
+ return this;
+ }
+
+ public void close() throws Exception {
+ testHarness.close();
+ }
+ }
+
+ private static class PassthroughFunction extends ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>> {
+
+ @Override
+ public void processElement(
+ TestElem left,
+ TestElem right,
+ Context ctx,
+ Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
+ out.collect(Tuple2.of(left, right));
+ }
+ }
+
+ private StreamRecord<Tuple2<TestElem, TestElem>> streamRecordOf(
+ long lhsTs,
+ long rhsTs
+ ) {
+ TestElem lhs = new TestElem(lhsTs, "lhs");
+ TestElem rhs = new TestElem(rhsTs, "rhs");
+
+ long ts = Math.max(lhsTs, rhsTs);
+ return new StreamRecord<>(Tuple2.of(lhs, rhs), ts);
+ }
+
+ private static class TestElem {
+ String key;
+ long ts;
+ String source;
+
+ public TestElem(long ts, String source) {
+ this.key = "key";
+ this.ts = ts;
+ this.source = source;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TestElem testElem = (TestElem) o;
+
+ if (ts != testElem.ts) {
+ return false;
+ }
+
+ if (key != null ? !key.equals(testElem.key) : testElem.key != null) {
+ return false;
+ }
+
+ return source != null ? source.equals(testElem.source) : testElem.source == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = key != null ? key.hashCode() : 0;
+ result = 31 * result + (int) (ts ^ (ts >>> 32));
+ result = 31 * result + (source != null ? source.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return this.source + ":" + this.ts;
+ }
+
+ public static TypeSerializer<TestElem> serializer() {
+ return TypeInformation.of(new TypeHint<TestElem>() {
+ }).createSerializer(new ExecutionConfig());
+ }
+ }
+
+ private static StreamRecord<TestElem> createStreamRecord(long ts, String source) {
+ TestElem testElem = new TestElem(ts, source);
+ return new StreamRecord<>(testElem, ts);
+ }
+
+ private void processElementsAndWatermarks(TestHarness testHarness) throws Exception {
+ if (lhsFasterThanRhs) {
+ // add to lhs
+ for (int i = 1; i <= 4; i++) {
+ testHarness.processElement1(createStreamRecord(i, "lhs"));
+ testHarness.processWatermark1(new Watermark(i));
+ }
+
+ // add to rhs
+ for (int i = 1; i <= 4; i++) {
+ testHarness.processElement2(createStreamRecord(i, "rhs"));
+ testHarness.processWatermark2(new Watermark(i));
+ }
+ } else {
+ // add to rhs
+ for (int i = 1; i <= 4; i++) {
+ testHarness.processElement2(createStreamRecord(i, "rhs"));
+ testHarness.processWatermark2(new Watermark(i));
+ }
+
+ // add to lhs
+ for (int i = 1; i <= 4; i++) {
+ testHarness.processElement1(createStreamRecord(i, "lhs"));
+ testHarness.processWatermark1(new Watermark(i));
+ }
+ }
+ }
+
+ /**
+ * Custom test harness to avoid endless generics in all of the test code.
+ */
+ private static class TestHarness extends KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> {
+
+ TestHarness(
+ TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>> operator,
+ KeySelector<TestElem, String> keySelector1,
+ KeySelector<TestElem, String> keySelector2,
+ TypeInformation<String> keyType) throws Exception {
+ super(operator, keySelector1, keySelector2, keyType);
+ }
+ }
+}
[3/3] flink git commit: [hotfix] Fixed import order in
RocksDBKeyedStateBackend.
Posted by kk...@apache.org.
[hotfix] Fixed import order in RocksDBKeyedStateBackend.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca0fa96b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca0fa96b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca0fa96b
Branch: refs/heads/master
Commit: ca0fa96bfacdc3a3a9e149b68c282c19fea2e2db
Parents: 42ada8a
Author: kkloudas <kk...@gmail.com>
Authored: Thu Jul 12 21:15:29 2018 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Jul 12 21:15:29 2018 +0200
----------------------------------------------------------------------
.../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ca0fa96b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 3bf0aea..622dd0c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -87,8 +87,8 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
-import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;