You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2019/08/27 23:19:49 UTC
[flink] branch release-1.9 updated: [hotfix][kinesis] Update emit
record javadoc and don't count max watermark as timeout
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new c905a4d [hotfix][kinesis] Update emit record javadoc and don't count max watermark as timeout
c905a4d is described below
commit c905a4d323c0ed4985cdb9e5764efe13bc6183d0
Author: Thomas Weise <th...@apache.org>
AuthorDate: Mon Aug 26 15:02:40 2019 -0700
[hotfix][kinesis] Update emit record javadoc and don't count max watermark as timeout
---
.../streaming/connectors/kinesis/internals/KinesisDataFetcher.java | 5 +++--
.../connectors/kinesis/util/JobManagerWatermarkTracker.java | 4 +++-
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index f38e6eb..80b724b 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -715,7 +715,7 @@ public class KinesisDataFetcher<T> {
// ------------------------------------------------------------------------
/**
- * Atomic operation to collect a record and update state to the sequence number of the record.
+ * Prepare a record and hand it over to the {@link RecordEmitter}, which may collect it asynchronously.
* This method is called by {@link ShardConsumer}s.
*
* @param record the record to collect
@@ -752,7 +752,8 @@ public class KinesisDataFetcher<T> {
}
/**
- * Actual record emission called from the record emitter.
+ * Atomic operation to collect a record and update state to the sequence number of the record.
+ * This method is called from the record emitter.
*
* <p>Responsible for tracking per shard watermarks and emit timestamps extracted from
* the record, when a watermark assigner was configured.
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
index f150bb0..1581024 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
@@ -144,7 +144,9 @@ public class JobManagerWatermarkTracker extends WatermarkTracker {
WatermarkState ws = e.getValue();
if (ws.lastUpdated + updateTimeoutMillis < currentTime) {
// ignore outdated entry
- updateTimeoutCount++;
+ if (ws.watermark < Long.MAX_VALUE) {
+ updateTimeoutCount++;
+ }
continue;
}
globalWatermark = Math.min(ws.watermark, globalWatermark);