You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/10 18:14:07 UTC

flink git commit: [FLINK-9691] [kinesis] Modify runloop to try to track a particular getRecords() frequency.

Repository: flink
Updated Branches:
  refs/heads/release-1.5 a0a810720 -> 62839e88e


[FLINK-9691] [kinesis] Modify runloop to try to track a particular getRecords() frequency.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62839e88
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62839e88
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62839e88

Branch: refs/heads/release-1.5
Commit: 62839e88e15b338a8af9afcef698c38a194c592f
Parents: a0a8107
Author: Jamie Grier <jg...@lyft.com>
Authored: Mon Jul 9 14:20:47 2018 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 10 18:34:15 2018 +0200

----------------------------------------------------------------------
 .../connectors/kinesis/internals/ShardConsumer.java          | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62839e88/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 0d730af..30f0016 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -199,6 +199,7 @@ public class ShardConsumer<T> implements Runnable {
 				}
 			}
 
+			long lastTimeNanos = 0;
 			while (isRunning()) {
 				if (nextShardItr == null) {
 					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
@@ -207,7 +208,12 @@ public class ShardConsumer<T> implements Runnable {
 					break;
 				} else {
 					if (fetchIntervalMillis != 0) {
-						Thread.sleep(fetchIntervalMillis);
+						long elapsedTimeNanos = System.nanoTime() - lastTimeNanos;
+						long sleepTimeMillis = fetchIntervalMillis - (elapsedTimeNanos / 1_000_000);
+						if (sleepTimeMillis > 0) {
+							Thread.sleep(sleepTimeMillis);
+						}
+						lastTimeNanos = System.nanoTime();
 					}
 
 					GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);