You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/21 04:56:19 UTC

kafka git commit: KAFKA-4848: Fix retryWithBackoff deadlock issue

Repository: kafka
Updated Branches:
  refs/heads/trunk 5a2fcdd6d -> 197a5d5a6


KAFKA-4848: Fix retryWithBackoff deadlock issue

Fixes related to handling of MAX_POLL_INTERVAL_MS_CONFIG during deadlock and CommitFailedException on partition revoked.

Author: Sachin Mittal <sj...@gmail.com>

Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang

Closes #2642 from sjmittal/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/197a5d5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/197a5d5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/197a5d5a

Branch: refs/heads/trunk
Commit: 197a5d5a6d160e9f3f1642caf85bad968a35f109
Parents: 5a2fcdd
Author: Sachin Mittal <sj...@gmail.com>
Authored: Mon Mar 20 21:56:15 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Mar 20 21:56:15 2017 -0700

----------------------------------------------------------------------
 clients/.gitignore                              |  1 +
 connect/api/.gitignore                          |  1 +
 connect/json/.gitignore                         |  1 +
 core/.gitignore                                 |  3 +++
 streams/.gitignore                              |  1 +
 .../org/apache/kafka/streams/StreamsConfig.java |  1 +
 .../processor/internals/StreamThread.java       | 22 +++++++++++++-------
 7 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/clients/.gitignore
----------------------------------------------------------------------
diff --git a/clients/.gitignore b/clients/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/clients/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/connect/api/.gitignore
----------------------------------------------------------------------
diff --git a/connect/api/.gitignore b/connect/api/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/connect/api/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/connect/json/.gitignore
----------------------------------------------------------------------
diff --git a/connect/json/.gitignore b/connect/json/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/connect/json/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/core/.gitignore
----------------------------------------------------------------------
diff --git a/core/.gitignore b/core/.gitignore
new file mode 100644
index 0000000..0d7e8b0
--- /dev/null
+++ b/core/.gitignore
@@ -0,0 +1,3 @@
+.cache-main
+.cache-tests
+/bin/

http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/streams/.gitignore
----------------------------------------------------------------------
diff --git a/streams/.gitignore b/streams/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/streams/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0eb3f7b..d2ba063 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -401,6 +401,7 @@ public class StreamsConfig extends AbstractConfig {
         tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
         tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
 
         CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 6a6b508..7cd4b93 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -26,6 +26,8 @@ import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -197,6 +199,7 @@ public class StreamThread extends Thread {
     private final Map<TaskId, StreamTask> suspendedTasks;
     private final Map<TaskId, StandbyTask> suspendedStandbyTasks;
     private final Time time;
+    private final int rebalanceTimeoutMs;
     private final long pollTimeMs;
     private final long cleanTimeMs;
     private final long commitTimeMs;
@@ -290,6 +293,8 @@ public class StreamThread extends Thread {
         this.standbyRecords = new HashMap<>();
 
         this.stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
+        final Object maxPollInterval = consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
+        this.rebalanceTimeoutMs =  (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT);
         this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
         this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
@@ -855,7 +860,7 @@ public class StreamThread extends Thread {
         }
     }
 
-    private void addStreamTasks(Collection<TopicPartition> assignment) {
+    private void addStreamTasks(Collection<TopicPartition> assignment, final long start) {
         if (partitionAssignor == null)
             throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen.");
 
@@ -893,7 +898,7 @@ public class StreamThread extends Thread {
 
         // create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
         // -> other thread will call removeSuspendedTasks(); eventually
-        taskCreator.retryWithBackoff(newTasks);
+        taskCreator.retryWithBackoff(newTasks, start);
     }
 
     StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
@@ -910,7 +915,7 @@ public class StreamThread extends Thread {
         }
     }
 
-    private void addStandbyTasks() {
+    private void addStandbyTasks(final long start) {
         if (partitionAssignor == null)
             throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding standby tasks: this should not happen.");
 
@@ -937,7 +942,7 @@ public class StreamThread extends Thread {
 
         // create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry)
         // -> other thread will call removeSuspendedStandbyTasks(); eventually
-        new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks);
+        new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks, start);
 
         restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet()));
 
@@ -1126,7 +1131,7 @@ public class StreamThread extends Thread {
     }
 
     abstract class AbstractTaskCreator {
-        void retryWithBackoff(final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
+        void retryWithBackoff(final Map<TaskId, Set<TopicPartition>> tasksToBeCreated, final long start) {
             long backoffTimeMs = 50L;
             while (true) {
                 final Iterator<Map.Entry<TaskId, Set<TopicPartition>>> it = tasksToBeCreated.entrySet().iterator();
@@ -1138,13 +1143,14 @@ public class StreamThread extends Thread {
                     try {
                         createTask(taskId, partitions);
                         it.remove();
+                        backoffTimeMs = 50L;
                     } catch (final LockException e) {
                         // ignore and retry
                         log.warn("Could not create task {}. Will retry.", taskId, e);
                     }
                 }
 
-                if (tasksToBeCreated.isEmpty()) {
+                if (tasksToBeCreated.isEmpty() || time.milliseconds() - start > rebalanceTimeoutMs) {
                     break;
                 }
 
@@ -1207,9 +1213,9 @@ public class StreamThread extends Thread {
                 // will become active or vice versa
                 closeNonAssignedSuspendedStandbyTasks();
                 closeNonAssignedSuspendedTasks();
-                addStreamTasks(assignment);
+                addStreamTasks(assignment, start);
                 storeChangelogReader.restore();
-                addStandbyTasks();
+                addStandbyTasks(start);
                 streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata());
                 lastCleanMs = time.milliseconds(); // start the cleaning cycle
                 setStateWhenNotInPendingShutdown(State.RUNNING);