You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/05 07:49:15 UTC

[GitHub] [kafka] rite2nikhil opened a new pull request #8807: KAFKA-10106: log time taken to handle LeaderAndIsr request

rite2nikhil opened a new pull request #8807:
URL: https://github.com/apache/kafka/pull/8807


   ReplicaManager!becomeLeaderOrFollower handles the LeaderAndIsr request, StateChangeLogger logs when this request is handled, however it can be useful to log when this calls ends and record the time taken, can help operationally. 
   
   Proposal is to ReplicaManager!becomeLeaderOrFollower start measuring the time before the `replicaStateChangeLock` is acquired and log before the response is returned. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8807: KAFKA-10106: log time taken to handle LeaderAndIsr request

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8807:
URL: https://github.com/apache/kafka/pull/8807#issuecomment-640157773


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao merged pull request #8807: KAFKA-10106: log time taken to handle LeaderAndIsr request

Posted by GitBox <gi...@apache.org>.
junrao merged pull request #8807:
URL: https://github.com/apache/kafka/pull/8807


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #8807: KAFKA-10106: log time taken to handle LeaderAndIsr request

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #8807:
URL: https://github.com/apache/kafka/pull/8807#discussion_r436289337



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1242,131 +1243,138 @@ class ReplicaManager(val config: KafkaConfig,
             s"epoch ${leaderAndIsrRequest.controllerEpoch}")
         }
 
-      if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
-        stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-          s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
-          s"Latest known controller epoch is $controllerEpoch")
-        leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
-      } else {
-        val responseMap = new mutable.HashMap[TopicPartition, Errors]
-        controllerEpoch = leaderAndIsrRequest.controllerEpoch
-
-        val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
-
-        // First create the partition if it doesn't exist already
-        requestPartitionStates.foreach { partitionState =>
-          val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
-          val partitionOpt = getPartition(topicPartition) match {
-            case HostedPartition.Offline =>
-              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
-                s"controller $controllerId with correlation id $correlationId " +
-                s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
-                "partition is in an offline log directory")
-              responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
-              None
+      val response = {
+        if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
+          stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
+            s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
+            s"Latest known controller epoch is $controllerEpoch")
+          leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
+        } else {
+          val responseMap = new mutable.HashMap[TopicPartition, Errors]
+          controllerEpoch = leaderAndIsrRequest.controllerEpoch
 
-            case HostedPartition.Online(partition) =>
-              Some(partition)
+          val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
 
-            case HostedPartition.None =>
-              val partition = Partition(topicPartition, time, this)
-              allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
-              Some(partition)
-          }
+          // First create the partition if it doesn't exist already
+          requestPartitionStates.foreach { partitionState =>
+            val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
+            val partitionOpt = getPartition(topicPartition) match {
+              case HostedPartition.Offline =>
+                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
+                  s"controller $controllerId with correlation id $correlationId " +
+                  s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
+                  "partition is in an offline log directory")
+                responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
+                None
+
+              case HostedPartition.Online(partition) =>
+                Some(partition)
+
+              case HostedPartition.None =>
+                val partition = Partition(topicPartition, time, this)
+                allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
+                Some(partition)
+            }
 
-          // Next check partition's leader epoch
-          partitionOpt.foreach { partition =>
-            val currentLeaderEpoch = partition.getLeaderEpoch
-            val requestLeaderEpoch = partitionState.leaderEpoch
-            if (requestLeaderEpoch > currentLeaderEpoch) {
-              // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-              // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-              if (partitionState.replicas.contains(localBrokerId))
-                partitionStates.put(partition, partitionState)
-              else {
-                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-                  s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
-                  s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
-                responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+            // Next check partition's leader epoch
+            partitionOpt.foreach { partition =>
+              val currentLeaderEpoch = partition.getLeaderEpoch
+              val requestLeaderEpoch = partitionState.leaderEpoch
+              if (requestLeaderEpoch > currentLeaderEpoch) {
+                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
+                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
+                if (partitionState.replicas.contains(localBrokerId))
+                  partitionStates.put(partition, partitionState)
+                else {
+                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
+                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
+                    s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
+                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+                }
+              } else if (requestLeaderEpoch < currentLeaderEpoch) {
+                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
+                  s"controller $controllerId with correlation id $correlationId " +
+                  s"epoch $controllerEpoch for partition $topicPartition since its associated " +
+                  s"leader epoch $requestLeaderEpoch is smaller than the current " +
+                  s"leader epoch $currentLeaderEpoch")
+                responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+              } else {
+                stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
+                  s"controller $controllerId with correlation id $correlationId " +
+                  s"epoch $controllerEpoch for partition $topicPartition since its associated " +
+                  s"leader epoch $requestLeaderEpoch matches the current leader epoch")
+                responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
               }
-            } else if (requestLeaderEpoch < currentLeaderEpoch) {
-              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
-                s"controller $controllerId with correlation id $correlationId " +
-                s"epoch $controllerEpoch for partition $topicPartition since its associated " +
-                s"leader epoch $requestLeaderEpoch is smaller than the current " +
-                s"leader epoch $currentLeaderEpoch")
-              responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
-            } else {
-              stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
-                s"controller $controllerId with correlation id $correlationId " +
-                s"epoch $controllerEpoch for partition $topicPartition since its associated " +
-                s"leader epoch $requestLeaderEpoch matches the current leader epoch")
-              responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
             }
           }
