You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/15 00:51:53 UTC

kafka git commit: KAFKA-5822; Consistent log formatting of topic partitions

Repository: kafka
Updated Branches:
  refs/heads/trunk 5d2422258 -> f6f56a645


KAFKA-5822; Consistent log formatting of topic partitions

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3778 from hachikuji/KAFKA-5822


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f6f56a64
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f6f56a64
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f6f56a64

Branch: refs/heads/trunk
Commit: f6f56a645bb1c5ec6810c024ba517e43bf77056c
Parents: 5d24222
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Sep 15 01:50:20 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Sep 15 01:51:35 2017 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |   6 +-
 .../scala/kafka/common/TopicAndPartition.scala  |   2 +-
 .../controller/ControllerChannelManager.scala   |   6 +-
 .../kafka/controller/KafkaController.scala      |  34 ++---
 .../controller/PartitionStateMachine.scala      |   7 +-
 .../kafka/controller/ReplicaStateMachine.scala  |  15 +-
 .../kafka/server/AbstractFetcherThread.scala    |  30 ++--
 .../main/scala/kafka/server/KafkaServer.scala   |   2 +-
 .../main/scala/kafka/server/MetadataCache.scala |   4 +-
 .../scala/kafka/server/ReplicaManager.scala     | 149 ++++++++++---------
 .../tools/ReplicaVerificationToolTest.scala     |   2 +-
 11 files changed, 129 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 9a55200..3b5fee0 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -69,7 +69,7 @@ class Partition(val topic: String,
    * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
    * each partition. */
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
-  this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
+  this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
 
   private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == localBrokerId
   val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
@@ -145,7 +145,7 @@ class Partition(val topic: String,
                                          AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
         val log = logManager.getOrCreateLog(topicPartition, config, isNew)
         val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
-        val offsetMap = checkpoint.read
+        val offsetMap = checkpoint.read()
         if (!offsetMap.contains(topicPartition))
           info(s"No checkpointed highwatermark is found for partition $topicPartition")
         val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset)
@@ -589,7 +589,7 @@ class Partition(val topic: String,
   override def hashCode: Int =
     31 + topic.hashCode + 17 * partitionId + (if (isOffline) 1 else 0)
 
-  override def toString: String = {
+  override def toString(): String = {
     val partitionString = new StringBuilder
     partitionString.append("Topic: " + topic)
     partitionString.append("; Partition: " + partitionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 07a2292..4a8e65d 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -37,5 +37,5 @@ case class TopicAndPartition(topic: String, partition: Int) {
 
   def asTopicPartition = new TopicPartition(topic, partition)
 
-  override def toString = "[%s,%d]".format(topic, partition)
+  override def toString: String = s"$topic-$partition"
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b34284f..693f297 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -405,10 +405,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
           val typeOfRequest =
             if (broker == state.basePartitionState.leader) "become-leader"
             else "become-follower"
-          stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
-                                   "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
-                                                                   state, broker,
-                                                                   topicPartition.topic, topicPartition.partition))
+          stateChangeLogger.trace(s"Controller $controllerId epoch $controllerEpoch sending " +
+            s"$typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition")
         }
         val leaderIds = leaderAndIsrPartitionStates.map(_._2.basePartitionState.leader).toSet
         val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4b07751..5ff47bf 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -22,7 +22,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.admin.{AdminUtils, PreferredReplicaLeaderElectionCommand}
 import kafka.api._
 import kafka.cluster.Broker
-import kafka.common.{TopicAndPartition, _}
+import kafka.common._
 import kafka.log.LogConfig
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.server._
@@ -88,7 +88,7 @@ class ControllerContext(val zkUtils: ZkUtils) {
     brokerIds.flatMap { brokerId =>
       partitionReplicaAssignment.collect {
         case (topicAndPartition, replicas) if replicas.contains(brokerId) =>
-          new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId)
+          PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId)
       }
     }.toSet
   }
@@ -98,7 +98,7 @@ class ControllerContext(val zkUtils: ZkUtils) {
       .filter { case (topicAndPartition, _) => topicAndPartition.topic == topic }
       .flatMap { case (topicAndPartition, replicas) =>
         replicas.map { r =>
-          new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
+          PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
         }
       }.toSet
   }
@@ -115,7 +115,7 @@ class ControllerContext(val zkUtils: ZkUtils) {
   def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = {
     partitions.flatMap { p =>
       val replicas = partitionReplicaAssignment(p)
-      replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))
+      replicas.map(r => PartitionAndReplica(p.topic, p.partition, r))
     }
   }
 
