You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/23 16:40:12 UTC
kafka git commit: MINOR: Small cleanups/refactoring in
kafka.controller
Repository: kafka
Updated Branches:
refs/heads/trunk 062c5568e -> ac17ab4f0
MINOR: Small cleanups/refactoring in kafka.controller
- Updated logging to use string interpolation
- Minor refactors
- Fixed a few typos
Author: Mickael Maison <mi...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #4231 from mimaison/controller_refactor
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ac17ab4f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ac17ab4f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ac17ab4f
Branch: refs/heads/trunk
Commit: ac17ab4f09082579d0a239b3b7aac6e2ce342d84
Parents: 062c556
Author: Mickael Maison <mi...@gmail.com>
Authored: Thu Nov 23 16:34:07 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Nov 23 16:39:59 2017 +0000
----------------------------------------------------------------------
.../controller/ControllerChannelManager.scala | 39 +++---
.../kafka/controller/ControllerState.scala | 2 +-
.../kafka/controller/KafkaController.scala | 140 +++++++++----------
.../controller/PartitionStateMachine.scala | 12 +-
.../kafka/controller/ReplicaStateMachine.scala | 11 +-
.../kafka/controller/TopicDeletionManager.scala | 67 +++++----
6 files changed, 132 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ac17ab4f/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index d5bd4e6..a072978 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -85,7 +85,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
case Some(stateInfo) =>
stateInfo.messageQueue.put(QueueItem(apiKey, request, callback))
case None =>
- warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
+ warn(s"Not sending request $request to broker $brokerId, since it is offline.")
}
}
}
@@ -93,7 +93,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
def addBroker(broker: Broker) {
// be careful here. Maybe the startup() API has already started the request send thread
brokerLock synchronized {
- if(!brokerStateInfo.contains(broker.id)) {
+ if (!brokerStateInfo.contains(broker.id)) {
addNewBroker(broker)
startRequestSendThread(broker.id)
}
@@ -108,7 +108,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
private def addNewBroker(broker: Broker) {
val messageQueue = new LinkedBlockingQueue[QueueItem]
- debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))
+ debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}")
val brokerNode = broker.getNode(config.interBrokerListenerName)
val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
val networkClient = {
@@ -148,8 +148,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
)
}
val threadName = threadNamePrefix match {
- case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)
- case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id)
+ case None => s"Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"
+ case Some(name) => s"$name:Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"
}
val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
@@ -188,7 +188,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
protected def startRequestSendThread(brokerId: Int) {
val requestThread = brokerStateInfo(brokerId).requestSendThread
- if(requestThread.getState == Thread.State.NEW)
+ if (requestThread.getState == Thread.State.NEW)
requestThread.start()
}
}
@@ -233,9 +233,8 @@ class RequestSendThread(val controllerId: Int,
}
} catch {
case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
- warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
- "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
- requestBuilder.toString, brokerNode.toString), e)
+ warn(s"Controller $controllerId epoch ${controllerContext.epoch} fails to send request $requestBuilder " +
+ s"to broker $brokerNode. Reconnecting to broker.", e)
networkClient.close(brokerNode.idString)
isSendSuccessful = false
backoff()
@@ -258,7 +257,7 @@ class RequestSendThread(val controllerId: Int,
}
} catch {
case e: Throwable =>
- error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString), e)
+ error(s"Controller $controllerId fails to send a request to broker $brokerNode", e)
// If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
networkClient.close(brokerNode.idString)
}
@@ -270,13 +269,13 @@ class RequestSendThread(val controllerId: Int,
if (!NetworkClientUtils.awaitReady(networkClient, brokerNode, time, socketTimeoutMs))
throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
- info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString))
+ info(s"Controller $controllerId connected to $brokerNode for sending state change requests")
}
true
} catch {
case e: Throwable =>
- warn("Controller %d's connection to broker %s was unsuccessful".format(controllerId, brokerNode.toString), e)
+ warn(s"Controller $controllerId's connection to broker $brokerNode was unsuccessful", e)
networkClient.close(brokerNode.idString)
false
}
@@ -296,14 +295,14 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
// raise error if the previous batch is not empty
if (leaderAndIsrRequestMap.nonEmpty)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
- "a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
+ s"a new one. Some LeaderAndIsr state changes $leaderAndIsrRequestMap might be lost ")
if (stopReplicaRequestMap.nonEmpty)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
- "new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
+ s"new one. Some StopReplica state changes $stopReplicaRequestMap might be lost ")
if (updateMetadataRequestBrokerSet.nonEmpty)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
- "new one. Some UpdateMetadata state changes to brokers %s with partition info %s might be lost ".format(
- updateMetadataRequestBrokerSet.toString(), updateMetadataRequestPartitionInfoMap.toString()))
+ s"new one. Some UpdateMetadata state changes to brokers $updateMetadataRequestBrokerSet with partition info " +
+ s"updateMetadataRequestPartitionInfoMap might be lost ")
}
def clear() {
@@ -369,7 +368,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
updateMetadataRequestPartitionInfoMap.put(partition, partitionStateInfo)
case None =>
- info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
+ info(s"Leader not yet assigned for partition $partition. Skip sending UpdateMetadataRequest.")
}
}
@@ -459,10 +458,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
- debug("The stop replica request (delete = true) sent to broker %d is %s"
- .format(broker, stopReplicaWithDelete.mkString(",")))
- debug("The stop replica request (delete = false) sent to broker %d is %s"
- .format(broker, stopReplicaWithoutDelete.mkString(",")))
+ debug(s"The stop replica request (delete = true) sent to broker $broker is ${stopReplicaWithDelete.mkString(",")}")
+ debug(s"The stop replica request (delete = false) sent to broker $broker is ${stopReplicaWithoutDelete.mkString(",")}")
val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => !r.deletePartition && r.callback == null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ac17ab4f/core/src/main/scala/kafka/controller/ControllerState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala
index 74029b1..2bb63e8 100644
--- a/core/src/main/scala/kafka/controller/ControllerState.scala
+++ b/core/src/main/scala/kafka/controller/ControllerState.scala
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
http://git-wip-us.apache.org/repos/asf/kafka/blob/ac17ab4f/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index f0ca9e7..3615b7d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -179,7 +179,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
/**
* This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
* It does the following things on the become-controller state change -
- * 1. Register controller epoch changed listener
+ * 1. Registers controller epoch changed listener
* 2. Increments the controller epoch
* 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and
* leaders for all existing partitions.
@@ -295,7 +295,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
* brokers as input. It does the following -
* 1. Sends update metadata request to all live and shutting down brokers
* 2. Triggers the OnlinePartition state change for all new/offline partitions
- * 3. It checks whether there are reassigned replicas assigned to any newly started brokers. If
+ * 3. It checks whether there are reassigned replicas assigned to any newly started brokers. If
* so, it performs the reassignment logic for each topic/partition.
*
* Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point for two reasons:
@@ -305,7 +305,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
* every broker that it is still valid. Brokers check the leader epoch to determine validity of the request.
*/
private def onBrokerStartup(newBrokers: Seq[Int]) {
- info("New broker startup callback for %s".format(newBrokers.mkString(",")))
+ info(s"New broker startup callback for ${newBrokers.mkString(",")}")
newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
val newBrokersSet = newBrokers.toSet
// send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new
@@ -329,9 +329,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
// on the newly restarted brokers, there is a chance that topic deletion can resume
val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
if (replicasForTopicsToBeDeleted.nonEmpty) {
- info(("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. " +
- "Signaling restart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","),
- topicDeletionManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
+ info(s"Some replicas ${replicasForTopicsToBeDeleted.mkString(",")} for topics scheduled for deletion " +
+ s"${topicDeletionManager.topicsToBeDeleted.mkString(",")} are on the newly restarted brokers " +
+ s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics")
topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
}
}
@@ -341,23 +341,23 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
* as input. It will call onReplicaBecomeOffline(...) with the list of replicas on those failed brokers as input.
*/
private def onBrokerFailure(deadBrokers: Seq[Int]) {
- info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
+ info(s"Broker failure callback for ${deadBrokers.mkString(",")}")
deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
val deadBrokersThatWereShuttingDown =
deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
- info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
+ info(s"Removed $deadBrokersThatWereShuttingDown from list of shutting down brokers.")
val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)
onReplicasBecomeOffline(allReplicasOnDeadBrokers)
}
/**
* This method marks the given replicas as offline. It does the following -
- * 1. Mark the given partitions as offline
+ * 1. Marks the given partitions as offline
* 2. Triggers the OnlinePartition state change for all new/offline partitions
* 3. Invokes the OfflineReplica state change on the input list of newly offline replicas
* 4. If no partitions are affected then send UpdateMetadataRequest to live or shutting down brokers
*
- * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because
+ * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because
* the partition state machine will refresh our cache for us when performing leader election for all new/offline
* partitions coming online.
*/
@@ -376,7 +376,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
// trigger OfflineReplica state change for those newly offline replicas
replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)
- // fail deletion of topics that affected by the offline replicas
+ // fail deletion of topics that are affected by the offline replicas
if (newOfflineReplicasForDeletion.nonEmpty) {
// it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
// deleted when its log directory is offline. This will prevent the replica from being in TopicDeletionStarted state indefinitely
@@ -398,7 +398,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
* 2. Move the newly created partitions from NewPartition->OnlinePartition state
*/
private def onNewPartitionCreation(newPartitions: Set[TopicPartition]) {
- info("New partition creation callback for %s".format(newPartitions.mkString(",")))
+ info(s"New partition creation callback for ${newPartitions.mkString(",")}")
partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
partitionStateMachine.handleStateChanges(newPartitions.toSeq, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
@@ -528,11 +528,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
private def onPreferredReplicaElection(partitions: Set[TopicPartition], isTriggeredByAutoRebalance: Boolean = false) {
- info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
+ info(s"Starting preferred replica leader election for partitions ${partitions.mkString(",")}")
try {
partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
} catch {
- case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
+ case e: Throwable => error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")}", e)
} finally {
removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
}
@@ -580,9 +580,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
// start the channel manager
startChannelManager()
initializePartitionReassignment()
- info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
- info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds))
- info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
+ info(s"Currently active brokers in the cluster: ${controllerContext.liveBrokerIds}")
+ info(s"Currently shutting brokers in the cluster: ${controllerContext.shuttingDownBrokerIds}")
+ info(s"Current list of topics in the cluster: ${controllerContext.allTopics}")
}
private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {
@@ -592,16 +592,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
val topicDeleted = replicasOpt.isEmpty
val successful =
- if(!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicasOpt.get.head else false
+ if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicasOpt.get.head else false
successful || topicDeleted
}
val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElection
val pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion.filter(partition => topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion -- pendingPreferredReplicaElectionsSkippedFromTopicDeletion
- info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
- info("Partitions that completed preferred replica election: %s".format(partitionsThatCompletedPreferredReplicaElection.mkString(",")))
- info("Skipping preferred replica election for partitions due to topic deletion: %s".format(pendingPreferredReplicaElectionsSkippedFromTopicDeletion.mkString(",")))
- info("Resuming preferred replica election for partitions: %s".format(pendingPreferredReplicaElections.mkString(",")))
+ info(s"Partitions undergoing preferred replica election: ${partitionsUndergoingPreferredReplicaElection.mkString(",")}")
+ info(s"Partitions that completed preferred replica election: ${partitionsThatCompletedPreferredReplicaElection.mkString(",")}")
+ info(s"Skipping preferred replica election for partitions due to topic deletion: ${pendingPreferredReplicaElectionsSkippedFromTopicDeletion.mkString(",")}")
+ info(s"Resuming preferred replica election for partitions: ${pendingPreferredReplicaElections.mkString(",")}")
pendingPreferredReplicaElections
}
@@ -645,8 +645,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}.keySet.map(_.topic)
val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
val topicsIneligibleForDeletion = topicsWithOfflineReplicas | topicsForWhichPartitionReassignmentIsInProgress
- info("List of topics to be deleted: %s".format(topicsToBeDeleted.mkString(",")))
- info("List of topics ineligible for deletion: %s".format(topicsIneligibleForDeletion.mkString(",")))
+ info(s"List of topics to be deleted: ${topicsToBeDeleted.mkString(",")}")
+ info(s"List of topics ineligible for deletion: ${topicsIneligibleForDeletion.mkString(",")}")
(topicsToBeDeleted, topicsIneligibleForDeletion)
}
@@ -683,21 +683,21 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
// request to the current or new leader. This will prevent it from adding the old replicas to the ISR
val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicPartition)
controllerContext.partitionReplicaAssignment.put(topicPartition, reassignedReplicas)
- if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
- info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicPartition) +
- "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
+ if (!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
+ info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
+ s"is not in the new list of replicas ${reassignedReplicas.mkString(",")}. Re-electing leader")
// move the leader to one of the alive and caught up new replicas
partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
} else {
// check if the leader is alive or not
if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) {
- info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicPartition) +
- "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
+ info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
+ s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} and is alive")
// shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
updateLeaderEpochAndSendRequest(topicPartition, oldAndNewReplicas, reassignedReplicas)
} else {
- info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicPartition) +
- "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
+ info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
+ s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} but is dead")
partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
}
}
@@ -721,14 +721,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic == partition.topic)
partitionsAndReplicasForThisTopic.put(partition, replicas)
val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap)
- if (setDataResponse.resultCode == Code.OK) {
- info("Updated assigned replicas for partition %s being reassigned to %s ".format(partition, replicas.mkString(",")))
- // update the assigned replica list after a successful zookeeper write
- controllerContext.partitionReplicaAssignment.put(partition, replicas)
- } else if (setDataResponse.resultCode == Code.NONODE) {
- throw new IllegalStateException("Topic %s doesn't exist".format(partition.topic))
- } else {
- throw new KafkaException(setDataResponse.resultException.get)
+ setDataResponse.resultCode match {
+ case Code.OK =>
+ info(s"Updated assigned replicas for partition $partition being reassigned to ${replicas.mkString(",")}")
+ // update the assigned replica list after a successful zookeeper write
+ controllerContext.partitionReplicaAssignment.put(partition, replicas)
+ case Code.NONODE => throw new IllegalStateException(s"Topic ${partition.topic} doesn't exist")
+ case _ => throw new KafkaException(setDataResponse.resultException.get)
}
}
@@ -791,7 +790,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
epochAndStatOpt.foreach { case (epoch, stat) =>
controllerContext.epoch = epoch
controllerContext.epochZkVersion = stat.getVersion
- info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion))
+ info(s"Initialized controller epoch to ${controllerContext.epoch} and zk version ${controllerContext.epochZkVersion}")
}
}
@@ -827,9 +826,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
if (currentLeader == preferredReplica) {
- info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
+ info(s"Partition $partition completed preferred replica leader election. New leader is $preferredReplica")
} else {
- warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
+ warn(s"Partition $partition failed to complete preferred replica leader election. Leader is $currentLeader")
}
}
if (!isTriggeredByAutoRebalance) {
@@ -868,7 +867,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
* @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty.
*/
private def updateLeaderEpoch(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
- debug("Updating leader epoch for partition %s.".format(partition))
+ debug(s"Updating leader epoch for partition $partition")
var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
var zkWriteCompleteOrUnnecessary = false
while (!zkWriteCompleteOrUnnecessary) {
@@ -878,9 +877,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
if (controllerEpoch > epoch)
- throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
- "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
- "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
+ throw new StateChangeFailedException("Leader and isr path written by another controller. This probably " +
+ s"means the current controller with epoch $epoch went through a soft failure and another " +
+ s"controller was elected with epoch $controllerEpoch. Aborting state change by this controller")
// increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded
// assigned replica list
val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
@@ -917,7 +916,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker)
}
- debug(s"Topics not in preferred replica $topicsNotInPreferredReplica")
+ debug(s"Topics not in preferred replica for broker $leaderBroker $topicsNotInPreferredReplica")
val imbalanceRatio = topicsNotInPreferredReplica.size.toDouble / topicPartitionsForBroker.size
trace(s"Leader imbalance ratio for broker $leaderBroker is $imbalanceRatio")
@@ -967,14 +966,14 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
}
- info("Shutting down broker " + id)
+ info(s"Shutting down broker $id")
if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
- throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
+ throw new BrokerNotAvailableException(s"Broker id $id does not exist.")
controllerContext.shuttingDownBrokerIds.add(id)
- debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
- debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
+ debug(s"All shutting down brokers: ${controllerContext.shuttingDownBrokerIds.mkString(",")}")
+ debug(s"Live brokers: ${controllerContext.liveBrokerIds.mkString(",")}")
val partitionsToActOn = controllerContext.partitionsOnBroker(id).filter { partition =>
controllerContext.partitionReplicaAssignment(partition).size > 1 && controllerContext.partitionLeadershipInfo.contains(partition)
@@ -998,7 +997,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
replicaStateMachine.handleStateChanges(partitionsFollowedByBroker.map(partition =>
PartitionAndReplica(partition, id)).toSeq, OfflineReplica)
def replicatedPartitionsBrokerLeads() = {
- trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
+ trace(s"All leaders = ${controllerContext.partitionLeadershipInfo.mkString(",")}")
controllerContext.partitionLeadershipInfo.filter {
case (topicPartition, leaderIsrAndControllerEpoch) =>
leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicPartition).size > 1
@@ -1048,7 +1047,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
import JavaConverters._
if (!isActive) return
val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
- debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
+ debug(s"Delete topic callback invoked for $stopReplicaResponse")
val responseMap = stopReplicaResponse.responses.asScala
val partitionsInError =
if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
@@ -1127,13 +1126,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
* createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
*/
if (activeControllerId != -1) {
- debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId))
+ debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
return
}
try {
zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
- info(config.brokerId + " successfully elected as the controller")
+ info(s"${config.brokerId} successfully elected as the controller")
activeControllerId = config.brokerId
onControllerFailover()
} catch {
@@ -1142,12 +1141,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (activeControllerId != -1)
- debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId))
+ debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}")
else
warn("A controller has been elected but just resigned, this will result in another round of election")
case e2: Throwable =>
- error("Error while electing or becoming controller on broker %d".format(config.brokerId), e2)
+ error(s"Error while electing or becoming controller on broker ${config.brokerId}", e2)
triggerControllerMove()
}
}
@@ -1167,8 +1166,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
- info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
- .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
+ info(s"Newly added brokers: ${newBrokerIdsSorted.mkString(",")}, " +
+ s"deleted brokers: ${deadBrokerIdsSorted.mkString(",")}, all live brokers: ${liveBrokerIdsSorted.mkString(",")}")
+
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
if (newBrokerIds.nonEmpty)
@@ -1193,8 +1193,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
!deletedTopics.contains(p._1.topic))
controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
- info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
- deletedTopics, addedPartitionReplicaAssignment))
+ info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
+ s"[$addedPartitionReplicaAssignment]")
if (addedPartitionReplicaAssignment.nonEmpty)
onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
}
@@ -1224,9 +1224,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))
- if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
- error("Skipping adding partitions %s for topic %s since it is currently being deleted"
- .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+ if (topicDeletionManager.isTopicQueuedUpForDeletion(topic))
+ error(s"Skipping adding partitions ${partitionsToBeAdded.map(_._1.partition).mkString(",")} for topic $topic " +
+ "since it is currently being deleted")
else {
if (partitionsToBeAdded.nonEmpty) {
info(s"New partitions to be added $partitionsToBeAdded")
@@ -1317,8 +1317,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
s"partition $partition being reassigned. Replica(s) " +
s"${(reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")} still need to catch up")
}
- case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
- .format(partition, reassignedReplicas.mkString(",")))
+ case None => error(s"Error handling reassignment of partition $partition to replicas " +
+ s"${reassignedReplicas.mkString(",")} as it was never created")
}
}
}
@@ -1344,7 +1344,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
private def processUpdateNotifications(partitions: Seq[TopicPartition]) {
val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
- debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicPartitions:" + partitions)
+ debug(s"Sending MetadataRequest to Brokers: $liveBrokers for TopicPartitions: $partitions")
sendUpdateMetadataRequest(liveBrokers, partitions.toSet)
}
}
@@ -1361,8 +1361,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
val partitions = zkClient.getPreferredReplicaElection
val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
if (partitionsForTopicsToBeDeleted.nonEmpty) {
- error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
- .format(partitionsForTopicsToBeDeleted))
+ error(s"Skipping preferred replica election for partitions $partitionsForTopicsToBeDeleted since the " +
+ "respective topics are being deleted")
}
onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
}
@@ -1515,7 +1515,7 @@ private[controller] class ControllerStats extends KafkaMetricsGroup {
val rateAndTimeMetrics: Map[ControllerState, KafkaTimer] = ControllerState.values.flatMap { state =>
state.rateAndTimeMetricName.map { metricName =>
- state -> new KafkaTimer(newTimer(s"$metricName", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+ state -> new KafkaTimer(newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
}
}.toMap
http://git-wip-us.apache.org/repos/asf/kafka/blob/ac17ab4f/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 217c2b6..2e27272 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -116,7 +116,7 @@ class PartitionStateMachine(config: KafkaConfig,
doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt)
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
- case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
+ case e: Throwable => error(s"Error while moving some partitions to $targetState state", e)
}
}
}
@@ -417,9 +417,9 @@ class PartitionStateMachine(config: KafkaConfig,
private def logInvalidTransition(partition: TopicPartition, targetState: PartitionState): Unit = {
val currState = partitionState(partition)
- val e = new IllegalStateException("Partition %s should be in the %s states before moving to %s state"
- .format(partition, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state"
- .format(currState))
+ val e = new IllegalStateException(s"Partition $partition should be in one of " +
+ s"${targetState.validPreviousStates.mkString(",")} states before moving to $targetState state. Instead it is in " +
+ s"$currState state")
logFailedStateChange(partition, currState, targetState, e)
}
@@ -429,8 +429,8 @@ class PartitionStateMachine(config: KafkaConfig,
private def logFailedStateChange(partition: TopicPartition, currState: PartitionState, targetState: PartitionState, t: Throwable): Unit = {
stateChangeLogger.withControllerEpoch(controllerContext.epoch)
- .error("Controller %d epoch %d failed to change state for partition %s from %s to %s"
- .format(controllerId, controllerContext.epoch, partition, currState, targetState), t)
+ .error(s"Controller $controllerId epoch ${controllerContext.epoch} failed to change state for partition $partition " +
+ s"from $currState to $targetState", t)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ac17ab4f/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 2156a67..85af764 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -105,7 +105,7 @@ class ReplicaStateMachine(config: KafkaConfig,
}
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
- case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
+ case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)
}
}
}
@@ -381,16 +381,15 @@ class ReplicaStateMachine(config: KafkaConfig,
private def logInvalidTransition(replica: PartitionAndReplica, targetState: ReplicaState): Unit = {
val currState = replicaState(replica)
- val e = new IllegalStateException("Replica %s should be in the %s states before moving to %s state"
- .format(replica, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state"
- .format(currState))
+ val e = new IllegalStateException(s"Replica $replica should be in the ${targetState.validPreviousStates.mkString(",")} " +
+ s"states before moving to $targetState state. Instead it is in $currState state")
logFailedStateChange(replica, currState, targetState, e)
}
private def logFailedStateChange(replica: PartitionAndReplica, currState: ReplicaState, targetState: ReplicaState, t: Throwable): Unit = {
stateChangeLogger.withControllerEpoch(controllerContext.epoch)
- .error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed"
- .format(controllerId, controllerContext.epoch, replica.replica, replica.topic, replica.partition, currState, targetState), t)
+ .error(s"Controller $controllerId epoch ${controllerContext.epoch} initiated state change of replica ${replica.replica} " +
+ s"for partition ${replica.topicPartition} from $currState to $targetState failed", t)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ac17ab4f/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index eaf6b09..b1d8394 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -58,7 +58,7 @@ import scala.collection.{Set, mutable}
class TopicDeletionManager(controller: KafkaController,
eventManager: ControllerEventManager,
zkClient: KafkaZkClient) extends Logging {
- this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
+ this.logIdent = s"[Topic Deletion Manager ${controller.config.brokerId}], "
val controllerContext = controller.controllerContext
val isDeleteTopicEnabled = controller.config.deleteTopicEnable
val topicsToBeDeleted = mutable.Set.empty[String]
@@ -72,7 +72,7 @@ class TopicDeletionManager(controller: KafkaController,
topicsIneligibleForDeletion ++= initialTopicsIneligibleForDeletion & topicsToBeDeleted
} else {
// if delete topic is disabled clean the topic entries under /admin/delete_topics
- info("Removing " + initialTopicsToBeDeleted + " since delete topic is disabled")
+ info(s"Removing $initialTopicsToBeDeleted since delete topic is disabled")
zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq)
}
}
@@ -101,7 +101,7 @@ class TopicDeletionManager(controller: KafkaController,
* @param topics Topics that should be deleted
*/
def enqueueTopicsForDeletion(topics: Set[String]) {
- if(isDeleteTopicEnabled) {
+ if (isDeleteTopicEnabled) {
topicsToBeDeleted ++= topics
partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic)
resumeDeletions()
@@ -115,9 +115,9 @@ class TopicDeletionManager(controller: KafkaController,
* @param topics Topics for which deletion can be resumed
*/
def resumeDeletionForTopics(topics: Set[String] = Set.empty) {
- if(isDeleteTopicEnabled) {
+ if (isDeleteTopicEnabled) {
val topicsToResumeDeletion = topics & topicsToBeDeleted
- if(topicsToResumeDeletion.nonEmpty) {
+ if (topicsToResumeDeletion.nonEmpty) {
topicsIneligibleForDeletion --= topicsToResumeDeletion
resumeDeletions()
}
@@ -132,12 +132,11 @@ class TopicDeletionManager(controller: KafkaController,
* @param replicas Replicas for which deletion has failed
*/
def failReplicaDeletion(replicas: Set[PartitionAndReplica]) {
- if(isDeleteTopicEnabled) {
+ if (isDeleteTopicEnabled) {
val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
- if(replicasThatFailedToDelete.nonEmpty) {
+ if (replicasThatFailedToDelete.nonEmpty) {
val topics = replicasThatFailedToDelete.map(_.topic)
- debug("Deletion failed for replicas %s. Halting deletion for topics %s"
- .format(replicasThatFailedToDelete.mkString(","), topics))
+ debug(s"Deletion failed for replicas ${replicasThatFailedToDelete.mkString(",")}. Halting deletion for topics $topics")
controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete.toSeq, ReplicaDeletionIneligible)
markTopicIneligibleForDeletion(topics)
resumeDeletions()
@@ -152,37 +151,37 @@ class TopicDeletionManager(controller: KafkaController,
* @param topics Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion
*/
def markTopicIneligibleForDeletion(topics: Set[String]) {
- if(isDeleteTopicEnabled) {
+ if (isDeleteTopicEnabled) {
val newTopicsToHaltDeletion = topicsToBeDeleted & topics
topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
- if(newTopicsToHaltDeletion.nonEmpty)
- info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
+ if (newTopicsToHaltDeletion.nonEmpty)
+ info(s"Halted deletion of topics ${newTopicsToHaltDeletion.mkString(",")}")
}
}
private def isTopicIneligibleForDeletion(topic: String): Boolean = {
- if(isDeleteTopicEnabled) {
+ if (isDeleteTopicEnabled) {
topicsIneligibleForDeletion.contains(topic)
} else
true
}
private def isTopicDeletionInProgress(topic: String): Boolean = {
- if(isDeleteTopicEnabled) {
+ if (isDeleteTopicEnabled) {
controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)
} else
false
}
def isPartitionToBeDeleted(topicAndPartition: TopicPartition) = {
- if(isDeleteTopicEnabled) {
+ if (isDeleteTopicEnabled) {
partitionsToBeDeleted.contains(topicAndPartition)
} else
false
}
def isTopicQueuedUpForDeletion(topic: String): Boolean = {
- if(isDeleteTopicEnabled) {
+ if (isDeleteTopicEnabled) {
topicsToBeDeleted.contains(topic)
} else
false
@@ -196,7 +195,7 @@ class TopicDeletionManager(controller: KafkaController,
*/
def completeReplicaDeletion(replicas: Set[PartitionAndReplica]) {
val successfullyDeletedReplicas = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
- debug("Deletion successfully completed for replicas %s".format(successfullyDeletedReplicas.mkString(",")))
+ debug(s"Deletion successfully completed for replicas ${successfullyDeletedReplicas.mkString(",")}")
controller.replicaStateMachine.handleStateChanges(successfullyDeletedReplicas.toSeq, ReplicaDeletionSuccessful)
resumeDeletions()
}
@@ -221,8 +220,7 @@ class TopicDeletionManager(controller: KafkaController,
private def markTopicForDeletionRetry(topic: String) {
// reset replica states from ReplicaDeletionIneligible to OfflineReplica
val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
- info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
- .format(topic, failedReplicas.mkString(",")))
+ info(s"Retrying delete topic for topic $topic since replicas ${failedReplicas.mkString(",")} were not successfully deleted")
controller.replicaStateMachine.handleStateChanges(failedReplicas.toSeq, OfflineReplica)
}
@@ -253,7 +251,7 @@ class TopicDeletionManager(controller: KafkaController,
* removed from their caches.
*/
private def onTopicDeletion(topics: Set[String]) {
- info("Topic deletion callback for %s".format(topics.mkString(",")))
+ info(s"Topic deletion callback for ${topics.mkString(",")}")
// send update metadata so that brokers stop serving data for topics to be deleted
val partitions = topics.flatMap(controllerContext.partitionsForTopic)
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
@@ -289,12 +287,12 @@ class TopicDeletionManager(controller: KafkaController,
controller.replicaStateMachine.handleStateChanges(deadReplicasForTopic.toSeq, ReplicaDeletionIneligible)
// send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, OfflineReplica)
- debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
+ debug(s"Deletion started for replicas ${replicasForDeletionRetry.mkString(",")}")
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, ReplicaDeletionStarted,
new Callbacks(stopReplicaResponseCallback = (stopReplicaResponseObj, replicaId) =>
eventManager.put(controller.TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj, replicaId))))
if (deadReplicasForTopic.nonEmpty) {
- debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
+ debug(s"Dead Replicas (${deadReplicasForTopic.mkString(",")}) found for topic $topic")
markTopicIneligibleForDeletion(Set(topic))
}
}
@@ -312,7 +310,7 @@ class TopicDeletionManager(controller: KafkaController,
* will delete all persistent data from all replicas of the respective partitions
*/
private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicPartition]) {
- info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))
+ info(s"Partition deletion callback for ${partitionsToBeDeleted.mkString(",")}")
val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
startReplicaDeletion(replicasPerPartition)
}
@@ -320,40 +318,39 @@ class TopicDeletionManager(controller: KafkaController,
private def resumeDeletions(): Unit = {
val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
- if(topicsQueuedForDeletion.nonEmpty)
- info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))
+ if (topicsQueuedForDeletion.nonEmpty)
+ info(s"Handling deletion for topics ${topicsQueuedForDeletion.mkString(",")}")
topicsQueuedForDeletion.foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
- if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
+ if (controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
// clear up all state for this topic from controller cache and zookeeper
completeDeleteTopic(topic)
- info("Deletion of topic %s successfully completed".format(topic))
+ info(s"Deletion of topic $topic successfully completed")
} else {
- if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
+ if (controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
// ignore since topic deletion is in progress
val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
val replicaIds = replicasInDeletionStartedState.map(_.replica)
val partitions = replicasInDeletionStartedState.map(_.topicPartition)
- info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),
- partitions.mkString(","), topic))
+ info(s"Deletion for replicas ${replicaIds.mkString(",")} for partition ${partitions.mkString(",")} of topic $topic in progress")
} else {
// if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
// TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
// or there is at least one failed replica (which means topic deletion should be retried).
- if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
+ if (controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
// mark topic for deletion retry
markTopicForDeletionRetry(topic)
}
}
}
// Try delete topic if it is eligible for deletion.
- if(isTopicEligibleForDeletion(topic)) {
- info("Deletion of topic %s (re)started".format(topic))
+ if (isTopicEligibleForDeletion(topic)) {
+ info(s"Deletion of topic $topic (re)started")
// topic deletion will be kicked off
onTopicDeletion(Set(topic))
- } else if(isTopicIneligibleForDeletion(topic)) {
- info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))
+ } else if (isTopicIneligibleForDeletion(topic)) {
+ info(s"Not retrying deletion of topic $topic at this time since it is marked ineligible for deletion")
}
}
}