You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/23 16:40:12 UTC

kafka git commit: MINOR: Small cleanups/refactoring in kafka.controller

Repository: kafka
Updated Branches:
  refs/heads/trunk 062c5568e -> ac17ab4f0


MINOR: Small cleanups/refactoring in kafka.controller

- Updated logging to use string interpolation
- Minor refactors
- Fixed a few typos

Author: Mickael Maison <mi...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #4231 from mimaison/controller_refactor


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ac17ab4f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ac17ab4f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ac17ab4f

Branch: refs/heads/trunk
Commit: ac17ab4f09082579d0a239b3b7aac6e2ce342d84
Parents: 062c556
Author: Mickael Maison <mi...@gmail.com>
Authored: Thu Nov 23 16:34:07 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Nov 23 16:39:59 2017 +0000

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala   |  39 +++---
 .../kafka/controller/ControllerState.scala      |   2 +-
 .../kafka/controller/KafkaController.scala      | 140 +++++++++----------
 .../controller/PartitionStateMachine.scala      |  12 +-
 .../kafka/controller/ReplicaStateMachine.scala  |  11 +-
 .../kafka/controller/TopicDeletionManager.scala |  67 +++++----
 6 files changed, 132 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ac17ab4f/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index d5bd4e6..a072978 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -85,7 +85,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
         case Some(stateInfo) =>
           stateInfo.messageQueue.put(QueueItem(apiKey, request, callback))
         case None =>
-          warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
+          warn(s"Not sending request $request to broker $brokerId, since it is offline.")
       }
     }
   }
@@ -93,7 +93,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
   def addBroker(broker: Broker) {
     // be careful here. Maybe the startup() API has already started the request send thread
     brokerLock synchronized {
-      if(!brokerStateInfo.contains(broker.id)) {
+      if (!brokerStateInfo.contains(broker.id)) {
         addNewBroker(broker)
         startRequestSendThread(broker.id)
       }
@@ -108,7 +108,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
 
   private def addNewBroker(broker: Broker) {
     val messageQueue = new LinkedBlockingQueue[QueueItem]
-    debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))
+    debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}")
     val brokerNode = broker.getNode(config.interBrokerListenerName)
     val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
     val networkClient = {
@@ -148,8 +148,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
       )
     }
     val threadName = threadNamePrefix match {
-      case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)
-      case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id)
+      case None => s"Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"
+      case Some(name) => s"$name:Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"
     }
 
     val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
@@ -188,7 +188,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
 
   protected def startRequestSendThread(brokerId: Int) {
     val requestThread = brokerStateInfo(brokerId).requestSendThread
-    if(requestThread.getState == Thread.State.NEW)
+    if (requestThread.getState == Thread.State.NEW)
       requestThread.start()
   }
 }
@@ -233,9 +233,8 @@ class RequestSendThread(val controllerId: Int,
           }
         } catch {
           case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
-            warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
-              "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
-                requestBuilder.toString, brokerNode.toString), e)
+            warn(s"Controller $controllerId epoch ${controllerContext.epoch} fails to send request $requestBuilder " +
+              s"to broker $brokerNode. Reconnecting to broker.", e)
             networkClient.close(brokerNode.idString)
             isSendSuccessful = false
             backoff()
@@ -258,7 +257,7 @@ class RequestSendThread(val controllerId: Int,
       }
     } catch {
       case e: Throwable =>
-        error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString), e)
+        error(s"Controller $controllerId fails to send a request to broker $brokerNode", e)
         // If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
         networkClient.close(brokerNode.idString)
     }
@@ -270,13 +269,13 @@ class RequestSendThread(val controllerId: Int,
         if (!NetworkClientUtils.awaitReady(networkClient, brokerNode, time, socketTimeoutMs))
           throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
 
-        info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString))
+        info(s"Controller $controllerId connected to $brokerNode for sending state change requests")
       }
 
       true
     } catch {
       case e: Throwable =>
-        warn("Controller %d's connection to broker %s was unsuccessful".format(controllerId, brokerNode.toString), e)
+        warn(s"Controller $controllerId's connection to broker $brokerNode was unsuccessful", e)
         networkClient.close(brokerNode.idString)
         false
     }
@@ -296,14 +295,14 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
     // raise error if the previous batch is not empty
     if (leaderAndIsrRequestMap.nonEmpty)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