@@ -129,7 +129,7 @@ class ControllerContext(val zkUtils: ZkUtils) {
 
 
 object KafkaController extends Logging {
-  val stateChangeLogger = new StateChangeLogger("state.change.logger")
+  val stateChangeLogger = StateChangeLogger("state.change.logger")
   val InitialControllerEpoch = 1
   val InitialControllerEpochZkVersion = 1
 
@@ -561,7 +561,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
       val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
       //5. replicas in RAR -> OnlineReplica
       reassignedReplicas.foreach { replica =>
-        replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
+        replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
           replica)), OnlineReplica)
       }
       //6. Set AR to RAR in memory.
@@ -768,9 +768,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     partitionsToReassign ++= partitionsBeingReassigned
     partitionsToReassign --= reassignedPartitions
     controllerContext.partitionsBeingReassigned ++= partitionsToReassign
-    info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString()))
-    info("Partitions already reassigned: %s".format(reassignedPartitions.toString()))
-    info("Resuming reassignment of partitions: %s".format(partitionsToReassign.toString()))
+    info(s"Partitions being reassigned: $partitionsBeingReassigned")
+    info(s"Partitions already reassigned: $reassignedPartitions")
+    info(s"Resuming reassignment of partitions: $partitionsToReassign")
   }
 
   private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
@@ -803,9 +803,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   }
 
   private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
-    zkUtils.getLeaderAndIsrForPartition(topic, partition).map { leaderAndIsr =>
+    zkUtils.getLeaderAndIsrForPartition(topic, partition).exists { leaderAndIsr =>
       replicas.forall(leaderAndIsr.isr.contains)
-    }.getOrElse(false)
+    }
   }
 
   private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
@@ -1168,7 +1168,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     preferredReplicasForTopicsByBrokers.foreach { case (leaderBroker, topicAndPartitionsForBroker) =>
       val topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) =>
         val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
-        leadershipInfo.map(_.leaderAndIsr.leader != leaderBroker).getOrElse(false)
+        leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker)
       }
       debug(s"Topics not in preferred replica $topicsNotInPreferredReplica")
 
