You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fa...@apache.org on 2023/07/07 08:49:09 UTC
[flink] 02/02: [FLINK-32496][connectors/common] Fix the bug that source cannot resume after enabling the watermark alignment and idleness
This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0e69a7b7d7684be4fe797ad4ff2d203f7da3f577
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sat Jul 1 13:49:22 2023 +0800
[FLINK-32496][connectors/common] Fix the bug that source cannot resume after enabling the watermark alignment and idleness
---
.../streaming/api/operators/SourceOperator.java | 18 +++++++++++++-----
.../operators/source/TimestampsAndWatermarks.java | 12 +++++++-----
.../api/operators/source/WatermarkToDataOutput.java | 7 +++++--
.../api/operators/SourceOperatorAlignmentTest.java | 20 +++++++++++++++-----
4 files changed, 40 insertions(+), 17 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 192e2decee6..c4f624e443c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -146,7 +146,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
private DataOutput<OUT> lastInvokedOutput;
- private long lastEmittedWatermark = Watermark.UNINITIALIZED.getTimestamp();
+ private long latestWatermark = Watermark.UNINITIALIZED.getTimestamp();
+
+ private boolean idle = false;
/** The state that holds the currently assigned splits. */
private ListState<SplitT> readerState;
@@ -503,11 +505,12 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
private void emitLatestWatermark(long time) {
checkState(currentMainOutput != null);
- if (lastEmittedWatermark == Watermark.UNINITIALIZED.getTimestamp()) {
+ if (latestWatermark == Watermark.UNINITIALIZED.getTimestamp()) {
return;
}
operatorEventGateway.sendEventToCoordinator(
- new ReportedWatermarkEvent(lastEmittedWatermark));
+ new ReportedWatermarkEvent(
+ idle ? Watermark.MAX_WATERMARK.getTimestamp() : latestWatermark));
}
@Override
@@ -601,9 +604,14 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
}
+ @Override
+ public void updateIdle(boolean isIdle) {
+ this.idle = isIdle;
+ }
+
@Override
public void updateCurrentEffectiveWatermark(long watermark) {
- lastEmittedWatermark = watermark;
+ latestWatermark = watermark;
checkWatermarkAlignment();
}
@@ -676,7 +684,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
}
private boolean shouldWaitForAlignment() {
- return currentMaxDesiredWatermark < lastEmittedWatermark;
+ return currentMaxDesiredWatermark < latestWatermark;
}
private void registerReader() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
index 2448f6142ae..cd41ca9ecb4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import java.time.Duration;
@@ -46,11 +45,14 @@ public interface TimestampsAndWatermarks<T> {
/** Lets the owner/creator of the output know about latest emitted watermark. */
@Internal
interface WatermarkUpdateListener {
+
+ /** It should be called once the idle is changed. */
+ void updateIdle(boolean isIdle);
+
/**
- * Effective watermark covers the {@link WatermarkStatus}. If an output becomes idle, this
- * method should be called with {@link Long#MAX_VALUE}, but what is more important, once it
- * becomes active again it should call this method with the last emitted value of the
- * watermark.
+ * Update the effective watermark. If an output becomes idle, please call {@link
+ * this#updateIdle} instead of update the watermark to {@link Long#MAX_VALUE}. Because the
+ * output needs to distinguish between idle and real watermark.
*/
void updateCurrentEffectiveWatermark(long watermark);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
index 040b7940e62..d4d81b64f45 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
@@ -45,6 +45,9 @@ public final class WatermarkToDataOutput implements WatermarkOutput {
this(
output,
new TimestampsAndWatermarks.WatermarkUpdateListener() {
+ @Override
+ public void updateIdle(boolean isIdle) {}
+
@Override
public void updateCurrentEffectiveWatermark(long watermark) {}
@@ -92,7 +95,7 @@ public final class WatermarkToDataOutput implements WatermarkOutput {
try {
output.emitWatermarkStatus(WatermarkStatus.IDLE);
- watermarkEmitted.updateCurrentEffectiveWatermark(Long.MAX_VALUE);
+ watermarkEmitted.updateIdle(true);
isIdle = true;
} catch (ExceptionInChainedOperatorException e) {
throw e;
@@ -118,7 +121,7 @@ public final class WatermarkToDataOutput implements WatermarkOutput {
}
output.emitWatermarkStatus(WatermarkStatus.ACTIVE);
- watermarkEmitted.updateCurrentEffectiveWatermark(maxWatermarkSoFar);
+ watermarkEmitted.updateIdle(false);
isIdle = false;
return false;
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
index ce7637ceb25..dc029c272d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
@@ -36,6 +36,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import javax.annotation.Nullable;
@@ -124,8 +126,9 @@ class SourceOperatorAlignmentTest {
assertThat(operator.isAvailable()).isFalse();
}
- @Test
- void testWatermarkAlignmentWithIdleness() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testWatermarkAlignmentWithIdleness(boolean allSubtasksIdle) throws Exception {
// we use a separate context, because we need to enable idleness
try (SourceOperatorTestContext context =
new SourceOperatorTestContext(
@@ -165,9 +168,16 @@ class SourceOperatorAlignmentTest {
.isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
context.getTimeService().advance(1);
assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
- // If all source subtasks of the watermark group are idle,
- // then the coordinator will report Long.MAX_VALUE
- operator.handleOperatorEvent(new WatermarkAlignmentEvent(Long.MAX_VALUE));
+
+ if (allSubtasksIdle) {
+ // If all source subtasks of the watermark group are idle,
+ // then the coordinator will report Long.MAX_VALUE
+ operator.handleOperatorEvent(
+ new WatermarkAlignmentEvent(Watermark.MAX_WATERMARK.getTimestamp()));
+ } else {
+ // Other subtasks are not idle, so the watermark is increasing.
+ operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 150));
+ }
// it is easier to create a new split than add records the old one. The old one is
// serialized, when sending the AddSplitEvent, so it is not as easy as