-        "a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
+        s"a new one. Some LeaderAndIsr state changes $leaderAndIsrRequestMap might be lost ")
     if (stopReplicaRequestMap.nonEmpty)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
-        "new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
+        s"new one. Some StopReplica state changes $stopReplicaRequestMap might be lost ")
     if (updateMetadataRequestBrokerSet.nonEmpty)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
-        "new one. Some UpdateMetadata state changes to brokers %s with partition info %s might be lost ".format(
-          updateMetadataRequestBrokerSet.toString(), updateMetadataRequestPartitionInfoMap.toString()))
+        s"new one. Some UpdateMetadata state changes to brokers $updateMetadataRequestBrokerSet with partition info " +
+        s"updateMetadataRequestPartitionInfoMap might be lost ")
   }
 
   def clear() {
@@ -369,7 +368,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
           updateMetadataRequestPartitionInfoMap.put(partition, partitionStateInfo)
 
         case None =>
-          info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
+          info(s"Leader not yet assigned for partition $partition. Skip sending UpdateMetadataRequest.")
       }
     }
 
@@ -459,10 +458,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
       stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
         val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
         val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
-        debug("The stop replica request (delete = true) sent to broker %d is %s"
-          .format(broker, stopReplicaWithDelete.mkString(",")))
-        debug("The stop replica request (delete = false) sent to broker %d is %s"
-          .format(broker, stopReplicaWithoutDelete.mkString(",")))
+        debug(s"The stop replica request (delete = true) sent to broker $broker is ${stopReplicaWithDelete.mkString(",")}")
+        debug(s"The stop replica request (delete = false) sent to broker $broker is ${stopReplicaWithoutDelete.mkString(",")}")
 
         val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => !r.deletePartition && r.callback == null)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ac17ab4f/core/src/main/scala/kafka/controller/ControllerState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala
index 74029b1..2bb63e8 100644
--- a/core/src/main/scala/kafka/controller/ControllerState.scala
+++ b/core/src/main/scala/kafka/controller/ControllerState.scala
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.