@@ -1263,7 +1263,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
           .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
       else {
         if (partitionsToBeAdded.nonEmpty) {
-          info("New partitions to be added %s".format(partitionsToBeAdded))
+          info(s"New partitions to be added $partitionsToBeAdded")
           controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
           onNewPartitionCreation(partitionsToBeAdded.keySet)
         }
@@ -1277,16 +1277,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
 
     override def process(): Unit = {
       if (!isActive) return
-      debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
+      debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
       val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
       if (nonExistentTopics.nonEmpty) {
-        warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
+        warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
         nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
       }
       topicsToBeDeleted --= nonExistentTopics
       if (config.deleteTopicEnable) {
         if (topicsToBeDeleted.nonEmpty) {
-          info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
+          info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
           // mark topic ineligible for deletion if other state changes are in progress
           topicsToBeDeleted.foreach { topic =>
             val partitionReassignmentInProgress =
@@ -1300,7 +1300,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
       } else {
         // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
         for (topic <- topicsToBeDeleted) {
-          info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled")
+          info(s"Removing ${getDeleteTopicPath(topic)} since delete topic is disabled")
           zkUtils.deletePath(getDeleteTopicPath(topic))
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 5024c02..7257501 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -298,10 +298,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
         val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
         if (controllerEpoch > controller.epoch) {
-          val failMsg = ("aborted leader election for partition [%s,%d] since the LeaderAndIsr path was " +
-                         "already written by another controller. This probably means that the current controller %d went through " +
-                         "a soft failure and another controller was elected with epoch %d.")
-                           .format(topic, partition, controllerId, controllerEpoch)
+          val failMsg = s"aborted leader election for partition $topicAndPartition since the LeaderAndIsr path was " +
+            s"already written by another controller. This probably means that the current controller $controllerId went through " +
+            s"a soft failure and another controller was elected with epoch $controllerEpoch."
           stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
           throw new StateChangeFailedException(failMsg)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 87c53b5..6b7adfe 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -145,8 +145,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           leaderIsrAndControllerEpochOpt match {
             case Some(leaderIsrAndControllerEpoch) =>
               if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
-                throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
-                  .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
+                throw new StateChangeFailedException(s"Replica $replicaId for partition $topicAndPartition cannot " +
+                  s"be moved to NewReplica state as it is being requested to become leader")
               brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
                                                                   topic, partition, leaderIsrAndControllerEpoch,
                                                                   replicaAssignment, isNew = true)
@@ -229,21 +229,22 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
             }
           if (leaderAndIsrIsEmpty && !controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition))
             throw new StateChangeFailedException(
-              "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
-              .format(replicaId, topicAndPartition))
+              s"Failed to change state of replica $replicaId for partition $topicAndPartition since the leader " +
+                s"and isr path in zookeeper is empty")
+
       }
     }
     catch {
       case t: Throwable =>
-        stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed"
-                                  .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t)
+        stateChangeLogger.error(s"Controller $controllerId epoch ${controller.epoch} initiated state change of " +
+          s"replica $replicaId for partition $topicAndPartition from $currState to $targetState failed", t)
     }
   }
 
   def areAllReplicasForTopicDeleted(topic: String): Boolean = {
     val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
     val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap
-    debug("Are all replicas for topic %s deleted %s".format(topic, replicaStatesForTopic))
+    debug(s"Are all replicas for topic $topic deleted $replicaStatesForTopic")
     replicaStatesForTopic.forall(_._2 == ReplicaDeletionSuccessful)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 9b01043..fd26e11 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -56,7 +56,7 @@ abstract class AbstractFetcherThread(name: String,
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
 
-  private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port)
+  private val metricId = ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port)
   val fetcherStats = new FetcherStats(metricId)
   val fetcherLagStats = new FetcherLagStats(metricId)
 
