You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/14 16:20:49 UTC
kafka git commit: HOTFIX: Introduce max wait time for
retry-and-backoff while creating tasks
Repository: kafka
Updated Branches:
refs/heads/trunk 160933bc0 -> 65e36895f
HOTFIX: Introduce max wait time for retry-and-backoff while creating tasks
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #3327 from mjsax/hotfix-backoff-retry
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65e36895
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65e36895
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65e36895
Branch: refs/heads/trunk
Commit: 65e36895f5cf91afcf785857ebff4c0fce8d1d9c
Parents: 160933b
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Jun 14 09:20:48 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 14 09:20:48 2017 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/processor/internals/StreamThread.java | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/65e36895/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 39369c0..372f314 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -238,6 +238,7 @@ public class StreamThread extends Thread {
}
abstract class AbstractTaskCreator {
+ final static long MAX_BACKOFF_TIME_MS = 1000L;
void retryWithBackoff(final Map<TaskId, Set<TopicPartition>> tasksToBeCreated, final long start) {
long backoffTimeMs = 50L;
final Set<TaskId> retryingTasks = new HashSet<>();
@@ -272,6 +273,7 @@ public class StreamThread extends Thread {
try {
Thread.sleep(backoffTimeMs);
backoffTimeMs <<= 1;
+ backoffTimeMs = Math.min(backoffTimeMs, MAX_BACKOFF_TIME_MS);
} catch (final InterruptedException e) {
// ignore
}