You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/06 18:18:38 UTC

[GitHub] [kafka] lbradstreet commented on a change in pull request #8807: KAFKA-10106: log time taken to handle LeaderAndIsr request

lbradstreet commented on a change in pull request #8807:
URL: https://github.com/apache/kafka/pull/8807#discussion_r436289337



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1242,131 +1243,138 @@ class ReplicaManager(val config: KafkaConfig,
             s"epoch ${leaderAndIsrRequest.controllerEpoch}")
         }
 
-      if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
-        stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-          s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
-          s"Latest known controller epoch is $controllerEpoch")
-        leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
-      } else {
-        val responseMap = new mutable.HashMap[TopicPartition, Errors]
-        controllerEpoch = leaderAndIsrRequest.controllerEpoch
-
-        val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
-
-        // First create the partition if it doesn't exist already
-        requestPartitionStates.foreach { partitionState =>
-          val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
-          val partitionOpt = getPartition(topicPartition) match {
-            case HostedPartition.Offline =>
-              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
-                s"controller $controllerId with correlation id $correlationId " +
-                s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
-                "partition is in an offline log directory")
-              responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
-              None
+      val response = {
+        if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
+          stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
+            s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
+            s"Latest known controller epoch is $controllerEpoch")
+          leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
+        } else {
+          val responseMap = new mutable.HashMap[TopicPartition, Errors]
+          controllerEpoch = leaderAndIsrRequest.controllerEpoch
 
-            case HostedPartition.Online(partition) =>
-              Some(partition)
+          val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
 
-            case HostedPartition.None =>
-              val partition = Partition(topicPartition, time, this)
-              allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
-              Some(partition)
-          }
+          // First create the partition if it doesn't exist already
+          requestPartitionStates.foreach { partitionState =>
+            val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
+            val partitionOpt = getPartition(topicPartition) match {
+              case HostedPartition.Offline =>
+                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
+                  s"controller $controllerId with correlation id $correlationId " +
+                  s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
+                  "partition is in an offline log directory")
+                responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
+                None
+
+              case HostedPartition.Online(partition) =>
+                Some(partition)
+
+              case HostedPartition.None =>
+                val partition = Partition(topicPartition, time, this)
+                allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
+                Some(partition)
+            }
 
-          // Next check partition's leader epoch
-          partitionOpt.foreach { partition =>
-            val currentLeaderEpoch = partition.getLeaderEpoch
-            val requestLeaderEpoch = partitionState.leaderEpoch
-            if (requestLeaderEpoch > currentLeaderEpoch) {
-              // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-              // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-              if (partitionState.replicas.contains(localBrokerId))
-                partitionStates.put(partition, partitionState)
-              else {
-                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-                  s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
-                  s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
-                responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+            // Next check partition's leader epoch
+            partitionOpt.foreach { partition =>
+              val currentLeaderEpoch = partition.getLeaderEpoch
+              val requestLeaderEpoch = partitionState.leaderEpoch
+              if (requestLeaderEpoch > currentLeaderEpoch) {
+                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
+                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
+                if (partitionState.replicas.contains(localBrokerId))
+                  partitionStates.put(partition, partitionState)
+                else {
+                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
+                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
+                    s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
+                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+                }
+              } else if (requestLeaderEpoch < currentLeaderEpoch) {
+                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
+                  s"controller $controllerId with correlation id $correlationId " +
+                  s"epoch $controllerEpoch for partition $topicPartition since its associated " +
+                  s"leader epoch $requestLeaderEpoch is smaller than the current " +
+                  s"leader epoch $currentLeaderEpoch")
+                responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+              } else {
+                stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
+                  s"controller $controllerId with correlation id $correlationId " +
+                  s"epoch $controllerEpoch for partition $topicPartition since its associated " +
+                  s"leader epoch $requestLeaderEpoch matches the current leader epoch")
+                responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
               }
-            } else if (requestLeaderEpoch < currentLeaderEpoch) {
-              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
-                s"controller $controllerId with correlation id $correlationId " +
-                s"epoch $controllerEpoch for partition $topicPartition since its associated " +
-                s"leader epoch $requestLeaderEpoch is smaller than the current " +
-                s"leader epoch $currentLeaderEpoch")
-              responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
-            } else {
-              stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
-                s"controller $controllerId with correlation id $correlationId " +
-                s"epoch $controllerEpoch for partition $topicPartition since its associated " +
-                s"leader epoch $requestLeaderEpoch matches the current leader epoch")
-              responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
             }
           }
