You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by re...@apache.org on 2022/09/14 06:41:59 UTC

[flink] branch release-1.16 updated: [FLINK-28975][connector/base] Add IdlenessManager for main and per-split output in source operator to mark idleness only if both outputs are idle

This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new a5be641a9a9 [FLINK-28975][connector/base] Add IdlenessManager for main and per-split output in source operator to mark idleness only if both outputs are idle
a5be641a9a9 is described below

commit a5be641a9a95de631dc697f3d2906e46cb126ee5
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Mon Sep 5 18:13:06 2022 +0800

    [FLINK-28975][connector/base] Add IdlenessManager for main and per-split output in source operator to mark idleness only if both outputs are idle
---
 .../common/eventtime/WatermarksWithIdleness.java   |   8 +-
 .../source/ProgressiveTimestampsAndWatermarks.java |  70 +++++++++-
 .../source/SourceOperatorEventTimeTest.java        | 155 +++++++++++++++------
 3 files changed, 187 insertions(+), 46 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
index 8a4858b291f..3d61217ce04 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
@@ -40,6 +40,8 @@ public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {
 
     private final IdlenessTimer idlenessTimer;
 
+    private boolean isIdleNow = false;
+
     /**
      * Creates a new WatermarksWithIdleness generator to the given generator idleness detection with
      * the given timeout.
@@ -65,12 +67,16 @@ public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {
     public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
         watermarks.onEvent(event, eventTimestamp, output);
         idlenessTimer.activity();
+        isIdleNow = false;
     }
 
     @Override
     public void onPeriodicEmit(WatermarkOutput output) {
         if (idlenessTimer.checkIfIdle()) {
-            output.markIdle();
+            if (!isIdleNow) {
+                output.markIdle();
+                isIdleNow = true;
+            }
         } else {
             watermarks.onPeriodicEmit(output);
         }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
index 6f9fa9eeb61..5693f180d6f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators.source;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.eventtime.WatermarkGenerator;
 import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
@@ -100,13 +101,15 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater
                 "already created a main output");
 
         final WatermarkOutput watermarkOutput = new WatermarkToDataOutput(output, watermarkEmitted);
+        IdlenessManager idlenessManager = new IdlenessManager(watermarkOutput);
+
         final WatermarkGenerator<T> watermarkGenerator =
                 watermarksFactory.createWatermarkGenerator(watermarksContext);
 
         currentPerSplitOutputs =
                 new SplitLocalOutputs<>(
                         output,
-                        watermarkOutput,
+                        idlenessManager.getSplitLocalOutput(),
                         timestampAssigner,
                         watermarksFactory,
                         watermarksContext);
@@ -114,7 +117,7 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater
         currentMainOutput =
                 new StreamingReaderOutput<>(
                         output,
-                        watermarkOutput,
+                        idlenessManager.getMainOutput(),
                         timestampAssigner,
                         watermarkGenerator,
                         currentPerSplitOutputs);
@@ -260,4 +263,67 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater
             watermarkMultiplexer.onPeriodicEmit();
         }
     }
+
+    /**
+     * A helper class for managing idleness status of the underlying output.
+     *
+     * <p>This class tracks the idleness status of main and split-local output, and only marks the
+     * underlying output as idle if both main and per-split output are idle.
+     *
+     * <p>The reason of adding this manager is that the implementation of source reader might only
+     * use one of main or split-local output for emitting records and watermarks, and we could avoid
+     * watermark generator on the vacant output keep marking the underlying output as idle.
+     */
+    private static class IdlenessManager {
+        private final WatermarkOutput underlyingOutput;
+        private final IdlenessAwareWatermarkOutput splitLocalOutput;
+        private final IdlenessAwareWatermarkOutput mainOutput;
+
+        IdlenessManager(WatermarkOutput underlyingOutput) {
+            this.underlyingOutput = underlyingOutput;
+            this.splitLocalOutput = new IdlenessAwareWatermarkOutput(underlyingOutput);
+            this.mainOutput = new IdlenessAwareWatermarkOutput(underlyingOutput);
+        }
+
+        IdlenessAwareWatermarkOutput getSplitLocalOutput() {
+            return splitLocalOutput;
+        }
+
+        IdlenessAwareWatermarkOutput getMainOutput() {
+            return mainOutput;
+        }
+
+        void maybeMarkUnderlyingOutputAsIdle() {
+            if (splitLocalOutput.isIdle && mainOutput.isIdle) {
+                underlyingOutput.markIdle();
+            }
+        }
+
+        private class IdlenessAwareWatermarkOutput implements WatermarkOutput {
+            private final WatermarkOutput underlyingOutput;
+            private boolean isIdle = true;
+
+            private IdlenessAwareWatermarkOutput(WatermarkOutput underlyingOutput) {
+                this.underlyingOutput = underlyingOutput;
+            }
+
+            @Override
+            public void emitWatermark(Watermark watermark) {
+                underlyingOutput.emitWatermark(watermark);
+                isIdle = false;
+            }
+
+            @Override
+            public void markIdle() {
+                isIdle = true;
+                maybeMarkUnderlyingOutputAsIdle();
+            }
+
+            @Override
+            public void markActive() {
+                isIdle = false;
+                underlyingOutput.markActive();
+            }
+        }
+    }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
index 959bc82402f..9e295abc50f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -28,17 +28,15 @@ import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -46,30 +44,17 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.streaming.api.operators.source.TestingSourceOperator.createTestOperator;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.hasSize;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests that validate correct handling of watermark generation in the {@link ReaderOutput} as
  * created by the {@link ProgressiveTimestampsAndWatermarks}.
  */
-@RunWith(Parameterized.class)
-public class SourceOperatorEventTimeTest {
-
-    @Parameterized.Parameters(name = "Emit progressive watermarks: {0}")
-    public static Collection<Boolean> parameters() {
-        return Arrays.asList(true, false);
-    }
-
-    private final boolean emitProgressiveWatermarks;
+class SourceOperatorEventTimeTest {
 
-    public SourceOperatorEventTimeTest(boolean emitProgressiveWatermarks) {
-        this.emitProgressiveWatermarks = emitProgressiveWatermarks;
-    }
-
-    @Test
-    public void testMainOutputPeriodicWatermarks() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testMainOutputPeriodicWatermarks(boolean emitProgressiveWatermarks) throws Exception {
         final WatermarkStrategy<Integer> watermarkStrategy =
                 WatermarkStrategy.forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>());
 
@@ -81,11 +66,13 @@ public class SourceOperatorEventTimeTest {
                         (output) -> output.collect(0, 120L),
                         (output) -> output.collect(0, 110L));
 
-        assertWatermarksOrEmpty(result, new Watermark(100L), new Watermark(120L));
+        assertWatermarksOrEmpty(
+                emitProgressiveWatermarks, result, new Watermark(100L), new Watermark(120L));
     }
 
-    @Test
-    public void testMainOutputEventWatermarks() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testMainOutputEventWatermarks(boolean emitProgressiveWatermarks) throws Exception {
         final WatermarkStrategy<Integer> watermarkStrategy =
                 WatermarkStrategy.forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>());
 
@@ -97,11 +84,13 @@ public class SourceOperatorEventTimeTest {
                         (output) -> output.collect(0, 120L),
                         (output) -> output.collect(0, 110L));
 
-        assertWatermarksOrEmpty(result, new Watermark(100L), new Watermark(120L));
+        assertWatermarksOrEmpty(
+                emitProgressiveWatermarks, result, new Watermark(100L), new Watermark(120L));
     }
 
-    @Test
-    public void testPerSplitOutputPeriodicWatermarks() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testPerSplitOutputPeriodicWatermarks(boolean emitProgressiveWatermarks) throws Exception {
         final WatermarkStrategy<Integer> watermarkStrategy =
                 WatermarkStrategy.forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>());
 
@@ -120,11 +109,16 @@ public class SourceOperatorEventTimeTest {
                         (output) -> output.createOutputForSplit("B").collect(0, 200L));
 
         assertWatermarksOrEmpty(
-                result, new Watermark(100L), new Watermark(150L), new Watermark(200L));
+                emitProgressiveWatermarks,
+                result,
+                new Watermark(100L),
+                new Watermark(150L),
+                new Watermark(200L));
     }
 
