You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/15 08:30:13 UTC
[flink] branch master updated: [FLINK-29387][tests] fix unstable late test for interval join
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7e51db92e11 [FLINK-29387][tests] fix unstable late test for interval join
7e51db92e11 is described below
commit 7e51db92e114bcfc5404fed3b39f8ae3aaea196f
Author: chenyuzhi <ch...@corp.netease.com>
AuthorDate: Mon Nov 14 22:21:43 2022 +0800
[FLINK-29387][tests] fix unstable late test for interval join
---
.../api/operators/co/IntervalJoinOperatorTest.java | 62 +++++++++++--
.../test/streaming/runtime/IntervalJoinITCase.java | 101 ---------------------
2 files changed, 55 insertions(+), 108 deletions(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
index 69f80607ce2..20859781041 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
@@ -558,6 +559,34 @@ public class IntervalJoinOperatorTest {
.close();
}
+ @Test
+ public void testLateData() throws Exception {
+ OutputTag<TestElem> leftLateTag = new OutputTag<TestElem>("left_late") {};
+ OutputTag<TestElem> rightLateTag = new OutputTag<TestElem>("right_late") {};
+ setupHarness(-1, true, 1, true, leftLateTag, rightLateTag)
+ .processElement1(3)
+ .processElement2(3)
+ .processWatermark1(3)
+ .processWatermark2(3)
+ .processElement1(4)
+ .processElement2(4)
+ .processElement1(1) // the left side element is late
+ .processElement2(2) // the right side element is late
+ .processElement1(5)
+ .processElement2(5)
+ .andExpect(
+ streamRecordOf(3, 3),
+ streamRecordOf(3, 4),
+ streamRecordOf(4, 3),
+ streamRecordOf(4, 4),
+ streamRecordOf(4, 5),
+ streamRecordOf(5, 4),
+ streamRecordOf(5, 5))
+ .expectLateRecords(leftLateTag, createStreamRecord(1, "lhs"))
+ .expectLateRecords(rightLateTag, createStreamRecord(2, "rhs"))
+ .close();
+ }
+
private void assertEmpty(MapState<Long, ?> state) throws Exception {
boolean stateIsEmpty = Iterables.size(state.keys()) == 0;
Assert.assertTrue("state not empty", stateIsEmpty);
@@ -581,9 +610,8 @@ public class IntervalJoinOperatorTest {
Assert.assertEquals(message, ts.length, Iterables.size(state.keys()));
}
- private void assertOutput(
- Iterable<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput,
- Queue<Object> actualOutput) {
+ private <T1, T2> void assertOutput(
+ Iterable<StreamRecord<T1>> expectedOutput, Queue<T2> actualOutput) {
int actualSize =
actualOutput.stream()
@@ -596,7 +624,7 @@ public class IntervalJoinOperatorTest {
Assert.assertEquals(
"Expected and actual size of stream records different", expectedSize, actualSize);
- for (StreamRecord<Tuple2<TestElem, TestElem>> record : expectedOutput) {
+ for (StreamRecord<T1> record : expectedOutput) {
Assert.assertTrue(actualOutput.contains(record));
}
}
@@ -631,7 +659,9 @@ public class IntervalJoinOperatorTest {
long lowerBound,
boolean lowerBoundInclusive,
long upperBound,
- boolean upperBoundInclusive)
+ boolean upperBoundInclusive,
+ OutputTag<TestElem> leftLateDataOutputTag,
+ OutputTag<TestElem> rightLateDataOutputTag)
throws Exception {
IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
@@ -640,8 +670,8 @@ public class IntervalJoinOperatorTest {
upperBound,
lowerBoundInclusive,
upperBoundInclusive,
- null,
- null,
+ leftLateDataOutputTag,
+ rightLateDataOutputTag,
TestElem.serializer(),
TestElem.serializer(),
new PassthroughFunction());
@@ -656,6 +686,17 @@ public class IntervalJoinOperatorTest {
return new JoinTestBuilder(t, operator);
}
+ private JoinTestBuilder setupHarness(
+ long lowerBound,
+ boolean lowerBoundInclusive,
+ long upperBound,
+ boolean upperBoundInclusive)
+ throws Exception {
+
+ return setupHarness(
+ lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive, null, null);
+ }
+
private class JoinTestBuilder {
private IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>>
@@ -772,6 +813,13 @@ public class IntervalJoinOperatorTest {
return this;
}
+ @SafeVarargs
+ public final JoinTestBuilder expectLateRecords(
+ OutputTag<TestElem> tag, StreamRecord<TestElem>... elems) {
+ assertOutput(Lists.newArrayList(elems), testHarness.getSideOutput(tag));
+ return this;
+ }
+
public JoinTestBuilder noLateRecords() {
TestHarnessUtil.assertNoLateRecords(this.testHarness.getOutput());
return this;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java
index d9d2c6d2e6b..a1ba8260bb3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.UnsupportedTimeCharacteristicException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
@@ -31,7 +30,6 @@ import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExt
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
@@ -181,105 +179,6 @@ public class IntervalJoinITCase {
long serialVersionUID = 1L;
}
- private void addSinkToSideOutput(
- SingleOutputStreamOperator<String> streamOperator,
- OutputTag<Tuple2<String, Integer>> outputTag) {
- streamOperator
- .getSideOutput(outputTag)
- .addSink(
- new SinkFunction<Tuple2<String, Integer>>() {
- @Override
- public void invoke(Tuple2<String, Integer> value, Context context)
- throws Exception {
- testResults.add(value.toString());
- }
- });
- }
-
- @Test
- public void testIntervalJoinSideOutputLeftLateData() throws Exception {
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStream<Tuple2<String, Integer>> streamOne =
- buildSourceStream(
- env,
- (ctx) -> {
- ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
- ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
- ctx.emitWatermark(new Watermark(3));
- ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L); // late data
- });
-
- DataStream<Tuple2<String, Integer>> streamTwo =
- buildSourceStream(
- env,
- (ctx) -> {
- ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
- ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L);
- ctx.emitWatermark(new Watermark(2));
- ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
- });
-
- OutputTag<Tuple2<String, Integer>> late = new OutputTag<Tuple2<String, Integer>>("late") {};
-
- SingleOutputStreamOperator<String> process =
- streamOne
- .keyBy(new Tuple2KeyExtractor())
- .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
- .between(Time.milliseconds(-1), Time.milliseconds(1))
- .sideOutputLeftLateData(late)
- .process(new CombineToStringJoinFunction());
-
- addSinkToSideOutput(process, late);
- env.execute();
-
- expectInAnyOrder("(key,1)");
- }
-
- @Test
- public void testIntervalJoinSideOutputRightLateData() throws Exception {
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStream<Tuple2<String, Integer>> streamOne =
- buildSourceStream(
- env,
- (ctx) -> {
- ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
- ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
- ctx.emitWatermark(new Watermark(3));
- ctx.collectWithTimestamp(Tuple2.of("key", 4), 4L);
- });
-
- DataStream<Tuple2<String, Integer>> streamTwo =
- buildSourceStream(
- env,
- (ctx) -> {
- ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L);
- ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
- ctx.emitWatermark(new Watermark(3));
- ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L); // late data
- });
-
- OutputTag<Tuple2<String, Integer>> late = new OutputTag<Tuple2<String, Integer>>("late") {};
-
- SingleOutputStreamOperator<String> process =
- streamOne
- .keyBy(new Tuple2KeyExtractor())
- .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
- .between(Time.milliseconds(-1), Time.milliseconds(1))
- .sideOutputRightLateData(late)
- .process(new CombineToStringJoinFunction());
-
- addSinkToSideOutput(process, late);
- env.execute();
-
- expectInAnyOrder("(key,2)");
- }
-
@Test
public void testBoundedUnorderedStreamsStillJoinCorrectly() throws Exception {