You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/21 04:56:19 UTC
kafka git commit: KAFKA-4848: Fix retryWithBackoff deadlock issue
Repository: kafka
Updated Branches:
refs/heads/trunk 5a2fcdd6d -> 197a5d5a6
KAFKA-4848: Fix retryWithBackoff deadlock issue
Fixes related to handling of MAX_POLL_INTERVAL_MS_CONFIG during deadlock and CommitFailedException on partition revoked.
Author: Sachin Mittal <sj...@gmail.com>
Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang
Closes #2642 from sjmittal/trunk
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/197a5d5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/197a5d5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/197a5d5a
Branch: refs/heads/trunk
Commit: 197a5d5a6d160e9f3f1642caf85bad968a35f109
Parents: 5a2fcdd
Author: Sachin Mittal <sj...@gmail.com>
Authored: Mon Mar 20 21:56:15 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Mar 20 21:56:15 2017 -0700
----------------------------------------------------------------------
clients/.gitignore | 1 +
connect/api/.gitignore | 1 +
connect/json/.gitignore | 1 +
core/.gitignore | 3 +++
streams/.gitignore | 1 +
.../org/apache/kafka/streams/StreamsConfig.java | 1 +
.../processor/internals/StreamThread.java | 22 +++++++++++++-------
7 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/clients/.gitignore
----------------------------------------------------------------------
diff --git a/clients/.gitignore b/clients/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/clients/.gitignore
@@ -0,0 +1 @@
+/bin/
http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/connect/api/.gitignore
----------------------------------------------------------------------
diff --git a/connect/api/.gitignore b/connect/api/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/connect/api/.gitignore
@@ -0,0 +1 @@
+/bin/
http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/connect/json/.gitignore
----------------------------------------------------------------------
diff --git a/connect/json/.gitignore b/connect/json/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/connect/json/.gitignore
@@ -0,0 +1 @@
+/bin/
http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/core/.gitignore
----------------------------------------------------------------------
diff --git a/core/.gitignore b/core/.gitignore
new file mode 100644
index 0000000..0d7e8b0
--- /dev/null
+++ b/core/.gitignore
@@ -0,0 +1,3 @@
+.cache-main
+.cache-tests
+/bin/
http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/streams/.gitignore
----------------------------------------------------------------------
diff --git a/streams/.gitignore b/streams/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/streams/.gitignore
@@ -0,0 +1 @@
+/bin/
http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0eb3f7b..d2ba063 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -401,6 +401,7 @@ public class StreamsConfig extends AbstractConfig {
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/197a5d5a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 6a6b508..7cd4b93 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -26,6 +26,8 @@ import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -197,6 +199,7 @@ public class StreamThread extends Thread {
private final Map<TaskId, StreamTask> suspendedTasks;
private final Map<TaskId, StandbyTask> suspendedStandbyTasks;
private final Time time;
+ private final int rebalanceTimeoutMs;
private final long pollTimeMs;
private final long cleanTimeMs;
private final long commitTimeMs;
@@ -290,6 +293,8 @@ public class StreamThread extends Thread {
this.standbyRecords = new HashMap<>();
this.stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
+ final Object maxPollInterval = consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
+ this.rebalanceTimeoutMs = (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT);
this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
@@ -855,7 +860,7 @@ public class StreamThread extends Thread {
}
}
- private void addStreamTasks(Collection<TopicPartition> assignment) {
+ private void addStreamTasks(Collection<TopicPartition> assignment, final long start) {
if (partitionAssignor == null)
throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen.");
@@ -893,7 +898,7 @@ public class StreamThread extends Thread {
// create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedTasks(); eventually
- taskCreator.retryWithBackoff(newTasks);
+ taskCreator.retryWithBackoff(newTasks, start);
}
StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
@@ -910,7 +915,7 @@ public class StreamThread extends Thread {
}
}
- private void addStandbyTasks() {
+ private void addStandbyTasks(final long start) {
if (partitionAssignor == null)
throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding standby tasks: this should not happen.");
@@ -937,7 +942,7 @@ public class StreamThread extends Thread {
// create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedStandbyTasks(); eventually
- new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks);
+ new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks, start);
restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet()));
@@ -1126,7 +1131,7 @@ public class StreamThread extends Thread {
}
abstract class AbstractTaskCreator {
- void retryWithBackoff(final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
+ void retryWithBackoff(final Map<TaskId, Set<TopicPartition>> tasksToBeCreated, final long start) {
long backoffTimeMs = 50L;
while (true) {
final Iterator<Map.Entry<TaskId, Set<TopicPartition>>> it = tasksToBeCreated.entrySet().iterator();
@@ -1138,13 +1143,14 @@ public class StreamThread extends Thread {
try {
createTask(taskId, partitions);
it.remove();
+ backoffTimeMs = 50L;
} catch (final LockException e) {
// ignore and retry
log.warn("Could not create task {}. Will retry.", taskId, e);
}
}
- if (tasksToBeCreated.isEmpty()) {
+ if (tasksToBeCreated.isEmpty() || time.milliseconds() - start > rebalanceTimeoutMs) {
break;
}
@@ -1207,9 +1213,9 @@ public class StreamThread extends Thread {
// will become active or vice versa
closeNonAssignedSuspendedStandbyTasks();
closeNonAssignedSuspendedTasks();
- addStreamTasks(assignment);
+ addStreamTasks(assignment, start);
storeChangelogReader.restore();
- addStandbyTasks();
+ addStandbyTasks(start);
streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata());
lastCleanMs = time.milliseconds(); // start the cleaning cycle
setStateWhenNotInPendingShutdown(State.RUNNING);