You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/08/07 22:34:41 UTC

[kafka] branch trunk updated: KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads (#9140)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a4a3d70  KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads (#9140)
a4a3d70 is described below

commit a4a3d7064e16d4964a5b64a114579512c22ae6d2
Author: David Jacot <dj...@confluent.io>
AuthorDate: Sat Aug 8 00:30:00 2020 +0200

    KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads (#9140)
    
    The patch https://github.com/apache/kafka/pull/8672 introduced a bug leading to crashing the replica fetcher threads. The issue is that https://github.com/apache/kafka/pull/8672 deletes the Partitions prior to stopping the replica fetchers. As the replica fetchers relies access the Partition in the ReplicaManager, they crash with a NotLeaderOrFollowerException that is not handled.
    
    This PR reverts the code to the original ordering to avoid this issue.
    
    The regression was caught and validated by our system test: `kafkatest.tests.core.reassign_partitions_test`.
    
    Reviewers: Vikas Singh <vi...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/server/ReplicaManager.scala   | 60 ++++++++++++----------
 1 file changed, 34 insertions(+), 26 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index be73bf2..32c9aa4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -366,8 +366,7 @@ class ReplicaManager(val config: KafkaConfig,
       } else {
         this.controllerEpoch = controllerEpoch
 
-        val stoppedPartitions = mutable.Set.empty[TopicPartition]
-        val deletedPartitions = mutable.Set.empty[TopicPartition]
+        val stoppedPartitions = mutable.Map.empty[TopicPartition, StopReplicaPartitionState]
         partitionStates.foreach { case (topicPartition, partitionState) =>
           val deletePartition = partitionState.deletePartition
 
@@ -379,7 +378,7 @@ class ReplicaManager(val config: KafkaConfig,
                 "partition is in an offline log directory")
               responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
 
-            case hostedPartition @ HostedPartition.Online(partition) =>
+            case HostedPartition.Online(partition) =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
               // When a topic is deleted, the leader epoch is not incremented. To circumvent this,
@@ -389,22 +388,7 @@ class ReplicaManager(val config: KafkaConfig,
               if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
                   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
                   requestLeaderEpoch > currentLeaderEpoch) {
-                stoppedPartitions += topicPartition
-
-                if (deletePartition) {
-                  if (allPartitions.remove(topicPartition, hostedPartition)) {
-                    maybeRemoveTopicMetrics(topicPartition.topic)
-                    // Logs are not deleted here. They are deleted in a single batch later on.
-                    // This is done to avoid having to checkpoint for every deletions.
-                    partition.delete()
-                    deletedPartitions += topicPartition
-                  }
-                }
-
-                // If we were the leader, we may have some operations still waiting for completion.
-                // We force completion to prevent them from timing out.
-                completeDelayedFetchOrProduceRequests(topicPartition)
-
+                stoppedPartitions += topicPartition -> partitionState
                 // Assume that everything will go right. It is overwritten in case of an error.
                 responseMap.put(topicPartition, Errors.NONE)
               } else if (requestLeaderEpoch < currentLeaderEpoch) {
@@ -425,18 +409,42 @@ class ReplicaManager(val config: KafkaConfig,
             case HostedPartition.None =>
               // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
               // This could happen when topic is being deleted while broker is down and recovers.
-              stoppedPartitions += topicPartition
-              if (deletePartition)
-                deletedPartitions += topicPartition
+              stoppedPartitions += topicPartition -> partitionState
               responseMap.put(topicPartition, Errors.NONE)
           }
         }
 
-        // First stop fetchers for all partitions, then stop the corresponding replicas
-        replicaFetcherManager.removeFetcherForPartitions(stoppedPartitions)
-        replicaAlterLogDirsManager.removeFetcherForPartitions(stoppedPartitions)
+        // First stop fetchers for all partitions.
+        val partitions = stoppedPartitions.keySet
+        replicaFetcherManager.removeFetcherForPartitions(partitions)
+        replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
+
+        // Second remove deleted partitions from the partition map. Fetchers rely on the
+        // ReplicaManager to get Partition's information so they must be stopped first.
+        val deletedPartitions = mutable.Set.empty[TopicPartition]
+        stoppedPartitions.foreach { case (topicPartition, partitionState) =>
+          if (partitionState.deletePartition) {
+            getPartition(topicPartition) match {
+              case hostedPartition@HostedPartition.Online(partition) =>
+                if (allPartitions.remove(topicPartition, hostedPartition)) {
+                  maybeRemoveTopicMetrics(topicPartition.topic)
+                  // Logs are not deleted here. They are deleted in a single batch later on.
+                  // This is done to avoid having to checkpoint for every deletions.
+                  partition.delete()
+                }
+
+              case _ =>
+            }
+
+            deletedPartitions += topicPartition
+          }
+
+          // If we were the leader, we may have some operations still waiting for completion.
+          // We force completion to prevent them from timing out.
+          completeDelayedFetchOrProduceRequests(topicPartition)
+        }
 
-        // Delete the logs and checkpoint
+        // Third delete the logs and checkpoint.
         logManager.asyncDelete(deletedPartitions, (topicPartition, exception) => {
           exception match {
             case e: KafkaStorageException =>