You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/15 12:32:37 UTC

[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r709122840



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -491,6 +495,19 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
+  def addTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = {

Review comment:
       nit: Should we prefix this method with `maybe` to indicate that it would set the topic id only if there is a state for the topic partition?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -491,6 +495,19 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
+  def addTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = {
+    partitionMapLock.lockInterruptibly()
+    try {
+      partitions.foreach { tp =>
+        val currentState = partitionStates.stateValue(tp)

Review comment:
       Should we ensure that there is actually a state? It must be there but it might be better to be safe.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -459,18 +459,22 @@ abstract class AbstractFetcherThread(name: String,
    */
   private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
     if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.currentLeaderEpoch) {
-      currentState
+      if (currentState.topicId.isEmpty && initialFetchState.topicId.isDefined) {
+        currentState.updateTopicId(initialFetchState.topicId)
+      } else {
+        currentState
+      }

Review comment:
       Is this change still necessary? 

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig,
     partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+                                        controllerEpoch: Int,
+                                        partitionStates: Map[Partition, LeaderAndIsrPartitionState],
+                                        correlationId: Int,
+                                        topicIds: String => Option[Uuid]) : Set[Partition] = {
+    val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+
+    val partitionsToUpdateFollower = mutable.Set.empty[Partition]
+    try {
+      partitionStates.forKeyValue { (partition, partitionState) =>
+        val newLeaderBrokerId = partitionState.leader
+          if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
+            // Only change partition state when the leader is available
+            partitionsToUpdateFollower += partition
+          } else {
+            // The leader broker should always be present in the metadata cache.
+            // If not, we should record the error message and abort the transition process for this partition
+            stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " +
+              s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
+              s"(last update controller epoch ${partitionState.controllerEpoch}) " +
+              s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
+          }
+      }
+
+      if (isShuttingDown.get()) {
+        if (traceLoggingEnabled) {
+          partitionsToUpdateFollower.foreach { partition =>
+            stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " +
+              s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
+              s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " +
+              "since it is shutting down")
+          }
+        }
+      } else {
+        val partitionsToUpdateFollowerWithLeader = partitionsToUpdateFollower.map { partition =>
+          val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache.
+            getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode())
+          val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port())
+          (partition.topicPartition, BrokerAndFetcherId(leader, replicaFetcherManager.getFetcherId(partition.topicPartition)))

Review comment:
       It looks like that `addTopicIdsToFetcherThread` only needs the `leader`, the `topic-partition` (to compute the fetcher id, and the `topic id`. How about passing just those? I would also let the fetcher manager compute the fetcher id.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig,
     partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+                                        controllerEpoch: Int,
+                                        partitionStates: Map[Partition, LeaderAndIsrPartitionState],
+                                        correlationId: Int,
+                                        topicIds: String => Option[Uuid]) : Set[Partition] = {
+    val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+
+    val partitionsToUpdateFollower = mutable.Set.empty[Partition]
+    try {
+      partitionStates.forKeyValue { (partition, partitionState) =>
+        val newLeaderBrokerId = partitionState.leader
+          if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
+            // Only change partition state when the leader is available
+            partitionsToUpdateFollower += partition
+          } else {
+            // The leader broker should always be present in the metadata cache.
+            // If not, we should record the error message and abort the transition process for this partition
+            stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " +
+              s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
+              s"(last update controller epoch ${partitionState.controllerEpoch}) " +
+              s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
+          }
+      }
+
+      if (isShuttingDown.get()) {
+        if (traceLoggingEnabled) {
+          partitionsToUpdateFollower.foreach { partition =>
+            stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " +
+              s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
+              s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " +
+              "since it is shutting down")
+          }
+        }
+      } else {
+        val partitionsToUpdateFollowerWithLeader = partitionsToUpdateFollower.map { partition =>
+          val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache.
+            getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode())
+          val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port())
+          (partition.topicPartition, BrokerAndFetcherId(leader, replicaFetcherManager.getFetcherId(partition.topicPartition)))
+        }
+        replicaFetcherManager.addTopicIdsToFetcherThread(partitionsToUpdateFollowerWithLeader, topicIds)

Review comment:
       I have the impression that we could almost remove everything in this method but this block of code. All test rest seems redundant to me and/or misleading. We could still keep the `isShuttingDown.get()` check though.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1402,6 +1404,8 @@ class ReplicaManager(val config: KafkaConfig,
                     stateChangeLogger.info(s"Updating log for $topicPartition to assign topic ID " +
                       s"$topicId from LeaderAndIsr request from controller $controllerId with correlation " +
                       s"id $correlationId epoch $controllerEpoch")
+                    if (partitionState.leader != localBrokerId)
+                      topicIdUpdatePartitions.put(partition, partitionState)

Review comment:
       nit: Should we rename `topicIdUpdatePartitions` to indicate that it is only about the followers?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -163,6 +163,17 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
     info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for partitions $initialOffsetAndEpochs")
   }
 
+  def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, BrokerAndFetcherId)], topicIds: String => Option[Uuid]): Unit = {
+    lock synchronized {
+      val partitionsPerFetcher = partitionsToUpdate.groupMap(_._2)(_._1)
+
+      for ((brokerAndFetcherId, partitions) <- partitionsPerFetcher) {
+        val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)

Review comment:
       It is a bit weird that you have to recreate `BrokerIdAndFetcherId` from `BrokerIdAndFetcherId` here.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -741,7 +741,8 @@ class ReplicaManager(val config: KafkaConfig,
 
           // throw NotLeaderOrFollowerException if replica does not exist for the given partition
           val partition = getPartitionOrException(topicPartition)
-          partition.localLogOrException

Review comment:
       This is weird... Do you know why it was here?

##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -76,7 +76,8 @@ class ReplicaAlterLogDirsThread(name: String,
     var partitionData: Seq[(TopicPartition, FetchData)] = null
     val request = fetchRequest.build()
 
-    val (topicIds, topicNames) = replicaMgr.metadataCache.topicIdInfo()
+    val topicIds = request.data().topics().asScala.map { topic => (topic.topic(), topic.topicId()) }.toMap

Review comment:
       nit: We could remove some parenthesis here and we usually use `map` with parenthesis instead of the curly braces when inline: `map { topic => ... }` -> `map(topic=> ....)`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org