http://git-wip-us.apache.org/repos/asf/kafka/blob/ac17ab4f/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index f0ca9e7..3615b7d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -179,7 +179,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
   /**
    * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
    * It does the following things on the become-controller state change -
-   * 1. Register controller epoch changed listener
+   * 1. Registers controller epoch changed listener
    * 2. Increments the controller epoch
    * 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and
    *    leaders for all existing partitions.
@@ -295,7 +295,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * brokers as input. It does the following -
    * 1. Sends update metadata request to all live and shutting down brokers
    * 2. Triggers the OnlinePartition state change for all new/offline partitions
-   * 3. It checks whether there are reassigned replicas assigned to any newly started brokers.  If
+   * 3. It checks whether there are reassigned replicas assigned to any newly started brokers. If
    *    so, it performs the reassignment logic for each topic/partition.
    *
    * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point for two reasons:
@@ -305,7 +305,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    *    every broker that it is still valid.  Brokers check the leader epoch to determine validity of the request.
    */
   private def onBrokerStartup(newBrokers: Seq[Int]) {
-    info("New broker startup callback for %s".format(newBrokers.mkString(",")))
+    info(s"New broker startup callback for ${newBrokers.mkString(",")}")
     newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
     val newBrokersSet = newBrokers.toSet
     // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new
@@ -329,9 +329,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     // on the newly restarted brokers, there is a chance that topic deletion can resume
     val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
     if (replicasForTopicsToBeDeleted.nonEmpty) {
-      info(("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. " +
-        "Signaling restart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","),
-        topicDeletionManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
+      info(s"Some replicas ${replicasForTopicsToBeDeleted.mkString(",")} for topics scheduled for deletion " +
+        s"${topicDeletionManager.topicsToBeDeleted.mkString(",")} are on the newly restarted brokers " +
+        s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics")
       topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
     }
   }
@@ -341,23 +341,23 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * as input. It will call onReplicaBecomeOffline(...) with the list of replicas on those failed brokers as input.
    */
   private def onBrokerFailure(deadBrokers: Seq[Int]) {
-    info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
+    info(s"Broker failure callback for ${deadBrokers.mkString(",")}")
     deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
     val deadBrokersThatWereShuttingDown =
       deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
-    info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
+    info(s"Removed $deadBrokersThatWereShuttingDown from list of shutting down brokers.")
     val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)
     onReplicasBecomeOffline(allReplicasOnDeadBrokers)
   }
 
   /**
     * This method marks the given replicas as offline. It does the following -
-    * 1. Mark the given partitions as offline
+    * 1. Marks the given partitions as offline
     * 2. Triggers the OnlinePartition state change for all new/offline partitions
     * 3. Invokes the OfflineReplica state change on the input list of newly offline replicas
     * 4. If no partitions are affected then send UpdateMetadataRequest to live or shutting down brokers
     *
-    * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point.  This is because
+    * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because
     * the partition state machine will refresh our cache for us when performing leader election for all new/offline
     * partitions coming online.
     */
@@ -376,7 +376,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     // trigger OfflineReplica state change for those newly offline replicas
     replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)
 
-    // fail deletion of topics that affected by the offline replicas
+    // fail deletion of topics that are affected by the offline replicas
     if (newOfflineReplicasForDeletion.nonEmpty) {
       // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
       // deleted when its log directory is offline. This will prevent the replica from being in TopicDeletionStarted state indefinitely
@@ -398,7 +398,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * 2. Move the newly created partitions from NewPartition->OnlinePartition state
    */
   private def onNewPartitionCreation(newPartitions: Set[TopicPartition]) {
-    info("New partition creation callback for %s".format(newPartitions.mkString(",")))
+    info(s"New partition creation callback for ${newPartitions.mkString(",")}")
     partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
     replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
     partitionStateMachine.handleStateChanges(newPartitions.toSeq, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
@@ -528,11 +528,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
   }
 
   private def onPreferredReplicaElection(partitions: Set[TopicPartition], isTriggeredByAutoRebalance: Boolean = false) {
-    info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
+    info(s"Starting preferred replica leader election for partitions ${partitions.mkString(",")}")
     try {
       partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
     } catch {
-      case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
+      case e: Throwable => error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")}", e)
     } finally {
       removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
     }
@@ -580,9 +580,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     // start the channel manager
     startChannelManager()
     initializePartitionReassignment()
-    info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
-    info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds))
-    info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
+    info(s"Currently active brokers in the cluster: ${controllerContext.liveBrokerIds}")
+    info(s"Currently shutting brokers in the cluster: ${controllerContext.shuttingDownBrokerIds}")
+    info(s"Current list of topics in the cluster: ${controllerContext.allTopics}")
   }
 
   private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {
@@ -592,16 +592,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
       val topicDeleted = replicasOpt.isEmpty
       val successful =
-        if(!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicasOpt.get.head else false
+        if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicasOpt.get.head else false
       successful || topicDeleted
     }
     val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElection
     val pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion.filter(partition => topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
     val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion -- pendingPreferredReplicaElectionsSkippedFromTopicDeletion
-    info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
-    info("Partitions that completed preferred replica election: %s".format(partitionsThatCompletedPreferredReplicaElection.mkString(",")))
-    info("Skipping preferred replica election for partitions due to topic deletion: %s".format(pendingPreferredReplicaElectionsSkippedFromTopicDeletion.mkString(",")))
-    info("Resuming preferred replica election for partitions: %s".format(pendingPreferredReplicaElections.mkString(",")))
+    info(s"Partitions undergoing preferred replica election: ${partitionsUndergoingPreferredReplicaElection.mkString(",")}")
+    info(s"Partitions that completed preferred replica election: ${partitionsThatCompletedPreferredReplicaElection.mkString(",")}")
+    info(s"Skipping preferred replica election for partitions due to topic deletion: ${pendingPreferredReplicaElectionsSkippedFromTopicDeletion.mkString(",")}")
+    info(s"Resuming preferred replica election for partitions: ${pendingPreferredReplicaElections.mkString(",")}")
     pendingPreferredReplicaElections
   }
 
@@ -645,8 +645,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }.keySet.map(_.topic)
     val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
     val topicsIneligibleForDeletion = topicsWithOfflineReplicas | topicsForWhichPartitionReassignmentIsInProgress
-    info("List of topics to be deleted: %s".format(topicsToBeDeleted.mkString(",")))
-    info("List of topics ineligible for deletion: %s".format(topicsIneligibleForDeletion.mkString(",")))
+    info(s"List of topics to be deleted: ${topicsToBeDeleted.mkString(",")}")
+    info(s"List of topics ineligible for deletion: ${topicsIneligibleForDeletion.mkString(",")}")
     (topicsToBeDeleted, topicsIneligibleForDeletion)
   }
 
