You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/11/06 23:29:07 UTC

[kafka] branch trunk updated: KAFKA-7537: Avoid sending full UpdateMetadataRequest to existing brokers in the cluster on broker changes to reduce controller memory footprint (#5869)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7b5ffa0   KAFKA-7537: Avoid sending full UpdateMetadataRequest to existing brokers in the cluster on broker changes to reduce controller memory footprint (#5869)
7b5ffa0 is described below

commit 7b5ffa0a070065e5e8320f481bbd8a3a26378f91
Author: Zhanxiang (Patrick) Huang <hz...@hotmail.com>
AuthorDate: Tue Nov 6 15:28:53 2018 -0800

     KAFKA-7537: Avoid sending full UpdateMetadataRequest to existing brokers in the cluster on broker changes to reduce controller memory footprint (#5869)
    
    This PR avoids sending out full UpdateMetadataReuqest in the following scenarios:
    
    1. On broker startup, send out full UpdateMetadataRequest to newly added brokers and only send out UpdateMetadataReuqest with empty partition states to existing brokers.
    2. On broker failure, if it doesn't require leader election, only include the states of partitions that are hosted by the dead broker(s) in the UpdateMetadataReuqest instead of including all partition states.
    
    This PR also introduces a minor optimization in the MetadataCache update to avoid copying the previous partition states upon receiving UpdateMetadataRequest with no partition states.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../controller/ControllerChannelManager.scala      |  7 +--
 .../scala/kafka/controller/KafkaController.scala   | 21 +++++----
 .../main/scala/kafka/server/MetadataCache.scala    | 44 ++++++++++--------
 .../controller/ControllerIntegrationTest.scala     | 53 ++++++++++++++++++++++
 4 files changed, 90 insertions(+), 35 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 85da8b8..a11f553 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -383,13 +383,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
       }
     }
 
-    val givenPartitions = if (partitions.isEmpty)
-      controllerContext.partitionLeadershipInfo.keySet
-    else
-      partitions
-
     updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
-    givenPartitions.foreach(partition => updateMetadataRequestPartitionInfo(partition,
+    partitions.foreach(partition => updateMetadataRequestPartitionInfo(partition,
       beingDeleted = controller.topicDeletionManager.topicsToBeDeleted.contains(partition.topic)))
   }
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 740ab7f..a52f3f0 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -254,7 +254,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
     // partitionStateMachine.startup().
     info("Sending update metadata request")
-    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
 
     replicaStateMachine.startup()
     partitionStateMachine.startup()
@@ -357,11 +357,14 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     info(s"New broker startup callback for ${newBrokers.mkString(",")}")
     newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
     val newBrokersSet = newBrokers.toSet
-    // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new
-    // broker via this update.
+    val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds -- newBrokers
+    // Send update metadata request to all the existing brokers in the cluster so that they know about the new brokers
+    // via this update. No need to include any partition states in the request since there are no partition state changes.
+    sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)
+    // Send update metadata request to all the new brokers in the cluster with a full set of partition states for initialization.
     // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the
-    // common controlled shutdown case, the metadata will reach the new brokers faster
-    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+    // common controlled shutdown case, the metadata will reach the new brokers faster.
+    sendUpdateMetadataRequest(newBrokers, controllerContext.partitionLeadershipInfo.keySet)
     // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
     // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
     val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
@@ -421,7 +424,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   private def onBrokerUpdate(updatedBrokerId: Int) {
     info(s"Broker info update callback for $updatedBrokerId")
-    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
   }
 
   /**
@@ -458,10 +461,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
     }
 
-    // If replica failure did not require leader re-election, inform brokers of the offline replica
+    // If replica failure did not require leader re-election, inform brokers of the offline brokers
     // Note that during leader re-election, brokers update their metadata
     if (partitionsWithoutLeader.isEmpty) {
-      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
     }
   }
 
@@ -887,7 +890,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    *
    * @param brokers The brokers that the update metadata request should be sent to
    */
