You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/09/02 20:46:18 UTC
[kafka] branch trunk updated: KAFKA-13225: Controller skips sending
UpdateMetadataRequest when no change in partition state. (#11255)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3c0b89d KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state. (#11255)
3c0b89d is described below
commit 3c0b89d9df7a46cceeaa08a6efd25bd2e49dcaaf
Author: David Mao <47...@users.noreply.github.com>
AuthorDate: Thu Sep 2 15:44:12 2021 -0500
KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state. (#11255)
The controller can skip sending updateMetadataRequest during the broker failure callback if there are offline partitions and the deleted brokers don't host any partitions.
Reviewers: Jun Rao <ju...@gmail.com>
---
.../scala/kafka/controller/KafkaController.scala | 9 ++++---
.../kafka/controller/PartitionStateMachine.scala | 4 +--
.../controller/ControllerIntegrationTest.scala | 29 ++++++++++++++++++++++
3 files changed, 36 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 1d36f52..e99bf75 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -616,7 +616,7 @@ class KafkaController(val config: KafkaConfig,
// trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas
partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, OfflinePartition)
// trigger OnlinePartition state changes for offline or new partitions
- partitionStateMachine.triggerOnlinePartitionStateChange()
+ val onlineStateChangeResults = partitionStateMachine.triggerOnlinePartitionStateChange()
// trigger OfflineReplica state change for those newly offline replicas
replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)
@@ -628,9 +628,10 @@ class KafkaController(val config: KafkaConfig,
topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
}
- // If replica failure did not require leader re-election, inform brokers of the offline brokers
- // Note that during leader re-election, brokers update their metadata
- if (partitionsWithOfflineLeader.isEmpty) {
+ // If no partition has changed leader or ISR, no UpdateMetadataRequest is sent through PartitionStateMachine
+ // and ReplicaStateMachine. In that case, we want to send an UpdateMetadataRequest explicitly to
+ // propagate the information about the new offline brokers.
+ if (newOfflineReplicasNotForDeletion.isEmpty && onlineStateChangeResults.values.forall(_.isLeft)) {
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
}
}
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index e812a14..894d0e4 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -54,7 +54,7 @@ abstract class PartitionStateMachine(controllerContext: ControllerContext) exten
* This API invokes the OnlinePartition state change on all partitions in either the NewPartition or OfflinePartition
* state. This is called on a successful controller election and on broker changes
*/
- def triggerOnlinePartitionStateChange(): Unit = {
+ def triggerOnlinePartitionStateChange(): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
val partitions = controllerContext.partitionsInStates(Set(OfflinePartition, NewPartition))
triggerOnlineStateChangeForPartitions(partitions)
}
@@ -64,7 +64,7 @@ abstract class PartitionStateMachine(controllerContext: ControllerContext) exten
triggerOnlineStateChangeForPartitions(partitions)
}
- private def triggerOnlineStateChangeForPartitions(partitions: collection.Set[TopicPartition]): Unit = {
+ private def triggerOnlineStateChangeForPartitions(partitions: collection.Set[TopicPartition]): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
// try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
// that belong to topics to be deleted
val partitionsToTrigger = partitions.filter { partition =>
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 815d0c6..bdc06a7 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -221,6 +221,35 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
}
@Test
+ def testMetadataPropagationOnBrokerShutdownWithNoReplicas(): Unit = {
+ servers = makeServers(3)
+ TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+ val replicaBroker = servers.filter(e => e.config.brokerId != controllerId).head
+
+ val controllerBroker = servers.filter(e => e.config.brokerId == controllerId).head
+ val otherBroker = servers.filter(e => e.config.brokerId != controllerId &&
+ e.config.brokerId != replicaBroker.config.brokerId).head
+
+ val topic = "topic1"
+ val assignment = Map(0 -> Seq(replicaBroker.config.brokerId))
+
+ // Create topic
+ TestUtils.createTopic(zkClient, topic, assignment, servers)
+
+ // Shutdown the broker with replica
+ replicaBroker.shutdown()
+ replicaBroker.awaitShutdown()
+
+ // Shutdown the other broker
+ otherBroker.shutdown()
+ otherBroker.awaitShutdown()
+
+ // The controller should be the only alive broker
+ TestUtils.waitUntilBrokerMetadataIsPropagated(Seq(controllerBroker))
+ }
+
+ @Test
def testTopicCreation(): Unit = {
servers = makeServers(1)
val tp = new TopicPartition("t", 0)