@@ -121,7 +121,7 @@ abstract class AbstractFetcherThread(name: String,
   def maybeTruncate(): Unit = {
     val epochRequests = inLock(partitionMapLock) { buildLeaderEpochRequest(states) }
 
-    if (!epochRequests.isEmpty) {
+    if (epochRequests.nonEmpty) {
       val fetchedEpochs = fetchEpochsFromLeader(epochRequests)
       //Ensure we hold a lock during truncation.
       inLock(partitionMapLock) {
@@ -149,7 +149,7 @@ abstract class AbstractFetcherThread(name: String,
     } catch {
       case t: Throwable =>
         if (isRunning.get) {
-          warn(s"Error in fetch to broker ${sourceBroker.id}, request ${fetchRequest}", t)
+          warn(s"Error in fetch to broker ${sourceBroker.id}, request $fetchRequest", t)
           inLock(partitionMapLock) {
             partitionStates.partitionSet.asScala.foreach(updatePartitionsWithError)
             // there is an error occurred while fetching partitions, sleep a while
@@ -194,31 +194,29 @@ abstract class AbstractFetcherThread(name: String,
                       // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
                       // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                       // should get fixed in the subsequent fetches
-                      logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.fetchOffset  + " error " + ime.getMessage)
+                      logger.error(s"Found invalid messages during fetch for partition $topicPartition offset ${currentPartitionFetchState.fetchOffset} error ${ime.getMessage}")
                       updatePartitionsWithError(topicPartition)
                     case e: KafkaStorageException =>
                       logger.error(s"Error while processing data for partition $topicPartition", e)
                       updatePartitionsWithError(topicPartition)
                     case e: Throwable =>
-                      throw new KafkaException("error processing data for partition [%s,%d] offset %d"
-                        .format(topic, partitionId, currentPartitionFetchState.fetchOffset), e)
+                      throw new KafkaException(s"Error processing data for partition $topicPartition " +
+                        s"offset ${currentPartitionFetchState.fetchOffset}", e)
                   }
                 case Errors.OFFSET_OUT_OF_RANGE =>
                   try {
                     val newOffset = handleOffsetOutOfRange(topicPartition)
                     partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
-                    error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
-                      .format(currentPartitionFetchState.fetchOffset, topic, partitionId, newOffset))
+                    error(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition out of range; reset offset to $newOffset")
                   } catch {
                     case e: FatalExitError => throw e
                     case e: Throwable =>
-                      error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
+                      error(s"Error getting offset for partition $topicPartition to broker ${sourceBroker.id}", e)
                       updatePartitionsWithError(topicPartition)
                   }
                 case _ =>
                   if (isRunning.get) {
-                    error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
-                      partitionData.exception.get))
+                    error(s"Error for partition $topicPartition to broker %${sourceBroker.id}:${partitionData.exception.get}")
                     updatePartitionsWithError(topicPartition)
                   }
               }
@@ -261,7 +259,7 @@ abstract class AbstractFetcherThread(name: String,
     val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStates.asScala
       .map { state =>
         val maybeTruncationComplete = partitions.get(state.topicPartition()) match {
-          case Some(offset) => new PartitionFetchState(offset, state.value.delay, truncatingLog = false)
+          case Some(offset) => PartitionFetchState(offset, state.value.delay, truncatingLog = false)
           case None => state.value()
         }
         (state.topicPartition(), maybeTruncationComplete)
@@ -275,7 +273,7 @@ abstract class AbstractFetcherThread(name: String,
       for (partition <- partitions) {
         Option(partitionStates.stateValue(partition)).foreach (currentPartitionFetchState =>
           if (!currentPartitionFetchState.isDelayed)
-            partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.fetchOffset, new DelayedItem(delay), currentPartitionFetchState.truncatingLog))
+            partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentPartitionFetchState.fetchOffset, new DelayedItem(delay), currentPartitionFetchState.truncatingLog))
         )
       }
       partitionMapCond.signalAll()
@@ -353,11 +351,11 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
   val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
 
   def getAndMaybePut(topic: String, partitionId: Int): FetcherLagMetrics = {
-    stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
+    stats.getAndMaybePut(ClientIdTopicPartition(metricId.clientId, topic, partitionId))
   }
 
   def isReplicaInSync(topic: String, partitionId: Int): Boolean = {
-    val fetcherLagMetrics = stats.get(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
+    val fetcherLagMetrics = stats.get(ClientIdTopicPartition(metricId.clientId, topic, partitionId))
     if (fetcherLagMetrics != null)
       fetcherLagMetrics.lag <= 0
     else
@@ -365,7 +363,7 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
   }
 
   def unregister(topic: String, partitionId: Int) {
-    val lagMetrics = stats.remove(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
+    val lagMetrics = stats.remove(ClientIdTopicPartition(metricId.clientId, topic, partitionId))
     if (lagMetrics != null) lagMetrics.unregister()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 69c81e6..88b1d23 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -598,7 +598,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
       } catch {
         case e : IOException =>
           offlineDirs += logDir
-          error(s"Fail to read ${brokerMetaPropsFile} under log directory ${logDir}", e)
+          error(s"Fail to read $brokerMetaPropsFile under log directory $logDir", e)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 9464941..293b0dc 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -229,11 +229,11 @@ class MetadataCache(brokerId: Int) extends Logging {
   def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, tp.partition).isDefined
 
   private def removePartitionInfo(topic: String, partitionId: Int): Boolean = {
-    cache.get(topic).map { infos =>
+    cache.get(topic).exists { infos =>
       infos.remove(partitionId)
       if (infos.isEmpty) cache.remove(topic)
       true
-    }.getOrElse(false)
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7920efe..113dbcb 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -181,7 +181,7 @@ class ReplicaManager(val config: KafkaConfig,
     (dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
 
   private var hwThreadInitialized = false
-  this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
+  this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
   val stateChangeLogger = KafkaController.stateChangeLogger
   private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
   private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
@@ -976,16 +976,17 @@ class ReplicaManager(val config: KafkaConfig,
                              leaderAndISRRequest: LeaderAndIsrRequest,
                              onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
     leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
-      stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
-                                .format(localBrokerId, stateInfo, correlationId,
-                                        leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
+      stateChangeLogger.trace(s"Broker $localBrokerId received LeaderAndIsr request $stateInfo " +
+        s"correlation id $correlationId from controller ${leaderAndISRRequest.controllerId} " +
+        s"epoch ${leaderAndISRRequest.controllerEpoch} for partition $topicPartition")
     }
     replicaStateChangeLock synchronized {
       val responseMap = new mutable.HashMap[TopicPartition, Errors]
       if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
-        stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
-          "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
-          correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
+        stateChangeLogger.warn(s"Broker $localBrokerId ignoring LeaderAndIsr request from " +
+          s"controller ${leaderAndISRRequest.controllerId} with correlation id $correlationId since " +
+          s"its controller epoch ${leaderAndISRRequest.controllerEpoch} is old. Latest known controller " +
+          s"epoch is $controllerEpoch")
         BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH)
       } else {
         val controllerId = leaderAndISRRequest.controllerId
@@ -997,9 +998,10 @@ class ReplicaManager(val config: KafkaConfig,
           val partition = getOrCreatePartition(topicPartition)
           val partitionLeaderEpoch = partition.getLeaderEpoch
           if (partition eq ReplicaManager.OfflinePartition) {
-            stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
-              "epoch %d for partition [%s,%d] as the local replica for the partition is in an offline log directory")
-              .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
+            stateChangeLogger.warn(s"Broker $localBrokerId ignoring LeaderAndIsr request from " +
+              s"controller $controllerId with correlation id $correlationId " +
+              s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
+              "partition is in an offline log directory")
             responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
           } else if (partitionLeaderEpoch < stateInfo.basePartitionState.leaderEpoch) {
             // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
@@ -1007,18 +1009,19 @@ class ReplicaManager(val config: KafkaConfig,
             if(stateInfo.basePartitionState.replicas.contains(localBrokerId))
               partitionState.put(partition, stateInfo)
             else {
-              stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
-                "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
-                .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
-                  topicPartition.topic, topicPartition.partition, stateInfo.basePartitionState.replicas.asScala.mkString(",")))
+              stateChangeLogger.warn(s"Broker $localBrokerId ignoring LeaderAndIsr request from " +
+                s"controller $controllerId with correlation id $correlationId " +
+                s"epoch $controllerEpoch for partition $topicPartition as itself is not in assigned " +
+                s"replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
               responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
             }
           } else {
             // Otherwise record the error code in response
-            stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
-              "epoch %d for partition [%s,%d] since its associated leader epoch %d is not higher than the current leader epoch %d")
-              .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
-                topicPartition.topic, topicPartition.partition, stateInfo.basePartitionState.leaderEpoch, partitionLeaderEpoch))
+            stateChangeLogger.warn(s"Broker $localBrokerId ignoring LeaderAndIsr request from " +
+              s"controller $controllerId with correlation id $correlationId " +
+              s"epoch $controllerEpoch for partition $topicPartition since its associated " +
+              s"leader epoch ${stateInfo.basePartitionState.leaderEpoch} is not higher than the current " +
+              s"leader epoch $partitionLeaderEpoch")
             responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
           }
         }
@@ -1080,9 +1083,9 @@ class ReplicaManager(val config: KafkaConfig,
                           correlationId: Int,
                           responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = {
     partitionState.keys.foreach { partition =>
-      stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
-        "starting the become-leader transition for partition %s")
-        .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
+      stateChangeLogger.trace(s"Broker $localBrokerId handling LeaderAndIsr request correlationId $correlationId from " +
+        s"controller $controllerId epoch $epoch starting the become-leader transition for " +
+        s"partition ${partition.topicPartition}")
     }
 
     for (partition <- partitionState.keys)
@@ -1098,39 +1101,40 @@ class ReplicaManager(val config: KafkaConfig,
         try {
           if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) {
             partitionsToMakeLeaders += partition
-            stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
-              "%d epoch %d with correlation id %d for partition %s")
-              .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
+            stateChangeLogger.trace(s"Broker $localBrokerId stopped fetchers as part of become-leader request from " +
+              s"controller $controllerId epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} " +
+              s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch})")
           } else
-            stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " +
-              "controller %d epoch %d for partition %s since it is already the leader for the partition.")
-              .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
+            stateChangeLogger.info(s"Broker $localBrokerId skipped the become-leader state change after marking its " +
+              s"partition as leader with correlation id $correlationId from controller $controllerId epoch $epoch for " +
+              s"partition ${partition.topicPartition} (last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
+              s"since it is already the leader for the partition.")
         } catch {
           case e: KafkaStorageException =>
-            stateChangeLogger.error(("Broker %d skipped the become-leader state change with correlation id %d from " +
-              "controller %d epoch %d for partition %s since the replica for the partition is offline due to disk error %s.")
-              .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition, e))
-            val dirOpt = getLogDir(new TopicPartition(partition.topic, partition.partitionId))
+            stateChangeLogger.error(s"Broker $localBrokerId skipped the become-leader state change with " +
+              s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+              s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) since " +
+              s"the replica for the partition is offline due to disk error $e")
+            val dirOpt = getLogDir(partition.topicPartition)
             error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e)
-            responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.KAFKA_STORAGE_ERROR)
+            responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
         }
       }
 
     } catch {
       case e: Throwable =>
         partitionState.keys.foreach { partition =>
-          val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" +
-            " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)
-          stateChangeLogger.error(errorMsg, e)
+          stateChangeLogger.error(s"Error on broker $localBrokerId while processing LeaderAndIsr request " +
+            s"correlationId $correlationId received from controller $controllerId epoch $epoch for " +
+            s"partition ${partition.topicPartition}", e)
         }
         // Re-throw the exception for it to be caught in KafkaApis
         throw e
     }
 
     partitionState.keys.foreach { partition =>
-      stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
-        "for the become-leader transition for partition %s")
-        .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
+      stateChangeLogger.trace(s"Broker $localBrokerId completed LeaderAndIsr request correlationId $correlationId from " +
+        s"controller $controllerId epoch $epoch for the become-leader transition for partition ${partition.topicPartition}")
     }
 
     partitionsToMakeLeaders
