You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/14 16:20:49 UTC

kafka git commit: HOTFIX: Introduce max wait time for retry-and-backoff while creating tasks

Repository: kafka
Updated Branches:
  refs/heads/trunk 160933bc0 -> 65e36895f


HOTFIX: Introduce max wait time for retry-and-backoff while creating tasks

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #3327 from mjsax/hotfix-backoff-retry


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65e36895
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65e36895
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65e36895

Branch: refs/heads/trunk
Commit: 65e36895f5cf91afcf785857ebff4c0fce8d1d9c
Parents: 160933b
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Jun 14 09:20:48 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 14 09:20:48 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/processor/internals/StreamThread.java | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/65e36895/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 39369c0..372f314 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -238,6 +238,7 @@ public class StreamThread extends Thread {
     }
 
     abstract class AbstractTaskCreator {
+        final static long MAX_BACKOFF_TIME_MS = 1000L;
         void retryWithBackoff(final Map<TaskId, Set<TopicPartition>> tasksToBeCreated, final long start) {
             long backoffTimeMs = 50L;
             final Set<TaskId> retryingTasks = new HashSet<>();
@@ -272,6 +273,7 @@ public class StreamThread extends Thread {
                 try {
                     Thread.sleep(backoffTimeMs);
                     backoffTimeMs <<= 1;
+                    backoffTimeMs = Math.min(backoffTimeMs, MAX_BACKOFF_TIME_MS);
                 } catch (final InterruptedException e) {
                     // ignore
                 }