You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/15 00:51:53 UTC
kafka git commit: KAFKA-5822;
Consistent log formatting of topic partitions
Repository: kafka
Updated Branches:
refs/heads/trunk 5d2422258 -> f6f56a645
KAFKA-5822; Consistent log formatting of topic partitions
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3778 from hachikuji/KAFKA-5822
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f6f56a64
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f6f56a64
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f6f56a64
Branch: refs/heads/trunk
Commit: f6f56a645bb1c5ec6810c024ba517e43bf77056c
Parents: 5d24222
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Sep 15 01:50:20 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Sep 15 01:51:35 2017 +0100
----------------------------------------------------------------------
.../main/scala/kafka/cluster/Partition.scala | 6 +-
.../scala/kafka/common/TopicAndPartition.scala | 2 +-
.../controller/ControllerChannelManager.scala | 6 +-
.../kafka/controller/KafkaController.scala | 34 ++---
.../controller/PartitionStateMachine.scala | 7 +-
.../kafka/controller/ReplicaStateMachine.scala | 15 +-
.../kafka/server/AbstractFetcherThread.scala | 30 ++--
.../main/scala/kafka/server/KafkaServer.scala | 2 +-
.../main/scala/kafka/server/MetadataCache.scala | 4 +-
.../scala/kafka/server/ReplicaManager.scala | 149 ++++++++++---------
.../tools/ReplicaVerificationToolTest.scala | 2 +-
11 files changed, 129 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 9a55200..3b5fee0 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -69,7 +69,7 @@ class Partition(val topic: String,
* In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
* each partition. */
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
- this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
+ this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == localBrokerId
val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
@@ -145,7 +145,7 @@ class Partition(val topic: String,
AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
val log = logManager.getOrCreateLog(topicPartition, config, isNew)
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
- val offsetMap = checkpoint.read
+ val offsetMap = checkpoint.read()
if (!offsetMap.contains(topicPartition))
info(s"No checkpointed highwatermark is found for partition $topicPartition")
val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset)
@@ -589,7 +589,7 @@ class Partition(val topic: String,
override def hashCode: Int =
31 + topic.hashCode + 17 * partitionId + (if (isOffline) 1 else 0)
- override def toString: String = {
+ override def toString(): String = {
val partitionString = new StringBuilder
partitionString.append("Topic: " + topic)
partitionString.append("; Partition: " + partitionId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 07a2292..4a8e65d 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -37,5 +37,5 @@ case class TopicAndPartition(topic: String, partition: Int) {
def asTopicPartition = new TopicPartition(topic, partition)
- override def toString = "[%s,%d]".format(topic, partition)
+ override def toString: String = s"$topic-$partition"
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b34284f..693f297 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -405,10 +405,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
val typeOfRequest =
if (broker == state.basePartitionState.leader) "become-leader"
else "become-follower"
- stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
- "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
- state, broker,
- topicPartition.topic, topicPartition.partition))
+ stateChangeLogger.trace(s"Controller $controllerId epoch $controllerEpoch sending " +
+ s"$typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition")
}
val leaderIds = leaderAndIsrPartitionStates.map(_._2.basePartitionState.leader).toSet
val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4b07751..5ff47bf 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -22,7 +22,7 @@ import com.yammer.metrics.core.Gauge
import kafka.admin.{AdminUtils, PreferredReplicaLeaderElectionCommand}
import kafka.api._
import kafka.cluster.Broker
-import kafka.common.{TopicAndPartition, _}
+import kafka.common._
import kafka.log.LogConfig
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server._
@@ -88,7 +88,7 @@ class ControllerContext(val zkUtils: ZkUtils) {
brokerIds.flatMap { brokerId =>
partitionReplicaAssignment.collect {
case (topicAndPartition, replicas) if replicas.contains(brokerId) =>
- new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId)
+ PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId)
}
}.toSet
}
@@ -98,7 +98,7 @@ class ControllerContext(val zkUtils: ZkUtils) {
.filter { case (topicAndPartition, _) => topicAndPartition.topic == topic }
.flatMap { case (topicAndPartition, replicas) =>
replicas.map { r =>
- new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
+ PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
}
}.toSet
}
@@ -115,7 +115,7 @@ class ControllerContext(val zkUtils: ZkUtils) {
def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = {
partitions.flatMap { p =>
val replicas = partitionReplicaAssignment(p)
- replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))
+ replicas.map(r => PartitionAndReplica(p.topic, p.partition, r))
}
}
@@ -129,7 +129,7 @@ class ControllerContext(val zkUtils: ZkUtils) {
object KafkaController extends Logging {
- val stateChangeLogger = new StateChangeLogger("state.change.logger")
+ val stateChangeLogger = StateChangeLogger("state.change.logger")
val InitialControllerEpoch = 1
val InitialControllerEpochZkVersion = 1
@@ -561,7 +561,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
//5. replicas in RAR -> OnlineReplica
reassignedReplicas.foreach { replica =>
- replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
+ replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
replica)), OnlineReplica)
}
//6. Set AR to RAR in memory.
@@ -768,9 +768,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
partitionsToReassign ++= partitionsBeingReassigned
partitionsToReassign --= reassignedPartitions
controllerContext.partitionsBeingReassigned ++= partitionsToReassign
- info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString()))
- info("Partitions already reassigned: %s".format(reassignedPartitions.toString()))
- info("Resuming reassignment of partitions: %s".format(partitionsToReassign.toString()))
+ info(s"Partitions being reassigned: $partitionsBeingReassigned")
+ info(s"Partitions already reassigned: $reassignedPartitions")
+ info(s"Resuming reassignment of partitions: $partitionsToReassign")
}
private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
@@ -803,9 +803,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
}
private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
- zkUtils.getLeaderAndIsrForPartition(topic, partition).map { leaderAndIsr =>
+ zkUtils.getLeaderAndIsrForPartition(topic, partition).exists { leaderAndIsr =>
replicas.forall(leaderAndIsr.isr.contains)
- }.getOrElse(false)
+ }
}
private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
@@ -1168,7 +1168,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
preferredReplicasForTopicsByBrokers.foreach { case (leaderBroker, topicAndPartitionsForBroker) =>
val topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) =>
val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
- leadershipInfo.map(_.leaderAndIsr.leader != leaderBroker).getOrElse(false)
+ leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker)
}
debug(s"Topics not in preferred replica $topicsNotInPreferredReplica")
@@ -1263,7 +1263,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
.format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
else {
if (partitionsToBeAdded.nonEmpty) {
- info("New partitions to be added %s".format(partitionsToBeAdded))
+ info(s"New partitions to be added $partitionsToBeAdded")
controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
onNewPartitionCreation(partitionsToBeAdded.keySet)
}
@@ -1277,16 +1277,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
override def process(): Unit = {
if (!isActive) return
- debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
+ debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
if (nonExistentTopics.nonEmpty) {
- warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
+ warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
}
topicsToBeDeleted --= nonExistentTopics
if (config.deleteTopicEnable) {
if (topicsToBeDeleted.nonEmpty) {
- info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
+ info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
// mark topic ineligible for deletion if other state changes are in progress
topicsToBeDeleted.foreach { topic =>
val partitionReassignmentInProgress =
@@ -1300,7 +1300,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
} else {
// If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
for (topic <- topicsToBeDeleted) {
- info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled")
+ info(s"Removing ${getDeleteTopicPath(topic)} since delete topic is disabled")
zkUtils.deletePath(getDeleteTopicPath(topic))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 5024c02..7257501 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -298,10 +298,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
if (controllerEpoch > controller.epoch) {
- val failMsg = ("aborted leader election for partition [%s,%d] since the LeaderAndIsr path was " +
- "already written by another controller. This probably means that the current controller %d went through " +
- "a soft failure and another controller was elected with epoch %d.")
- .format(topic, partition, controllerId, controllerEpoch)
+ val failMsg = s"aborted leader election for partition $topicAndPartition since the LeaderAndIsr path was " +
+ s"already written by another controller. This probably means that the current controller $controllerId went through " +
+ s"a soft failure and another controller was elected with epoch $controllerEpoch."
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 87c53b5..6b7adfe 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -145,8 +145,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
- throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
- .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
+ throw new StateChangeFailedException(s"Replica $replicaId for partition $topicAndPartition cannot " +
+ s"be moved to NewReplica state as it is being requested to become leader")
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment, isNew = true)
@@ -229,21 +229,22 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
}
if (leaderAndIsrIsEmpty && !controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition))
throw new StateChangeFailedException(
- "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
- .format(replicaId, topicAndPartition))
+ s"Failed to change state of replica $replicaId for partition $topicAndPartition since the leader " +
+ s"and isr path in zookeeper is empty")
+
}
}
catch {
case t: Throwable =>
- stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed"
- .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t)
+ stateChangeLogger.error(s"Controller $controllerId epoch ${controller.epoch} initiated state change of " +
+ s"replica $replicaId for partition $topicAndPartition from $currState to $targetState failed", t)
}
}
def areAllReplicasForTopicDeleted(topic: String): Boolean = {
val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap
- debug("Are all replicas for topic %s deleted %s".format(topic, replicaStatesForTopic))
+ debug(s"Are all replicas for topic $topic deleted $replicaStatesForTopic")
replicaStatesForTopic.forall(_._2 == ReplicaDeletionSuccessful)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 9b01043..fd26e11 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -56,7 +56,7 @@ abstract class AbstractFetcherThread(name: String,
private val partitionMapLock = new ReentrantLock
private val partitionMapCond = partitionMapLock.newCondition()
- private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port)
+ private val metricId = ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port)
val fetcherStats = new FetcherStats(metricId)
val fetcherLagStats = new FetcherLagStats(metricId)
@@ -121,7 +121,7 @@ abstract class AbstractFetcherThread(name: String,
def maybeTruncate(): Unit = {
val epochRequests = inLock(partitionMapLock) { buildLeaderEpochRequest(states) }
- if (!epochRequests.isEmpty) {
+ if (epochRequests.nonEmpty) {
val fetchedEpochs = fetchEpochsFromLeader(epochRequests)
//Ensure we hold a lock during truncation.
inLock(partitionMapLock) {
@@ -149,7 +149,7 @@ abstract class AbstractFetcherThread(name: String,
} catch {
case t: Throwable =>
if (isRunning.get) {
- warn(s"Error in fetch to broker ${sourceBroker.id}, request ${fetchRequest}", t)
+ warn(s"Error in fetch to broker ${sourceBroker.id}, request $fetchRequest", t)
inLock(partitionMapLock) {
partitionStates.partitionSet.asScala.foreach(updatePartitionsWithError)
// there is an error occurred while fetching partitions, sleep a while
@@ -194,31 +194,29 @@ abstract class AbstractFetcherThread(name: String,
// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
// should get fixed in the subsequent fetches
- logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.fetchOffset + " error " + ime.getMessage)
+ logger.error(s"Found invalid messages during fetch for partition $topicPartition offset ${currentPartitionFetchState.fetchOffset} error ${ime.getMessage}")
updatePartitionsWithError(topicPartition)
case e: KafkaStorageException =>
logger.error(s"Error while processing data for partition $topicPartition", e)
updatePartitionsWithError(topicPartition)
case e: Throwable =>
- throw new KafkaException("error processing data for partition [%s,%d] offset %d"
- .format(topic, partitionId, currentPartitionFetchState.fetchOffset), e)
+ throw new KafkaException(s"Error processing data for partition $topicPartition " +
+ s"offset ${currentPartitionFetchState.fetchOffset}", e)
}
case Errors.OFFSET_OUT_OF_RANGE =>
try {
val newOffset = handleOffsetOutOfRange(topicPartition)
partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
- error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
- .format(currentPartitionFetchState.fetchOffset, topic, partitionId, newOffset))
+ error(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition out of range; reset offset to $newOffset")
} catch {
case e: FatalExitError => throw e
case e: Throwable =>
- error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
+ error(s"Error getting offset for partition $topicPartition to broker ${sourceBroker.id}", e)
updatePartitionsWithError(topicPartition)
}
case _ =>
if (isRunning.get) {
- error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
- partitionData.exception.get))
+ error(s"Error for partition $topicPartition to broker %${sourceBroker.id}:${partitionData.exception.get}")
updatePartitionsWithError(topicPartition)
}
}
@@ -261,7 +259,7 @@ abstract class AbstractFetcherThread(name: String,
val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStates.asScala
.map { state =>
val maybeTruncationComplete = partitions.get(state.topicPartition()) match {
- case Some(offset) => new PartitionFetchState(offset, state.value.delay, truncatingLog = false)
+ case Some(offset) => PartitionFetchState(offset, state.value.delay, truncatingLog = false)
case None => state.value()
}
(state.topicPartition(), maybeTruncationComplete)
@@ -275,7 +273,7 @@ abstract class AbstractFetcherThread(name: String,
for (partition <- partitions) {
Option(partitionStates.stateValue(partition)).foreach (currentPartitionFetchState =>
if (!currentPartitionFetchState.isDelayed)
- partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.fetchOffset, new DelayedItem(delay), currentPartitionFetchState.truncatingLog))
+ partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentPartitionFetchState.fetchOffset, new DelayedItem(delay), currentPartitionFetchState.truncatingLog))
)
}
partitionMapCond.signalAll()
@@ -353,11 +351,11 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
def getAndMaybePut(topic: String, partitionId: Int): FetcherLagMetrics = {
- stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
+ stats.getAndMaybePut(ClientIdTopicPartition(metricId.clientId, topic, partitionId))
}
def isReplicaInSync(topic: String, partitionId: Int): Boolean = {
- val fetcherLagMetrics = stats.get(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
+ val fetcherLagMetrics = stats.get(ClientIdTopicPartition(metricId.clientId, topic, partitionId))
if (fetcherLagMetrics != null)
fetcherLagMetrics.lag <= 0
else
@@ -365,7 +363,7 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
}
def unregister(topic: String, partitionId: Int) {
- val lagMetrics = stats.remove(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
+ val lagMetrics = stats.remove(ClientIdTopicPartition(metricId.clientId, topic, partitionId))
if (lagMetrics != null) lagMetrics.unregister()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 69c81e6..88b1d23 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -598,7 +598,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
} catch {
case e : IOException =>
offlineDirs += logDir
- error(s"Fail to read ${brokerMetaPropsFile} under log directory ${logDir}", e)
+ error(s"Fail to read $brokerMetaPropsFile under log directory $logDir", e)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 9464941..293b0dc 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -229,11 +229,11 @@ class MetadataCache(brokerId: Int) extends Logging {
def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, tp.partition).isDefined
private def removePartitionInfo(topic: String, partitionId: Int): Boolean = {
- cache.get(topic).map { infos =>
+ cache.get(topic).exists { infos =>
infos.remove(partitionId)
if (infos.isEmpty) cache.remove(topic)
true
- }.getOrElse(false)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7920efe..113dbcb 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -181,7 +181,7 @@ class ReplicaManager(val config: KafkaConfig,
(dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
private var hwThreadInitialized = false
- this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
+ this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
val stateChangeLogger = KafkaController.stateChangeLogger
private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
@@ -976,16 +976,17 @@ class ReplicaManager(val config: KafkaConfig,
leaderAndISRRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
- stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
- .format(localBrokerId, stateInfo, correlationId,
- leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
+ stateChangeLogger.trace(s"Broker $localBrokerId received LeaderAndIsr request $stateInfo " +
+ s"correlation id $correlationId from controller ${leaderAndISRRequest.controllerId} " +
+ s"epoch ${leaderAndISRRequest.controllerEpoch} for partition $topicPartition")
}
replicaStateChangeLock synchronized {
val responseMap = new mutable.HashMap[TopicPartition, Errors]
if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
- stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
- "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
- correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
+ stateChangeLogger.warn(s"Broker $localBrokerId ignoring LeaderAndIsr request from " +
+ s"controller ${leaderAndISRRequest.controllerId} with correlation id $correlationId since " +
+ s"its controller epoch ${leaderAndISRRequest.controllerEpoch} is old. Latest known controller " +
+ s"epoch is $controllerEpoch")
BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH)
} else {
val controllerId = leaderAndISRRequest.controllerId
@@ -997,9 +998,10 @@ class ReplicaManager(val config: KafkaConfig,
val partition = getOrCreatePartition(topicPartition)
val partitionLeaderEpoch = partition.getLeaderEpoch
if (partition eq ReplicaManager.OfflinePartition) {
- stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
- "epoch %d for partition [%s,%d] as the local replica for the partition is in an offline log directory")
- .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
+ stateChangeLogger.warn(s"Broker $localBrokerId ignoring LeaderAndIsr request from " +
+ s"controller $controllerId with correlation id $correlationId " +
+ s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
+ "partition is in an offline log directory")
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
} else if (partitionLeaderEpoch < stateInfo.basePartitionState.leaderEpoch) {
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
@@ -1007,18 +1009,19 @@ class ReplicaManager(val config: KafkaConfig,
if(stateInfo.basePartitionState.replicas.contains(localBrokerId))
partitionState.put(partition, stateInfo)
else {
- stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
- "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
- .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
- topicPartition.topic, topicPartition.partition, stateInfo.basePartitionState.replicas.asScala.mkString(",")))
+ stateChangeLogger.warn(s"Broker $localBrokerId ignoring LeaderAndIsr request from " +
+ s"controller $controllerId with correlation id $correlationId " +
+ s"epoch $controllerEpoch for partition $topicPartition as itself is not in assigned " +
+ s"replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
} else {
// Otherwise record the error code in response
- stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
- "epoch %d for partition [%s,%d] since its associated leader epoch %d is not higher than the current leader epoch %d")
- .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
- topicPartition.topic, topicPartition.partition, stateInfo.basePartitionState.leaderEpoch, partitionLeaderEpoch))
+ stateChangeLogger.warn(s"Broker $localBrokerId ignoring LeaderAndIsr request from " +
+ s"controller $controllerId with correlation id $correlationId " +
+ s"epoch $controllerEpoch for partition $topicPartition since its associated " +
+ s"leader epoch ${stateInfo.basePartitionState.leaderEpoch} is not higher than the current " +
+ s"leader epoch $partitionLeaderEpoch")
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
}
}
@@ -1080,9 +1083,9 @@ class ReplicaManager(val config: KafkaConfig,
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = {
partitionState.keys.foreach { partition =>
- stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
- "starting the become-leader transition for partition %s")
- .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
+ stateChangeLogger.trace(s"Broker $localBrokerId handling LeaderAndIsr request correlationId $correlationId from " +
+ s"controller $controllerId epoch $epoch starting the become-leader transition for " +
+ s"partition ${partition.topicPartition}")
}
for (partition <- partitionState.keys)
@@ -1098,39 +1101,40 @@ class ReplicaManager(val config: KafkaConfig,
try {
if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) {
partitionsToMakeLeaders += partition
- stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
- "%d epoch %d with correlation id %d for partition %s")
- .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
+ stateChangeLogger.trace(s"Broker $localBrokerId stopped fetchers as part of become-leader request from " +
+ s"controller $controllerId epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} " +
+ s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch})")
} else
- stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " +
- "controller %d epoch %d for partition %s since it is already the leader for the partition.")
- .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
+ stateChangeLogger.info(s"Broker $localBrokerId skipped the become-leader state change after marking its " +
+ s"partition as leader with correlation id $correlationId from controller $controllerId epoch $epoch for " +
+ s"partition ${partition.topicPartition} (last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
+ s"since it is already the leader for the partition.")
} catch {
case e: KafkaStorageException =>
- stateChangeLogger.error(("Broker %d skipped the become-leader state change with correlation id %d from " +
- "controller %d epoch %d for partition %s since the replica for the partition is offline due to disk error %s.")
- .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition, e))
- val dirOpt = getLogDir(new TopicPartition(partition.topic, partition.partitionId))
+ stateChangeLogger.error(s"Broker $localBrokerId skipped the become-leader state change with " +
+ s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+ s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) since " +
+ s"the replica for the partition is offline due to disk error $e")
+ val dirOpt = getLogDir(partition.topicPartition)
error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e)
- responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.KAFKA_STORAGE_ERROR)
+ responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
}
}
} catch {
case e: Throwable =>
partitionState.keys.foreach { partition =>
- val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" +
- " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)
- stateChangeLogger.error(errorMsg, e)
+ stateChangeLogger.error(s"Error on broker $localBrokerId while processing LeaderAndIsr request " +
+ s"correlationId $correlationId received from controller $controllerId epoch $epoch for " +
+ s"partition ${partition.topicPartition}", e)
}
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
partitionState.keys.foreach { partition =>
- stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
- "for the become-leader transition for partition %s")
- .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
+ stateChangeLogger.trace(s"Broker $localBrokerId completed LeaderAndIsr request correlationId $correlationId from " +
+ s"controller $controllerId epoch $epoch for the become-leader transition for partition ${partition.topicPartition}")
}
partitionsToMakeLeaders
@@ -1160,9 +1164,9 @@ class ReplicaManager(val config: KafkaConfig,
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = {
partitionState.keys.foreach { partition =>
- stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
- "starting the become-follower transition for partition %s")
- .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
+ stateChangeLogger.trace(s"Broker $localBrokerId handling LeaderAndIsr request correlationId $correlationId from " +
+ s"controller $controllerId epoch $epoch starting the become-follower transition for " +
+ s"partition ${partition.topicPartition}")
}
for (partition <- partitionState.keys)
@@ -1182,37 +1186,39 @@ class ReplicaManager(val config: KafkaConfig,
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
partitionsToMakeFollower += partition
else
- stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
- "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader")
- .format(localBrokerId, correlationId, controllerId, partitionStateInfo.basePartitionState.controllerEpoch,
- partition.topicPartition, newLeaderBrokerId))
+ stateChangeLogger.info(s"Broker $localBrokerId skipped the become-follower state change after marking " +
+ s"its partition as follower with correlation id $correlationId from controller $controllerId epoch $epoch " +
+ s"for partition ${partition.topicPartition} (last update " +
+ s"controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
+ s"since the new leader $newLeaderBrokerId is the same as the old leader")
case None =>
// The leader broker should always be present in the metadata cache.
// If not, we should record the error message and abort the transition process for this partition
- stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
- " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.")
- .format(localBrokerId, correlationId, controllerId, partitionStateInfo.basePartitionState.controllerEpoch,
- partition.topicPartition, newLeaderBrokerId))
+ stateChangeLogger.error(s"Broker $localBrokerId received LeaderAndIsrRequest with " +
+ s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+ s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
+ s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
// Create the local replica even if the leader is unavailable. This is required to ensure that we include
// the partition's high watermark in the checkpoint file (see KAFKA-1647)
partition.getOrCreateReplica(isNew = partitionStateInfo.isNew)
}
} catch {
case e: KafkaStorageException =>
- stateChangeLogger.error(("Broker %d skipped the become-follower state change with correlation id %d from " +
- "controller %d epoch %d for partition [%s,%d] since the replica for the partition is offline due to disk error %s")
- .format(localBrokerId, correlationId, controllerId, partitionStateInfo.basePartitionState.controllerEpoch, partition.topic, partition.partitionId, e))
- val dirOpt = getLogDir(new TopicPartition(partition.topic, partition.partitionId))
+ stateChangeLogger.error(s"Broker $localBrokerId skipped the become-follower state change with " +
+ s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+ s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) since the " +
+ s"replica for the partition is offline due to disk error $e")
+ val dirOpt = getLogDir(partition.topicPartition)
error(s"Error while making broker the follower for partition $partition in dir $dirOpt", e)
- responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.KAFKA_STORAGE_ERROR)
+ responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
}
}
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
partitionsToMakeFollower.foreach { partition =>
- stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " +
- "%d epoch %d with correlation id %d for partition %s")
- .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
+ stateChangeLogger.trace(s"Broker $localBrokerId stopped fetchers as part of become-follower request from " +
+ s"controller $controllerId epoch $epoch with correlation id $correlationId for " +
+ s"partition ${partition.topicPartition}")
}
partitionsToMakeFollower.foreach { partition =>
@@ -1222,16 +1228,16 @@ class ReplicaManager(val config: KafkaConfig,
}
partitionsToMakeFollower.foreach { partition =>
- stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " +
- "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId,
- partition.topicPartition, correlationId, controllerId, epoch))
+ stateChangeLogger.trace(s"Broker $localBrokerId truncated logs and checkpointed recovery boundaries for " +
+ s"partition ${partition.topicPartition} as part of become-follower request with " +
+ s"correlation id $correlationId from controller $controllerId epoch $epoch")
}
if (isShuttingDown.get()) {
partitionsToMakeFollower.foreach { partition =>
- stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " +
- "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId,
- controllerId, epoch, partition.topicPartition))
+ stateChangeLogger.trace(s"Broker $localBrokerId skipped the adding-fetcher step of the become-follower state " +
+ s"change with correlation id $correlationId from controller $controllerId epoch $epoch for " +
+ s"partition ${partition.topicPartition} since it is shutting down")
}
}
else {
@@ -1243,24 +1249,23 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
partitionsToMakeFollower.foreach { partition =>
- stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " +
- "%d epoch %d with correlation id %d for partition %s")
- .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
+ stateChangeLogger.trace(s"Broker $localBrokerId started fetcher to new leader as part of become-follower " +
+ s"request from controller $controllerId epoch $epoch with correlation id $correlationId for " +
+ s"partition ${partition.topicPartition}")
}
}
} catch {
case e: Throwable =>
- val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " +
- "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
- stateChangeLogger.error(errorMsg, e)
+ stateChangeLogger.error(s"Error on broker $localBrokerId while processing LeaderAndIsr request with " +
+ s"correlationId $correlationId received from controller $controllerId epoch $epoch", e)
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
partitionState.keys.foreach { partition =>
- stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
- "for the become-follower transition for partition %s")
- .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
+ stateChangeLogger.trace(s"Broker $localBrokerId completed LeaderAndIsr request correlationId $correlationId from " +
+ s"controller $controllerId epoch $epoch for the become-follower transition for " +
+ s"partition ${partition.topicPartition}")
}
partitionsToMakeFollower
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6f56a64/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
index c0f385f..eb6142a 100644
--- a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
+++ b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
@@ -54,7 +54,7 @@ class ReplicaVerificationToolTest {
val output = sb.toString.trim
assertTrue(s"Max lag information should be in output: `$output`",
- output.endsWith(": max lag is 10 for partition [a,0] at offset 10 among 3 partitions"))
+ output.endsWith(": max lag is 10 for partition a-0 at offset 10 among 3 partitions"))
}
}