@@ -1160,9 +1164,9 @@ class ReplicaManager(val config: KafkaConfig,
                             correlationId: Int,
                             responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = {
     partitionState.keys.foreach { partition =>
-      stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
-        "starting the become-follower transition for partition %s")
-        .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
+      stateChangeLogger.trace(s"Broker $localBrokerId handling LeaderAndIsr request correlationId $correlationId from " +
+        s"controller $controllerId epoch $epoch starting the become-follower transition for " +
+        s"partition ${partition.topicPartition}")
     }
 
     for (partition <- partitionState.keys)
@@ -1182,37 +1186,39 @@ class ReplicaManager(val config: KafkaConfig,
               if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
                 partitionsToMakeFollower += partition
               else
-                stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
-                  "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader")
-                  .format(localBrokerId, correlationId, controllerId, partitionStateInfo.basePartitionState.controllerEpoch,
-                    partition.topicPartition, newLeaderBrokerId))
+                stateChangeLogger.info(s"Broker $localBrokerId skipped the become-follower state change after marking " +
+                  s"its partition as follower with correlation id $correlationId from controller $controllerId epoch $epoch " +
+                  s"for partition ${partition.topicPartition} (last update " +
+                  s"controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
+                  s"since the new leader $newLeaderBrokerId is the same as the old leader")
             case None =>
               // The leader broker should always be present in the metadata cache.
               // If not, we should record the error message and abort the transition process for this partition
