You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/07 09:55:51 UTC

[2/2] kafka git commit: MINOR: Eliminate unnecessary Topic(And)Partition allocations in Controller

MINOR: Eliminate unnecessary Topic(And)Partition allocations in Controller

- Eliminated all the unnecessary allocations of `TopicPartition` and
`TopicAndPartition` in the Controller. We now use the former
in the Controller (bringing it inline with the rest of the non legacy
code).
- Fixed missed `Listener` -> `Handler` renames for companion
objects.
- More String.format -> String interpolation conversions (the former
is roughly 5 times more expensive).
- Some other minor clean-ups.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Onur Karaman <ok...@linkedin.com>, Viktor Somogyi <vi...@gmail.com>

Closes #4152 from ijuma/controller-topic-partition-and-other-clean-ups


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3735a6ca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3735a6ca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3735a6ca

Branch: refs/heads/trunk
Commit: 3735a6ca8b6432db2de4a0bd07df9301459bbf0b
Parents: 5813812
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Nov 7 09:55:44 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Nov 7 09:55:44 2017 +0000

----------------------------------------------------------------------
 .../scala/kafka/common/TopicAndPartition.scala  |  11 -
 .../consumer/ZookeeperConsumerConnector.scala   |   9 +-
 .../controller/ControllerChannelManager.scala   |  45 +--
 .../kafka/controller/ControllerContext.scala    |  45 ++-
 .../kafka/controller/KafkaController.scala      | 279 +++++++++----------
 .../controller/PartitionStateMachine.scala      |  65 +++--
 .../kafka/controller/ReplicaStateMachine.scala  |  87 +++---
 .../kafka/controller/TopicDeletionManager.scala |  15 +-
 .../transaction/TransactionStateManager.scala   |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   8 +-
 .../main/scala/kafka/utils/LogDirUtils.scala    |   4 +-
 .../scala/kafka/utils/ReplicationUtils.scala    |   4 +-
 .../src/main/scala/kafka/zk/KafkaZkClient.scala |  55 ++--
 core/src/main/scala/kafka/zk/ZkData.scala       |  38 +--
 .../test/scala/unit/kafka/admin/AdminTest.scala |   4 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |  10 +-
 .../controller/ControllerFailoverTest.scala     |   4 +-
 .../controller/PartitionStateMachineTest.scala  |  19 +-
 .../controller/ReplicaStateMachineTest.scala    |  32 +--
 .../unit/kafka/server/LogDirFailureTest.scala   |   2 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala |  22 +-
 21 files changed, 356 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 4a8e65d..6c27695 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -1,6 +1,5 @@
 package kafka.common
 
