You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2020/04/17 21:22:58 UTC

[kafka] branch trunk updated: MINOR: reduce impact of trace logging in replica hot path (#8468)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 851b45c  MINOR: reduce impact of trace logging in replica hot path (#8468)
851b45c is described below

commit 851b45c842a9d35c8299f87725b52b1b6523271e
Author: Lucas Bradstreet <lu...@gmail.com>
AuthorDate: Fri Apr 17 14:22:30 2020 -0700

    MINOR: reduce impact of trace logging in replica hot path (#8468)
    
    The impact of trace logging is normally small, on the order of 40ns per getEffectiveLevel check, however this adds up with trace is called multiple times per partition in the replica fetch hot path.
    
    This PR removes some trace logs that are not very useful and reduces cases where the level is checked over and over for one fetch request.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/cluster/Replica.scala      |  2 --
 .../scala/kafka/server/ReplicaFetcherThread.scala    |  7 ++++---
 .../src/main/scala/kafka/server/ReplicaManager.scala | 20 +++++++++++++-------
 3 files changed, 17 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index f9de7ba..d1fc345 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -83,7 +83,6 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
     lastFetchLeaderLogEndOffset = leaderEndOffset
     lastFetchTimeMs = followerFetchTimeMs
     updateLastSentHighWatermark(lastSentHighwatermark)
-    trace(s"Updated state of replica to $this")
   }
 
   /**
@@ -96,7 +95,6 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
     */
   private def updateLastSentHighWatermark(highWatermark: Long): Unit = {
     _lastSentHighWatermark = highWatermark
-    trace(s"Updated HW of replica to $highWatermark")
   }
 
   def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long): Unit = {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index a42f689..ec6d75c 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -149,6 +149,7 @@ class ReplicaFetcherThread(name: String,
   override def processPartitionData(topicPartition: TopicPartition,
                                     fetchOffset: Long,
                                     partitionData: FetchData): Option[LogAppendInfo] = {
+    val logTrace = isTraceEnabled
     val partition = replicaMgr.nonOfflinePartition(topicPartition).get
     val log = partition.localLogOrException
     val records = toMemoryRecords(partitionData.records)
@@ -159,14 +160,14 @@ class ReplicaFetcherThread(name: String,
       throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
         topicPartition, fetchOffset, log.logEndOffset))
 
-    if (isTraceEnabled)
+    if (logTrace)
       trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
         .format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
 
     // Append the leader's messages to the log
     val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
 
-    if (isTraceEnabled)
+    if (logTrace)
       trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
         .format(log.logEndOffset, records.sizeInBytes, topicPartition))
     val leaderLogStartOffset = partitionData.logStartOffset
@@ -175,7 +176,7 @@ class ReplicaFetcherThread(name: String,
     // These values will be computed upon becoming leader or handling a preferred read replica fetch.
     val followerHighWatermark = log.updateHighWatermark(partitionData.highWatermark)
     log.maybeIncrementLogStartOffset(leaderLogStartOffset)
-    if (isTraceEnabled)
+    if (logTrace)
       trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")
 
     // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9659172..4d917d5 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -842,7 +842,7 @@ class ReplicaManager(val config: KafkaConfig,
                                origin: AppendOrigin,
                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
-
+    val traceEnabled = isTraceEnabled
     def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
       val logStartOffset = getPartition(topicPartition) match {
         case HostedPartition.Online(partition) => partition.logStartOffset
@@ -855,7 +855,9 @@ class ReplicaManager(val config: KafkaConfig,
       logStartOffset
     }
 
-    trace(s"Append [$entriesPerPartition] to local log")
+    if (traceEnabled)
+      trace(s"Append [$entriesPerPartition] to local log")
+
     entriesPerPartition.map { case (topicPartition, records) =>
       brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
       brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
@@ -877,8 +879,10 @@ class ReplicaManager(val config: KafkaConfig,
           brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
           brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)
 
-          trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " +
-            s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}")
+          if (traceEnabled)
+            trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " +
+              s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}")
+
           (topicPartition, LogAppendResult(info))
         } catch {
           // NOTE: Failed produce requests metric is not incremented for known exceptions
@@ -1024,6 +1028,7 @@ class ReplicaManager(val config: KafkaConfig,
                        readPartitionInfo: Seq[(TopicPartition, PartitionData)],
                        quota: ReplicaQuota,
                        clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)] = {
+    val traceEnabled = isTraceEnabled
 
     def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
       val offset = fetchInfo.fetchOffset
@@ -1035,9 +1040,10 @@ class ReplicaManager(val config: KafkaConfig,
 
       val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
       try {
-        trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
-          s"remaining response limit $limitBytes" +
-          (if (minOneMessage) s", ignoring response/partition size limits" else ""))
+        if (traceEnabled)
+          trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
+            s"remaining response limit $limitBytes" +
+            (if (minOneMessage) s", ignoring response/partition size limits" else ""))
 
         val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)
         val fetchTimeMs = time.milliseconds