-        }
 
-        val partitionsToBeLeader = partitionStates.filter { case (_, partitionState) =>
-          partitionState.leader == localBrokerId
-        }
-        val partitionsToBeFollower = partitionStates.filter { case (k, _) => !partitionsToBeLeader.contains(k) }
+          val partitionsToBeLeader = partitionStates.filter { case (_, partitionState) =>
+            partitionState.leader == localBrokerId
+          }
+          val partitionsToBeFollower = partitionStates.filter { case (k, _) => !partitionsToBeLeader.contains(k) }
 
-        val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
-        val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
-          makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
-            highWatermarkCheckpoints)
-        else
-          Set.empty[Partition]
-        val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
-          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
-            highWatermarkCheckpoints)
-        else
-          Set.empty[Partition]
+          val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+          val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
+            makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
+              highWatermarkCheckpoints)
+          else
+            Set.empty[Partition]
+          val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
+            makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
+              highWatermarkCheckpoints)
+          else
+            Set.empty[Partition]
 
-        /*
+          /*
          * KAFKA-8392
          * For topic partitions of which the broker is no longer a leader, delete metrics related to
          * those topics. Note that this means the broker stops being either a replica or a leader of
          * partitions of said topics
          */
-        val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
-        val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
-        followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
+          val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
+          val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
+          followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
 
-        // remove metrics for brokers which are not followers of a topic
-        leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
+          // remove metrics for brokers which are not followers of a topic
+          leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
 
-        leaderAndIsrRequest.partitionStates.forEach { partitionState =>
-          val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
-          /*
+          leaderAndIsrRequest.partitionStates.forEach { partitionState =>
+            val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
+            /*
            * If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
            * before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
            * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
            * we need to map this topic-partition to OfflinePartition instead.
            */
-          if (localLog(topicPartition).isEmpty)
-            markPartitionOffline(topicPartition)
-        }
-
-        // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
-        // have been completely populated before starting the checkpointing there by avoiding weird race conditions
-        startHighWatermarkCheckPointThread()
-
-        maybeAddLogDirFetchers(partitionStates.keySet, highWatermarkCheckpoints)
+            if (localLog(topicPartition).isEmpty)
+              markPartitionOffline(topicPartition)
+          }
 
-        replicaFetcherManager.shutdownIdleFetcherThreads()
-        replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
-        onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-        val responsePartitions = responseMap.iterator.map { case (tp, error) =>
-          new LeaderAndIsrPartitionError()
-            .setTopicName(tp.topic)
-            .setPartitionIndex(tp.partition)
-            .setErrorCode(error.code)
-        }.toBuffer
-        new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-          .setErrorCode(Errors.NONE.code)
-          .setPartitionErrors(responsePartitions.asJava))
+          // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
+          // have been completely populated before starting the checkpointing there by avoiding weird race conditions
+          startHighWatermarkCheckPointThread()
+
+          maybeAddLogDirFetchers(partitionStates.keySet, highWatermarkCheckpoints)
+
+          replicaFetcherManager.shutdownIdleFetcherThreads()
+          replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+          onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
+          val responsePartitions = responseMap.iterator.map { case (tp, error) =>
+            new LeaderAndIsrPartitionError()
+              .setTopicName(tp.topic)
+              .setPartitionIndex(tp.partition)
+              .setErrorCode(error.code)
+          }.toBuffer
+          new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+            .setErrorCode(Errors.NONE.code)
+            .setPartitionErrors(responsePartitions.asJava))
+        }
       }
+      val endMs = time.milliseconds()

Review comment:
       For ease of review, it's worth mentioning in the PR body that the only lines that changed here are 1373-1377 and 1233, with the response being captured in 1246.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org