-import kafka.cluster.{Partition, Replica}
 import org.apache.kafka.common.TopicPartition
 
 /**
@@ -25,17 +24,7 @@ import org.apache.kafka.common.TopicPartition
  */
 case class TopicAndPartition(topic: String, partition: Int) {
 
-  def this(tuple: (String, Int)) = this(tuple._1, tuple._2)
-
-  def this(partition: Partition) = this(partition.topic, partition.partitionId)
-
   def this(topicPartition: TopicPartition) = this(topicPartition.topic, topicPartition.partition)
 
-  def this(replica: Replica) = this(replica.topicPartition)
-
-  def asTuple = (topic, partition)
-
-  def asTopicPartition = new TopicPartition(topic, partition)
-
   override def toString: String = s"$topic-$partition"
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index d1928b4..bb5fc0f 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -717,12 +717,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           false
         else {
           val offsetFetchResponse = offsetFetchResponseOpt.get
-          topicPartitions.foreach(topicAndPartition => {
-            val (topic, partition) = topicAndPartition.asTuple
-            val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
-            val threadId = partitionAssignment(topicAndPartition)
+          topicPartitions.foreach { case tp@ TopicAndPartition(topic, partition) =>
+            val offset = offsetFetchResponse.requestInfo(tp).offset
+            val threadId = partitionAssignment(tp)
             addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
-          })
+          }
 
           /**
            * move the partition ownership here, since that can be used to indicate a truly successful re-balancing attempt

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 9fef617..7314679 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
 import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.cluster.Broker
-import kafka.common.{KafkaException, TopicAndPartition}
+import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.KafkaConfig
 import kafka.utils._
@@ -312,10 +312,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
     updateMetadataRequestPartitionInfoMap.clear()
   }
 
-  def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
+  def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topicPartition: TopicPartition,
                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
                                        replicas: Seq[Int], isNew: Boolean) {
-    val topicPartition = new TopicPartition(topic, partition)
 
     brokerIds.filter(_ >= 0).foreach { brokerId =>
       val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
@@ -329,29 +328,24 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
         isNew || alreadyNew))
     }
 
-    addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
-                                       Set(TopicAndPartition(topic, partition)))
+    addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
   }
 
-  def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
+  def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topicPartition: TopicPartition, deletePartition: Boolean,
                                       callback: (AbstractResponse, Int) => Unit) {
     brokerIds.filter(b => b >= 0).foreach { brokerId =>
       stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
       val v = stopReplicaRequestMap(brokerId)
-      if(callback != null)
-        stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
-          deletePartition, (r: AbstractResponse) => callback(r, brokerId))
-      else
-        stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
-          deletePartition)
+      stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topicPartition, brokerId),
+        deletePartition, (r: AbstractResponse) => callback(r, brokerId))
     }
   }
 
   /** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
   def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
-                                         partitions: collection.Set[TopicAndPartition]) {
+                                         partitions: collection.Set[TopicPartition]) {
 
-    def updateMetadataRequestPartitionInfo(partition: TopicAndPartition, beingDeleted: Boolean) {
+    def updateMetadataRequestPartitionInfo(partition: TopicPartition, beingDeleted: Boolean) {
       val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
       leaderIsrAndControllerEpochOpt match {
         case Some(l @ LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)) =>
@@ -371,7 +365,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
             leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion,
             replicas.map(Integer.valueOf).asJava,
             offlineReplicas.map(Integer.valueOf).asJava)
-          updateMetadataRequestPartitionInfoMap.put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
+          updateMetadataRequestPartitionInfoMap.put(partition, partitionStateInfo)
 
         case None =>
           info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
@@ -474,13 +468,13 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
         // Send one StopReplicaRequest for all partitions that require neither delete nor callback. This potentially
         // changes the order in which the requests are sent for the same partitions, but that's OK.
         val stopReplicaRequest = new StopReplicaRequest.Builder(controllerId, controllerEpoch, false,
-          replicasToGroup.map(r => new TopicPartition(r.replica.topic, r.replica.partition)).toSet.asJava)
+          replicasToGroup.map(_.replica.topicPartition).toSet.asJava)
         controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest)
 
         replicasToNotGroup.foreach { r =>
           val stopReplicaRequest = new StopReplicaRequest.Builder(
               controllerId, controllerEpoch, r.deletePartition,
-              Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
+              Set(r.replica.topicPartition).asJava)
           controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest, r.callback)
         }
       }
@@ -510,19 +504,6 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
                                      requestSendThread: RequestSendThread,
                                      queueSizeGauge: Gauge[Int])
 
-case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit = null)
-
-class Callbacks private (var stopReplicaResponseCallback: (AbstractResponse, Int) => Unit)
+case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit)
 
-object Callbacks {
-  class CallbackBuilder {
-    var stopReplicaResponseCbk: (AbstractResponse, Int) => Unit = null
-
-    def stopReplicaCallback(cbk: (AbstractResponse, Int) => Unit): CallbackBuilder = {
-      stopReplicaResponseCbk = cbk
-      this
-    }
-
-    def build: Callbacks = new Callbacks(stopReplicaResponseCbk)
-  }
-}
+class Callbacks(val stopReplicaResponseCallback: (AbstractResponse, Int) => Unit = (_, _ ) => ())

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/controller/ControllerContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index d4a29f8..541bce8 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -18,7 +18,7 @@
 package kafka.controller
 
 import kafka.cluster.Broker
-import kafka.common.TopicAndPartition
+import org.apache.kafka.common.TopicPartition
 
 import scala.collection.{Seq, Set, mutable}
 
@@ -31,10 +31,10 @@ class ControllerContext {
   var epoch: Int = KafkaController.InitialControllerEpoch - 1
   var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
   var allTopics: Set[String] = Set.empty
-  var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty
-  var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
-  val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = mutable.Map.empty
-  val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicAndPartition]] = mutable.Map.empty
+  var partitionReplicaAssignment: mutable.Map[TopicPartition, Seq[Int]] = mutable.Map.empty
+  var partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
+  val partitionsBeingReassigned: mutable.Map[TopicPartition, ReassignedPartitionsContext] = mutable.Map.empty
+  val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty
 
   private var liveBrokersUnderlying: Set[Broker] = Set.empty
   private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
@@ -52,58 +52,55 @@ class ControllerContext {
   def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying
   def liveOrShuttingDownBrokers = liveBrokersUnderlying
 
-  def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = {
+  def partitionsOnBroker(brokerId: Int): Set[TopicPartition] = {
     partitionReplicaAssignment.collect {
-      case (topicAndPartition, replicas) if replicas.contains(brokerId) => topicAndPartition
+      case (topicPartition, replicas) if replicas.contains(brokerId) => topicPartition
     }.toSet
   }
 
-  def isReplicaOnline(brokerId: Int, topicAndPartition: TopicAndPartition, includeShuttingDownBrokers: Boolean = false): Boolean = {
+  def isReplicaOnline(brokerId: Int, topicPartition: TopicPartition, includeShuttingDownBrokers: Boolean = false): Boolean = {
     val brokerOnline = {
       if (includeShuttingDownBrokers) liveOrShuttingDownBrokerIds.contains(brokerId)
       else liveBrokerIds.contains(brokerId)
     }
-    brokerOnline && !replicasOnOfflineDirs.getOrElse(brokerId, Set.empty).contains(topicAndPartition)
+    brokerOnline && !replicasOnOfflineDirs.getOrElse(brokerId, Set.empty).contains(topicPartition)
   }
 
   def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
     brokerIds.flatMap { brokerId =>
-      partitionReplicaAssignment.collect {
-        case (topicAndPartition, replicas) if replicas.contains(brokerId) =>
-          PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId)
+      partitionReplicaAssignment.collect { case (topicPartition, replicas) if replicas.contains(brokerId) =>
+        PartitionAndReplica(topicPartition, brokerId)
       }
     }.toSet
   }
 
   def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
     partitionReplicaAssignment
-      .filter { case (topicAndPartition, _) => topicAndPartition.topic == topic }
-      .flatMap { case (topicAndPartition, replicas) =>
-        replicas.map { r =>
-          PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
-        }
+      .filter { case (topicPartition, _) => topicPartition.topic == topic }
+      .flatMap { case (topicPartition, replicas) =>
+        replicas.map(PartitionAndReplica(topicPartition, _))
       }.toSet
   }
 
-  def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] =
-    partitionReplicaAssignment.keySet.filter(topicAndPartition => topicAndPartition.topic == topic)
+  def partitionsForTopic(topic: String): collection.Set[TopicPartition] =
+    partitionReplicaAssignment.keySet.filter(topicPartition => topicPartition.topic == topic)
 
   def allLiveReplicas(): Set[PartitionAndReplica] = {
     replicasOnBrokers(liveBrokerIds).filter { partitionAndReplica =>
-      isReplicaOnline(partitionAndReplica.replica, TopicAndPartition(partitionAndReplica.topic, partitionAndReplica.partition))
+      isReplicaOnline(partitionAndReplica.replica, partitionAndReplica.topicPartition)
     }
   }
 
-  def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = {
+  def replicasForPartition(partitions: collection.Set[TopicPartition]): collection.Set[PartitionAndReplica] = {
     partitions.flatMap { p =>
       val replicas = partitionReplicaAssignment(p)
-      replicas.map(r => PartitionAndReplica(p.topic, p.partition, r))
+      replicas.map(PartitionAndReplica(p, _))
     }
   }
 
   def removeTopic(topic: String) = {
-    partitionLeadershipInfo = partitionLeadershipInfo.filter { case (topicAndPartition, _) => topicAndPartition.topic != topic }
-    partitionReplicaAssignment = partitionReplicaAssignment.filter { case (topicAndPartition, _) => topicAndPartition.topic != topic }
+    partitionLeadershipInfo = partitionLeadershipInfo.filter { case (topicPartition, _) => topicPartition.topic != topic }
+    partitionReplicaAssignment = partitionReplicaAssignment.filter { case (topicPartition, _) => topicPartition.topic != topic }
     allTopics -= topic
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index b676ead..ade3ae4 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -28,6 +28,7 @@ import kafka.utils._
 import kafka.zk._
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zookeeper.{ZNodeChangeHandler, ZNodeChildChangeHandler}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -139,7 +140,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * @param id Id of the broker to shutdown.
    * @return The number of partitions that the broker still leads.
    */
-  def shutdownBroker(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit): Unit = {
+  def shutdownBroker(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit): Unit = {
     val controlledShutdownEvent = ControlledShutdown(id, controlledShutdownCallback)
     eventManager.put(controlledShutdownEvent)
   }
@@ -295,13 +296,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     partitionStateMachine.triggerOnlinePartitionStateChange()
     // check if reassignment of some partitions need to be restarted
     val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
-      case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
+      case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains)
     }
-    partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
+    partitionsWithReplicasOnNewBrokers.foreach { case (tp, context) => onPartitionReassignment(tp, context) }
     // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
     // on the newly restarted brokers, there is a chance that topic deletion can resume
     val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
-    if(replicasForTopicsToBeDeleted.nonEmpty) {
+    if (replicasForTopicsToBeDeleted.nonEmpty) {
       info(("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. " +
         "Signaling restart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","),
         topicDeletionManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
@@ -370,7 +371,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * 1. Move the newly created partitions to the NewPartition state
    * 2. Move the newly created partitions from NewPartition->OnlinePartition state
    */
-  def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
+  def onNewPartitionCreation(newPartitions: Set[TopicPartition]) {
     info("New partition creation callback for %s".format(newPartitions.mkString(",")))
     partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
     replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
@@ -419,51 +420,50 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently.
    * This way, if the controller crashes before that step, we can still recover.
    */
-  def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
+  def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
-    if (!areReplicasInIsr(topicAndPartition, reassignedReplicas)) {
-      info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
-        "reassigned not yet caught up with the leader")
-      val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
-      val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
+    if (!areReplicasInIsr(topicPartition, reassignedReplicas)) {
+      info(s"New replicas ${reassignedReplicas.mkString(",")} for partition $topicPartition being reassigned not yet " +
+        "caught up with the leader")
+      val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicPartition).toSet
+      val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicPartition)).toSet
       //1. Update AR in ZK with OAR + RAR.
-      updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
+      updateAssignedReplicasForPartition(topicPartition, newAndOldReplicas.toSeq)
       //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
-      updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
+      updateLeaderEpochAndSendRequest(topicPartition, controllerContext.partitionReplicaAssignment(topicPartition),
         newAndOldReplicas.toSeq)
       //3. replicas in RAR - OAR -> NewReplica
-      startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
-      info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
+      startNewReplicasForReassignedPartition(topicPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
+      info(s"Waiting for new replicas ${reassignedReplicas.mkString(",")} for partition ${topicPartition} being " +
         "reassigned to catch up with the leader")
     } else {
       //4. Wait until all replicas in RAR are in sync with the leader.
-      val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
+      val oldReplicas = controllerContext.partitionReplicaAssignment(topicPartition).toSet -- reassignedReplicas.toSet
       //5. replicas in RAR -> OnlineReplica
       reassignedReplicas.foreach { replica =>
-        replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
-          replica)), OnlineReplica)
+        replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicPartition, replica)), OnlineReplica)
       }
       //6. Set AR to RAR in memory.
       //7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and
       //   a new AR (using RAR) and same isr to every broker in RAR