-              stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
-                " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.")
-                .format(localBrokerId, correlationId, controllerId, partitionStateInfo.basePartitionState.controllerEpoch,
-                  partition.topicPartition, newLeaderBrokerId))
+              stateChangeLogger.error(s"Broker $localBrokerId received LeaderAndIsrRequest with " +
+                s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+                s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
+                s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
               // Create the local replica even if the leader is unavailable. This is required to ensure that we include
               // the partition's high watermark in the checkpoint file (see KAFKA-1647)
               partition.getOrCreateReplica(isNew = partitionStateInfo.isNew)
           }
         } catch {
           case e: KafkaStorageException =>
-            stateChangeLogger.error(("Broker %d skipped the become-follower state change with correlation id %d from " +
-              "controller %d epoch %d for partition [%s,%d] since the replica for the partition is offline due to disk error %s")
-              .format(localBrokerId, correlationId, controllerId, partitionStateInfo.basePartitionState.controllerEpoch, partition.topic, partition.partitionId, e))
-            val dirOpt = getLogDir(new TopicPartition(partition.topic, partition.partitionId))
+            stateChangeLogger.error(s"Broker $localBrokerId skipped the become-follower state change with " +
+              s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+              s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) since the " +
+              s"replica for the partition is offline due to disk error $e")
+            val dirOpt = getLogDir(partition.topicPartition)
             error(s"Error while making broker the follower for partition $partition in dir $dirOpt", e)
