You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/03/23 02:20:13 UTC
kafka git commit: KAFKA-3447;
partitionState in UpdateMetadataRequest not logged properly
state-change log
Repository: kafka
Updated Branches:
refs/heads/trunk 983e9486e -> adca4d7df
KAFKA-3447; partitionState in UpdateMetadataRequest not logged properly state-change log
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #1117 from ijuma/kafka-3447-metadata-cache-logging
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/adca4d7d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/adca4d7d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/adca4d7d
Branch: refs/heads/trunk
Commit: adca4d7df3155ac29ae510c237adf06c9521d221
Parents: 983e948
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Mar 22 18:20:07 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Mar 22 18:20:07 2016 -0700
----------------------------------------------------------------------
.../main/scala/kafka/server/MetadataCache.scala | 26 +++++++++-----------
1 file changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/adca4d7d/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 4b68f70..06fae42 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -44,7 +44,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
private val aliveNodes = mutable.Map[Int, collection.Map[SecurityProtocol, Node]]()
private val partitionMetadataLock = new ReentrantReadWriteLock()
- this.logIdent = "[Kafka Metadata Cache on broker %d] ".format(brokerId)
+ this.logIdent = s"[Kafka Metadata Cache on broker $brokerId] "
private def getAliveEndpoints(brokers: Iterable[Int], protocol: SecurityProtocol): Seq[Node] = {
val result = new mutable.ArrayBuffer[Node](math.min(aliveBrokers.size, brokers.size))
@@ -73,7 +73,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
maybeLeader match {
case None =>
- debug("Error while fetching metadata for %s: leader not available".format(topicPartition))
+ debug(s"Error while fetching metadata for $topicPartition: leader not available")
new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, Node.noNode(),
replicaInfo.asJava, java.util.Collections.emptyList())
@@ -82,14 +82,14 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
val isrInfo = getAliveEndpoints(isr, protocol)
if (replicaInfo.size < replicas.size) {
- debug("Error while fetching metadata for %s: replica information not available for following brokers %s"
- .format(topicPartition, replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")))
+ debug(s"Error while fetching metadata for $topicPartition: replica information not available for " +
+ s"following brokers ${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}")
new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader,
replicaInfo.asJava, isrInfo.asJava)
} else if (isrInfo.size < isr.size) {
- debug("Error while fetching metadata for %s: in sync replica information not available for following brokers %s"
- .format(topicPartition, isr.filterNot(isrInfo.map(_.id).contains).mkString(",")))
+ debug(s"Error while fetching metadata for $topicPartition: in sync replica information not available for " +
+ s"following brokers ${isr.filterNot(isrInfo.map(_.id).contains).mkString(",")}")
new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader,
replicaInfo.asJava, isrInfo.asJava)
} else {
@@ -167,19 +167,17 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
}
updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
+ val controllerId = updateMetadataRequest.controllerId
+ val controllerEpoch = updateMetadataRequest.controllerEpoch
if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
removePartitionInfo(tp.topic, tp.partition)
- stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
- "sent by controller %d epoch %d with correlation id %d")
- .format(brokerId, tp, updateMetadataRequest.controllerId,
- updateMetadataRequest.controllerEpoch, correlationId))
+ stateChangeLogger.trace(s"Broker $brokerId deleted partition $tp from metadata cache in response to UpdateMetadata " +
+ s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
} else {
val partitionInfo = partitionStateToPartitionStateInfo(info)
addOrUpdatePartitionInfo(tp.topic, tp.partition, partitionInfo)
- stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
- "sent by controller %d epoch %d with correlation id %d")
- .format(brokerId, info, tp, updateMetadataRequest.controllerId,
- updateMetadataRequest.controllerEpoch, correlationId))
+ stateChangeLogger.trace(s"Broker $brokerId cached leader info $partitionInfo for partition $tp in response to " +
+ s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
}
}
}