-      moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
+      moveReassignedPartitionLeaderIfRequired(topicPartition, reassignedPartitionContext)
       //8. replicas in OAR - RAR -> Offline (force those replicas out of isr)
       //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
-      stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
+      stopOldReplicasOfReassignedPartition(topicPartition, reassignedPartitionContext, oldReplicas)
       //10. Update AR in ZK with RAR.
-      updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
+      updateAssignedReplicasForPartition(topicPartition, reassignedReplicas)
       //11. Update the /admin/reassign_partitions path in ZK to remove this partition.
-      removePartitionFromReassignedPartitions(topicAndPartition)
-      info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
-      controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
+      removePartitionFromReassignedPartitions(topicPartition)
+      info(s"Removed partition $topicPartition from the list of reassigned partitions in zookeeper")
+      controllerContext.partitionsBeingReassigned.remove(topicPartition)
       //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
-      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
+      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
       // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
-      topicDeletionManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
+      topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic))
     }
   }
 
-  private def watchIsrChangesForReassignedPartition(partition: TopicAndPartition,
+  private def watchIsrChangesForReassignedPartition(partition: TopicPartition,
                                                     reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, partition)
     reassignedPartitionContext.reassignIsrChangeHandler = reassignIsrChangeHandler
@@ -471,37 +471,37 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
   }
 
