You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2019/08/27 23:19:49 UTC

[flink] branch release-1.9 updated: [hotfix][kinesis] Update emit record javadoc and don't count max watermark as timeout

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new c905a4d  [hotfix][kinesis] Update emit record javadoc and don't count max watermark as timeout
c905a4d is described below

commit c905a4d323c0ed4985cdb9e5764efe13bc6183d0
Author: Thomas Weise <th...@apache.org>
AuthorDate: Mon Aug 26 15:02:40 2019 -0700

    [hotfix][kinesis] Update emit record javadoc and don't count max watermark as timeout
---
 .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java   | 5 +++--
 .../connectors/kinesis/util/JobManagerWatermarkTracker.java          | 4 +++-
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index f38e6eb..80b724b 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -715,7 +715,7 @@ public class KinesisDataFetcher<T> {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Atomic operation to collect a record and update state to the sequence number of the record.
+	 * Prepare a record and hand it over to the {@link RecordEmitter}, which may collect it asynchronously.
 	 * This method is called by {@link ShardConsumer}s.
 	 *
 	 * @param record the record to collect
@@ -752,7 +752,8 @@ public class KinesisDataFetcher<T> {
 	}
 
 	/**
-	 * Actual record emission called from the record emitter.
+	 * Atomic operation to collect a record and update state to the sequence number of the record.
+	 * This method is called from the record emitter.
 	 *
 	 * <p>Responsible for tracking per shard watermarks and emit timestamps extracted from
 	 * the record, when a watermark assigner was configured.
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
index f150bb0..1581024 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
@@ -144,7 +144,9 @@ public class JobManagerWatermarkTracker extends WatermarkTracker {
 				WatermarkState ws = e.getValue();
 				if (ws.lastUpdated + updateTimeoutMillis < currentTime) {
 					// ignore outdated entry
-					updateTimeoutCount++;
+					if (ws.watermark < Long.MAX_VALUE) {
+						updateTimeoutCount++;
+					}
 					continue;
 				}
 				globalWatermark = Math.min(ws.watermark, globalWatermark);