@@ -683,21 +683,21 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     // request to the current or new leader. This will prevent it from adding the old replicas to the ISR
     val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicPartition)
     controllerContext.partitionReplicaAssignment.put(topicPartition, reassignedReplicas)
-    if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
-      info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicPartition) +
-        "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
+    if (!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
+      info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
+        s"is not in the new list of replicas ${reassignedReplicas.mkString(",")}. Re-electing leader")
       // move the leader to one of the alive and caught up new replicas
       partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
     } else {
       // check if the leader is alive or not
       if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) {
-        info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicPartition) +
-          "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
+        info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
+          s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} and is alive")
         // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
         updateLeaderEpochAndSendRequest(topicPartition, oldAndNewReplicas, reassignedReplicas)
       } else {
-        info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicPartition) +
-          "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
+        info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
+          s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} but is dead")
         partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
       }
     }
@@ -721,14 +721,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic == partition.topic)
     partitionsAndReplicasForThisTopic.put(partition, replicas)
     val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap)
-    if (setDataResponse.resultCode == Code.OK) {
-      info("Updated assigned replicas for partition %s being reassigned to %s ".format(partition, replicas.mkString(",")))
-      // update the assigned replica list after a successful zookeeper write
-      controllerContext.partitionReplicaAssignment.put(partition, replicas)
-    } else if (setDataResponse.resultCode == Code.NONODE) {
-      throw new IllegalStateException("Topic %s doesn't exist".format(partition.topic))
-    } else {
-      throw new KafkaException(setDataResponse.resultException.get)
+    setDataResponse.resultCode match {
+      case Code.OK =>
+        info(s"Updated assigned replicas for partition $partition being reassigned to ${replicas.mkString(",")}")
+        // update the assigned replica list after a successful zookeeper write
+        controllerContext.partitionReplicaAssignment.put(partition, replicas)
+      case Code.NONODE => throw new IllegalStateException(s"Topic ${partition.topic} doesn't exist")
+      case _ => throw new KafkaException(setDataResponse.resultException.get)
     }
   }
 
@@ -791,7 +790,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     epochAndStatOpt.foreach { case (epoch, stat) =>
       controllerContext.epoch = epoch
       controllerContext.epochZkVersion = stat.getVersion
-      info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion))
+      info(s"Initialized controller epoch to ${controllerContext.epoch} and zk version ${controllerContext.epochZkVersion}")
     }
   }
 
@@ -827,9 +826,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
       val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
       if (currentLeader == preferredReplica) {
-        info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
+        info(s"Partition $partition completed preferred replica leader election. New leader is $preferredReplica")
       } else {
-        warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
+        warn(s"Partition $partition failed to complete preferred replica leader election. Leader is $currentLeader")
       }
     }
     if (!isTriggeredByAutoRebalance) {
@@ -868,7 +867,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty.
    */
   private def updateLeaderEpoch(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
-    debug("Updating leader epoch for partition %s.".format(partition))
+    debug(s"Updating leader epoch for partition $partition")
     var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
@@ -878,9 +877,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
           val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
           val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
           if (controllerEpoch > epoch)
-            throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
-              "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
-              "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
+            throw new StateChangeFailedException("Leader and isr path written by another controller. This probably " +
+              s"means the current controller with epoch $epoch went through a soft failure and another " +
+              s"controller was elected with epoch $controllerEpoch. Aborting state change by this controller")
           // increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded
           // assigned replica list
           val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
@@ -917,7 +916,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
         val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
         leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker)
       }
-      debug(s"Topics not in preferred replica $topicsNotInPreferredReplica")
+      debug(s"Topics not in preferred replica for broker $leaderBroker $topicsNotInPreferredReplica")
 
       val imbalanceRatio = topicsNotInPreferredReplica.size.toDouble / topicPartitionsForBroker.size
       trace(s"Leader imbalance ratio for broker $leaderBroker is $imbalanceRatio")
@@ -967,14 +966,14 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
         throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
       }
 
-      info("Shutting down broker " + id)
+      info(s"Shutting down broker $id")
 
       if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
-        throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
+        throw new BrokerNotAvailableException(s"Broker id $id does not exist.")
 
       controllerContext.shuttingDownBrokerIds.add(id)
