You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/08/07 22:34:41 UTC
[kafka] branch trunk updated: KAFKA-10371;
Partition reassignments can result in crashed ReplicaFetcherThreads
(#9140)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a4a3d70 KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads (#9140)
a4a3d70 is described below
commit a4a3d7064e16d4964a5b64a114579512c22ae6d2
Author: David Jacot <dj...@confluent.io>
AuthorDate: Sat Aug 8 00:30:00 2020 +0200
KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads (#9140)
The patch https://github.com/apache/kafka/pull/8672 introduced a bug leading to crashing the replica fetcher threads. The issue is that https://github.com/apache/kafka/pull/8672 deletes the Partitions prior to stopping the replica fetchers. As the replica fetchers relies access the Partition in the ReplicaManager, they crash with a NotLeaderOrFollowerException that is not handled.
This PR reverts the code to the original ordering to avoid this issue.
The regression was caught and validated by our system test: `kafkatest.tests.core.reassign_partitions_test`.
Reviewers: Vikas Singh <vi...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
.../main/scala/kafka/server/ReplicaManager.scala | 60 ++++++++++++----------
1 file changed, 34 insertions(+), 26 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index be73bf2..32c9aa4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -366,8 +366,7 @@ class ReplicaManager(val config: KafkaConfig,
} else {
this.controllerEpoch = controllerEpoch
- val stoppedPartitions = mutable.Set.empty[TopicPartition]
- val deletedPartitions = mutable.Set.empty[TopicPartition]
+ val stoppedPartitions = mutable.Map.empty[TopicPartition, StopReplicaPartitionState]
partitionStates.foreach { case (topicPartition, partitionState) =>
val deletePartition = partitionState.deletePartition
@@ -379,7 +378,7 @@ class ReplicaManager(val config: KafkaConfig,
"partition is in an offline log directory")
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
- case hostedPartition @ HostedPartition.Online(partition) =>
+ case HostedPartition.Online(partition) =>
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
// When a topic is deleted, the leader epoch is not incremented. To circumvent this,
@@ -389,22 +388,7 @@ class ReplicaManager(val config: KafkaConfig,
if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
requestLeaderEpoch > currentLeaderEpoch) {
- stoppedPartitions += topicPartition
-
- if (deletePartition) {
- if (allPartitions.remove(topicPartition, hostedPartition)) {
- maybeRemoveTopicMetrics(topicPartition.topic)
- // Logs are not deleted here. They are deleted in a single batch later on.
- // This is done to avoid having to checkpoint for every deletions.
- partition.delete()
- deletedPartitions += topicPartition
- }
- }
-
- // If we were the leader, we may have some operations still waiting for completion.
- // We force completion to prevent them from timing out.
- completeDelayedFetchOrProduceRequests(topicPartition)
-
+ stoppedPartitions += topicPartition -> partitionState
// Assume that everything will go right. It is overwritten in case of an error.
responseMap.put(topicPartition, Errors.NONE)
} else if (requestLeaderEpoch < currentLeaderEpoch) {
@@ -425,18 +409,42 @@ class ReplicaManager(val config: KafkaConfig,
case HostedPartition.None =>
// Delete log and corresponding folders in case replica manager doesn't hold them anymore.
// This could happen when topic is being deleted while broker is down and recovers.
- stoppedPartitions += topicPartition
- if (deletePartition)
- deletedPartitions += topicPartition
+ stoppedPartitions += topicPartition -> partitionState
responseMap.put(topicPartition, Errors.NONE)
}
}
- // First stop fetchers for all partitions, then stop the corresponding replicas
- replicaFetcherManager.removeFetcherForPartitions(stoppedPartitions)
- replicaAlterLogDirsManager.removeFetcherForPartitions(stoppedPartitions)
+ // First stop fetchers for all partitions.
+ val partitions = stoppedPartitions.keySet
+ replicaFetcherManager.removeFetcherForPartitions(partitions)
+ replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
+
+ // Second remove deleted partitions from the partition map. Fetchers rely on the
+ // ReplicaManager to get Partition's information so they must be stopped first.
+ val deletedPartitions = mutable.Set.empty[TopicPartition]
+ stoppedPartitions.foreach { case (topicPartition, partitionState) =>
+ if (partitionState.deletePartition) {
+ getPartition(topicPartition) match {
+ case hostedPartition@HostedPartition.Online(partition) =>
+ if (allPartitions.remove(topicPartition, hostedPartition)) {
+ maybeRemoveTopicMetrics(topicPartition.topic)
+ // Logs are not deleted here. They are deleted in a single batch later on.
+ // This is done to avoid having to checkpoint for every deletions.
+ partition.delete()
+ }
+
+ case _ =>
+ }
+
+ deletedPartitions += topicPartition
+ }
+
+ // If we were the leader, we may have some operations still waiting for completion.
+ // We force completion to prevent them from timing out.
+ completeDelayedFetchOrProduceRequests(topicPartition)
+ }
- // Delete the logs and checkpoint
+ // Third delete the logs and checkpoint.
logManager.asyncDelete(deletedPartitions, (topicPartition, exception) => {
exception match {
case e: KafkaStorageException =>