-  def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
-                                        reassignedPartitionContext: ReassignedPartitionsContext) {
+  def initiateReassignReplicasForTopicPartition(topicPartition: TopicPartition,
+                                                reassignedPartitionContext: ReassignedPartitionsContext) {
     val newReplicas = reassignedPartitionContext.newReplicas
-    val topic = topicAndPartition.topic
+    val topic = topicPartition.topic
     try {
-      val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
+      val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicPartition)
       assignedReplicasOpt match {
         case Some(assignedReplicas) =>
           if (assignedReplicas == newReplicas) {
-            throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
-              " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
+            throw new KafkaException(s"Partition $topicPartition to be reassigned is already assigned to replicas " +
+              s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment")
           } else {
-            info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
+            info(s"Handling reassignment of partition $topicPartition to new replicas ${newReplicas.mkString(",")}")
             // first register ISR change listener
-            watchIsrChangesForReassignedPartition(topicAndPartition, reassignedPartitionContext)
-            controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
+            watchIsrChangesForReassignedPartition(topicPartition, reassignedPartitionContext)
+            controllerContext.partitionsBeingReassigned.put(topicPartition, reassignedPartitionContext)
             // mark topic ineligible for deletion for the partitions being reassigned
             topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
-            onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
+            onPartitionReassignment(topicPartition, reassignedPartitionContext)
           }
-        case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
-          .format(topicAndPartition))
+        case None => throw new KafkaException(s"Attempt to reassign partition $topicPartition that doesn't exist")
       }
     } catch {
-      case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
-      // remove the partition from the admin path to unblock the admin client
-      removePartitionFromReassignedPartitions(topicAndPartition)
+      case e: Throwable =>
+        error(s"Error completing reassignment of partition $topicPartition", e)
+        // remove the partition from the admin path to unblock the admin client
+        removePartitionFromReassignedPartitions(topicPartition)
     }
   }
 
-  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
+  def onPreferredReplicaElection(partitions: Set[TopicPartition], isTriggeredByAutoRebalance: Boolean = false) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
       partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
@@ -572,7 +572,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
     registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
     controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
-    controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
+    controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicPartition, LeaderIsrAndControllerEpoch]
     controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
     // update the leader and isr cache for all existing partitions from Zookeeper
     updateLeaderAndIsrCache()
@@ -584,7 +584,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
   }
 
-  private def fetchPendingPreferredReplicaElections(): Set[TopicAndPartition] = {
+  private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {
     val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
     // check if they are already completed or topic was deleted
     val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
@@ -621,19 +621,17 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   private def initializePartitionReassignment() {
     // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
-    val partitionsBeingReassigned = zkClient.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
+    val partitionsBeingReassigned = zkClient.getPartitionReassignment
     // check if they are already completed or topic was deleted
-    val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
-      val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
-      val topicDeleted = replicasOpt.isEmpty
-      val successful = if (!topicDeleted) replicasOpt.get == partition._2.newReplicas else false
-      topicDeleted || successful
+    val reassignedPartitions = partitionsBeingReassigned.filter { case (tp, reassignmentReplicas) =>
+      controllerContext.partitionReplicaAssignment.get(tp) match {
+        case None => true // topic deleted
+        case Some(currentReplicas) => currentReplicas == reassignmentReplicas // reassignment completed
+      }
     }.keys
-    reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p))
-    val partitionsToReassign = mutable.Map[TopicAndPartition, ReassignedPartitionsContext]()
-    partitionsToReassign ++= partitionsBeingReassigned
-    partitionsToReassign --= reassignedPartitions
-    controllerContext.partitionsBeingReassigned ++= partitionsToReassign
+    reassignedPartitions.foreach(removePartitionFromReassignedPartitions)
+    val partitionsToReassign = partitionsBeingReassigned -- reassignedPartitions
+    controllerContext.partitionsBeingReassigned ++= partitionsToReassign.mapValues(new ReassignedPartitionsContext(_))
     info(s"Partitions being reassigned: $partitionsBeingReassigned")
     info(s"Partitions already reassigned: $reassignedPartitions")
     info(s"Resuming reassignment of partitions: $partitionsToReassign")
@@ -652,8 +650,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
   }
 
   private def maybeTriggerPartitionReassignment() {
-    controllerContext.partitionsBeingReassigned.foreach { topicPartitionToReassign =>
-      initiateReassignReplicasForTopicPartition(topicPartitionToReassign._1, topicPartitionToReassign._2)
+    controllerContext.partitionsBeingReassigned.foreach { case (tp, reassignContext) =>
+      initiateReassignReplicasForTopicPartition(tp, reassignContext)
     }
   }
 
@@ -663,54 +661,52 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     controllerContext.controllerChannelManager.startup()
   }
 
-  def updateLeaderAndIsrCache(partitions: Seq[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
+  def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
     val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
     leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
       controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
     }
   }
 
-  private def areReplicasInIsr(partition: TopicAndPartition, replicas: Seq[Int]): Boolean = {
+  private def areReplicasInIsr(partition: TopicPartition, replicas: Seq[Int]): Boolean = {
     zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
       replicas.forall(leaderIsrAndControllerEpoch.leaderAndIsr.isr.contains)
     }
   }
 
-  private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
+  private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition,
                                                       reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
+    val currentLeader = controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
     // change the assigned replica list to just the reassigned replicas in the cache so it gets sent out on the LeaderAndIsr
     // request to the current or new leader. This will prevent it from adding the old replicas to the ISR
-    val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-    controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas)
+    val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicPartition)
+    controllerContext.partitionReplicaAssignment.put(topicPartition, reassignedReplicas)
     if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
-      info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
+      info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicPartition) +
         "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
       // move the leader to one of the alive and caught up new replicas
-      partitionStateMachine.handleStateChanges(Seq(topicAndPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
+      partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
     } else {
       // check if the leader is alive or not
-      if (controllerContext.isReplicaOnline(currentLeader, topicAndPartition)) {
-        info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
+      if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) {
+        info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicPartition) +
           "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
         // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
-        updateLeaderEpochAndSendRequest(topicAndPartition, oldAndNewReplicas, reassignedReplicas)
+        updateLeaderEpochAndSendRequest(topicPartition, oldAndNewReplicas, reassignedReplicas)
       } else {
-        info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
+        info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicPartition) +
           "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
-        partitionStateMachine.handleStateChanges(Seq(topicAndPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
+        partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
       }
     }
   }
 
