You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/07 09:55:50 UTC
[1/2] kafka git commit: MINOR: Eliminate unnecessary
Topic(And)Partition allocations in Controller
Repository: kafka
Updated Branches:
refs/heads/trunk 58138126c -> 3735a6ca8
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index e41007b..2156a67 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -17,12 +17,12 @@
package kafka.controller
import kafka.api.LeaderAndIsr
-import kafka.common.{StateChangeFailedException, TopicAndPartition}
-import kafka.controller.Callbacks.CallbackBuilder
+import kafka.common.StateChangeFailedException
import kafka.server.KafkaConfig
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.KeeperException.Code
import scala.collection.mutable
@@ -82,7 +82,7 @@ class ReplicaStateMachine(config: KafkaConfig,
private def initializeReplicaState() {
controllerContext.partitionReplicaAssignment.foreach { case (partition, replicas) =>
replicas.foreach { replicaId =>
- val partitionAndReplica = PartitionAndReplica(partition.topic, partition.partition, replicaId)
+ val partitionAndReplica = PartitionAndReplica(partition, replicaId)
if (controllerContext.isReplicaOnline(replicaId, partition))
replicaState.put(partitionAndReplica, OnlineReplica)
else
@@ -95,12 +95,12 @@ class ReplicaStateMachine(config: KafkaConfig,
}
def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState,
- callbacks: Callbacks = (new CallbackBuilder).build): Unit = {
+ callbacks: Callbacks = new Callbacks()): Unit = {
if (replicas.nonEmpty) {
try {
controllerBrokerRequestBatch.newBatch()
replicas.groupBy(_.replica).map { case (replicaId, replicas) =>
- val partitions = replicas.map(_.topicAndPartition)
+ val partitions = replicas.map(_.topicPartition)
doHandleStateChanges(replicaId, partitions, targetState, callbacks)
}
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
@@ -145,16 +145,16 @@ class ReplicaStateMachine(config: KafkaConfig,
* @param partitions The partitions on this replica for which the state transition is invoked
* @param targetState The end state that the replica should be moved to
*/
- private def doHandleStateChanges(replicaId: Int, partitions: Seq[TopicAndPartition], targetState: ReplicaState,
+ private def doHandleStateChanges(replicaId: Int, partitions: Seq[TopicPartition], targetState: ReplicaState,
callbacks: Callbacks): Unit = {
- val replicas = partitions.map(partition => PartitionAndReplica(partition.topic, partition.partition, replicaId))
+ val replicas = partitions.map(partition => PartitionAndReplica(partition, replicaId))
replicas.foreach(replica => replicaState.getOrElseUpdate(replica, NonExistentReplica))
val (validReplicas, invalidReplicas) = replicas.partition(replica => isValidTransition(replica, targetState))
invalidReplicas.foreach(replica => logInvalidTransition(replica, targetState))
targetState match {
case NewReplica =>
validReplicas.foreach { replica =>
- val partition = replica.topicAndPartition
+ val partition = replica.topicPartition
controllerContext.partitionLeadershipInfo.get(partition) match {
case Some(leaderIsrAndControllerEpoch) =>
if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {
@@ -162,10 +162,9 @@ class ReplicaStateMachine(config: KafkaConfig,
logFailedStateChange(replica, replicaState(replica), OfflineReplica, exception)
} else {
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
- replica.topic,
- replica.partition,
+ replica.topicPartition,
leaderIsrAndControllerEpoch,
- controllerContext.partitionReplicaAssignment(replica.topicAndPartition),
+ controllerContext.partitionReplicaAssignment(replica.topicPartition),
isNew = true)
logSuccessfulTransition(replicaId, partition, replicaState(replica), NewReplica)
replicaState.put(replica, NewReplica)
@@ -177,7 +176,7 @@ class ReplicaStateMachine(config: KafkaConfig,
}
case OnlineReplica =>
validReplicas.foreach { replica =>
- val partition = replica.topicAndPartition
+ val partition = replica.topicPartition
replicaState(replica) match {
case NewReplica =>
val assignment = controllerContext.partitionReplicaAssignment(partition)
@@ -188,8 +187,7 @@ class ReplicaStateMachine(config: KafkaConfig,
controllerContext.partitionLeadershipInfo.get(partition) match {
case Some(leaderIsrAndControllerEpoch) =>
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
- replica.topic,
- replica.partition,
+ replica.topicPartition,
leaderIsrAndControllerEpoch,
controllerContext.partitionReplicaAssignment(partition), isNew = false)
case None =>
@@ -200,48 +198,47 @@ class ReplicaStateMachine(config: KafkaConfig,
}
case OfflineReplica =>
validReplicas.foreach { replica =>
- controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topic, replica.partition, deletePartition = false, null)
+ controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition,
+ deletePartition = false, (_, _) => ())
}
- val replicasToRemoveFromIsr = validReplicas.filter(replica => controllerContext.partitionLeadershipInfo.contains(replica.topicAndPartition))
- val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasToRemoveFromIsr.map(_.topicAndPartition))
+ val replicasToRemoveFromIsr = validReplicas.filter(replica => controllerContext.partitionLeadershipInfo.contains(replica.topicPartition))
+ val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasToRemoveFromIsr.map(_.topicPartition))
updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,
- partition.topic,
- partition.partition,
+ partition,
leaderIsrAndControllerEpoch,
controllerContext.partitionReplicaAssignment(partition), isNew = false)
}
- val replica = PartitionAndReplica(partition.topic, partition.partition, replicaId)
+ val replica = PartitionAndReplica(partition, replicaId)
logSuccessfulTransition(replicaId, partition, replicaState(replica), OfflineReplica)
replicaState.put(replica, OfflineReplica)
}
case ReplicaDeletionStarted =>
validReplicas.foreach { replica =>
- logSuccessfulTransition(replicaId, replica.topicAndPartition, replicaState(replica), ReplicaDeletionStarted)
+ logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), ReplicaDeletionStarted)
replicaState.put(replica, ReplicaDeletionStarted)
controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId),
- replica.topic,
- replica.partition,
+ replica.topicPartition,
deletePartition = true,
callbacks.stopReplicaResponseCallback)
}
case ReplicaDeletionIneligible =>
validReplicas.foreach { replica =>
- logSuccessfulTransition(replicaId, replica.topicAndPartition, replicaState(replica), ReplicaDeletionIneligible)
+ logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), ReplicaDeletionIneligible)
replicaState.put(replica, ReplicaDeletionIneligible)
}
case ReplicaDeletionSuccessful =>
validReplicas.foreach { replica =>
- logSuccessfulTransition(replicaId, replica.topicAndPartition, replicaState(replica), ReplicaDeletionSuccessful)
+ logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), ReplicaDeletionSuccessful)
replicaState.put(replica, ReplicaDeletionSuccessful)
}
case NonExistentReplica =>
validReplicas.foreach { replica =>
- val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(replica.topicAndPartition)
- controllerContext.partitionReplicaAssignment.put(replica.topicAndPartition, currentAssignedReplicas.filterNot(_ == replica.replica))
- logSuccessfulTransition(replicaId, replica.topicAndPartition, replicaState(replica), NonExistentReplica)
+ val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(replica.topicPartition)
+ controllerContext.partitionReplicaAssignment.put(replica.topicPartition, currentAssignedReplicas.filterNot(_ == replica.replica))
+ logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), NonExistentReplica)
replicaState.remove(replica)
}
}
@@ -254,16 +251,16 @@ class ReplicaStateMachine(config: KafkaConfig,
* @param partitions The partitions from which we're trying to remove the replica from isr
* @return The updated LeaderIsrAndControllerEpochs of all partitions for which we successfully removed the replica from isr.
*/
- private def removeReplicasFromIsr(replicaId: Int, partitions: Seq[TopicAndPartition]):
- Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
- var results = Map.empty[TopicAndPartition, LeaderIsrAndControllerEpoch]
+ private def removeReplicasFromIsr(replicaId: Int, partitions: Seq[TopicPartition]):
+ Map[TopicPartition, LeaderIsrAndControllerEpoch] = {
+ var results = Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]
var remaining = partitions
while (remaining.nonEmpty) {
val (successfulRemovals, removalsToRetry, failedRemovals) = doRemoveReplicasFromIsr(replicaId, remaining)
results ++= successfulRemovals
remaining = removalsToRetry
failedRemovals.foreach { case (partition, e) =>
- val replica = PartitionAndReplica(partition.topic, partition.partition, replicaId)
+ val replica = PartitionAndReplica(partition, replicaId)
logFailedStateChange(replica, replicaState(replica), OfflineReplica, e)
}
}
@@ -282,10 +279,10 @@ class ReplicaStateMachine(config: KafkaConfig,
* the partition leader updated partition state while the controller attempted to update partition state.
* 3. Exceptions corresponding to failed removals that should not be retried.
*/
- private def doRemoveReplicasFromIsr(replicaId: Int, partitions: Seq[TopicAndPartition]):
- (Map[TopicAndPartition, LeaderIsrAndControllerEpoch],
- Seq[TopicAndPartition],
- Map[TopicAndPartition, Exception]) = {
+ private def doRemoveReplicasFromIsr(replicaId: Int, partitions: Seq[TopicPartition]):
+ (Map[TopicPartition, LeaderIsrAndControllerEpoch],
+ Seq[TopicPartition],
+ Map[TopicPartition, Exception]) = {
val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk, failedStateReads) = getTopicPartitionStatesFromZk(partitions)
val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (partition, leaderAndIsr) => leaderAndIsr.isr.contains(replicaId) }
val adjustedLeaderAndIsrs = leaderAndIsrsWithReplica.mapValues { leaderAndIsr =>
@@ -318,13 +315,13 @@ class ReplicaStateMachine(config: KafkaConfig,
* didn't finish partition initialization.
* 3. Exceptions corresponding to failed zookeeper lookups or states whose controller epoch exceeds our current epoch.
*/
- private def getTopicPartitionStatesFromZk(partitions: Seq[TopicAndPartition]):
- (Map[TopicAndPartition, LeaderAndIsr],
- Seq[TopicAndPartition],
- Map[TopicAndPartition, Exception]) = {
- val leaderAndIsrs = mutable.Map.empty[TopicAndPartition, LeaderAndIsr]
- val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicAndPartition]
- val failed = mutable.Map.empty[TopicAndPartition, Exception]
+ private def getTopicPartitionStatesFromZk(partitions: Seq[TopicPartition]):
+ (Map[TopicPartition, LeaderAndIsr],
+ Seq[TopicPartition],
+ Map[TopicPartition, Exception]) = {
+ val leaderAndIsrs = mutable.Map.empty[TopicPartition, LeaderAndIsr]
+ val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicPartition]
+ val failed = mutable.Map.empty[TopicPartition, Exception]
val getDataResponses = try {
zkClient.getTopicPartitionStatesRaw(partitions)
} catch {
@@ -333,7 +330,7 @@ class ReplicaStateMachine(config: KafkaConfig,
return (leaderAndIsrs.toMap, partitionsWithNoLeaderAndIsrInZk, failed.toMap)
}
getDataResponses.foreach { getDataResponse =>
- val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
+ val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
if (getDataResponse.resultCode == Code.OK) {
val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
if (leaderIsrAndControllerEpochOpt.isEmpty) {
@@ -377,7 +374,7 @@ class ReplicaStateMachine(config: KafkaConfig,
private def isValidTransition(replica: PartitionAndReplica, targetState: ReplicaState) =
targetState.validPreviousStates.contains(replicaState(replica))
- private def logSuccessfulTransition(replicaId: Int, partition: TopicAndPartition, currState: ReplicaState, targetState: ReplicaState): Unit = {
+ private def logSuccessfulTransition(replicaId: Int, partition: TopicPartition, currState: ReplicaState, targetState: ReplicaState): Unit = {
stateChangeLogger.withControllerEpoch(controllerContext.epoch)
.trace(s"Changed state of replica $replicaId for partition $partition from $currState to $targetState")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 2e93f9d..eaf6b09 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -16,10 +16,9 @@
*/
package kafka.controller
-
-import kafka.common.TopicAndPartition
import kafka.utils.Logging
import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.TopicPartition
import scala.collection.{Set, mutable}
@@ -63,7 +62,7 @@ class TopicDeletionManager(controller: KafkaController,
val controllerContext = controller.controllerContext
val isDeleteTopicEnabled = controller.config.deleteTopicEnable
val topicsToBeDeleted = mutable.Set.empty[String]
- val partitionsToBeDeleted = mutable.Set.empty[TopicAndPartition]
+ val partitionsToBeDeleted = mutable.Set.empty[TopicPartition]
val topicsIneligibleForDeletion = mutable.Set.empty[String]
def init(initialTopicsToBeDeleted: Set[String], initialTopicsIneligibleForDeletion: Set[String]): Unit = {
@@ -175,7 +174,7 @@ class TopicDeletionManager(controller: KafkaController,
false
}
- def isPartitionToBeDeleted(topicAndPartition: TopicAndPartition) = {
+ def isPartitionToBeDeleted(topicAndPartition: TopicPartition) = {
if(isDeleteTopicEnabled) {
partitionsToBeDeleted.contains(topicAndPartition)
} else
@@ -292,8 +291,8 @@ class TopicDeletionManager(controller: KafkaController,
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, OfflineReplica)
debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, ReplicaDeletionStarted,
- new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) =>
- eventManager.put(controller.TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj, replicaId))).build)
+ new Callbacks(stopReplicaResponseCallback = (stopReplicaResponseObj, replicaId) =>
+ eventManager.put(controller.TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj, replicaId))))
if (deadReplicasForTopic.nonEmpty) {
debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
markTopicIneligibleForDeletion(Set(topic))
@@ -312,7 +311,7 @@ class TopicDeletionManager(controller: KafkaController,
* 3. Move all replicas to ReplicaDeletionStarted state. This will send StopReplicaRequest with deletePartition=true. And
* will delete all persistent data from all replicas of the respective partitions
*/
- private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
+ private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicPartition]) {
info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))
val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
startReplicaDeletion(replicasPerPartition)
@@ -335,7 +334,7 @@ class TopicDeletionManager(controller: KafkaController,
// ignore since topic deletion is in progress
val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
val replicaIds = replicasInDeletionStartedState.map(_.replica)
- val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))
+ val partitions = replicasInDeletionStartedState.map(_.topicPartition)
info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),
partitions.mkString(","), topic))
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index e79a6e3..da61077 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -28,7 +28,7 @@ import kafka.message.UncompressedCodec
import kafka.server.Defaults
import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
-import kafka.utils.{Logging, Pool, Scheduler, ZkUtils}
+import kafka.utils.{Logging, Pool, Scheduler}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4637521..3f8ac49 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
import kafka.cluster.Partition
-import kafka.common.{OffsetAndMetadata, OffsetMetadata, TopicAndPartition}
+import kafka.common.{OffsetAndMetadata, OffsetMetadata}
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.controller.{KafkaController}
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
@@ -37,7 +37,7 @@ import kafka.network.RequestChannel
import kafka.network.RequestChannel.{CloseConnectionAction, NoOpAction, SendAction}
import kafka.security.SecurityUtils
import kafka.security.auth.{Resource, _}
-import kafka.utils.{CoreUtils, Logging, ZKGroupTopicDirs, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, ZkUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
@@ -236,10 +236,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val controlledShutdownRequest = request.body[ControlledShutdownRequest]
authorizeClusterAction(request)
- def controlledShutdownCallback(controlledShutdownResult: Try[Set[TopicAndPartition]]): Unit = {
+ def controlledShutdownCallback(controlledShutdownResult: Try[Set[TopicPartition]]): Unit = {
val response = controlledShutdownResult match {
case Success(partitionsRemaining) =>
- new ControlledShutdownResponse(Errors.NONE, partitionsRemaining.map(_.asTopicPartition).asJava)
+ new ControlledShutdownResponse(Errors.NONE, partitionsRemaining.asJava)
case Failure(throwable) =>
controlledShutdownRequest.getErrorResponse(throwable)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/utils/LogDirUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/LogDirUtils.scala b/core/src/main/scala/kafka/utils/LogDirUtils.scala
index 8457ce5..669edf9 100644
--- a/core/src/main/scala/kafka/utils/LogDirUtils.scala
+++ b/core/src/main/scala/kafka/utils/LogDirUtils.scala
@@ -17,7 +17,7 @@
package kafka.utils
-import kafka.controller.LogDirEventNotificationListener
+import kafka.controller.LogDirEventNotificationHandler
import scala.collection.Map
object LogDirUtils extends Logging {
@@ -32,7 +32,7 @@ object LogDirUtils extends Logging {
}
private def logDirFailureEventZkData(brokerId: Int): String = {
- Json.encode(Map("version" -> LogDirEventNotificationListener.version, "broker" -> brokerId, "event" -> LogDirFailureEvent))
+ Json.encode(Map("version" -> LogDirEventNotificationHandler.Version, "broker" -> brokerId, "event" -> LogDirFailureEvent))
}
def deleteLogDirEvents(zkUtils: ZkUtils) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index cc08055..1a0633c 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -18,7 +18,7 @@
package kafka.utils
import kafka.api.LeaderAndIsr
-import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
+import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
import kafka.utils.ZkUtils._
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.data.Stat
@@ -88,7 +88,7 @@ object ReplicationUtils extends Logging {
private def generateIsrChangeJson(isrChanges: Set[TopicPartition]): String = {
val partitions = isrChanges.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)).toArray
- Json.encode(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitions))
+ Json.encode(Map("version" -> IsrChangeNotificationHandler.Version, "partitions" -> partitions))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 90d53d4..3267a74 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,7 +21,6 @@ import java.util.Properties
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
-import kafka.common.TopicAndPartition
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.log.LogConfig
import kafka.server.ConfigType
@@ -29,7 +28,7 @@ import kafka.utils._
import kafka.zookeeper._
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.KeeperException.Code
-import org.apache.zookeeper.data.Stat
+import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException}
import scala.collection.mutable
@@ -53,7 +52,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* @param partitions the partitions for which we want ot get states.
* @return sequence of GetDataResponses whose contexts are the partitions they are associated with.
*/
- def getTopicPartitionStatesRaw(partitions: Seq[TopicAndPartition]): Seq[GetDataResponse] = {
+ def getTopicPartitionStatesRaw(partitions: Seq[TopicPartition]): Seq[GetDataResponse] = {
val getDataRequests = partitions.map { partition =>
GetDataRequest(TopicPartitionStateZNode.path(partition), ctx = Some(partition))
}
@@ -65,7 +64,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
* @return sequence of SetDataResponse whose contexts are the partitions they are associated with.
*/
- def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = {
+ def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = {
val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
val path = TopicPartitionStateZNode.path(partition)
val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
@@ -79,7 +78,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
* @return sequence of CreateResponse whose contexts are the partitions they are associated with.
*/
- def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = {
+ def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = {
createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq)
createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq)
val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
@@ -118,10 +117,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* @param controllerEpoch The current controller epoch.
* @return UpdateLeaderAndIsrResult instance containing per partition results.
*/
- def updateLeaderAndIsr(leaderAndIsrs: Map[TopicAndPartition, LeaderAndIsr], controllerEpoch: Int): UpdateLeaderAndIsrResult = {
- val successfulUpdates = mutable.Map.empty[TopicAndPartition, LeaderAndIsr]
- val updatesToRetry = mutable.Buffer.empty[TopicAndPartition]
- val failed = mutable.Map.empty[TopicAndPartition, Exception]
+ def updateLeaderAndIsr(leaderAndIsrs: Map[TopicPartition, LeaderAndIsr], controllerEpoch: Int): UpdateLeaderAndIsrResult = {
+ val successfulUpdates = mutable.Map.empty[TopicPartition, LeaderAndIsr]
+ val updatesToRetry = mutable.Buffer.empty[TopicPartition]
+ val failed = mutable.Map.empty[TopicPartition, Exception]
val leaderIsrAndControllerEpochs = leaderAndIsrs.map { case (partition, leaderAndIsr) => partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) }
val setDataResponses = try {
setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
@@ -131,7 +130,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
return UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
}
setDataResponses.foreach { setDataResponse =>
- val partition = setDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
+ val partition = setDataResponse.ctx.get.asInstanceOf[TopicPartition]
if (setDataResponse.resultCode == Code.OK) {
val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion)
successfulUpdates.put(partition, updatedLeaderAndIsr)
@@ -227,7 +226,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* @param assignment the partition to replica mapping to set for the given topic
* @return SetDataResponse
*/
- def setTopicAssignmentRaw(topic: String, assignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
+ def setTopicAssignmentRaw(topic: String, assignment: collection.Map[TopicPartition, Seq[Int]]): SetDataResponse = {
val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), ZkVersion.NoVersion)
retryRequestUntilConnected(setDataRequest)
}
@@ -296,7 +295,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* @param topics the topics whose partitions we wish to get the assignments for.
* @return the replica assignment for each partition from the given topics.
*/
- def getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicAndPartition, Seq[Int]] = {
+ def getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicPartition, Seq[Int]] = {
val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
getDataResponses.flatMap { getDataResponse =>
@@ -304,7 +303,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
if (getDataResponse.resultCode == Code.OK) {
TopicZNode.decode(topic, getDataResponse.data)
} else if (getDataResponse.resultCode == Code.NONODE) {
- Map.empty[TopicAndPartition, Seq[Int]]
+ Map.empty[TopicPartition, Seq[Int]]
} else {
throw getDataResponse.resultException.get
}
@@ -418,13 +417,13 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* Returns all reassignments.
* @return the reassignments for each partition.
*/
- def getPartitionReassignment: Map[TopicAndPartition, Seq[Int]] = {
+ def getPartitionReassignment: Map[TopicPartition, Seq[Int]] = {
val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
val getDataResponse = retryRequestUntilConnected(getDataRequest)
if (getDataResponse.resultCode == Code.OK) {
ReassignPartitionsZNode.decode(getDataResponse.data)
} else if (getDataResponse.resultCode == Code.NONODE) {
- Map.empty[TopicAndPartition, Seq[Int]]
+ Map.empty[TopicPartition, Seq[Int]]
} else {
throw getDataResponse.resultException.get
}
@@ -437,7 +436,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* @param reassignment the reassignment to set on the reassignment znode
* @throws KeeperException if there is an error while setting or creating the znode
*/
- def setOrCreatePartitionReassignment(reassignment: collection.Map[TopicAndPartition, Seq[Int]]): Unit = {
+ def setOrCreatePartitionReassignment(reassignment: collection.Map[TopicPartition, Seq[Int]]): Unit = {
def set(reassignmentData: Array[Byte]): SetDataResponse = {
val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, reassignmentData, ZkVersion.NoVersion)
@@ -473,10 +472,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* @param partitions the partitions for which we want ot get states.
* @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state.
*/
- def getTopicPartitionStates(partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
+ def getTopicPartitionStates(partitions: Seq[TopicPartition]): Map[TopicPartition, LeaderIsrAndControllerEpoch] = {
val getDataResponses = getTopicPartitionStatesRaw(partitions)
getDataResponses.flatMap { getDataResponse =>
- val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
+ val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
if (getDataResponse.resultCode == Code.OK) {
TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat).map(partition -> _)
} else if (getDataResponse.resultCode == Code.NONODE) {
@@ -507,7 +506,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* @param sequenceNumbers the sequence numbers associated with the isr change notifications.
* @return partitions associated with the given isr change notifications.
*/
- def getPartitionsFromIsrChangeNotifications(sequenceNumbers: Seq[String]): Seq[TopicAndPartition] = {
+ def getPartitionsFromIsrChangeNotifications(sequenceNumbers: Seq[String]): Seq[TopicPartition] = {
val getDataRequests = sequenceNumbers.map { sequenceNumber =>
GetDataRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber))
}
@@ -550,13 +549,13 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* Gets the partitions marked for preferred replica election.
* @return sequence of partitions.
*/
- def getPreferredReplicaElection: Set[TopicAndPartition] = {
+ def getPreferredReplicaElection: Set[TopicPartition] = {
val getDataRequest = GetDataRequest(PreferredReplicaElectionZNode.path)
val getDataResponse = retryRequestUntilConnected(getDataRequest)
if (getDataResponse.resultCode == Code.OK) {
PreferredReplicaElectionZNode.decode(getDataResponse.data)
} else if (getDataResponse.resultCode == Code.NONODE) {
- Set.empty[TopicAndPartition]
+ Set.empty[TopicPartition]
} else {
throw getDataResponse.resultException.get
}
@@ -775,7 +774,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
}
- private def createTopicPartition(partitions: Seq[TopicAndPartition]) = {
+ private def createTopicPartition(partitions: Seq[TopicPartition]): Seq[CreateResponse] = {
val createRequests = partitions.map { partition =>
val path = TopicPartitionZNode.path(partition)
CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(partition))
@@ -783,7 +782,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
retryRequestsUntilConnected(createRequests)
}
- private def createTopicPartitions(topics: Seq[String]) = {
+ private def createTopicPartitions(topics: Seq[String]): Seq[CreateResponse] = {
val createRequests = topics.map { topic =>
val path = TopicPartitionsZNode.path(topic)
CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic))
@@ -791,14 +790,14 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
retryRequestsUntilConnected(createRequests)
}
- private def getTopicConfigs(topics: Seq[String]) = {
+ private def getTopicConfigs(topics: Seq[String]): Seq[GetDataResponse] = {
val getDataRequests = topics.map { topic =>
GetDataRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ctx = Some(topic))
}
retryRequestsUntilConnected(getDataRequests)
}
- private def acls(path: String) = {
+ private def acls(path: String): Seq[ACL] = {
import scala.collection.JavaConverters._
ZkUtils.defaultAcls(isSecure, path).asScala
}
@@ -892,7 +891,7 @@ object KafkaZkClient {
* update partition state.
* @param failedPartitions Exceptions corresponding to failed partition state updates.
*/
- case class UpdateLeaderAndIsrResult(successfulPartitions: Map[TopicAndPartition, LeaderAndIsr],
- partitionsToRetry: Seq[TopicAndPartition],
- failedPartitions: Map[TopicAndPartition, Exception])
+ case class UpdateLeaderAndIsrResult(successfulPartitions: Map[TopicPartition, LeaderAndIsr],
+ partitionsToRetry: Seq[TopicPartition],
+ failedPartitions: Map[TopicPartition, Exception])
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index e46f438..a1ff559 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -21,9 +21,9 @@ import java.util.Properties
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
-import kafka.common.TopicAndPartition
-import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
+import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
import kafka.utils.Json
+import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.data.Stat
import scala.collection.Seq
@@ -82,17 +82,17 @@ object TopicsZNode {
object TopicZNode {
def path(topic: String) = s"${TopicsZNode.path}/$topic"
- def encode(assignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
+ def encode(assignment: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = {
val assignmentJson = assignment.map { case (partition, replicas) => partition.partition.toString -> replicas }
Json.encodeAsBytes(Map("version" -> 1, "partitions" -> assignmentJson))
}
- def decode(topic: String, bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = {
+ def decode(topic: String, bytes: Array[Byte]): Map[TopicPartition, Seq[Int]] = {
Json.parseBytes(bytes).flatMap { js =>
val assignmentJson = js.asJsonObject
val partitionsJsonOpt = assignmentJson.get("partitions").map(_.asJsonObject)
partitionsJsonOpt.map { partitionsJson =>
partitionsJson.iterator.map { case (partition, replicas) =>
- TopicAndPartition(topic, partition.toInt) -> replicas.to[Seq[Int]]
+ new TopicPartition(topic, partition.toInt) -> replicas.to[Seq[Int]]
}
}
}.map(_.toMap).getOrElse(Map.empty)
@@ -104,11 +104,11 @@ object TopicPartitionsZNode {
}
object TopicPartitionZNode {
- def path(partition: TopicAndPartition) = s"${TopicPartitionsZNode.path(partition.topic)}/${partition.partition}"
+ def path(partition: TopicPartition) = s"${TopicPartitionsZNode.path(partition.topic)}/${partition.partition}"
}
object TopicPartitionStateZNode {
- def path(partition: TopicAndPartition) = s"${TopicPartitionZNode.path(partition)}/state"
+ def path(partition: TopicPartition) = s"${TopicPartitionZNode.path(partition)}/state"
def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = {
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
@@ -155,19 +155,19 @@ object IsrChangeNotificationZNode {
object IsrChangeNotificationSequenceZNode {
val SequenceNumberPrefix = "isr_change_"
def path(sequenceNumber: String) = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
- def encode(partitions: Set[TopicAndPartition]): Array[Byte] = {
+ def encode(partitions: Set[TopicPartition]): Array[Byte] = {
val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic, "partition" -> partition.partition))
- Json.encodeAsBytes(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitionsJson))
+ Json.encodeAsBytes(Map("version" -> IsrChangeNotificationHandler.Version, "partitions" -> partitionsJson))
}
- def decode(bytes: Array[Byte]): Set[TopicAndPartition] = {
+ def decode(bytes: Array[Byte]): Set[TopicPartition] = {
Json.parseBytes(bytes).map { js =>
val partitionsJson = js.asJsonObject("partitions").asJsonArray
partitionsJson.iterator.map { partitionsJson =>
val partitionJson = partitionsJson.asJsonObject
val topic = partitionJson("topic").to[String]
val partition = partitionJson("partition").to[Int]
- TopicAndPartition(topic, partition)
+ new TopicPartition(topic, partition)
}
}
}.map(_.toSet).getOrElse(Set.empty)
@@ -204,13 +204,13 @@ object DeleteTopicsTopicZNode {
object ReassignPartitionsZNode {
def path = s"${AdminZNode.path}/reassign_partitions"
- def encode(reassignment: collection.Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
- val reassignmentJson = reassignment.map { case (TopicAndPartition(topic, partition), replicas) =>
- Map("topic" -> topic, "partition" -> partition, "replicas" -> replicas)
+ def encode(reassignment: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = {
+ val reassignmentJson = reassignment.map { case (tp, replicas) =>
+ Map("topic" -> tp.topic, "partition" -> tp.partition, "replicas" -> replicas)
}
Json.encodeAsBytes(Map("version" -> 1, "partitions" -> reassignmentJson))
}
- def decode(bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = Json.parseBytes(bytes).flatMap { js =>
+ def decode(bytes: Array[Byte]): Map[TopicPartition, Seq[Int]] = Json.parseBytes(bytes).flatMap { js =>
val reassignmentJson = js.asJsonObject
val partitionsJsonOpt = reassignmentJson.get("partitions")
partitionsJsonOpt.map { partitionsJson =>
@@ -219,7 +219,7 @@ object ReassignPartitionsZNode {
val topic = partitionFields("topic").to[String]
val partition = partitionFields("partition").to[Int]
val replicas = partitionFields("replicas").to[Seq[Int]]
- TopicAndPartition(topic, partition) -> replicas
+ new TopicPartition(topic, partition) -> replicas
}
}
}.map(_.toMap).getOrElse(Map.empty)
@@ -227,18 +227,18 @@ object ReassignPartitionsZNode {
object PreferredReplicaElectionZNode {
def path = s"${AdminZNode.path}/preferred_replica_election"
- def encode(partitions: Set[TopicAndPartition]): Array[Byte] = {
+ def encode(partitions: Set[TopicPartition]): Array[Byte] = {
val jsonMap = Map("version" -> 1,
"partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)))
Json.encodeAsBytes(jsonMap)
}
- def decode(bytes: Array[Byte]): Set[TopicAndPartition] = Json.parseBytes(bytes).map { js =>
+ def decode(bytes: Array[Byte]): Set[TopicPartition] = Json.parseBytes(bytes).map { js =>
val partitionsJson = js.asJsonObject("partitions").asJsonArray
partitionsJson.iterator.map { partitionsJson =>
val partitionJson = partitionsJson.asJsonObject
val topic = partitionJson("topic").to[String]
val partition = partitionJson("partition").to[Int]
- TopicAndPartition(topic, partition)
+ new TopicPartition(topic, partition)
}
}.map(_.toSet).getOrElse(Set.empty)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 7f4eed7..2f520a3 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -369,8 +369,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
val controllerId = zkUtils.getController()
val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
- val resultQueue = new LinkedBlockingQueue[Try[Set[TopicAndPartition]]]()
- val controlledShutdownCallback = (controlledShutdownResult: Try[Set[TopicAndPartition]]) => resultQueue.put(controlledShutdownResult)
+ val resultQueue = new LinkedBlockingQueue[Try[Set[TopicPartition]]]()
+ val controlledShutdownCallback = (controlledShutdownResult: Try[Set[TopicPartition]]) => resultQueue.put(controlledShutdownResult)
controller.shutdownBroker(2, controlledShutdownCallback)
var partitionsRemaining = resultQueue.take().get
var activeServers = servers.filter(s => s.config.brokerId != 2)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 06ddd66..78f022a 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -25,7 +25,7 @@ import org.junit.Assert._
import org.junit.{After, Test}
import java.util.Properties
-import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
+import kafka.common.{TopicAndPartition, TopicAlreadyMarkedForDeletionException}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
@@ -43,8 +43,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
@Test
def testDeleteTopicWithAllAliveReplicas() {
- val topicPartition = new TopicPartition("test", 0)
- val topic = topicPartition.topic
+ val topic = "test"
servers = createTestTopicAndCluster(topic)
// start topic deletion
AdminUtils.deleteTopic(zkUtils, topic)
@@ -128,7 +127,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
// reassign partition 0
val oldAssignedReplicas = zkUtils.getReplicasForPartition(topic, 0)
val newReplicas = Seq(1, 2, 3)
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(new TopicAndPartition(topicPartition) -> newReplicas))
+ val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None,
+ Map(new TopicAndPartition(topicPartition) -> newReplicas))
assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
// wait until reassignment is completed
TestUtils.waitUntilTrue(() => {
@@ -139,7 +139,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val controllerId = zkUtils.getController()
val controller = servers.filter(s => s.config.brokerId == controllerId).head
assertFalse("Partition reassignment should fail",
- controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(new TopicAndPartition(topicPartition)))
+ controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicPartition))
val assignedReplicas = zkUtils.getReplicasForPartition(topic, 0)
assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas)
follower.startup()
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 446d8ae..04f9bea 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -21,10 +21,10 @@ import java.util.Properties
import java.util.concurrent.CountDownLatch
import kafka.admin.AdminUtils
-import kafka.common.TopicAndPartition
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils._
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.log4j.Logger
import org.junit.{After, Test}
@@ -61,7 +61,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
val initialEpoch = initialController.epoch
// Create topic with one partition
AdminUtils.createTopic(servers.head.zkUtils, topic, 1, 1)
- val topicPartition = TopicAndPartition("topic1", 0)
+ val topicPartition = new TopicPartition("topic1", 0)
TestUtils.waitUntilTrue(() =>
initialController.partitionStateMachine.partitionsInState(OnlinePartition).contains(topicPartition),
s"Partition $topicPartition did not transition to online state")
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 81df1e1..2e56c33 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -17,13 +17,13 @@
package kafka.controller
import kafka.api.LeaderAndIsr
-import kafka.common.TopicAndPartition
import kafka.log.LogConfig
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zookeeper.{CreateResponse, GetDataResponse, ZooKeeperClientException}
+import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.Stat
import org.easymock.EasyMock
@@ -38,13 +38,13 @@ class PartitionStateMachineTest extends JUnitSuite {
private var mockZkClient: KafkaZkClient = null
private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
private var mockTopicDeletionManager: TopicDeletionManager = null
- private var partitionState: mutable.Map[TopicAndPartition, PartitionState] = null
+ private var partitionState: mutable.Map[TopicPartition, PartitionState] = null
private var partitionStateMachine: PartitionStateMachine = null
private val brokerId = 5
private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "zkConnect"))
private val controllerEpoch = 50
- private val partition = TopicAndPartition("t", 0)
+ private val partition = new TopicPartition("t", 0)
private val partitions = Seq(partition)
@Before
@@ -54,7 +54,7 @@ class PartitionStateMachineTest extends JUnitSuite {
mockZkClient = EasyMock.createMock(classOf[KafkaZkClient])
mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager])
- partitionState = mutable.Map.empty[TopicAndPartition, PartitionState]
+ partitionState = mutable.Map.empty[TopicPartition, PartitionState]
partitionStateMachine = new PartitionStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager,
mockZkClient, partitionState, mockControllerBrokerRequestBatch)
}
@@ -87,7 +87,7 @@ class PartitionStateMachineTest extends JUnitSuite {
EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
.andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null)))
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
- partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true))
+ partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
@@ -161,8 +161,7 @@ class PartitionStateMachineTest extends JUnitSuite {
EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
.andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
- partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
- Seq(brokerId), isNew = false))
+ partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
@@ -193,8 +192,8 @@ class PartitionStateMachineTest extends JUnitSuite {
EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
.andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
- partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
- Seq(brokerId, otherBrokerId), isNew = false))
+ partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId, otherBrokerId),
+ isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
@@ -246,7 +245,7 @@ class PartitionStateMachineTest extends JUnitSuite {
EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
.andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
- partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false))
+ partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 6363d41..5d24d79 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -17,12 +17,12 @@
package kafka.controller
import kafka.api.LeaderAndIsr
-import kafka.common.TopicAndPartition
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
-import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zookeeper.GetDataResponse
+import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.Stat
import org.easymock.EasyMock
@@ -43,9 +43,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
private val brokerId = 5
private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "zkConnect"))
private val controllerEpoch = 50
- private val partition = TopicAndPartition("t", 0)
+ private val partition = new TopicPartition("t", 0)
private val partitions = Seq(partition)
- private val replica = PartitionAndReplica(partition.topic, partition.partition, brokerId)
+ private val replica = PartitionAndReplica(partition, brokerId)
private val replicas = Seq(replica)
@Before
@@ -113,8 +113,8 @@ class ReplicaStateMachineTest extends JUnitSuite {
def testNewReplicaToOfflineReplicaTransition(): Unit = {
replicaState.put(replica, NewReplica)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
- EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
- partition.topic, partition.partition, false, null))
+ EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(EasyMock.eq(Seq(brokerId)),
+ EasyMock.eq(partition), EasyMock.eq(false), EasyMock.anyObject()))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockControllerBrokerRequestBatch)
replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
@@ -155,7 +155,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
- partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
+ partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
@@ -175,19 +175,19 @@ class ReplicaStateMachineTest extends JUnitSuite {
val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
- EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
- partition.topic, partition.partition, false, null))
+ EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(EasyMock.eq(Seq(brokerId)),
+ EasyMock.eq(partition), EasyMock.eq(false), EasyMock.anyObject()))
val adjustedLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(LeaderAndIsr.NoLeader, List(otherBrokerId))
val updatedLeaderAndIsr = adjustedLeaderAndIsr.withZkVersion(adjustedLeaderAndIsr .zkVersion + 1)
val updatedLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch)
- EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
- .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
+ EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)).andReturn(
+ Seq(GetDataResponse(Code.OK, null, Some(partition),
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch))
.andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
EasyMock.expect(mockTopicDeletionManager.isPartitionToBeDeleted(partition)).andReturn(false)
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
- partition.topic, partition.partition, updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = false))
+ partition, updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
@@ -230,7 +230,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
- partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
+ partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
@@ -240,11 +240,11 @@ class ReplicaStateMachineTest extends JUnitSuite {
@Test
def testOfflineReplicaToReplicaDeletionStartedTransition(): Unit = {
- val callbacks = (new Callbacks.CallbackBuilder).build
+ val callbacks = new Callbacks()
replicaState.put(replica, OfflineReplica)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
- partition.topic, partition.partition, true, callbacks.stopReplicaResponseCallback))
+ partition, true, callbacks.stopReplicaResponseCallback))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionStarted, callbacks)
@@ -348,7 +348,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
- partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
+ partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 438d736..afd619d 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -195,7 +195,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
// The controller should have marked the replica on the original leader as offline
val controllerServer = servers.find(_.kafkaController.isActive).get
val offlineReplicas = controllerServer.kafkaController.replicaStateMachine.replicasInState(topic, OfflineReplica)
- assertTrue(offlineReplicas.contains(PartitionAndReplica(topic, 0, leaderServerId)))
+ assertTrue(offlineReplicas.contains(PartitionAndReplica(new TopicPartition(topic, 0), leaderServerId)))
}
private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 77ac748..775c68e 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,15 +16,12 @@
*/
package kafka.zk
-import kafka.common.TopicAndPartition
import kafka.server.Defaults
import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.TopicPartition
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.junit.{After, Before, Test}
-import scala.collection.mutable
-
class KafkaZkClientTest extends ZooKeeperTestHarness {
private var zooKeeperClient: ZooKeeperClient = null
@@ -102,10 +99,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// create a topic path
zkClient.createRecursive(TopicZNode.path(topic))
- val assignment = new mutable.HashMap[TopicAndPartition, Seq[Int]]()
- assignment.put(new TopicAndPartition(topic, 0), Seq(0,1))
- assignment.put(new TopicAndPartition(topic, 1), Seq(0,1))
- zkClient.setTopicAssignmentRaw(topic, assignment.toMap)
+ val assignment = Map(
+ new TopicPartition(topic, 0) -> Seq(0, 1),
+ new TopicPartition(topic, 1) -> Seq(0, 1)
+ )
+ zkClient.setTopicAssignmentRaw(topic, assignment)
assertEquals(2, zkClient.getTopicPartitionCount(topic).get)
}
@@ -165,15 +163,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertEquals(Map.empty, zkClient.getPartitionReassignment)
val reassignment = Map(
- TopicAndPartition("topic_a", 0) -> Seq(0, 1, 3),
- TopicAndPartition("topic_a", 1) -> Seq(2, 1, 3),
- TopicAndPartition("topic_b", 0) -> Seq(4, 5),
- TopicAndPartition("topic_c", 0) -> Seq(5, 3)
+ new TopicPartition("topic_a", 0) -> Seq(0, 1, 3),
+ new TopicPartition("topic_a", 1) -> Seq(2, 1, 3),
+ new TopicPartition("topic_b", 0) -> Seq(4, 5),
+ new TopicPartition("topic_c", 0) -> Seq(5, 3)
)
zkClient.setOrCreatePartitionReassignment(reassignment)
assertEquals(reassignment, zkClient.getPartitionReassignment)
- val updatedReassingment = reassignment - TopicAndPartition("topic_b", 0)
+ val updatedReassingment = reassignment - new TopicPartition("topic_b", 0)
zkClient.setOrCreatePartitionReassignment(updatedReassingment)
assertEquals(updatedReassingment, zkClient.getPartitionReassignment)
[2/2] kafka git commit: MINOR: Eliminate unnecessary
Topic(And)Partition allocations in Controller
Posted by ij...@apache.org.
MINOR: Eliminate unnecessary Topic(And)Partition allocations in Controller
- Eliminated all the unnecessary allocations of `TopicPartition` and
`TopicAndPartition` in the Controller. We now use the former
in the Controller (bringing it inline with the rest of the non legacy
code).
- Fixed missed `Listener` -> `Handler` renames for companion
objects.
- More String.format -> String interpolation conversions (the former
is roughly 5 times more expensive).
- Some other minor clean-ups.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Onur Karaman <ok...@linkedin.com>, Viktor Somogyi <vi...@gmail.com>
Closes #4152 from ijuma/controller-topic-partition-and-other-clean-ups
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3735a6ca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3735a6ca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3735a6ca
Branch: refs/heads/trunk
Commit: 3735a6ca8b6432db2de4a0bd07df9301459bbf0b
Parents: 5813812
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Nov 7 09:55:44 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Nov 7 09:55:44 2017 +0000
----------------------------------------------------------------------
.../scala/kafka/common/TopicAndPartition.scala | 11 -
.../consumer/ZookeeperConsumerConnector.scala | 9 +-
.../controller/ControllerChannelManager.scala | 45 +--
.../kafka/controller/ControllerContext.scala | 45 ++-
.../kafka/controller/KafkaController.scala | 279 +++++++++----------
.../controller/PartitionStateMachine.scala | 65 +++--
.../kafka/controller/ReplicaStateMachine.scala | 87 +++---
.../kafka/controller/TopicDeletionManager.scala | 15 +-
.../transaction/TransactionStateManager.scala | 2 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 8 +-
.../main/scala/kafka/utils/LogDirUtils.scala | 4 +-
.../scala/kafka/utils/ReplicationUtils.scala | 4 +-
.../src/main/scala/kafka/zk/KafkaZkClient.scala | 55 ++--
core/src/main/scala/kafka/zk/ZkData.scala | 38 +--
.../test/scala/unit/kafka/admin/AdminTest.scala | 4 +-
.../unit/kafka/admin/DeleteTopicTest.scala | 10 +-
.../controller/ControllerFailoverTest.scala | 4 +-
.../controller/PartitionStateMachineTest.scala | 19 +-
.../controller/ReplicaStateMachineTest.scala | 32 +--
.../unit/kafka/server/LogDirFailureTest.scala | 2 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 22 +-
21 files changed, 356 insertions(+), 404 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 4a8e65d..6c27695 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -1,6 +1,5 @@
package kafka.common
-import kafka.cluster.{Partition, Replica}
import org.apache.kafka.common.TopicPartition
/**
@@ -25,17 +24,7 @@ import org.apache.kafka.common.TopicPartition
*/
case class TopicAndPartition(topic: String, partition: Int) {
- def this(tuple: (String, Int)) = this(tuple._1, tuple._2)
-
- def this(partition: Partition) = this(partition.topic, partition.partitionId)
-
def this(topicPartition: TopicPartition) = this(topicPartition.topic, topicPartition.partition)
- def this(replica: Replica) = this(replica.topicPartition)
-
- def asTuple = (topic, partition)
-
- def asTopicPartition = new TopicPartition(topic, partition)
-
override def toString: String = s"$topic-$partition"
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index d1928b4..bb5fc0f 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -717,12 +717,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
false
else {
val offsetFetchResponse = offsetFetchResponseOpt.get
- topicPartitions.foreach(topicAndPartition => {
- val (topic, partition) = topicAndPartition.asTuple
- val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
- val threadId = partitionAssignment(topicAndPartition)
+ topicPartitions.foreach { case tp@ TopicAndPartition(topic, partition) =>
+ val offset = offsetFetchResponse.requestInfo(tp).offset
+ val threadId = partitionAssignment(tp)
addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
- })
+ }
/**
* move the partition ownership here, since that can be used to indicate a truly successful re-balancing attempt
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 9fef617..7314679 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.cluster.Broker
-import kafka.common.{KafkaException, TopicAndPartition}
+import kafka.common.KafkaException
import kafka.metrics.KafkaMetricsGroup
import kafka.server.KafkaConfig
import kafka.utils._
@@ -312,10 +312,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
updateMetadataRequestPartitionInfoMap.clear()
}
- def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
+ def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topicPartition: TopicPartition,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
replicas: Seq[Int], isNew: Boolean) {
- val topicPartition = new TopicPartition(topic, partition)
brokerIds.filter(_ >= 0).foreach { brokerId =>
val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
@@ -329,29 +328,24 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
isNew || alreadyNew))
}
- addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
- Set(TopicAndPartition(topic, partition)))
+ addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
}
- def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
+ def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topicPartition: TopicPartition, deletePartition: Boolean,
callback: (AbstractResponse, Int) => Unit) {
brokerIds.filter(b => b >= 0).foreach { brokerId =>
stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
val v = stopReplicaRequestMap(brokerId)
- if(callback != null)
- stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
- deletePartition, (r: AbstractResponse) => callback(r, brokerId))
- else
- stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
- deletePartition)
+ stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topicPartition, brokerId),
+ deletePartition, (r: AbstractResponse) => callback(r, brokerId))
}
}
/** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
- partitions: collection.Set[TopicAndPartition]) {
+ partitions: collection.Set[TopicPartition]) {
- def updateMetadataRequestPartitionInfo(partition: TopicAndPartition, beingDeleted: Boolean) {
+ def updateMetadataRequestPartitionInfo(partition: TopicPartition, beingDeleted: Boolean) {
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
leaderIsrAndControllerEpochOpt match {
case Some(l @ LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)) =>
@@ -371,7 +365,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion,
replicas.map(Integer.valueOf).asJava,
offlineReplicas.map(Integer.valueOf).asJava)
- updateMetadataRequestPartitionInfoMap.put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
+ updateMetadataRequestPartitionInfoMap.put(partition, partitionStateInfo)
case None =>
info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
@@ -474,13 +468,13 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
// Send one StopReplicaRequest for all partitions that require neither delete nor callback. This potentially
// changes the order in which the requests are sent for the same partitions, but that's OK.
val stopReplicaRequest = new StopReplicaRequest.Builder(controllerId, controllerEpoch, false,
- replicasToGroup.map(r => new TopicPartition(r.replica.topic, r.replica.partition)).toSet.asJava)
+ replicasToGroup.map(_.replica.topicPartition).toSet.asJava)
controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest)
replicasToNotGroup.foreach { r =>
val stopReplicaRequest = new StopReplicaRequest.Builder(
controllerId, controllerEpoch, r.deletePartition,
- Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
+ Set(r.replica.topicPartition).asJava)
controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest, r.callback)
}
}
@@ -510,19 +504,6 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
requestSendThread: RequestSendThread,
queueSizeGauge: Gauge[Int])
-case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit = null)
-
-class Callbacks private (var stopReplicaResponseCallback: (AbstractResponse, Int) => Unit)
+case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit)
-object Callbacks {
- class CallbackBuilder {
- var stopReplicaResponseCbk: (AbstractResponse, Int) => Unit = null
-
- def stopReplicaCallback(cbk: (AbstractResponse, Int) => Unit): CallbackBuilder = {
- stopReplicaResponseCbk = cbk
- this
- }
-
- def build: Callbacks = new Callbacks(stopReplicaResponseCbk)
- }
-}
+class Callbacks(val stopReplicaResponseCallback: (AbstractResponse, Int) => Unit = (_, _ ) => ())
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/controller/ControllerContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index d4a29f8..541bce8 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -18,7 +18,7 @@
package kafka.controller
import kafka.cluster.Broker
-import kafka.common.TopicAndPartition
+import org.apache.kafka.common.TopicPartition
import scala.collection.{Seq, Set, mutable}
@@ -31,10 +31,10 @@ class ControllerContext {
var epoch: Int = KafkaController.InitialControllerEpoch - 1
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
var allTopics: Set[String] = Set.empty
- var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty
- var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
- val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = mutable.Map.empty
- val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicAndPartition]] = mutable.Map.empty
+ var partitionReplicaAssignment: mutable.Map[TopicPartition, Seq[Int]] = mutable.Map.empty
+ var partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
+ val partitionsBeingReassigned: mutable.Map[TopicPartition, ReassignedPartitionsContext] = mutable.Map.empty
+ val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty
private var liveBrokersUnderlying: Set[Broker] = Set.empty
private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
@@ -52,58 +52,55 @@ class ControllerContext {
def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying
def liveOrShuttingDownBrokers = liveBrokersUnderlying
- def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = {
+ def partitionsOnBroker(brokerId: Int): Set[TopicPartition] = {
partitionReplicaAssignment.collect {
- case (topicAndPartition, replicas) if replicas.contains(brokerId) => topicAndPartition
+ case (topicPartition, replicas) if replicas.contains(brokerId) => topicPartition
}.toSet
}
- def isReplicaOnline(brokerId: Int, topicAndPartition: TopicAndPartition, includeShuttingDownBrokers: Boolean = false): Boolean = {
+ def isReplicaOnline(brokerId: Int, topicPartition: TopicPartition, includeShuttingDownBrokers: Boolean = false): Boolean = {
val brokerOnline = {
if (includeShuttingDownBrokers) liveOrShuttingDownBrokerIds.contains(brokerId)
else liveBrokerIds.contains(brokerId)
}
- brokerOnline && !replicasOnOfflineDirs.getOrElse(brokerId, Set.empty).contains(topicAndPartition)
+ brokerOnline && !replicasOnOfflineDirs.getOrElse(brokerId, Set.empty).contains(topicPartition)
}
def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
brokerIds.flatMap { brokerId =>
- partitionReplicaAssignment.collect {
- case (topicAndPartition, replicas) if replicas.contains(brokerId) =>
- PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId)
+ partitionReplicaAssignment.collect { case (topicPartition, replicas) if replicas.contains(brokerId) =>
+ PartitionAndReplica(topicPartition, brokerId)
}
}.toSet
}
def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
partitionReplicaAssignment
- .filter { case (topicAndPartition, _) => topicAndPartition.topic == topic }
- .flatMap { case (topicAndPartition, replicas) =>
- replicas.map { r =>
- PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
- }
+ .filter { case (topicPartition, _) => topicPartition.topic == topic }
+ .flatMap { case (topicPartition, replicas) =>
+ replicas.map(PartitionAndReplica(topicPartition, _))
}.toSet
}
- def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] =
- partitionReplicaAssignment.keySet.filter(topicAndPartition => topicAndPartition.topic == topic)
+ def partitionsForTopic(topic: String): collection.Set[TopicPartition] =
+ partitionReplicaAssignment.keySet.filter(topicPartition => topicPartition.topic == topic)
def allLiveReplicas(): Set[PartitionAndReplica] = {
replicasOnBrokers(liveBrokerIds).filter { partitionAndReplica =>
- isReplicaOnline(partitionAndReplica.replica, TopicAndPartition(partitionAndReplica.topic, partitionAndReplica.partition))
+ isReplicaOnline(partitionAndReplica.replica, partitionAndReplica.topicPartition)
}
}
- def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = {
+ def replicasForPartition(partitions: collection.Set[TopicPartition]): collection.Set[PartitionAndReplica] = {
partitions.flatMap { p =>
val replicas = partitionReplicaAssignment(p)
- replicas.map(r => PartitionAndReplica(p.topic, p.partition, r))
+ replicas.map(PartitionAndReplica(p, _))
}
}
def removeTopic(topic: String) = {
- partitionLeadershipInfo = partitionLeadershipInfo.filter { case (topicAndPartition, _) => topicAndPartition.topic != topic }
- partitionReplicaAssignment = partitionReplicaAssignment.filter { case (topicAndPartition, _) => topicAndPartition.topic != topic }
+ partitionLeadershipInfo = partitionLeadershipInfo.filter { case (topicPartition, _) => topicPartition.topic != topic }
+ partitionReplicaAssignment = partitionReplicaAssignment.filter { case (topicPartition, _) => topicPartition.topic != topic }
allTopics -= topic
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index b676ead..ade3ae4 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -28,6 +28,7 @@ import kafka.utils._
import kafka.zk._
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zookeeper.{ZNodeChangeHandler, ZNodeChildChangeHandler}
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -139,7 +140,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
* @param id Id of the broker to shutdown.
* @return The number of partitions that the broker still leads.
*/
- def shutdownBroker(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit): Unit = {
+ def shutdownBroker(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit): Unit = {
val controlledShutdownEvent = ControlledShutdown(id, controlledShutdownCallback)
eventManager.put(controlledShutdownEvent)
}
@@ -295,13 +296,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
partitionStateMachine.triggerOnlinePartitionStateChange()
// check if reassignment of some partitions need to be restarted
val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
- case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
+ case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains)
}
- partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
+ partitionsWithReplicasOnNewBrokers.foreach { case (tp, context) => onPartitionReassignment(tp, context) }
// check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
// on the newly restarted brokers, there is a chance that topic deletion can resume
val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
- if(replicasForTopicsToBeDeleted.nonEmpty) {
+ if (replicasForTopicsToBeDeleted.nonEmpty) {
info(("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. " +
"Signaling restart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","),
topicDeletionManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
@@ -370,7 +371,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
* 1. Move the newly created partitions to the NewPartition state
* 2. Move the newly created partitions from NewPartition->OnlinePartition state
*/
- def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
+ def onNewPartitionCreation(newPartitions: Set[TopicPartition]) {
info("New partition creation callback for %s".format(newPartitions.mkString(",")))
partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
@@ -419,51 +420,50 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
* Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently.
* This way, if the controller crashes before that step, we can still recover.
*/
- def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
+ def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
- if (!areReplicasInIsr(topicAndPartition, reassignedReplicas)) {
- info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
- "reassigned not yet caught up with the leader")
- val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
- val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
+ if (!areReplicasInIsr(topicPartition, reassignedReplicas)) {
+ info(s"New replicas ${reassignedReplicas.mkString(",")} for partition $topicPartition being reassigned not yet " +
+ "caught up with the leader")
+ val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicPartition).toSet
+ val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicPartition)).toSet
//1. Update AR in ZK with OAR + RAR.
- updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
+ updateAssignedReplicasForPartition(topicPartition, newAndOldReplicas.toSeq)
//2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
- updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
+ updateLeaderEpochAndSendRequest(topicPartition, controllerContext.partitionReplicaAssignment(topicPartition),
newAndOldReplicas.toSeq)
//3. replicas in RAR - OAR -> NewReplica
- startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
- info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
+ startNewReplicasForReassignedPartition(topicPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
+ info(s"Waiting for new replicas ${reassignedReplicas.mkString(",")} for partition ${topicPartition} being " +
"reassigned to catch up with the leader")
} else {
//4. Wait until all replicas in RAR are in sync with the leader.
- val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
+ val oldReplicas = controllerContext.partitionReplicaAssignment(topicPartition).toSet -- reassignedReplicas.toSet
//5. replicas in RAR -> OnlineReplica
reassignedReplicas.foreach { replica =>
- replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
- replica)), OnlineReplica)
+ replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicPartition, replica)), OnlineReplica)
}
//6. Set AR to RAR in memory.
//7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and
// a new AR (using RAR) and same isr to every broker in RAR
- moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
+ moveReassignedPartitionLeaderIfRequired(topicPartition, reassignedPartitionContext)
//8. replicas in OAR - RAR -> Offline (force those replicas out of isr)
//9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
- stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
+ stopOldReplicasOfReassignedPartition(topicPartition, reassignedPartitionContext, oldReplicas)
//10. Update AR in ZK with RAR.
- updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
+ updateAssignedReplicasForPartition(topicPartition, reassignedReplicas)
//11. Update the /admin/reassign_partitions path in ZK to remove this partition.
- removePartitionFromReassignedPartitions(topicAndPartition)
- info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
- controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
+ removePartitionFromReassignedPartitions(topicPartition)
+ info(s"Removed partition $topicPartition from the list of reassigned partitions in zookeeper")
+ controllerContext.partitionsBeingReassigned.remove(topicPartition)
//12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
- sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
+ sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
// signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
- topicDeletionManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
+ topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic))
}
}
- private def watchIsrChangesForReassignedPartition(partition: TopicAndPartition,
+ private def watchIsrChangesForReassignedPartition(partition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, partition)
reassignedPartitionContext.reassignIsrChangeHandler = reassignIsrChangeHandler
@@ -471,37 +471,37 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
}
- def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
- reassignedPartitionContext: ReassignedPartitionsContext) {
+ def initiateReassignReplicasForTopicPartition(topicPartition: TopicPartition,
+ reassignedPartitionContext: ReassignedPartitionsContext) {
val newReplicas = reassignedPartitionContext.newReplicas
- val topic = topicAndPartition.topic
+ val topic = topicPartition.topic
try {
- val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
+ val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicPartition)
assignedReplicasOpt match {
case Some(assignedReplicas) =>
if (assignedReplicas == newReplicas) {
- throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
- " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
+ throw new KafkaException(s"Partition $topicPartition to be reassigned is already assigned to replicas " +
+ s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment")
} else {
- info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
+ info(s"Handling reassignment of partition $topicPartition to new replicas ${newReplicas.mkString(",")}")
// first register ISR change listener
- watchIsrChangesForReassignedPartition(topicAndPartition, reassignedPartitionContext)
- controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
+ watchIsrChangesForReassignedPartition(topicPartition, reassignedPartitionContext)
+ controllerContext.partitionsBeingReassigned.put(topicPartition, reassignedPartitionContext)
// mark topic ineligible for deletion for the partitions being reassigned
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
- onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
+ onPartitionReassignment(topicPartition, reassignedPartitionContext)
}
- case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
- .format(topicAndPartition))
+ case None => throw new KafkaException(s"Attempt to reassign partition $topicPartition that doesn't exist")
}
} catch {
- case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
- // remove the partition from the admin path to unblock the admin client
- removePartitionFromReassignedPartitions(topicAndPartition)
+ case e: Throwable =>
+ error(s"Error completing reassignment of partition $topicPartition", e)
+ // remove the partition from the admin path to unblock the admin client
+ removePartitionFromReassignedPartitions(topicPartition)
}
}
- def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
+ def onPreferredReplicaElection(partitions: Set[TopicPartition], isTriggeredByAutoRebalance: Boolean = false) {
info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
try {
partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
@@ -572,7 +572,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
- controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
+ controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicPartition, LeaderIsrAndControllerEpoch]
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
// update the leader and isr cache for all existing partitions from Zookeeper
updateLeaderAndIsrCache()
@@ -584,7 +584,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
}
- private def fetchPendingPreferredReplicaElections(): Set[TopicAndPartition] = {
+ private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {
val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
// check if they are already completed or topic was deleted
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
@@ -621,19 +621,17 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
private def initializePartitionReassignment() {
// read the partitions being reassigned from zookeeper path /admin/reassign_partitions
- val partitionsBeingReassigned = zkClient.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
+ val partitionsBeingReassigned = zkClient.getPartitionReassignment
// check if they are already completed or topic was deleted
- val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
- val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
- val topicDeleted = replicasOpt.isEmpty
- val successful = if (!topicDeleted) replicasOpt.get == partition._2.newReplicas else false
- topicDeleted || successful
+ val reassignedPartitions = partitionsBeingReassigned.filter { case (tp, reassignmentReplicas) =>
+ controllerContext.partitionReplicaAssignment.get(tp) match {
+ case None => true // topic deleted
+ case Some(currentReplicas) => currentReplicas == reassignmentReplicas // reassignment completed
+ }
}.keys
- reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p))
- val partitionsToReassign = mutable.Map[TopicAndPartition, ReassignedPartitionsContext]()
- partitionsToReassign ++= partitionsBeingReassigned
- partitionsToReassign --= reassignedPartitions
- controllerContext.partitionsBeingReassigned ++= partitionsToReassign
+ reassignedPartitions.foreach(removePartitionFromReassignedPartitions)
+ val partitionsToReassign = partitionsBeingReassigned -- reassignedPartitions
+ controllerContext.partitionsBeingReassigned ++= partitionsToReassign.mapValues(new ReassignedPartitionsContext(_))
info(s"Partitions being reassigned: $partitionsBeingReassigned")
info(s"Partitions already reassigned: $reassignedPartitions")
info(s"Resuming reassignment of partitions: $partitionsToReassign")
@@ -652,8 +650,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
private def maybeTriggerPartitionReassignment() {
- controllerContext.partitionsBeingReassigned.foreach { topicPartitionToReassign =>
- initiateReassignReplicasForTopicPartition(topicPartitionToReassign._1, topicPartitionToReassign._2)
+ controllerContext.partitionsBeingReassigned.foreach { case (tp, reassignContext) =>
+ initiateReassignReplicasForTopicPartition(tp, reassignContext)
}
}
@@ -663,54 +661,52 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
controllerContext.controllerChannelManager.startup()
}
- def updateLeaderAndIsrCache(partitions: Seq[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
+ def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
}
}
- private def areReplicasInIsr(partition: TopicAndPartition, replicas: Seq[Int]): Boolean = {
+ private def areReplicasInIsr(partition: TopicPartition, replicas: Seq[Int]): Boolean = {
zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
replicas.forall(leaderIsrAndControllerEpoch.leaderAndIsr.isr.contains)
}
}
- private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
+ private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
- val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
+ val currentLeader = controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
// change the assigned replica list to just the reassigned replicas in the cache so it gets sent out on the LeaderAndIsr
// request to the current or new leader. This will prevent it from adding the old replicas to the ISR
- val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
- controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas)
+ val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicPartition)
+ controllerContext.partitionReplicaAssignment.put(topicPartition, reassignedReplicas)
if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
- info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
+ info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicPartition) +
"is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
// move the leader to one of the alive and caught up new replicas
- partitionStateMachine.handleStateChanges(Seq(topicAndPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
+ partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
} else {
// check if the leader is alive or not
- if (controllerContext.isReplicaOnline(currentLeader, topicAndPartition)) {
- info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
+ if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) {
+ info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicPartition) +
"is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
// shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
- updateLeaderEpochAndSendRequest(topicAndPartition, oldAndNewReplicas, reassignedReplicas)
+ updateLeaderEpochAndSendRequest(topicPartition, oldAndNewReplicas, reassignedReplicas)
} else {
- info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
+ info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicPartition) +
"is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
- partitionStateMachine.handleStateChanges(Seq(topicAndPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
+ partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
}
}
}
- private def stopOldReplicasOfReassignedPartition(topicAndPartition: TopicAndPartition,
+ private def stopOldReplicasOfReassignedPartition(topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext,
oldReplicas: Set[Int]) {
- val topic = topicAndPartition.topic
- val partition = topicAndPartition.partition
// first move the replica to offline state (the controller removes it from the ISR)
- val replicasToBeDeleted = oldReplicas.map(r => PartitionAndReplica(topic, partition, r))
+ val replicasToBeDeleted = oldReplicas.map(PartitionAndReplica(topicPartition, _))
replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, OfflineReplica)
// send stop replica command to the old replicas
replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, ReplicaDeletionStarted)
@@ -719,7 +715,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, NonExistentReplica)
}
- private def updateAssignedReplicasForPartition(partition: TopicAndPartition,
+ private def updateAssignedReplicasForPartition(partition: TopicPartition,
replicas: Seq[Int]) {
val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic == partition.topic)
partitionsAndReplicasForThisTopic.put(partition, replicas)
@@ -735,24 +731,24 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
}
- private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition,
+ private def startNewReplicasForReassignedPartition(topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext,
newReplicas: Set[Int]) {
// send the start replica request to the brokers in the reassigned replicas list that are not in the assigned
// replicas list
newReplicas.foreach { replica =>
- replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica)
+ replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicPartition, replica)), NewReplica)
}
}
- private def updateLeaderEpochAndSendRequest(partition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
+ private def updateLeaderEpochAndSendRequest(partition: TopicPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
updateLeaderEpoch(partition) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
try {
brokerRequestBatch.newBatch()
- brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, partition.topic,
- partition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas, isNew = false)
+ brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, partition,
+ updatedLeaderIsrAndControllerEpoch, newAssignedReplicas, isNew = false)
brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
case e: IllegalStateException =>
@@ -798,17 +794,17 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
}
- def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
- controllerContext.partitionsBeingReassigned.get(topicAndPartition).foreach { reassignContext =>
+ def removePartitionFromReassignedPartitions(topicPartition: TopicPartition) {
+ controllerContext.partitionsBeingReassigned.get(topicPartition).foreach { reassignContext =>
// stop watching the ISR changes for this partition
zkClient.unregisterZNodeChangeHandler(reassignContext.reassignIsrChangeHandler.path)
}
- val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned - topicAndPartition
+ val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned - topicPartition
// write the new list to zookeeper
if (updatedPartitionsBeingReassigned.isEmpty) {
- info("No more partitions need to be reassigned. Deleting zk path %s".format(ReassignPartitionsZNode.path))
+ info(s"No more partitions need to be reassigned. Deleting zk path ${ReassignPartitionsZNode.path}")
zkClient.deletePartitionReassignment()
// Ensure we detect future reassignments
eventManager.put(PartitionReassignment)
@@ -820,16 +816,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
}
- controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
+ controllerContext.partitionsBeingReassigned.remove(topicPartition)
}
- def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
+ def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicPartition],
isTriggeredByAutoRebalance : Boolean) {
- for(partition <- partitionsToBeRemoved) {
+ for (partition <- partitionsToBeRemoved) {
// check the status
val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
- if(currentLeader == preferredReplica) {
+ if (currentLeader == preferredReplica) {
info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
} else {
warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
@@ -845,7 +841,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
*
* @param brokers The brokers that the update metadata request should be sent to
*/
- def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
+ def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition] = Set.empty[TopicPartition]) {
try {
brokerRequestBatch.newBatch()
brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
@@ -862,7 +858,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
* @param partition partition
* @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty.
*/
- private def updateLeaderEpoch(partition: TopicAndPartition): Option[LeaderIsrAndControllerEpoch] = {
+ private def updateLeaderEpoch(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
debug("Updating leader epoch for partition %s.".format(partition))
var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
var zkWriteCompleteOrUnnecessary = false
@@ -900,21 +896,21 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
private def checkAndTriggerAutoLeaderRebalance(): Unit = {
trace("Checking need to trigger auto leader balancing")
- val preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] =
+ val preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicPartition, Seq[Int]]] =
controllerContext.partitionReplicaAssignment.filterNot { case (tp, _) =>
topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
}.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
debug(s"Preferred replicas by broker $preferredReplicasForTopicsByBrokers")
// for each broker, check if a preferred replica election needs to be triggered
- preferredReplicasForTopicsByBrokers.foreach { case (leaderBroker, topicAndPartitionsForBroker) =>
- val topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) =>
+ preferredReplicasForTopicsByBrokers.foreach { case (leaderBroker, topicPartitionsForBroker) =>
+ val topicsNotInPreferredReplica = topicPartitionsForBroker.filter { case (topicPartition, _) =>
val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker)
}
debug(s"Topics not in preferred replica $topicsNotInPreferredReplica")
- val imbalanceRatio = topicsNotInPreferredReplica.size.toDouble / topicAndPartitionsForBroker.size
+ val imbalanceRatio = topicsNotInPreferredReplica.size.toDouble / topicPartitionsForBroker.size
trace(s"Leader imbalance ratio for broker $leaderBroker is $imbalanceRatio")
// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
@@ -948,7 +944,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
}
- case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit) extends ControllerEvent {
+ case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent {
def state = ControllerState.ControlledShutdown
@@ -957,7 +953,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
controlledShutdownCallback(controlledShutdownResult)
}
- private def doControlledShutdown(id: Int): Set[TopicAndPartition] = {
+ private def doControlledShutdown(id: Int): Set[TopicPartition] = {
if (!isActive) {
throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
}
@@ -981,8 +977,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
try {
brokerRequestBatch.newBatch()
partitionsFollowedByBroker.foreach { partition =>
- brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition.topic,
- partition.partition, deletePartition = false, null)
+ brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition, deletePartition = false,
+ (_, _) => ())
}
brokerRequestBatch.sendRequestsToBrokers(epoch)
} catch {
@@ -990,12 +986,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
handleIllegalState(e)
}
// If the broker is a follower, updates the isr in ZK and notifies the current leader
- replicaStateMachine.handleStateChanges(partitionsFollowedByBroker.map(partition => PartitionAndReplica(partition.topic, partition.partition, id)).toSeq, OfflineReplica)
+ replicaStateMachine.handleStateChanges(partitionsFollowedByBroker.map(partition =>
+ PartitionAndReplica(partition, id)).toSeq, OfflineReplica)
def replicatedPartitionsBrokerLeads() = {
trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
controllerContext.partitionLeadershipInfo.filter {
- case (topicAndPartition, leaderIsrAndControllerEpoch) =>
- leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
+ case (topicPartition, leaderIsrAndControllerEpoch) =>
+ leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicPartition).size > 1
}.keys
}
replicatedPartitionsBrokerLeads().toSet
@@ -1016,18 +1013,20 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
return
}
- val offlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.KAFKA_STORAGE_ERROR).keys.map(
- new TopicAndPartition(_)).toSet
- val onlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.NONE).keys.map(
- new TopicAndPartition(_)).toSet
- val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicAndPartition])
+ val offlineReplicas = leaderAndIsrResponse.responses.asScala.collect {
+ case (tp, error) if error == Errors.KAFKA_STORAGE_ERROR => tp
+ }
+ val onlineReplicas = leaderAndIsrResponse.responses.asScala.collect {
+ case (tp, error) if error == Errors.NONE => tp
+ }
+ val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicPartition])
val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
if (newOfflineReplicas.nonEmpty) {
stateChangeLogger.info(s"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
- onReplicasBecomeOffline(newOfflineReplicas.map(tp => PartitionAndReplica(tp.topic, tp.partition, brokerId)))
+ onReplicasBecomeOffline(newOfflineReplicas.map(PartitionAndReplica(_, brokerId)))
}
}
}
@@ -1045,13 +1044,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
val partitionsInError =
if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
else responseMap.filter { case (_, error) => error != Errors.NONE }.keySet
- val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
+ val replicasInError = partitionsInError.map(PartitionAndReplica(_, replicaId))
// move all the failed replicas to ReplicaDeletionIneligible
topicDeletionManager.failReplicaDeletion(replicasInError)
if (replicasInError.size != responseMap.size) {
// some replicas could have been successfully deleted
val deletedReplicas = responseMap.keySet -- partitionsInError
- topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
+ topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(PartitionAndReplica(_, replicaId)))
}
}
}
@@ -1283,22 +1282,20 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
// the `path exists` check for free
if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {
val partitionReassignment = zkClient.getPartitionReassignment
- val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
- partitionsToBeReassigned.foreach { partitionToBeReassigned =>
- if (topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
- error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
- .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
- removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
+ val partitionsToBeReassigned = partitionReassignment -- controllerContext.partitionsBeingReassigned.keys
+ partitionsToBeReassigned.foreach { case (tp, context) =>
+ if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
+ error(s"Skipping reassignment of $tp since the topic is currently being deleted")
+ removePartitionFromReassignedPartitions(tp)
} else {
- val context = ReassignedPartitionsContext(partitionToBeReassigned._2)
- initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
+ initiateReassignReplicasForTopicPartition(tp, ReassignedPartitionsContext(context))
}
}
}
}
}
- case class PartitionReassignmentIsrChange(partition: TopicAndPartition) extends ControllerEvent {
+ case class PartitionReassignmentIsrChange(partition: TopicPartition) extends ControllerEvent {
override def state: ControllerState = ControllerState.PartitionReassignment
override def process(): Unit = {
@@ -1310,17 +1307,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
- if(caughtUpReplicas == reassignedReplicas) {
+ if (caughtUpReplicas == reassignedReplicas) {
// resume the partition reassignment process
- info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
- .format(caughtUpReplicas.size, reassignedReplicas.size, partition) +
- "Resuming partition reassignment")
+ info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
+ s"partition $partition being reassigned. Resuming partition reassignment")
onPartitionReassignment(partition, reassignedPartitionContext)
}
else {
- info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
- .format(caughtUpReplicas.size, reassignedReplicas.size, partition) +
- "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
+ info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
+ s"partition $partition being reassigned. Replica(s) " +
+ s"${(reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")} still need to catch up")
}
case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
.format(partition, reassignedReplicas.mkString(",")))
@@ -1347,9 +1343,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
}
- private def processUpdateNotifications(partitions: Seq[TopicAndPartition]) {
+ private def processUpdateNotifications(partitions: Seq[TopicPartition]) {
val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
- debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + partitions)
+ debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicPartitions:" + partitions)
sendUpdateMetadataRequest(liveBrokers, partitions.toSet)
}
}
@@ -1433,6 +1429,10 @@ class LogDirEventNotificationHandler(controller: KafkaController, eventManager:
override def handleChildChange(): Unit = eventManager.put(controller.LogDirEventNotification)
}
+object LogDirEventNotificationHandler {
+ val Version: Long = 1L
+}
+
class PartitionModificationsHandler(controller: KafkaController, eventManager: ControllerEventManager, topic: String) extends ZNodeChangeHandler {
override val path: String = TopicZNode.path(topic)
@@ -1454,7 +1454,7 @@ class PartitionReassignmentHandler(controller: KafkaController, eventManager: Co
override def handleCreation(): Unit = eventManager.put(controller.PartitionReassignment)
}
-class PartitionReassignmentIsrChangeHandler(controller: KafkaController, eventManager: ControllerEventManager, partition: TopicAndPartition) extends ZNodeChangeHandler {
+class PartitionReassignmentIsrChangeHandler(controller: KafkaController, eventManager: ControllerEventManager, partition: TopicPartition) extends ZNodeChangeHandler {
override val path: String = TopicPartitionStateZNode.path(partition)
override def handleDataChange(): Unit = eventManager.put(controller.PartitionReassignmentIsrChange(partition))
@@ -1466,6 +1466,10 @@ class IsrChangeNotificationHandler(controller: KafkaController, eventManager: Co
override def handleChildChange(): Unit = eventManager.put(controller.IsrChangeNotification)
}
+object IsrChangeNotificationHandler {
+ val Version: Long = 1L
+}
+
class PreferredReplicaElectionHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
override val path: String = PreferredReplicaElectionZNode.path
@@ -1482,23 +1486,16 @@ class ControllerChangeHandler(controller: KafkaController, eventManager: Control
override def handleDataChange(): Unit = eventManager.put(controller.ControllerChange)
}
-object LogDirEventNotificationListener {
- val version: Long = 1L
-}
-
-object IsrChangeNotificationListener {
- val version: Long = 1L
-}
-
case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
var reassignIsrChangeHandler: PartitionReassignmentIsrChangeHandler = null)
-case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
+case class PartitionAndReplica(topicPartition: TopicPartition, replica: Int) {
+ def topic: String = topicPartition.topic
+ def partition: Int = topicPartition.partition
+
override def toString: String = {
- "[Topic=%s,Partition=%d,Replica=%d]".format(topic, partition, replica)
+ s"[Topic=$topic,Partition=$partition,Replica=$replica]"
}
-
- def topicAndPartition = TopicAndPartition(topic, partition)
}
case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 1dee71d..217c2b6 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -17,11 +17,12 @@
package kafka.controller
import kafka.api.LeaderAndIsr
-import kafka.common.{StateChangeFailedException, TopicAndPartition}
+import kafka.common.StateChangeFailedException
import kafka.server.KafkaConfig
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
@@ -45,7 +46,7 @@ class PartitionStateMachine(config: KafkaConfig,
controllerContext: ControllerContext,
topicDeletionManager: TopicDeletionManager,
zkClient: KafkaZkClient,
- partitionState: mutable.Map[TopicAndPartition, PartitionState],
+ partitionState: mutable.Map[TopicPartition, PartitionState],
controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
private val controllerId = config.brokerId
@@ -107,7 +108,7 @@ class PartitionStateMachine(config: KafkaConfig,
// It is important to trigger leader election for those partitions.
}
- def handleStateChanges(partitions: Seq[TopicAndPartition], targetState: PartitionState,
+ def handleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Unit = {
if (partitions.nonEmpty) {
try {
@@ -120,7 +121,7 @@ class PartitionStateMachine(config: KafkaConfig,
}
}
- def partitionsInState(state: PartitionState): Set[TopicAndPartition] = {
+ def partitionsInState(state: PartitionState): Set[TopicPartition] = {
partitionState.filter { case (_, s) => s == state }.keySet.toSet
}
@@ -146,7 +147,7 @@ class PartitionStateMachine(config: KafkaConfig,
* @param partitions The partitions for which the state transition is invoked
* @param targetState The end state that the partition should be moved to
*/
- private def doHandleStateChanges(partitions: Seq[TopicAndPartition], targetState: PartitionState,
+ private def doHandleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Unit = {
val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
partitions.foreach(partition => partitionState.getOrElseUpdate(partition, NonExistentPartition))
@@ -196,8 +197,8 @@ class PartitionStateMachine(config: KafkaConfig,
* @param partitions The partitions that we're trying to initialize.
* @return The partitions that have been successfully initialized.
*/
- private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicAndPartition]): Seq[TopicAndPartition] = {
- val successfulInitializations = mutable.Buffer.empty[TopicAndPartition]
+ private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = {
+ val successfulInitializations = mutable.Buffer.empty[TopicPartition]
val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))
val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>
val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
@@ -226,16 +227,12 @@ class PartitionStateMachine(config: KafkaConfig,
}
createResponses.foreach { createResponse =>
val code = createResponse.resultCode
- val partition = createResponse.ctx.get.asInstanceOf[TopicAndPartition]
+ val partition = createResponse.ctx.get.asInstanceOf[TopicPartition]
val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochs(partition)
if (code == Code.OK) {
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(leaderIsrAndControllerEpoch.leaderAndIsr.isr,
- partition.topic,
- partition.partition,
- leaderIsrAndControllerEpoch,
- controllerContext.partitionReplicaAssignment(partition),
- isNew = true)
+ partition, leaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(partition), isNew = true)
successfulInitializations += partition
} else {
logFailedStateChange(partition, NewPartition, OnlinePartition, code)
@@ -250,8 +247,8 @@ class PartitionStateMachine(config: KafkaConfig,
* @param partitionLeaderElectionStrategy The election strategy to use.
* @return The partitions that successfully had a leader elected.
*/
- private def electLeaderForPartitions(partitions: Seq[TopicAndPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): Seq[TopicAndPartition] = {
- val successfulElections = mutable.Buffer.empty[TopicAndPartition]
+ private def electLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): Seq[TopicPartition] = {
+ val successfulElections = mutable.Buffer.empty[TopicPartition]
var remaining = partitions
while (remaining.nonEmpty) {
val (success, updatesToRetry, failedElections) = doElectLeaderForPartitions(partitions, partitionLeaderElectionStrategy)
@@ -276,18 +273,18 @@ class PartitionStateMachine(config: KafkaConfig,
* the partition leader updated partition state while the controller attempted to update partition state.
* 3. Exceptions corresponding to failed elections that should not be retried.
*/
- private def doElectLeaderForPartitions(partitions: Seq[TopicAndPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy):
- (Seq[TopicAndPartition], Seq[TopicAndPartition], Map[TopicAndPartition, Exception]) = {
+ private def doElectLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy):
+ (Seq[TopicPartition], Seq[TopicPartition], Map[TopicPartition, Exception]) = {
val getDataResponses = try {
zkClient.getTopicPartitionStatesRaw(partitions)
} catch {
case e: Exception =>
return (Seq.empty, Seq.empty, partitions.map(_ -> e).toMap)
}
- val failedElections = mutable.Map.empty[TopicAndPartition, Exception]
- val leaderIsrAndControllerEpochPerPartition = mutable.Buffer.empty[(TopicAndPartition, LeaderIsrAndControllerEpoch)]
+ val failedElections = mutable.Map.empty[TopicPartition, Exception]
+ val leaderIsrAndControllerEpochPerPartition = mutable.Buffer.empty[(TopicPartition, LeaderIsrAndControllerEpoch)]
getDataResponses.foreach { getDataResponse =>
- val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
+ val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
val currState = partitionState(partition)
if (getDataResponse.resultCode == Code.OK) {
val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
@@ -338,14 +335,14 @@ class PartitionStateMachine(config: KafkaConfig,
val replicas = controllerContext.partitionReplicaAssignment(partition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
- controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition.topic,
- partition.partition, leaderIsrAndControllerEpoch, replicas, isNew = false)
+ controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
+ leaderIsrAndControllerEpoch, replicas, isNew = false)
}
(successfulUpdates.keys.toSeq, updatesToRetry, failedElections.toMap ++ failedUpdates)
}
- private def leaderForOffline(leaderIsrAndControllerEpochs: Seq[(TopicAndPartition, LeaderIsrAndControllerEpoch)]):
- Seq[(TopicAndPartition, Option[LeaderAndIsr], Seq[Int])] = {
+ private def leaderForOffline(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]):
+ Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderIsrAndControllerEpochs.partition { case (partition, leaderIsrAndControllerEpoch) =>
val liveInSyncReplicas = leaderIsrAndControllerEpoch.leaderAndIsr.isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
liveInSyncReplicas.isEmpty
@@ -378,8 +375,8 @@ class PartitionStateMachine(config: KafkaConfig,
}
}
- private def leaderForReassign(leaderIsrAndControllerEpochs: Seq[(TopicAndPartition, LeaderIsrAndControllerEpoch)]):
- Seq[(TopicAndPartition, Option[LeaderAndIsr], Seq[Int])] = {
+ private def leaderForReassign(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]):
+ Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
val reassignment = controllerContext.partitionsBeingReassigned(partition).newReplicas
val liveReplicas = reassignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
@@ -390,8 +387,8 @@ class PartitionStateMachine(config: KafkaConfig,
}
}
- private def leaderForPreferredReplica(leaderIsrAndControllerEpochs: Seq[(TopicAndPartition, LeaderIsrAndControllerEpoch)]):
- Seq[(TopicAndPartition, Option[LeaderAndIsr], Seq[Int])] = {
+ private def leaderForPreferredReplica(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]):
+ Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
val assignment = controllerContext.partitionReplicaAssignment(partition)
val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
@@ -402,8 +399,8 @@ class PartitionStateMachine(config: KafkaConfig,
}
}
- private def leaderForControlledShutdown(leaderIsrAndControllerEpochs: Seq[(TopicAndPartition, LeaderIsrAndControllerEpoch)], shuttingDownBrokers: Set[Int]):
- Seq[(TopicAndPartition, Option[LeaderAndIsr], Seq[Int])] = {
+ private def leaderForControlledShutdown(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)], shuttingDownBrokers: Set[Int]):
+ Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
val assignment = controllerContext.partitionReplicaAssignment(partition)
val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
@@ -415,10 +412,10 @@ class PartitionStateMachine(config: KafkaConfig,
}
}
- private def isValidTransition(partition: TopicAndPartition, targetState: PartitionState) =
+ private def isValidTransition(partition: TopicPartition, targetState: PartitionState) =
targetState.validPreviousStates.contains(partitionState(partition))
- private def logInvalidTransition(partition: TopicAndPartition, targetState: PartitionState): Unit = {
+ private def logInvalidTransition(partition: TopicPartition, targetState: PartitionState): Unit = {
val currState = partitionState(partition)
val e = new IllegalStateException("Partition %s should be in the %s states before moving to %s state"
.format(partition, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state"
@@ -426,11 +423,11 @@ class PartitionStateMachine(config: KafkaConfig,
logFailedStateChange(partition, currState, targetState, e)
}
- private def logFailedStateChange(partition: TopicAndPartition, currState: PartitionState, targetState: PartitionState, code: Code): Unit = {
+ private def logFailedStateChange(partition: TopicPartition, currState: PartitionState, targetState: PartitionState, code: Code): Unit = {
logFailedStateChange(partition, currState, targetState, KeeperException.create(code))
}
- private def logFailedStateChange(partition: TopicAndPartition, currState: PartitionState, targetState: PartitionState, t: Throwable): Unit = {
+ private def logFailedStateChange(partition: TopicPartition, currState: PartitionState, targetState: PartitionState, t: Throwable): Unit = {
stateChangeLogger.withControllerEpoch(controllerContext.epoch)
.error("Controller %d epoch %d failed to change state for partition %s from %s to %s"
.format(controllerId, controllerContext.epoch, partition, currState, targetState), t)