You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2022/11/28 23:35:10 UTC

[flink] branch master updated: [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask (#20844)

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 61834da8298 [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask (#20844)
61834da8298 is described below

commit 61834da82984323484718e551c15e31ce023e026
Author: Seth Saperstein <99...@users.noreply.github.com>
AuthorDate: Mon Nov 28 15:35:01 2022 -0800

    [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask (#20844)
---
 .../connectors/kinesis/internals/KinesisDataFetcher.java   | 14 ++++++++++++--
 .../kinesis/util/JobManagerWatermarkTracker.java           |  5 +++++
 .../kinesis/util/JobManagerWatermarkTrackerTest.java       |  1 +
 3 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 4fcc80a250e..d6c7b296da8 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -1174,6 +1174,14 @@ public class KinesisDataFetcher<T> {
             }
         }
 
+        LOG.debug(
+                "WatermarkEmitter subtask: {}, last watermark: {}, potential watermark: {}"
+                        + ", potential next watermark: {}",
+                indexOfThisConsumerSubtask,
+                lastWatermark,
+                potentialWatermark,
+                potentialNextWatermark);
+
         // advance watermark if possible (watermarks can only be ascending)
         if (potentialWatermark == Long.MAX_VALUE) {
             if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) {
@@ -1265,11 +1273,11 @@ public class KinesisDataFetcher<T> {
         public void onProcessingTime(long timestamp) {
             if (nextWatermark != Long.MIN_VALUE) {
                 long globalWatermark = lastGlobalWatermark;
-                // TODO: refresh watermark while idle
                 if (!(isIdle && nextWatermark == propagatedLocalWatermark)) {
                     globalWatermark = watermarkTracker.updateWatermark(nextWatermark);
                     propagatedLocalWatermark = nextWatermark;
                 } else {
+                    globalWatermark = watermarkTracker.updateWatermark(Long.MIN_VALUE);
                     LOG.info(
                             "WatermarkSyncCallback subtask: {} is idle",
                             indexOfThisConsumerSubtask);
@@ -1279,12 +1287,14 @@ public class KinesisDataFetcher<T> {
                     lastLogged = System.currentTimeMillis();
                     LOG.info(
                             "WatermarkSyncCallback subtask: {} local watermark: {}"
-                                    + ", global watermark: {}, delta: {} timeouts: {}, emitter: {}",
+                                    + ", global watermark: {}, delta: {} timeouts: {}, idle: {}"
+                                    + ", emitter: {}",
                             indexOfThisConsumerSubtask,
                             nextWatermark,
                             globalWatermark,
                             nextWatermark - globalWatermark,
                             watermarkTracker.getUpdateTimeoutCount(),
+                            isIdle,
                             recordEmitter.printInfo());
 
                     // Following is for debugging non-reproducible issue with stalled watermark
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
index b4c78438dca..51d055e2c96 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
@@ -129,6 +129,11 @@ public class JobManagerWatermarkTracker extends WatermarkTracker {
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
+            // no op to get global watermark without updating it
+            if (value.watermark == Long.MIN_VALUE) {
+                addCount--;
+                return accumulator;
+            }
             WatermarkState ws = accumulator.get(value.id);
             if (ws == null) {
                 accumulator.put(value.id, ws = new WatermarkState());
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java
index c5ea32aa2f5..5af64295812 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java
@@ -67,6 +67,7 @@ public class JobManagerWatermarkTrackerTest {
         public void run(SourceContext<Integer> ctx) {
             assertThat(tracker.updateWatermark(998)).isEqualTo(998);
             assertThat(tracker.updateWatermark(999)).isEqualTo(999);
+            assertThat(tracker.updateWatermark(Long.MIN_VALUE)).isEqualTo(999);
         }
 
         @Override