-    @Test
-    public void testPerSplitOutputEventWatermarks() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testPerSplitOutputEventWatermarks(boolean emitProgressiveWatermarks) throws Exception {
         final WatermarkStrategy<Integer> watermarkStrategy =
                 WatermarkStrategy.forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>());
 
@@ -143,11 +137,17 @@ public class SourceOperatorEventTimeTest {
                         (output) -> output.createOutputForSplit("two").collect(0, 200L));
 
         assertWatermarksOrEmpty(
-                result, new Watermark(100L), new Watermark(150L), new Watermark(200L));
+                emitProgressiveWatermarks,
+                result,
+                new Watermark(100L),
+                new Watermark(150L),
+                new Watermark(200L));
     }
 
-    @Test
-    public void testCreatingPerSplitOutputOnSplitAddition() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testCreatingPerSplitOutputOnSplitAddition(boolean emitProgressiveWatermarks)
+            throws Exception {
         final WatermarkStrategy<Integer> watermarkStrategy =
                 WatermarkStrategy.forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>());
 
@@ -172,7 +172,76 @@ public class SourceOperatorEventTimeTest {
                         new MockSourceSplitSerializer()));
 
         final List<Watermark> result = testSequenceOfWatermarks(sourceOperator);
-        assertWatermarksOrEmpty(result, new Watermark(150L), new Watermark(300L));
+        assertWatermarksOrEmpty(
+                emitProgressiveWatermarks, result, new Watermark(150L), new Watermark(300L));
+    }
+
+    @Test
+    void testMainAndPerSplitWatermarkIdleness() throws Exception {
+        final WatermarkStrategy<Integer> watermarkStrategy =
+                WatermarkStrategy.forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>());
+
+        InterpretingSourceReader reader =
+                new InterpretingSourceReader(
+                        // Emit from main output
+                        output -> output.collect(0, 100L),
+                        // Mark main output as idle
+                        // This should not generate IDLE event as per-split output is active
+                        ReaderOutput::markIdle,
+                        // Emit from per-split output
+                        output -> output.createOutputForSplit("1").collect(0, 150L),
+                        output -> output.createOutputForSplit("2").collect(0, 200L),
+                        // Mark output of split 1 as idle
+                        // This should not generate IDLE event as output of split 2 is active
+                        output -> output.createOutputForSplit("1").markIdle(),
+                        // Mark output of split 2 as idle
+                        // Expect to have an IDLE event as all outputs (main, split 1 and split 2)
+                        // are idle
+                        output -> output.createOutputForSplit("2").markIdle(),
+                        // Emit from main output
+                        // Expect to have an ACTIVE event
+                        output -> output.collect(0, 250L),
+                        // Mark output 1 and 2 as idle again
+                        // This should not generate IDLE event as main output is active
+                        output -> output.createOutputForSplit("1").markIdle(),
+                        output -> output.createOutputForSplit("2").markIdle(),
+                        // Mark main output as idle again to test active from per-split output
+                        // Expect to have an IDLE event
+                        ReaderOutput::markIdle,
+                        // Emit from per-split output
+                        // Expect to have an ACTIVE event
+                        output -> output.createOutputForSplit("1").collect(0, 300L));
+
+        SourceOperator<Integer, MockSourceSplit> sourceOperator =
+                createTestOperator(reader, watermarkStrategy, true);
+
+        List<Object> events = testSequenceOfEvents(sourceOperator);
+
+        assertThat(events)
+                .containsExactly(
+                        // Record and watermark from main output
+                        new StreamRecord<>(0, 100L),
+                        new Watermark(100L),
+                        new WatermarkStatus(WatermarkStatus.IDLE_STATUS),
+                        // Record and watermark from split 1
+                        new StreamRecord<>(0, 150L),
+                        new WatermarkStatus(WatermarkStatus.ACTIVE_STATUS),
+                        new Watermark(150L),
+                        // Record and watermark from split 2
+                        new StreamRecord<>(0, 200L),
+                        new Watermark(200L),
+                        // IDLE event as output of main, split 1 and split 2 are idle
+                        new WatermarkStatus(WatermarkStatus.IDLE_STATUS),
+                        // Record and watermark from main output, together with an ACTIVE event
+                        new StreamRecord<>(0, 250L),
+                        new WatermarkStatus(WatermarkStatus.ACTIVE_STATUS),
+                        new Watermark(250L),
+                        // IDLE event as output of main, split 1 and split 2 are idle
+                        new WatermarkStatus(WatermarkStatus.IDLE_STATUS),
+                        // Record and watermark from split 1, together with an ACTIVE event
+                        new StreamRecord<>(0, 300L),
+                        new WatermarkStatus(WatermarkStatus.ACTIVE_STATUS),
+                        new Watermark(300));
     }
 
     // ------------------------------------------------------------------------
@@ -184,18 +253,18 @@ public class SourceOperatorEventTimeTest {
      * mode. Otherwise, asserts that the list of actual watermarks is empty in BATCH mode.
      */
     private void assertWatermarksOrEmpty(
-            List<Watermark> actualWatermarks, Watermark... expectedWatermarks) {
+            boolean emitProgressiveWatermarks,
+            List<Watermark> actualWatermarks,
+            Watermark... expectedWatermarks) {
         // We add the expected Long.MAX_VALUE watermark to the end. We expect that for both
         // "STREAMING" and "BATCH" mode.
         if (emitProgressiveWatermarks) {
-            ArrayList<Watermark> watermarks = Lists.newArrayList(expectedWatermarks);
-            assertThat(actualWatermarks, contains(watermarks.toArray()));
+            assertThat(actualWatermarks).contains(expectedWatermarks);
         } else {
-            assertThat(actualWatermarks, hasSize(0));
+            assertThat(actualWatermarks).isEmpty();
         }
     }
 
-    @SuppressWarnings("FinalPrivateMethod")
     @SafeVarargs
     private final List<Watermark> testSequenceOfWatermarks(
             final boolean emitProgressiveWatermarks,