You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/09/02 20:46:18 UTC

[kafka] branch trunk updated: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state. (#11255)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c0b89d  KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state. (#11255)
3c0b89d is described below

commit 3c0b89d9df7a46cceeaa08a6efd25bd2e49dcaaf
Author: David Mao <47...@users.noreply.github.com>
AuthorDate: Thu Sep 2 15:44:12 2021 -0500

    KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state. (#11255)
    
    The controller can skip sending updateMetadataRequest during the broker failure callback if there are offline partitions and the deleted brokers don't host any partitions.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../scala/kafka/controller/KafkaController.scala   |  9 ++++---
 .../kafka/controller/PartitionStateMachine.scala   |  4 +--
 .../controller/ControllerIntegrationTest.scala     | 29 ++++++++++++++++++++++
 3 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 1d36f52..e99bf75 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -616,7 +616,7 @@ class KafkaController(val config: KafkaConfig,
     // trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas
     partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
-    partitionStateMachine.triggerOnlinePartitionStateChange()
+    val onlineStateChangeResults = partitionStateMachine.triggerOnlinePartitionStateChange()
     // trigger OfflineReplica state change for those newly offline replicas
     replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)
 
@@ -628,9 +628,10 @@ class KafkaController(val config: KafkaConfig,
       topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
     }
 
-    // If replica failure did not require leader re-election, inform brokers of the offline brokers
-    // Note that during leader re-election, brokers update their metadata
-    if (partitionsWithOfflineLeader.isEmpty) {
+    // If no partition has changed leader or ISR, no UpdateMetadataRequest is sent through PartitionStateMachine
+    // and ReplicaStateMachine. In that case, we want to send an UpdateMetadataRequest explicitly to
+    // propagate the information about the new offline brokers.
+    if (newOfflineReplicasNotForDeletion.isEmpty && onlineStateChangeResults.values.forall(_.isLeft)) {
       sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
     }
   }
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index e812a14..894d0e4 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -54,7 +54,7 @@ abstract class PartitionStateMachine(controllerContext: ControllerContext) exten
    * This API invokes the OnlinePartition state change on all partitions in either the NewPartition or OfflinePartition
    * state. This is called on a successful controller election and on broker changes
    */
-  def triggerOnlinePartitionStateChange(): Unit = {
+  def triggerOnlinePartitionStateChange(): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
     val partitions = controllerContext.partitionsInStates(Set(OfflinePartition, NewPartition))
     triggerOnlineStateChangeForPartitions(partitions)
   }
@@ -64,7 +64,7 @@ abstract class PartitionStateMachine(controllerContext: ControllerContext) exten
     triggerOnlineStateChangeForPartitions(partitions)
   }
 
-  private def triggerOnlineStateChangeForPartitions(partitions: collection.Set[TopicPartition]): Unit = {
+  private def triggerOnlineStateChangeForPartitions(partitions: collection.Set[TopicPartition]): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
     // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
     // that belong to topics to be deleted
     val partitionsToTrigger = partitions.filter { partition =>
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 815d0c6..bdc06a7 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -221,6 +221,35 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testMetadataPropagationOnBrokerShutdownWithNoReplicas(): Unit = {
+    servers = makeServers(3)
+    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val replicaBroker = servers.filter(e => e.config.brokerId != controllerId).head
+
+    val controllerBroker = servers.filter(e => e.config.brokerId == controllerId).head
+    val otherBroker = servers.filter(e => e.config.brokerId != controllerId &&
+      e.config.brokerId != replicaBroker.config.brokerId).head
+
+    val topic = "topic1"
+    val assignment = Map(0 -> Seq(replicaBroker.config.brokerId))
+
+    // Create topic
+    TestUtils.createTopic(zkClient, topic, assignment, servers)
+
+    // Shutdown the broker with replica
+    replicaBroker.shutdown()
+    replicaBroker.awaitShutdown()
+
+    // Shutdown the other broker
+    otherBroker.shutdown()
+    otherBroker.awaitShutdown()
+
+    // The controller should be the only alive broker
+    TestUtils.waitUntilBrokerMetadataIsPropagated(Seq(controllerBroker))
+  }
+
+  @Test
   def testTopicCreation(): Unit = {
     servers = makeServers(1)
     val tp = new TopicPartition("t", 0)