You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fa...@apache.org on 2023/07/07 08:49:09 UTC

[flink] 02/02: [FLINK-32496][connectors/common] Fix the bug that source cannot resume after enabling the watermark alignment and idleness

This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e69a7b7d7684be4fe797ad4ff2d203f7da3f577
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sat Jul 1 13:49:22 2023 +0800

    [FLINK-32496][connectors/common] Fix the bug that source cannot resume after enabling the watermark alignment and idleness
---
 .../streaming/api/operators/SourceOperator.java      | 18 +++++++++++++-----
 .../operators/source/TimestampsAndWatermarks.java    | 12 +++++++-----
 .../api/operators/source/WatermarkToDataOutput.java  |  7 +++++--
 .../api/operators/SourceOperatorAlignmentTest.java   | 20 +++++++++++++++-----
 4 files changed, 40 insertions(+), 17 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 192e2decee6..c4f624e443c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -146,7 +146,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
 
     private DataOutput<OUT> lastInvokedOutput;
 
-    private long lastEmittedWatermark = Watermark.UNINITIALIZED.getTimestamp();
+    private long latestWatermark = Watermark.UNINITIALIZED.getTimestamp();
+
+    private boolean idle = false;
 
     /** The state that holds the currently assigned splits. */
     private ListState<SplitT> readerState;
@@ -503,11 +505,12 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
 
     private void emitLatestWatermark(long time) {
         checkState(currentMainOutput != null);
-        if (lastEmittedWatermark == Watermark.UNINITIALIZED.getTimestamp()) {
+        if (latestWatermark == Watermark.UNINITIALIZED.getTimestamp()) {
             return;
         }
         operatorEventGateway.sendEventToCoordinator(
-                new ReportedWatermarkEvent(lastEmittedWatermark));
+                new ReportedWatermarkEvent(
+                        idle ? Watermark.MAX_WATERMARK.getTimestamp() : latestWatermark));
     }
 
     @Override
@@ -601,9 +604,14 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
         sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
     }
 
+    @Override
+    public void updateIdle(boolean isIdle) {
+        this.idle = isIdle;
+    }
+
     @Override
     public void updateCurrentEffectiveWatermark(long watermark) {
-        lastEmittedWatermark = watermark;
+        latestWatermark = watermark;
         checkWatermarkAlignment();
     }
 
@@ -676,7 +684,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
     }
 
     private boolean shouldWaitForAlignment() {
-        return currentMaxDesiredWatermark < lastEmittedWatermark;
+        return currentMaxDesiredWatermark < latestWatermark;
     }
 
     private void registerReader() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
index 2448f6142ae..cd41ca9ecb4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
 import java.time.Duration;
 
@@ -46,11 +45,14 @@ public interface TimestampsAndWatermarks<T> {
     /** Lets the owner/creator of the output know about latest emitted watermark. */
     @Internal
     interface WatermarkUpdateListener {
+
+        /** It should be called once the idle is changed. */
+        void updateIdle(boolean isIdle);
+
         /**
-         * Effective watermark covers the {@link WatermarkStatus}. If an output becomes idle, this
-         * method should be called with {@link Long#MAX_VALUE}, but what is more important, once it
-         * becomes active again it should call this method with the last emitted value of the
-         * watermark.
+         * Update the effective watermark. If an output becomes idle, please call {@link
+         * this#updateIdle} instead of update the watermark to {@link Long#MAX_VALUE}. Because the
+         * output needs to distinguish between idle and real watermark.
          */
         void updateCurrentEffectiveWatermark(long watermark);
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
index 040b7940e62..d4d81b64f45 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
@@ -45,6 +45,9 @@ public final class WatermarkToDataOutput implements WatermarkOutput {
         this(
                 output,
                 new TimestampsAndWatermarks.WatermarkUpdateListener() {
+                    @Override
+                    public void updateIdle(boolean isIdle) {}
+
                     @Override
                     public void updateCurrentEffectiveWatermark(long watermark) {}
 
@@ -92,7 +95,7 @@ public final class WatermarkToDataOutput implements WatermarkOutput {
 
         try {
             output.emitWatermarkStatus(WatermarkStatus.IDLE);
-            watermarkEmitted.updateCurrentEffectiveWatermark(Long.MAX_VALUE);
+            watermarkEmitted.updateIdle(true);
             isIdle = true;
         } catch (ExceptionInChainedOperatorException e) {
             throw e;
@@ -118,7 +121,7 @@ public final class WatermarkToDataOutput implements WatermarkOutput {
         }
 
         output.emitWatermarkStatus(WatermarkStatus.ACTIVE);
-        watermarkEmitted.updateCurrentEffectiveWatermark(maxWatermarkSoFar);
+        watermarkEmitted.updateIdle(false);
         isIdle = false;
         return false;
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
index ce7637ceb25..dc029c272d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
@@ -36,6 +36,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.annotation.Nullable;
 
@@ -124,8 +126,9 @@ class SourceOperatorAlignmentTest {
         assertThat(operator.isAvailable()).isFalse();
     }
 
-    @Test
-    void testWatermarkAlignmentWithIdleness() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testWatermarkAlignmentWithIdleness(boolean allSubtasksIdle) throws Exception {
         // we use a separate context, because we need to enable idleness
         try (SourceOperatorTestContext context =
                 new SourceOperatorTestContext(
@@ -165,9 +168,16 @@ class SourceOperatorAlignmentTest {
                     .isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
-            // If all source subtasks of the watermark group are idle,
-            // then the coordinator will report Long.MAX_VALUE
-            operator.handleOperatorEvent(new WatermarkAlignmentEvent(Long.MAX_VALUE));
+
+            if (allSubtasksIdle) {
+                // If all source subtasks of the watermark group are idle,
+                // then the coordinator will report Long.MAX_VALUE
+                operator.handleOperatorEvent(
+                        new WatermarkAlignmentEvent(Watermark.MAX_WATERMARK.getTimestamp()));
+            } else {
+                // Other subtasks are not idle, so the watermark is increasing.
+                operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 150));
+            }
 
             // it is easier to create a new split than add records the old one. The old one is
             // serialized, when sending the AddSplitEvent, so it is not as easy as