You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/03/23 02:20:13 UTC

kafka git commit: KAFKA-3447; partitionState in UpdateMetadataRequest not logged properly state-change log

Repository: kafka
Updated Branches:
  refs/heads/trunk 983e9486e -> adca4d7df


KAFKA-3447; partitionState in UpdateMetadataRequest not logged properly state-change log

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #1117 from ijuma/kafka-3447-metadata-cache-logging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/adca4d7d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/adca4d7d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/adca4d7d

Branch: refs/heads/trunk
Commit: adca4d7df3155ac29ae510c237adf06c9521d221
Parents: 983e948
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Mar 22 18:20:07 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Mar 22 18:20:07 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/server/MetadataCache.scala | 26 +++++++++-----------
 1 file changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/adca4d7d/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 4b68f70..06fae42 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -44,7 +44,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
   private val aliveNodes = mutable.Map[Int, collection.Map[SecurityProtocol, Node]]()
   private val partitionMetadataLock = new ReentrantReadWriteLock()
 
-  this.logIdent = "[Kafka Metadata Cache on broker %d] ".format(brokerId)
+  this.logIdent = s"[Kafka Metadata Cache on broker $brokerId] "
 
   private def getAliveEndpoints(brokers: Iterable[Int], protocol: SecurityProtocol): Seq[Node] = {
     val result = new mutable.ArrayBuffer[Node](math.min(aliveBrokers.size, brokers.size))
@@ -73,7 +73,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
 
         maybeLeader match {
           case None =>
-            debug("Error while fetching metadata for %s: leader not available".format(topicPartition))
+            debug(s"Error while fetching metadata for $topicPartition: leader not available")
             new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, Node.noNode(),
               replicaInfo.asJava, java.util.Collections.emptyList())
 
@@ -82,14 +82,14 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
             val isrInfo = getAliveEndpoints(isr, protocol)
 
             if (replicaInfo.size < replicas.size) {
-              debug("Error while fetching metadata for %s: replica information not available for following brokers %s"
-                .format(topicPartition, replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")))
+              debug(s"Error while fetching metadata for $topicPartition: replica information not available for " +
+                s"following brokers ${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}")
 
               new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader,
                 replicaInfo.asJava, isrInfo.asJava)
             } else if (isrInfo.size < isr.size) {
-              debug("Error while fetching metadata for %s: in sync replica information not available for following brokers %s"
-                .format(topicPartition, isr.filterNot(isrInfo.map(_.id).contains).mkString(",")))
+              debug(s"Error while fetching metadata for $topicPartition: in sync replica information not available for " +
+                s"following brokers ${isr.filterNot(isrInfo.map(_.id).contains).mkString(",")}")
               new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader,
                 replicaInfo.asJava, isrInfo.asJava)
             } else {
@@ -167,19 +167,17 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
       }
 
       updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
+        val controllerId = updateMetadataRequest.controllerId
+        val controllerEpoch = updateMetadataRequest.controllerEpoch
         if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
           removePartitionInfo(tp.topic, tp.partition)
-          stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
-            "sent by controller %d epoch %d with correlation id %d")
-            .format(brokerId, tp, updateMetadataRequest.controllerId,
-              updateMetadataRequest.controllerEpoch, correlationId))
+          stateChangeLogger.trace(s"Broker $brokerId deleted partition $tp from metadata cache in response to UpdateMetadata " +
+            s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
         } else {
           val partitionInfo = partitionStateToPartitionStateInfo(info)
           addOrUpdatePartitionInfo(tp.topic, tp.partition, partitionInfo)
-          stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
-            "sent by controller %d epoch %d with correlation id %d")
-            .format(brokerId, info, tp, updateMetadataRequest.controllerId,
-              updateMetadataRequest.controllerEpoch, correlationId))
+          stateChangeLogger.trace(s"Broker $brokerId cached leader info $partitionInfo for partition $tp in response to " +
+            s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
         }
       }
     }