-  private def stopOldReplicasOfReassignedPartition(topicAndPartition: TopicAndPartition,
+  private def stopOldReplicasOfReassignedPartition(topicPartition: TopicPartition,
                                                    reassignedPartitionContext: ReassignedPartitionsContext,
                                                    oldReplicas: Set[Int]) {
-    val topic = topicAndPartition.topic
-    val partition = topicAndPartition.partition
     // first move the replica to offline state (the controller removes it from the ISR)
-    val replicasToBeDeleted = oldReplicas.map(r => PartitionAndReplica(topic, partition, r))
+    val replicasToBeDeleted = oldReplicas.map(PartitionAndReplica(topicPartition, _))
     replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, OfflineReplica)
     // send stop replica command to the old replicas
     replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, ReplicaDeletionStarted)
@@ -719,7 +715,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, NonExistentReplica)
   }
 
-  private def updateAssignedReplicasForPartition(partition: TopicAndPartition,
+  private def updateAssignedReplicasForPartition(partition: TopicPartition,
                                                  replicas: Seq[Int]) {
     val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic == partition.topic)
     partitionsAndReplicasForThisTopic.put(partition, replicas)
@@ -735,24 +731,24 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   }
 
-  private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition,
+  private def startNewReplicasForReassignedPartition(topicPartition: TopicPartition,
                                                      reassignedPartitionContext: ReassignedPartitionsContext,
                                                      newReplicas: Set[Int]) {
     // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned
     // replicas list
     newReplicas.foreach { replica =>
-      replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica)
+      replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicPartition, replica)), NewReplica)
     }
   }
 
-  private def updateLeaderEpochAndSendRequest(partition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
+  private def updateLeaderEpochAndSendRequest(partition: TopicPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
     val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
     updateLeaderEpoch(partition) match {
       case Some(updatedLeaderIsrAndControllerEpoch) =>
         try {
           brokerRequestBatch.newBatch()
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, partition.topic,
-            partition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas, isNew = false)
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, partition,
+            updatedLeaderIsrAndControllerEpoch, newAssignedReplicas, isNew = false)
           brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
         } catch {
           case e: IllegalStateException =>
@@ -798,17 +794,17 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   }
 
-  def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
-    controllerContext.partitionsBeingReassigned.get(topicAndPartition).foreach { reassignContext =>
+  def removePartitionFromReassignedPartitions(topicPartition: TopicPartition) {
+    controllerContext.partitionsBeingReassigned.get(topicPartition).foreach { reassignContext =>
       // stop watching the ISR changes for this partition
       zkClient.unregisterZNodeChangeHandler(reassignContext.reassignIsrChangeHandler.path)
     }
 
-    val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned - topicAndPartition
+    val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned - topicPartition
 
     // write the new list to zookeeper
     if (updatedPartitionsBeingReassigned.isEmpty) {
-      info("No more partitions need to be reassigned. Deleting zk path %s".format(ReassignPartitionsZNode.path))
+      info(s"No more partitions need to be reassigned. Deleting zk path ${ReassignPartitionsZNode.path}")
       zkClient.deletePartitionReassignment()
       // Ensure we detect future reassignments
       eventManager.put(PartitionReassignment)
@@ -820,16 +816,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       }
     }
 
-    controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
+    controllerContext.partitionsBeingReassigned.remove(topicPartition)
   }
 
-  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
+  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicPartition],
                                                    isTriggeredByAutoRebalance : Boolean) {
-    for(partition <- partitionsToBeRemoved) {
+    for (partition <- partitionsToBeRemoved) {
       // check the status
       val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
       val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
-      if(currentLeader == preferredReplica) {
+      if (currentLeader == preferredReplica) {
         info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
       } else {
         warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
@@ -845,7 +841,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    *
    * @param brokers The brokers that the update metadata request should be sent to
    */
-  def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
+  def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition] = Set.empty[TopicPartition]) {
     try {
       brokerRequestBatch.newBatch()
       brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
@@ -862,7 +858,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * @param partition partition
    * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty.
    */
-  private def updateLeaderEpoch(partition: TopicAndPartition): Option[LeaderIsrAndControllerEpoch] = {
+  private def updateLeaderEpoch(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
     debug("Updating leader epoch for partition %s.".format(partition))
     var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
     var zkWriteCompleteOrUnnecessary = false
@@ -900,21 +896,21 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   private def checkAndTriggerAutoLeaderRebalance(): Unit = {
     trace("Checking need to trigger auto leader balancing")
-    val preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] =
+    val preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicPartition, Seq[Int]]] =
       controllerContext.partitionReplicaAssignment.filterNot { case (tp, _) =>
         topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
       }.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
     debug(s"Preferred replicas by broker $preferredReplicasForTopicsByBrokers")
 
     // for each broker, check if a preferred replica election needs to be triggered
-    preferredReplicasForTopicsByBrokers.foreach { case (leaderBroker, topicAndPartitionsForBroker) =>
-      val topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) =>
+    preferredReplicasForTopicsByBrokers.foreach { case (leaderBroker, topicPartitionsForBroker) =>
+      val topicsNotInPreferredReplica = topicPartitionsForBroker.filter { case (topicPartition, _) =>
         val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
         leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker)
       }
       debug(s"Topics not in preferred replica $topicsNotInPreferredReplica")
 
-      val imbalanceRatio = topicsNotInPreferredReplica.size.toDouble / topicAndPartitionsForBroker.size
+      val imbalanceRatio = topicsNotInPreferredReplica.size.toDouble / topicPartitionsForBroker.size
       trace(s"Leader imbalance ratio for broker $leaderBroker is $imbalanceRatio")
 
       // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
@@ -948,7 +944,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   }
 
-  case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit) extends ControllerEvent {
+  case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent {
 
     def state = ControllerState.ControlledShutdown
 
@@ -957,7 +953,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       controlledShutdownCallback(controlledShutdownResult)
     }
 
-    private def doControlledShutdown(id: Int): Set[TopicAndPartition] = {
+    private def doControlledShutdown(id: Int): Set[TopicPartition] = {
       if (!isActive) {
         throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
       }
@@ -981,8 +977,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       try {
         brokerRequestBatch.newBatch()
         partitionsFollowedByBroker.foreach { partition =>
-          brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition.topic,
-            partition.partition, deletePartition = false, null)
+          brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition, deletePartition = false,
+            (_, _) => ())
         }
         brokerRequestBatch.sendRequestsToBrokers(epoch)
       } catch {
@@ -990,12 +986,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
           handleIllegalState(e)
       }
       // If the broker is a follower, updates the isr in ZK and notifies the current leader
-      replicaStateMachine.handleStateChanges(partitionsFollowedByBroker.map(partition => PartitionAndReplica(partition.topic, partition.partition, id)).toSeq, OfflineReplica)
+      replicaStateMachine.handleStateChanges(partitionsFollowedByBroker.map(partition =>
+        PartitionAndReplica(partition, id)).toSeq, OfflineReplica)
       def replicatedPartitionsBrokerLeads() = {
         trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
         controllerContext.partitionLeadershipInfo.filter {
-          case (topicAndPartition, leaderIsrAndControllerEpoch) =>
-            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
+          case (topicPartition, leaderIsrAndControllerEpoch) =>
+            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicPartition).size > 1
         }.keys
       }
       replicatedPartitionsBrokerLeads().toSet
@@ -1016,18 +1013,20 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
         return
       }
 
-      val offlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.KAFKA_STORAGE_ERROR).keys.map(
-        new TopicAndPartition(_)).toSet
-      val onlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.NONE).keys.map(
-        new TopicAndPartition(_)).toSet
-      val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicAndPartition])
+      val offlineReplicas = leaderAndIsrResponse.responses.asScala.collect {
+        case (tp, error) if error == Errors.KAFKA_STORAGE_ERROR => tp
+      }
+      val onlineReplicas = leaderAndIsrResponse.responses.asScala.collect {
+        case (tp, error) if error == Errors.NONE => tp
+      }
+      val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicPartition])
       val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
       controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
       val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
 
       if (newOfflineReplicas.nonEmpty) {
         stateChangeLogger.info(s"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
-        onReplicasBecomeOffline(newOfflineReplicas.map(tp => PartitionAndReplica(tp.topic, tp.partition, brokerId)))
+        onReplicasBecomeOffline(newOfflineReplicas.map(PartitionAndReplica(_, brokerId)))
       }
     }
   }
