You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/10 18:14:07 UTC
flink git commit: [FLINK-9691] [kinesis] Modify runloop to try to
track a particular getRecords() frequency.
Repository: flink
Updated Branches:
refs/heads/release-1.5 a0a810720 -> 62839e88e
[FLINK-9691] [kinesis] Modify runloop to try to track a particular getRecords() frequency.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62839e88
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62839e88
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62839e88
Branch: refs/heads/release-1.5
Commit: 62839e88e15b338a8af9afcef698c38a194c592f
Parents: a0a8107
Author: Jamie Grier <jg...@lyft.com>
Authored: Mon Jul 9 14:20:47 2018 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 10 18:34:15 2018 +0200
----------------------------------------------------------------------
.../connectors/kinesis/internals/ShardConsumer.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/62839e88/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 0d730af..30f0016 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -199,6 +199,7 @@ public class ShardConsumer<T> implements Runnable {
}
}
+ long lastTimeNanos = 0;
while (isRunning()) {
if (nextShardItr == null) {
fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
@@ -207,7 +208,12 @@ public class ShardConsumer<T> implements Runnable {
break;
} else {
if (fetchIntervalMillis != 0) {
- Thread.sleep(fetchIntervalMillis);
+ long elapsedTimeNanos = System.nanoTime() - lastTimeNanos;
+ long sleepTimeMillis = fetchIntervalMillis - (elapsedTimeNanos / 1_000_000);
+ if (sleepTimeMillis > 0) {
+ Thread.sleep(sleepTimeMillis);
+ }
+ lastTimeNanos = System.nanoTime();
}
GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);