-        }
 
-        val partitionsToBeLeader = partitionStates.filter { case (_, partitionState) =>
-          partitionState.leader == localBrokerId
-        }
-        val partitionsToBeFollower = partitionStates.filter { case (k, _) => !partitionsToBeLeader.contains(k) }
+          val partitionsToBeLeader = partitionStates.filter { case (_, partitionState) =>
+            partitionState.leader == localBrokerId
+          }
+          val partitionsToBeFollower = partitionStates.filter { case (k, _) => !partitionsToBeLeader.contains(k) }
 
-        val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
-        val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
-          makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
-            highWatermarkCheckpoints)
-        else
-          Set.empty[Partition]
-        val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
-          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
-            highWatermarkCheckpoints)
-        else
-          Set.empty[Partition]
+          val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+          val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
+            makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
+              highWatermarkCheckpoints)
+          else
+            Set.empty[Partition]
+          val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
+            makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
+              highWatermarkCheckpoints)
+          else
+            Set.empty[Partition]
 
-        /*
+          /*
          * KAFKA-8392
          * For topic partitions of which the broker is no longer a leader, delete metrics related to
          * those topics. Note that this means the broker stops being either a replica or a leader of
          * partitions of said topics
          */
-        val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
-        val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
-        followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
+          val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
+          val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
+          followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
 
-        // remove metrics for brokers which are not followers of a topic
-        leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
+          // remove metrics for brokers which are not followers of a topic
+          leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
 
-        leaderAndIsrRequest.partitionStates.forEach { partitionState =>
-          val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
-          /*
+          leaderAndIsrRequest.partitionStates.forEach { partitionState =>
+            val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
+            /*
            * If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
            * before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
            * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
            * we need to map this topic-partition to OfflinePartition instead.
            */
-          if (localLog(topicPartition).isEmpty)
-            markPartitionOffline(topicPartition)
-        }
-
-        // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
-        // have been completely populated before starting the checkpointing there by avoiding weird race conditions
-        startHighWatermarkCheckPointThread()
-
-        maybeAddLogDirFetchers(partitionStates.keySet, highWatermarkCheckpoints)
+            if (localLog(topicPartition).isEmpty)
+              markPartitionOffline(topicPartition)
+          }
 
-        replicaFetcherManager.shutdownIdleFetcherThreads()
-        replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
-        onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-        val responsePartitions = responseMap.iterator.map { case (tp, error) =>
-          new LeaderAndIsrPartitionError()
-            .setTopicName(tp.topic)
-            .setPartitionIndex(tp.partition)
-            .setErrorCode(error.code)
-        }.toBuffer
-        new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-          .setErrorCode(Errors.NONE.code)
-          .setPartitionErrors(responsePartitions.asJava))
+          // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
+          // have been completely populated before starting the checkpointing there by avoiding weird race conditions
+          startHighWatermarkCheckPointThread()
+
+          maybeAddLogDirFetchers(partitionStates.keySet, highWatermarkCheckpoints)
+
+          replicaFetcherManager.shutdownIdleFetcherThreads()
+          replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+          onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
+          val responsePartitions = responseMap.iterator.map { case (tp, error) =>
+            new LeaderAndIsrPartitionError()
+              .setTopicName(tp.topic)
+              .setPartitionIndex(tp.partition)
+              .setErrorCode(error.code)
+          }.toBuffer
+          new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+            .setErrorCode(Errors.NONE.code)
+            .setPartitionErrors(responsePartitions.asJava))
+        }
       }
+      val endMs = time.milliseconds()

Review comment:
       For ease of review, it's worth mentioning in the PR body that the only lines that changed here are 1373-1377 and 1233, with the response being captured in 1246.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org