@@ -1045,13 +1044,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       val partitionsInError =
         if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
         else responseMap.filter { case (_, error) => error != Errors.NONE }.keySet
-      val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
+      val replicasInError = partitionsInError.map(PartitionAndReplica(_, replicaId))
       // move all the failed replicas to ReplicaDeletionIneligible
       topicDeletionManager.failReplicaDeletion(replicasInError)
       if (replicasInError.size != responseMap.size) {
         // some replicas could have been successfully deleted
         val deletedReplicas = responseMap.keySet -- partitionsInError
-        topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
+        topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(PartitionAndReplica(_, replicaId)))
       }
     }
   }
@@ -1283,22 +1282,20 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       // the `path exists` check for free
       if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {
         val partitionReassignment = zkClient.getPartitionReassignment
-        val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
-        partitionsToBeReassigned.foreach { partitionToBeReassigned =>
-          if (topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
-            error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
-              .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
-            removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
+        val partitionsToBeReassigned = partitionReassignment -- controllerContext.partitionsBeingReassigned.keys
+        partitionsToBeReassigned.foreach { case (tp, context) =>
+          if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
+            error(s"Skipping reassignment of $tp since the topic is currently being deleted")
+            removePartitionFromReassignedPartitions(tp)
           } else {
-            val context = ReassignedPartitionsContext(partitionToBeReassigned._2)
-            initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
+            initiateReassignReplicasForTopicPartition(tp, ReassignedPartitionsContext(context))
           }
         }
       }
     }
   }
 
-  case class PartitionReassignmentIsrChange(partition: TopicAndPartition) extends ControllerEvent {
+  case class PartitionReassignmentIsrChange(partition: TopicPartition) extends ControllerEvent {
     override def state: ControllerState = ControllerState.PartitionReassignment
 
     override def process(): Unit = {
@@ -1310,17 +1307,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
           case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
             val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
             val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
-            if(caughtUpReplicas == reassignedReplicas) {
+            if (caughtUpReplicas == reassignedReplicas) {
               // resume the partition reassignment process
-              info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
-                .format(caughtUpReplicas.size, reassignedReplicas.size, partition) +
-                "Resuming partition reassignment")
+              info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
+                s"partition $partition being reassigned. Resuming partition reassignment")
               onPartitionReassignment(partition, reassignedPartitionContext)
             }
             else {
-              info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
-                .format(caughtUpReplicas.size, reassignedReplicas.size, partition) +
-                "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
+              info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
+                s"partition $partition being reassigned. Replica(s) " +
+                s"${(reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")} still need to catch up")
             }
           case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
             .format(partition, reassignedReplicas.mkString(",")))
@@ -1347,9 +1343,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       }
     }
 
-    private def processUpdateNotifications(partitions: Seq[TopicAndPartition]) {
+    private def processUpdateNotifications(partitions: Seq[TopicPartition]) {
       val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
-      debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + partitions)
+      debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicPartitions:" + partitions)
       sendUpdateMetadataRequest(liveBrokers, partitions.toSet)
     }
   }
@@ -1433,6 +1429,10 @@ class LogDirEventNotificationHandler(controller: KafkaController, eventManager:
   override def handleChildChange(): Unit = eventManager.put(controller.LogDirEventNotification)
 }
 