-      debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
-      debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
+      debug(s"All shutting down brokers: ${controllerContext.shuttingDownBrokerIds.mkString(",")}")
+      debug(s"Live brokers: ${controllerContext.liveBrokerIds.mkString(",")}")
 
       val partitionsToActOn = controllerContext.partitionsOnBroker(id).filter { partition =>
         controllerContext.partitionReplicaAssignment(partition).size > 1 && controllerContext.partitionLeadershipInfo.contains(partition)
@@ -998,7 +997,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       replicaStateMachine.handleStateChanges(partitionsFollowedByBroker.map(partition =>
         PartitionAndReplica(partition, id)).toSeq, OfflineReplica)
       def replicatedPartitionsBrokerLeads() = {
-        trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
+        trace(s"All leaders = ${controllerContext.partitionLeadershipInfo.mkString(",")}")
         controllerContext.partitionLeadershipInfo.filter {
           case (topicPartition, leaderIsrAndControllerEpoch) =>
             leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicPartition).size > 1
@@ -1048,7 +1047,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       import JavaConverters._
       if (!isActive) return
       val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
-      debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
+      debug(s"Delete topic callback invoked for $stopReplicaResponse")
       val responseMap = stopReplicaResponse.responses.asScala
       val partitionsInError =
         if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
@@ -1127,13 +1126,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
      * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
      */
     if (activeControllerId != -1) {
-      debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId))
+      debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
       return
     }
 
     try {
       zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
-      info(config.brokerId + " successfully elected as the controller")
+      info(s"${config.brokerId} successfully elected as the controller")
       activeControllerId = config.brokerId
       onControllerFailover()
     } catch {
@@ -1142,12 +1141,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
         activeControllerId = zkClient.getControllerId.getOrElse(-1)
 
         if (activeControllerId != -1)
-          debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId))
+          debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}")
         else
           warn("A controller has been elected but just resigned, this will result in another round of election")
 
       case e2: Throwable =>
-        error("Error while electing or becoming controller on broker %d".format(config.brokerId), e2)
+        error(s"Error while electing or becoming controller on broker ${config.brokerId}", e2)
         triggerControllerMove()
     }
   }
@@ -1167,8 +1166,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
       val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
       val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
-      info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
-        .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
+      info(s"Newly added brokers: ${newBrokerIdsSorted.mkString(",")}, " +
+        s"deleted brokers: ${deadBrokerIdsSorted.mkString(",")}, all live brokers: ${liveBrokerIdsSorted.mkString(",")}")
+
       newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
       deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
       if (newBrokerIds.nonEmpty)
@@ -1193,8 +1193,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
         !deletedTopics.contains(p._1.topic))
       controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
-      info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
-        deletedTopics, addedPartitionReplicaAssignment))
+      info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
+        s"[$addedPartitionReplicaAssignment]")
       if (addedPartitionReplicaAssignment.nonEmpty)
         onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
     }
@@ -1224,9 +1224,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
       val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
         !controllerContext.partitionReplicaAssignment.contains(p._1))
-      if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
-        error("Skipping adding partitions %s for topic %s since it is currently being deleted"
-          .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+      if (topicDeletionManager.isTopicQueuedUpForDeletion(topic))
+        error(s"Skipping adding partitions ${partitionsToBeAdded.map(_._1.partition).mkString(",")} for topic $topic " +
+          "since it is currently being deleted")
       else {
         if (partitionsToBeAdded.nonEmpty) {
           info(s"New partitions to be added $partitionsToBeAdded")
@@ -1317,8 +1317,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
                 s"partition $partition being reassigned. Replica(s) " +
                 s"${(reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")} still need to catch up")
             }
-          case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
-            .format(partition, reassignedReplicas.mkString(",")))
+          case None => error(s"Error handling reassignment of partition $partition to replicas " +
+                         s"${reassignedReplicas.mkString(",")} as it was never created")
         }
       }
     }
@@ -1344,7 +1344,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
     private def processUpdateNotifications(partitions: Seq[TopicPartition]) {
       val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
-      debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicPartitions:" + partitions)
+      debug(s"Sending MetadataRequest to Brokers: $liveBrokers for TopicPartitions: $partitions")
       sendUpdateMetadataRequest(liveBrokers, partitions.toSet)
     }
   }
@@ -1361,8 +1361,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
         val partitions = zkClient.getPreferredReplicaElection
         val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
         if (partitionsForTopicsToBeDeleted.nonEmpty) {
-          error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
-            .format(partitionsForTopicsToBeDeleted))
+          error(s"Skipping preferred replica election for partitions $partitionsForTopicsToBeDeleted since the " +
+            "respective topics are being deleted")
         }
         onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
       }