-            responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.KAFKA_STORAGE_ERROR)
+            responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
         }
       }
 
       replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
       partitionsToMakeFollower.foreach { partition =>
-        stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " +
-          "%d epoch %d with correlation id %d for partition %s")
-          .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
+        stateChangeLogger.trace(s"Broker $localBrokerId stopped fetchers as part of become-follower request from " +
+          s"controller $controllerId epoch $epoch with correlation id $correlationId for " +
+          s"partition ${partition.topicPartition}")
       }
 
       partitionsToMakeFollower.foreach { partition =>
@@ -1222,16 +1228,16 @@ class ReplicaManager(val config: KafkaConfig,
       }
 
       partitionsToMakeFollower.foreach { partition =>
-        stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " +
-          "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId,
-          partition.topicPartition, correlationId, controllerId, epoch))
+        stateChangeLogger.trace(s"Broker $localBrokerId truncated logs and checkpointed recovery boundaries for " +
+          s"partition ${partition.topicPartition} as part of become-follower request with " +
+          s"correlation id $correlationId from controller $controllerId epoch $epoch")
       }
 
       if (isShuttingDown.get()) {
         partitionsToMakeFollower.foreach { partition =>
-          stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " +
-            "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId,
-            controllerId, epoch, partition.topicPartition))
+          stateChangeLogger.trace(s"Broker $localBrokerId skipped the adding-fetcher step of the become-follower state " +
+            s"change with correlation id $correlationId from controller $controllerId epoch $epoch for " +
+            s"partition ${partition.topicPartition} since it is shutting down")
         }
       }
       else {
@@ -1243,24 +1249,23 @@ class ReplicaManager(val config: KafkaConfig,
         replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
 
         partitionsToMakeFollower.foreach { partition =>
-          stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " +
-            "%d epoch %d with correlation id %d for partition %s")
-            .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
+          stateChangeLogger.trace(s"Broker $localBrokerId started fetcher to new leader as part of become-follower " +
+            s"request from controller $controllerId epoch $epoch with correlation id $correlationId for " +
+            s"partition ${partition.topicPartition}")
         }
       }
     } catch {
       case e: Throwable =>
-        val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " +
-          "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
-        stateChangeLogger.error(errorMsg, e)
+        stateChangeLogger.error(s"Error on broker $localBrokerId while processing LeaderAndIsr request with " +
+          s"correlationId $correlationId received from controller $controllerId epoch $epoch", e)
         // Re-throw the exception for it to be caught in KafkaApis
         throw e
     }
 
     partitionState.keys.foreach { partition =>
-      stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
-        "for the become-follower transition for partition %s")
-        .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
+      stateChangeLogger.trace(s"Broker $localBrokerId completed LeaderAndIsr request correlationId $correlationId from " +
+        s"controller $controllerId epoch $epoch for the become-follower transition for " +
+        s"partition ${partition.topicPartition}")
     }
 
     partitionsToMakeFollower

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
index c0f385f..eb6142a 100644
--- a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
+++ b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
@@ -54,7 +54,7 @@ class ReplicaVerificationToolTest {
     val output = sb.toString.trim
 
     assertTrue(s"Max lag information should be in output: `$output`",
-      output.endsWith(": max lag is 10 for partition [a,0] at offset 10 among 3 partitions"))
+      output.endsWith(": max lag is 10 for partition a-0 at offset 10 among 3 partitions"))
   }
 
 }