-  private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition] = Set.empty[TopicPartition]) {
+  private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]) {
     try {
       brokerRequestBatch.newBatch()
       brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 3114663..3fefc7b 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -214,13 +214,6 @@ class MetadataCache(brokerId: Int) extends Logging {
   def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {
     inWriteLock(partitionMetadataLock) {
 
-      //since kafka may do partial metadata updates, we start by copying the previous state
-      val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size)
-      metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) =>
-        val copy = new mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size)
-        copy ++= oldPartitionStates
-        partitionStates += (topic -> copy)
-      }
       val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
       val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size)
       val controllerId = updateMetadataRequest.controllerId match {
@@ -248,21 +241,32 @@ class MetadataCache(brokerId: Int) extends Logging {
       }
 
       val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
-      updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
-        val controllerId = updateMetadataRequest.controllerId
-        val controllerEpoch = updateMetadataRequest.controllerEpoch
-        if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) {
-          removePartitionInfo(partitionStates, tp.topic, tp.partition)
-          stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " +
-            s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
-          deletedPartitions += tp
-        } else {
-          addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info)
-          stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " +
-            s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
+      if (updateMetadataRequest.partitionStates().isEmpty) {
+        metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerId, aliveBrokers, aliveNodes)
+      } else {
+        //since kafka may do partial metadata updates, we start by copying the previous state
+        val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size)
+        metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) =>
+          val copy = new mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size)
+          copy ++= oldPartitionStates
+          partitionStates += (topic -> copy)
+        }
+        updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
+          val controllerId = updateMetadataRequest.controllerId
+          val controllerEpoch = updateMetadataRequest.controllerEpoch
+          if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) {
+            removePartitionInfo(partitionStates, tp.topic, tp.partition)
+            stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " +
+              s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
+            deletedPartitions += tp
+          } else {
+            addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info)
+            stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " +
+              s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
+          }
         }
+        metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes)
       }
-      metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes)
       deletedPartitions
     }
   }
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index e2f50b9..c0eda3d 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -83,6 +83,59 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     waitUntilControllerEpoch(firstControllerEpoch + 1, "controller epoch was not incremented after controller move")
   }
 
+  // This test case is used to ensure that there will be no correctness issue after we avoid sending out full
+  // UpdateMetadataRequest to all brokers in the cluster
+  @Test
+  def testMetadataPropagationOnBrokerChange(): Unit = {
+    servers = makeServers(3)
+    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    // Need to make sure the broker we shutdown and startup are not the controller. Otherwise we will send out
+    // full UpdateMetadataReuqest to all brokers during controller failover.
+    val testBroker = servers.filter(e => e.config.brokerId != controllerId).head
+    val remainingBrokers = servers.filter(_.config.brokerId != testBroker.config.brokerId)
+    val topic = "topic1"
+    // Make sure shutdown the test broker will not require any leadership change to test avoid sending out full
+    // UpdateMetadataRequest on broker failure
+    val assignment = Map(
+      0 -> Seq(remainingBrokers(0).config.brokerId, testBroker.config.brokerId),
+      1 -> remainingBrokers.map(_.config.brokerId))
+
+    // Create topic
+    TestUtils.createTopic(zkClient, topic, assignment, servers)
+
+    // Shutdown the broker
+    testBroker.shutdown()
+    testBroker.awaitShutdown()
+    TestUtils.waitUntilBrokerMetadataIsPropagated(remainingBrokers)
+    remainingBrokers.foreach { server =>
+      val offlineReplicaPartitionInfo = server.metadataCache.getPartitionInfo(topic, 0).get
+      assertEquals(1, offlineReplicaPartitionInfo.offlineReplicas.size())
+      assertEquals(testBroker.config.brokerId, offlineReplicaPartitionInfo.offlineReplicas.get(0))
+      assertEquals(assignment(0).asJava, offlineReplicaPartitionInfo.basePartitionState.replicas)
+      assertEquals(Seq(remainingBrokers.head.config.brokerId).asJava, offlineReplicaPartitionInfo.basePartitionState.isr)
+      val onlinePartitionInfo = server.metadataCache.getPartitionInfo(topic, 1).get
+      assertEquals(assignment(1).asJava, onlinePartitionInfo.basePartitionState.replicas)
+      assertTrue(onlinePartitionInfo.offlineReplicas.isEmpty)
+    }
+
+    // Startup the broker
+    testBroker.startup()
+    TestUtils.waitUntilTrue( () => {
+      !servers.exists { server =>
+        assignment.exists { case (partitionId, replicas) =>
+          val partitionInfoOpt = server.metadataCache.getPartitionInfo(topic, partitionId)
+          if (partitionInfoOpt.isDefined) {
+            val partitionInfo = partitionInfoOpt.get
+            !partitionInfo.offlineReplicas.isEmpty || !partitionInfo.basePartitionState.replicas.asScala.equals(replicas)
+          } else {
+            true
+          }
+        }
+      }
+    }, "Inconsistent metadata after broker startup")
+  }
+
   @Test
   def testTopicCreation(): Unit = {
     servers = makeServers(1)