@@ -1515,7 +1515,7 @@ private[controller] class ControllerStats extends KafkaMetricsGroup {
 
   val rateAndTimeMetrics: Map[ControllerState, KafkaTimer] = ControllerState.values.flatMap { state =>
     state.rateAndTimeMetricName.map { metricName =>
-      state -> new KafkaTimer(newTimer(s"$metricName", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+      state -> new KafkaTimer(newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
     }
   }.toMap
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ac17ab4f/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 217c2b6..2e27272 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -116,7 +116,7 @@ class PartitionStateMachine(config: KafkaConfig,
         doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt)
         controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
       } catch {
-        case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
+        case e: Throwable => error(s"Error while moving some partitions to $targetState state", e)
       }
     }
   }
@@ -417,9 +417,9 @@ class PartitionStateMachine(config: KafkaConfig,
 
   private def logInvalidTransition(partition: TopicPartition, targetState: PartitionState): Unit = {
     val currState = partitionState(partition)
-    val e = new IllegalStateException("Partition %s should be in the %s states before moving to %s state"
-      .format(partition, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state"
-      .format(currState))
+    val e = new IllegalStateException(s"Partition $partition should be in one of " +
+      s"${targetState.validPreviousStates.mkString(",")} states before moving to $targetState state. Instead it is in " +
+      s"$currState state")
     logFailedStateChange(partition, currState, targetState, e)
   }
 
@@ -429,8 +429,8 @@ class PartitionStateMachine(config: KafkaConfig,
 
   private def logFailedStateChange(partition: TopicPartition, currState: PartitionState, targetState: PartitionState, t: Throwable): Unit = {
     stateChangeLogger.withControllerEpoch(controllerContext.epoch)
-      .error("Controller %d epoch %d failed to change state for partition %s from %s to %s"
-      .format(controllerId, controllerContext.epoch, partition, currState, targetState), t)
+      .error(s"Controller $controllerId epoch ${controllerContext.epoch} failed to change state for partition $partition " +
+        s"from $currState to $targetState", t)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ac17ab4f/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 2156a67..85af764 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -105,7 +105,7 @@ class ReplicaStateMachine(config: KafkaConfig,
         }
         controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
       } catch {
-        case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
+        case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)
       }
     }
   }
@@ -381,16 +381,15 @@ class ReplicaStateMachine(config: KafkaConfig,
 
   private def logInvalidTransition(replica: PartitionAndReplica, targetState: ReplicaState): Unit = {
     val currState = replicaState(replica)
-    val e = new IllegalStateException("Replica %s should be in the %s states before moving to %s state"
-      .format(replica, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state"
-      .format(currState))
+    val e = new IllegalStateException(s"Replica $replica should be in the ${targetState.validPreviousStates.mkString(",")} " +
+      s"states before moving to $targetState state. Instead it is in $currState state")
     logFailedStateChange(replica, currState, targetState, e)
   }
 
   private def logFailedStateChange(replica: PartitionAndReplica, currState: ReplicaState, targetState: ReplicaState, t: Throwable): Unit = {
     stateChangeLogger.withControllerEpoch(controllerContext.epoch)
-      .error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed"
-      .format(controllerId, controllerContext.epoch, replica.replica, replica.topic, replica.partition, currState, targetState), t)
+      .error(s"Controller $controllerId epoch ${controllerContext.epoch} initiated state change of replica ${replica.replica} " +
+        s"for partition ${replica.topicPartition} from $currState to $targetState failed", t)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ac17ab4f/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index eaf6b09..b1d8394 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -58,7 +58,7 @@ import scala.collection.{Set, mutable}
 class TopicDeletionManager(controller: KafkaController,
                            eventManager: ControllerEventManager,
                            zkClient: KafkaZkClient) extends Logging {
-  this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
+  this.logIdent = s"[Topic Deletion Manager ${controller.config.brokerId}], "
   val controllerContext = controller.controllerContext
   val isDeleteTopicEnabled = controller.config.deleteTopicEnable
   val topicsToBeDeleted = mutable.Set.empty[String]
@@ -72,7 +72,7 @@ class TopicDeletionManager(controller: KafkaController,
       topicsIneligibleForDeletion ++= initialTopicsIneligibleForDeletion & topicsToBeDeleted
     } else {
       // if delete topic is disabled clean the topic entries under /admin/delete_topics
-      info("Removing " + initialTopicsToBeDeleted + " since delete topic is disabled")
+      info(s"Removing $initialTopicsToBeDeleted since delete topic is disabled")
       zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq)
     }
   }
@@ -101,7 +101,7 @@ class TopicDeletionManager(controller: KafkaController,
    * @param topics Topics that should be deleted
    */
   def enqueueTopicsForDeletion(topics: Set[String]) {
-    if(isDeleteTopicEnabled) {
+    if (isDeleteTopicEnabled) {
       topicsToBeDeleted ++= topics
       partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic)
       resumeDeletions()
@@ -115,9 +115,9 @@ class TopicDeletionManager(controller: KafkaController,
    * @param topics Topics for which deletion can be resumed
    */
   def resumeDeletionForTopics(topics: Set[String] = Set.empty) {
-    if(isDeleteTopicEnabled) {
+    if (isDeleteTopicEnabled) {
       val topicsToResumeDeletion = topics & topicsToBeDeleted
-      if(topicsToResumeDeletion.nonEmpty) {
+      if (topicsToResumeDeletion.nonEmpty) {
         topicsIneligibleForDeletion --= topicsToResumeDeletion
         resumeDeletions()
       }
@@ -132,12 +132,11 @@ class TopicDeletionManager(controller: KafkaController,
    * @param replicas Replicas for which deletion has failed
    */
   def failReplicaDeletion(replicas: Set[PartitionAndReplica]) {
-    if(isDeleteTopicEnabled) {
+    if (isDeleteTopicEnabled) {
       val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
-      if(replicasThatFailedToDelete.nonEmpty) {
+      if (replicasThatFailedToDelete.nonEmpty) {
         val topics = replicasThatFailedToDelete.map(_.topic)
-        debug("Deletion failed for replicas %s. Halting deletion for topics %s"
-          .format(replicasThatFailedToDelete.mkString(","), topics))
+        debug(s"Deletion failed for replicas ${replicasThatFailedToDelete.mkString(",")}. Halting deletion for topics $topics")
         controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete.toSeq, ReplicaDeletionIneligible)
         markTopicIneligibleForDeletion(topics)
         resumeDeletions()
@@ -152,37 +151,37 @@ class TopicDeletionManager(controller: KafkaController,
    * @param topics Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion
    */
   def markTopicIneligibleForDeletion(topics: Set[String]) {
-    if(isDeleteTopicEnabled) {
+    if (isDeleteTopicEnabled) {
       val newTopicsToHaltDeletion = topicsToBeDeleted & topics
       topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
-      if(newTopicsToHaltDeletion.nonEmpty)
-        info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
+      if (newTopicsToHaltDeletion.nonEmpty)
+        info(s"Halted deletion of topics ${newTopicsToHaltDeletion.mkString(",")}")
     }
   }
 
   private def isTopicIneligibleForDeletion(topic: String): Boolean = {
-    if(isDeleteTopicEnabled) {
+    if (isDeleteTopicEnabled) {
       topicsIneligibleForDeletion.contains(topic)
     } else
       true
   }
 
   private def isTopicDeletionInProgress(topic: String): Boolean = {
-    if(isDeleteTopicEnabled) {
+    if (isDeleteTopicEnabled) {
       controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)
     } else
       false
   }
 
   def isPartitionToBeDeleted(topicAndPartition: TopicPartition) = {
-    if(isDeleteTopicEnabled) {
+    if (isDeleteTopicEnabled) {
       partitionsToBeDeleted.contains(topicAndPartition)
     } else
       false
   }
 
   def isTopicQueuedUpForDeletion(topic: String): Boolean = {
-    if(isDeleteTopicEnabled) {
+    if (isDeleteTopicEnabled) {
       topicsToBeDeleted.contains(topic)
     } else
       false
@@ -196,7 +195,7 @@ class TopicDeletionManager(controller: KafkaController,
    */
   def completeReplicaDeletion(replicas: Set[PartitionAndReplica]) {
     val successfullyDeletedReplicas = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
-    debug("Deletion successfully completed for replicas %s".format(successfullyDeletedReplicas.mkString(",")))
+    debug(s"Deletion successfully completed for replicas ${successfullyDeletedReplicas.mkString(",")}")
     controller.replicaStateMachine.handleStateChanges(successfullyDeletedReplicas.toSeq, ReplicaDeletionSuccessful)
     resumeDeletions()
   }
@@ -221,8 +220,7 @@ class TopicDeletionManager(controller: KafkaController,
   private def markTopicForDeletionRetry(topic: String) {
     // reset replica states from ReplicaDeletionIneligible to OfflineReplica
     val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
-    info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
-      .format(topic, failedReplicas.mkString(",")))
+    info(s"Retrying delete topic for topic $topic since replicas ${failedReplicas.mkString(",")} were not successfully deleted")
     controller.replicaStateMachine.handleStateChanges(failedReplicas.toSeq, OfflineReplica)
   }
 
@@ -253,7 +251,7 @@ class TopicDeletionManager(controller: KafkaController,
    * removed from their caches.
    */
   private def onTopicDeletion(topics: Set[String]) {
-    info("Topic deletion callback for %s".format(topics.mkString(",")))
+    info(s"Topic deletion callback for ${topics.mkString(",")}")
     // send update metadata so that brokers stop serving data for topics to be deleted
     val partitions = topics.flatMap(controllerContext.partitionsForTopic)
     controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
@@ -289,12 +287,12 @@ class TopicDeletionManager(controller: KafkaController,
       controller.replicaStateMachine.handleStateChanges(deadReplicasForTopic.toSeq, ReplicaDeletionIneligible)
       // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
       controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, OfflineReplica)
-      debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
+      debug(s"Deletion started for replicas ${replicasForDeletionRetry.mkString(",")}")
       controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, ReplicaDeletionStarted,
         new Callbacks(stopReplicaResponseCallback = (stopReplicaResponseObj, replicaId) =>
           eventManager.put(controller.TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj, replicaId))))
       if (deadReplicasForTopic.nonEmpty) {
-        debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
+        debug(s"Dead Replicas (${deadReplicasForTopic.mkString(",")}) found for topic $topic")
         markTopicIneligibleForDeletion(Set(topic))
       }
     }
@@ -312,7 +310,7 @@ class TopicDeletionManager(controller: KafkaController,
    *    will delete all persistent data from all replicas of the respective partitions
    */
   private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicPartition]) {
-    info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))
+    info(s"Partition deletion callback for ${partitionsToBeDeleted.mkString(",")}")
     val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
     startReplicaDeletion(replicasPerPartition)
   }
@@ -320,40 +318,39 @@ class TopicDeletionManager(controller: KafkaController,
   private def resumeDeletions(): Unit = {
     val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
 
-    if(topicsQueuedForDeletion.nonEmpty)
-      info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))
+    if (topicsQueuedForDeletion.nonEmpty)
+      info(s"Handling deletion for topics ${topicsQueuedForDeletion.mkString(",")}")
 
     topicsQueuedForDeletion.foreach { topic =>
       // if all replicas are marked as deleted successfully, then topic deletion is done
-      if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
+      if (controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
         // clear up all state for this topic from controller cache and zookeeper
         completeDeleteTopic(topic)
-        info("Deletion of topic %s successfully completed".format(topic))
+        info(s"Deletion of topic $topic successfully completed")
       } else {
-        if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
+        if (controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
           // ignore since topic deletion is in progress
           val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
           val replicaIds = replicasInDeletionStartedState.map(_.replica)
           val partitions = replicasInDeletionStartedState.map(_.topicPartition)
-          info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),
-            partitions.mkString(","), topic))
+          info(s"Deletion for replicas ${replicaIds.mkString(",")} for partition ${partitions.mkString(",")} of topic $topic in progress")
         } else {
           // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
           // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
           // or there is at least one failed replica (which means topic deletion should be retried).
-          if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
+          if (controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
             // mark topic for deletion retry
             markTopicForDeletionRetry(topic)
           }
         }
       }
       // Try delete topic if it is eligible for deletion.
-      if(isTopicEligibleForDeletion(topic)) {
-        info("Deletion of topic %s (re)started".format(topic))
+      if (isTopicEligibleForDeletion(topic)) {
+        info(s"Deletion of topic $topic (re)started")
         // topic deletion will be kicked off
         onTopicDeletion(Set(topic))
-      } else if(isTopicIneligibleForDeletion(topic)) {
-        info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))
+      } else if (isTopicIneligibleForDeletion(topic)) {
+        info(s"Not retrying deletion of topic $topic at this time since it is marked ineligible for deletion")
       }
     }
   }