You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/15 08:30:13 UTC

[flink] branch master updated: [FLINK-29387][tests] fix unstable late test for interval join

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e51db92e11 [FLINK-29387][tests] fix unstable late test for interval join
7e51db92e11 is described below

commit 7e51db92e114bcfc5404fed3b39f8ae3aaea196f
Author: chenyuzhi <ch...@corp.netease.com>
AuthorDate: Mon Nov 14 22:21:43 2022 +0800

    [FLINK-29387][tests] fix unstable late test for interval join
---
 .../api/operators/co/IntervalJoinOperatorTest.java |  62 +++++++++++--
 .../test/streaming/runtime/IntervalJoinITCase.java | 101 ---------------------
 2 files changed, 55 insertions(+), 108 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
index 69f80607ce2..20859781041 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OutputTag;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
@@ -558,6 +559,34 @@ public class IntervalJoinOperatorTest {
                 .close();
     }
 
+    @Test
+    public void testLateData() throws Exception {
+        OutputTag<TestElem> leftLateTag = new OutputTag<TestElem>("left_late") {};
+        OutputTag<TestElem> rightLateTag = new OutputTag<TestElem>("right_late") {};
+        setupHarness(-1, true, 1, true, leftLateTag, rightLateTag)
+                .processElement1(3)
+                .processElement2(3)
+                .processWatermark1(3)
+                .processWatermark2(3)
+                .processElement1(4)
+                .processElement2(4)
+                .processElement1(1) // the left side element is late
+                .processElement2(2) // the right side element is late
+                .processElement1(5)
+                .processElement2(5)
+                .andExpect(
+                        streamRecordOf(3, 3),
+                        streamRecordOf(3, 4),
+                        streamRecordOf(4, 3),
+                        streamRecordOf(4, 4),
+                        streamRecordOf(4, 5),
+                        streamRecordOf(5, 4),
+                        streamRecordOf(5, 5))
+                .expectLateRecords(leftLateTag, createStreamRecord(1, "lhs"))
+                .expectLateRecords(rightLateTag, createStreamRecord(2, "rhs"))
+                .close();
+    }
+
     private void assertEmpty(MapState<Long, ?> state) throws Exception {
         boolean stateIsEmpty = Iterables.size(state.keys()) == 0;
         Assert.assertTrue("state not empty", stateIsEmpty);
@@ -581,9 +610,8 @@ public class IntervalJoinOperatorTest {
         Assert.assertEquals(message, ts.length, Iterables.size(state.keys()));
     }
 
-    private void assertOutput(
-            Iterable<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput,
-            Queue<Object> actualOutput) {
+    private <T1, T2> void assertOutput(
+            Iterable<StreamRecord<T1>> expectedOutput, Queue<T2> actualOutput) {
 
         int actualSize =
                 actualOutput.stream()
@@ -596,7 +624,7 @@ public class IntervalJoinOperatorTest {
         Assert.assertEquals(
                 "Expected and actual size of stream records different", expectedSize, actualSize);
 
-        for (StreamRecord<Tuple2<TestElem, TestElem>> record : expectedOutput) {
+        for (StreamRecord<T1> record : expectedOutput) {
             Assert.assertTrue(actualOutput.contains(record));
         }
     }
@@ -631,7 +659,9 @@ public class IntervalJoinOperatorTest {
             long lowerBound,
             boolean lowerBoundInclusive,
             long upperBound,
-            boolean upperBoundInclusive)
+            boolean upperBoundInclusive,
+            OutputTag<TestElem> leftLateDataOutputTag,
+            OutputTag<TestElem> rightLateDataOutputTag)
             throws Exception {
 
         IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator =
@@ -640,8 +670,8 @@ public class IntervalJoinOperatorTest {
                         upperBound,
                         lowerBoundInclusive,
                         upperBoundInclusive,
-                        null,
-                        null,
+                        leftLateDataOutputTag,
+                        rightLateDataOutputTag,
                         TestElem.serializer(),
                         TestElem.serializer(),
                         new PassthroughFunction());
@@ -656,6 +686,17 @@ public class IntervalJoinOperatorTest {
         return new JoinTestBuilder(t, operator);
     }
 
+    private JoinTestBuilder setupHarness(
+            long lowerBound,
+            boolean lowerBoundInclusive,
+            long upperBound,
+            boolean upperBoundInclusive)
+            throws Exception {
+
+        return setupHarness(
+                lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive, null, null);
+    }
+
     private class JoinTestBuilder {
 
         private IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>>
@@ -772,6 +813,13 @@ public class IntervalJoinOperatorTest {
             return this;
         }
 
+        @SafeVarargs
+        public final JoinTestBuilder expectLateRecords(
+                OutputTag<TestElem> tag, StreamRecord<TestElem>... elems) {
+            assertOutput(Lists.newArrayList(elems), testHarness.getSideOutput(tag));
+            return this;
+        }
+
         public JoinTestBuilder noLateRecords() {
             TestHarnessUtil.assertNoLateRecords(this.testHarness.getOutput());
             return this;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java
index d9d2c6d2e6b..a1ba8260bb3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.UnsupportedTimeCharacteristicException;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
@@ -31,7 +30,6 @@ import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExt
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
@@ -181,105 +179,6 @@ public class IntervalJoinITCase {
         long serialVersionUID = 1L;
     }
 
-    private void addSinkToSideOutput(
-            SingleOutputStreamOperator<String> streamOperator,
-            OutputTag<Tuple2<String, Integer>> outputTag) {
-        streamOperator
-                .getSideOutput(outputTag)
-                .addSink(
-                        new SinkFunction<Tuple2<String, Integer>>() {
-                            @Override
-                            public void invoke(Tuple2<String, Integer> value, Context context)
-                                    throws Exception {
-                                testResults.add(value.toString());
-                            }
-                        });
-    }
-
-    @Test
-    public void testIntervalJoinSideOutputLeftLateData() throws Exception {
-
-        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-
-        DataStream<Tuple2<String, Integer>> streamOne =
-                buildSourceStream(
-                        env,
-                        (ctx) -> {
-                            ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
-                            ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
-                            ctx.emitWatermark(new Watermark(3));
-                            ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L); // late data
-                        });
-
-        DataStream<Tuple2<String, Integer>> streamTwo =
-                buildSourceStream(
-                        env,
-                        (ctx) -> {
-                            ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
-                            ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L);
-                            ctx.emitWatermark(new Watermark(2));
-                            ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
-                        });
-
-        OutputTag<Tuple2<String, Integer>> late = new OutputTag<Tuple2<String, Integer>>("late") {};
-
-        SingleOutputStreamOperator<String> process =
-                streamOne
-                        .keyBy(new Tuple2KeyExtractor())
-                        .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
-                        .between(Time.milliseconds(-1), Time.milliseconds(1))
-                        .sideOutputLeftLateData(late)
-                        .process(new CombineToStringJoinFunction());
-
-        addSinkToSideOutput(process, late);
-        env.execute();
-
-        expectInAnyOrder("(key,1)");
-    }
-
-    @Test
-    public void testIntervalJoinSideOutputRightLateData() throws Exception {
-
-        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-
-        DataStream<Tuple2<String, Integer>> streamOne =
-                buildSourceStream(
-                        env,
-                        (ctx) -> {
-                            ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
-                            ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
-                            ctx.emitWatermark(new Watermark(3));
-                            ctx.collectWithTimestamp(Tuple2.of("key", 4), 4L);
-                        });
-
-        DataStream<Tuple2<String, Integer>> streamTwo =
-                buildSourceStream(
-                        env,
-                        (ctx) -> {
-                            ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L);
-                            ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
-                            ctx.emitWatermark(new Watermark(3));
-                            ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L); // late data
-                        });
-
-        OutputTag<Tuple2<String, Integer>> late = new OutputTag<Tuple2<String, Integer>>("late") {};
-
-        SingleOutputStreamOperator<String> process =
-                streamOne
-                        .keyBy(new Tuple2KeyExtractor())
-                        .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
-                        .between(Time.milliseconds(-1), Time.milliseconds(1))
-                        .sideOutputRightLateData(late)
-                        .process(new CombineToStringJoinFunction());
-
-        addSinkToSideOutput(process, late);
-        env.execute();
-
-        expectInAnyOrder("(key,2)");
-    }
-
     @Test
     public void testBoundedUnorderedStreamsStillJoinCorrectly() throws Exception {