You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2022/11/28 23:35:10 UTC
[flink] branch master updated: [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask (#20844)
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 61834da8298 [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask (#20844)
61834da8298 is described below
commit 61834da82984323484718e551c15e31ce023e026
Author: Seth Saperstein <99...@users.noreply.github.com>
AuthorDate: Mon Nov 28 15:35:01 2022 -0800
[FLINK-29099][connectors/kinesis] Update global watermark for idle subtask (#20844)
---
.../connectors/kinesis/internals/KinesisDataFetcher.java | 14 ++++++++++++--
.../kinesis/util/JobManagerWatermarkTracker.java | 5 +++++
.../kinesis/util/JobManagerWatermarkTrackerTest.java | 1 +
3 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 4fcc80a250e..d6c7b296da8 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -1174,6 +1174,14 @@ public class KinesisDataFetcher<T> {
}
}
+ LOG.debug(
+ "WatermarkEmitter subtask: {}, last watermark: {}, potential watermark: {}"
+ + ", potential next watermark: {}",
+ indexOfThisConsumerSubtask,
+ lastWatermark,
+ potentialWatermark,
+ potentialNextWatermark);
+
// advance watermark if possible (watermarks can only be ascending)
if (potentialWatermark == Long.MAX_VALUE) {
if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) {
@@ -1265,11 +1273,11 @@ public class KinesisDataFetcher<T> {
public void onProcessingTime(long timestamp) {
if (nextWatermark != Long.MIN_VALUE) {
long globalWatermark = lastGlobalWatermark;
- // TODO: refresh watermark while idle
if (!(isIdle && nextWatermark == propagatedLocalWatermark)) {
globalWatermark = watermarkTracker.updateWatermark(nextWatermark);
propagatedLocalWatermark = nextWatermark;
} else {
+ globalWatermark = watermarkTracker.updateWatermark(Long.MIN_VALUE);
LOG.info(
"WatermarkSyncCallback subtask: {} is idle",
indexOfThisConsumerSubtask);
@@ -1279,12 +1287,14 @@ public class KinesisDataFetcher<T> {
lastLogged = System.currentTimeMillis();
LOG.info(
"WatermarkSyncCallback subtask: {} local watermark: {}"
- + ", global watermark: {}, delta: {} timeouts: {}, emitter: {}",
+ + ", global watermark: {}, delta: {} timeouts: {}, idle: {}"
+ + ", emitter: {}",
indexOfThisConsumerSubtask,
nextWatermark,
globalWatermark,
nextWatermark - globalWatermark,
watermarkTracker.getUpdateTimeoutCount(),
+ isIdle,
recordEmitter.printInfo());
// Following is for debugging non-reproducible issue with stalled watermark
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
index b4c78438dca..51d055e2c96 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
@@ -129,6 +129,11 @@ public class JobManagerWatermarkTracker extends WatermarkTracker {
} catch (Exception e) {
throw new RuntimeException(e);
}
+ // no op to get global watermark without updating it
+ if (value.watermark == Long.MIN_VALUE) {
+ addCount--;
+ return accumulator;
+ }
WatermarkState ws = accumulator.get(value.id);
if (ws == null) {
accumulator.put(value.id, ws = new WatermarkState());
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java
index c5ea32aa2f5..5af64295812 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java
@@ -67,6 +67,7 @@ public class JobManagerWatermarkTrackerTest {
public void run(SourceContext<Integer> ctx) {
assertThat(tracker.updateWatermark(998)).isEqualTo(998);
assertThat(tracker.updateWatermark(999)).isEqualTo(999);
+ assertThat(tracker.updateWatermark(Long.MIN_VALUE)).isEqualTo(999);
}
@Override