+object LogDirEventNotificationHandler {
+  val Version: Long = 1L
+}
+
 class PartitionModificationsHandler(controller: KafkaController, eventManager: ControllerEventManager, topic: String) extends ZNodeChangeHandler {
   override val path: String = TopicZNode.path(topic)
 
@@ -1454,7 +1454,7 @@ class PartitionReassignmentHandler(controller: KafkaController, eventManager: Co
   override def handleCreation(): Unit = eventManager.put(controller.PartitionReassignment)
 }
 
-class PartitionReassignmentIsrChangeHandler(controller: KafkaController, eventManager: ControllerEventManager, partition: TopicAndPartition) extends ZNodeChangeHandler {
+class PartitionReassignmentIsrChangeHandler(controller: KafkaController, eventManager: ControllerEventManager, partition: TopicPartition) extends ZNodeChangeHandler {
   override val path: String = TopicPartitionStateZNode.path(partition)
 
   override def handleDataChange(): Unit = eventManager.put(controller.PartitionReassignmentIsrChange(partition))
@@ -1466,6 +1466,10 @@ class IsrChangeNotificationHandler(controller: KafkaController, eventManager: Co
   override def handleChildChange(): Unit = eventManager.put(controller.IsrChangeNotification)
 }
 
+object IsrChangeNotificationHandler {
+  val Version: Long = 1L
+}
+
 class PreferredReplicaElectionHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
   override val path: String = PreferredReplicaElectionZNode.path
 
@@ -1482,23 +1486,16 @@ class ControllerChangeHandler(controller: KafkaController, eventManager: Control
   override def handleDataChange(): Unit = eventManager.put(controller.ControllerChange)
 }
 
-object LogDirEventNotificationListener {
-  val version: Long = 1L
-}
-
-object IsrChangeNotificationListener {
-  val version: Long = 1L
-}
-
 case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
                                        var reassignIsrChangeHandler: PartitionReassignmentIsrChangeHandler = null)
 
-case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
+case class PartitionAndReplica(topicPartition: TopicPartition, replica: Int) {
+  def topic: String = topicPartition.topic
+  def partition: Int = topicPartition.partition
+
   override def toString: String = {
-    "[Topic=%s,Partition=%d,Replica=%d]".format(topic, partition, replica)
+    s"[Topic=$topic,Partition=$partition,Replica=$replica]"
   }
-
-  def topicAndPartition = TopicAndPartition(topic, partition)
 }
 
 case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 1dee71d..217c2b6 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -17,11 +17,12 @@
 package kafka.controller
 
 import kafka.api.LeaderAndIsr
-import kafka.common.{StateChangeFailedException, TopicAndPartition}
+import kafka.common.StateChangeFailedException
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
 
@@ -45,7 +46,7 @@ class PartitionStateMachine(config: KafkaConfig,
                             controllerContext: ControllerContext,
                             topicDeletionManager: TopicDeletionManager,
                             zkClient: KafkaZkClient,
-                            partitionState: mutable.Map[TopicAndPartition, PartitionState],
+                            partitionState: mutable.Map[TopicPartition, PartitionState],
                             controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
   private val controllerId = config.brokerId
 
@@ -107,7 +108,7 @@ class PartitionStateMachine(config: KafkaConfig,
     // It is important to trigger leader election for those partitions.
   }
 
-  def handleStateChanges(partitions: Seq[TopicAndPartition], targetState: PartitionState,
+  def handleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
                          partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Unit = {
     if (partitions.nonEmpty) {
       try {
@@ -120,7 +121,7 @@ class PartitionStateMachine(config: KafkaConfig,
     }
   }
 
-  def partitionsInState(state: PartitionState): Set[TopicAndPartition] = {
+  def partitionsInState(state: PartitionState): Set[TopicPartition] = {
     partitionState.filter { case (_, s) => s == state }.keySet.toSet
   }
 
@@ -146,7 +147,7 @@ class PartitionStateMachine(config: KafkaConfig,
    * @param partitions  The partitions for which the state transition is invoked
    * @param targetState The end state that the partition should be moved to
    */
-  private def doHandleStateChanges(partitions: Seq[TopicAndPartition], targetState: PartitionState,
+  private def doHandleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
                            partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Unit = {
     val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
     partitions.foreach(partition => partitionState.getOrElseUpdate(partition, NonExistentPartition))
@@ -196,8 +197,8 @@ class PartitionStateMachine(config: KafkaConfig,
    * @param partitions The partitions  that we're trying to initialize.
    * @return The partitions that have been successfully initialized.
    */
-  private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicAndPartition]): Seq[TopicAndPartition] = {
-    val successfulInitializations = mutable.Buffer.empty[TopicAndPartition]
+  private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = {
+    val successfulInitializations = mutable.Buffer.empty[TopicPartition]
     val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))
     val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>
         val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
@@ -226,16 +227,12 @@ class PartitionStateMachine(config: KafkaConfig,
     }
     createResponses.foreach { createResponse =>
       val code = createResponse.resultCode
-      val partition = createResponse.ctx.get.asInstanceOf[TopicAndPartition]
+      val partition = createResponse.ctx.get.asInstanceOf[TopicPartition]
       val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochs(partition)
       if (code == Code.OK) {
         controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
         controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(leaderIsrAndControllerEpoch.leaderAndIsr.isr,
-          partition.topic,
-          partition.partition,
-          leaderIsrAndControllerEpoch,
-          controllerContext.partitionReplicaAssignment(partition),
-          isNew = true)
+          partition, leaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(partition), isNew = true)
         successfulInitializations += partition
       } else {
         logFailedStateChange(partition, NewPartition, OnlinePartition, code)
@@ -250,8 +247,8 @@ class PartitionStateMachine(config: KafkaConfig,
    * @param partitionLeaderElectionStrategy The election strategy to use.
    * @return The partitions that successfully had a leader elected.
    */
-  private def electLeaderForPartitions(partitions: Seq[TopicAndPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): Seq[TopicAndPartition] = {
-    val successfulElections = mutable.Buffer.empty[TopicAndPartition]
+  private def electLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): Seq[TopicPartition] = {
+    val successfulElections = mutable.Buffer.empty[TopicPartition]
     var remaining = partitions
     while (remaining.nonEmpty) {
       val (success, updatesToRetry, failedElections) = doElectLeaderForPartitions(partitions, partitionLeaderElectionStrategy)
@@ -276,18 +273,18 @@ class PartitionStateMachine(config: KafkaConfig,
    *         the partition leader updated partition state while the controller attempted to update partition state.
    *         3. Exceptions corresponding to failed elections that should not be retried.
    */
-  private def doElectLeaderForPartitions(partitions: Seq[TopicAndPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy):
-  (Seq[TopicAndPartition], Seq[TopicAndPartition], Map[TopicAndPartition, Exception]) = {
+  private def doElectLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy):
+  (Seq[TopicPartition], Seq[TopicPartition], Map[TopicPartition, Exception]) = {
     val getDataResponses = try {
       zkClient.getTopicPartitionStatesRaw(partitions)
     } catch {
       case e: Exception =>
         return (Seq.empty, Seq.empty, partitions.map(_ -> e).toMap)
     }
-    val failedElections = mutable.Map.empty[TopicAndPartition, Exception]
-    val leaderIsrAndControllerEpochPerPartition = mutable.Buffer.empty[(TopicAndPartition, LeaderIsrAndControllerEpoch)]
+    val failedElections = mutable.Map.empty[TopicPartition, Exception]
+    val leaderIsrAndControllerEpochPerPartition = mutable.Buffer.empty[(TopicPartition, LeaderIsrAndControllerEpoch)]
     getDataResponses.foreach { getDataResponse =>
-      val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
+      val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
       val currState = partitionState(partition)
       if (getDataResponse.resultCode == Code.OK) {
         val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
@@ -338,14 +335,14 @@ class PartitionStateMachine(config: KafkaConfig,
       val replicas = controllerContext.partitionReplicaAssignment(partition)
       val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
       controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
-      controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition.topic,
-        partition.partition, leaderIsrAndControllerEpoch, replicas, isNew = false)
+      controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
+        leaderIsrAndControllerEpoch, replicas, isNew = false)
     }
     (successfulUpdates.keys.toSeq, updatesToRetry, failedElections.toMap ++ failedUpdates)
   }
 
-  private def leaderForOffline(leaderIsrAndControllerEpochs: Seq[(TopicAndPartition, LeaderIsrAndControllerEpoch)]):
-  Seq[(TopicAndPartition, Option[LeaderAndIsr], Seq[Int])] = {
+  private def leaderForOffline(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]):
+  Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
     val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderIsrAndControllerEpochs.partition { case (partition, leaderIsrAndControllerEpoch) =>
       val liveInSyncReplicas = leaderIsrAndControllerEpoch.leaderAndIsr.isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
       liveInSyncReplicas.isEmpty
@@ -378,8 +375,8 @@ class PartitionStateMachine(config: KafkaConfig,
     }
   }
 
-  private def leaderForReassign(leaderIsrAndControllerEpochs: Seq[(TopicAndPartition, LeaderIsrAndControllerEpoch)]):
-  Seq[(TopicAndPartition, Option[LeaderAndIsr], Seq[Int])] = {
+  private def leaderForReassign(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]):
+  Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
     leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
       val reassignment = controllerContext.partitionsBeingReassigned(partition).newReplicas
       val liveReplicas = reassignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
@@ -390,8 +387,8 @@ class PartitionStateMachine(config: KafkaConfig,
     }
   }
 
-  private def leaderForPreferredReplica(leaderIsrAndControllerEpochs: Seq[(TopicAndPartition, LeaderIsrAndControllerEpoch)]):
-  Seq[(TopicAndPartition, Option[LeaderAndIsr], Seq[Int])] = {
+  private def leaderForPreferredReplica(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]):
+  Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
     leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
       val assignment = controllerContext.partitionReplicaAssignment(partition)
       val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
@@ -402,8 +399,8 @@ class PartitionStateMachine(config: KafkaConfig,
     }
   }
 
-  private def leaderForControlledShutdown(leaderIsrAndControllerEpochs: Seq[(TopicAndPartition, LeaderIsrAndControllerEpoch)], shuttingDownBrokers: Set[Int]):
-  Seq[(TopicAndPartition, Option[LeaderAndIsr], Seq[Int])] = {
+  private def leaderForControlledShutdown(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)], shuttingDownBrokers: Set[Int]):
+  Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
     leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
       val assignment = controllerContext.partitionReplicaAssignment(partition)
       val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
@@ -415,10 +412,10 @@ class PartitionStateMachine(config: KafkaConfig,
     }
   }
 
-  private def isValidTransition(partition: TopicAndPartition, targetState: PartitionState) =
+  private def isValidTransition(partition: TopicPartition, targetState: PartitionState) =
     targetState.validPreviousStates.contains(partitionState(partition))
 
-  private def logInvalidTransition(partition: TopicAndPartition, targetState: PartitionState): Unit = {
+  private def logInvalidTransition(partition: TopicPartition, targetState: PartitionState): Unit = {
     val currState = partitionState(partition)
     val e = new IllegalStateException("Partition %s should be in the %s states before moving to %s state"
       .format(partition, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state"
@@ -426,11 +423,11 @@ class PartitionStateMachine(config: KafkaConfig,
     logFailedStateChange(partition, currState, targetState, e)
   }
 
-  private def logFailedStateChange(partition: TopicAndPartition, currState: PartitionState, targetState: PartitionState, code: Code): Unit = {
+  private def logFailedStateChange(partition: TopicPartition, currState: PartitionState, targetState: PartitionState, code: Code): Unit = {
     logFailedStateChange(partition, currState, targetState, KeeperException.create(code))
   }
 
-  private def logFailedStateChange(partition: TopicAndPartition, currState: PartitionState, targetState: PartitionState, t: Throwable): Unit = {
+  private def logFailedStateChange(partition: TopicPartition, currState: PartitionState, targetState: PartitionState, t: Throwable): Unit = {
     stateChangeLogger.withControllerEpoch(controllerContext.epoch)
       .error("Controller %d epoch %d failed to change state for partition %s from %s to %s"
       .format(controllerId, controllerContext.epoch, partition, currState, targetState), t)