You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/23 20:20:53 UTC

[kafka] branch trunk updated: KAFKA-8371: Remove dependence on ReplicaManager from Partition (#6705)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3696b98  KAFKA-8371: Remove dependence on ReplicaManager from Partition (#6705)
3696b98 is described below

commit 3696b9882d0267871a31fc3df03da568b59433b7
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu May 23 13:20:39 2019 -0700

    KAFKA-8371: Remove dependence on ReplicaManager from Partition (#6705)
    
    This patch attempts to simplify the interaction between Partition and the various components from `ReplicaManager`. This is primarily to make unit testing easier. I have also tried to eliminate the OfflinePartition sentinel which has always been unsafe.
    
    Reviewers: Boyang Chen <bc...@outlook.com>, David Arthur <mu...@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 341 ++++++++++++-------
 .../src/main/scala/kafka/server/AdminManager.scala |   2 +-
 .../scala/kafka/server/DelayedDeleteRecords.scala  |  25 +-
 .../main/scala/kafka/server/DelayedOperation.scala |  15 +-
 .../main/scala/kafka/server/DelayedProduce.scala   |  15 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   4 +-
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |   4 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |  11 +-
 .../main/scala/kafka/server/ReplicaManager.scala   | 376 +++++++++++----------
 .../server/checkpoints/OffsetCheckpointFile.scala  |  15 +
 .../main/scala/kafka/utils/ReplicationUtils.scala  |   4 +-
 .../admin/ReassignPartitionsClusterTest.scala      |   2 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 308 ++++++++---------
 .../coordinator/group/GroupCoordinatorTest.scala   |  14 +-
 .../group/GroupMetadataManagerTest.scala           |   2 +-
 .../kafka/server/BrokerEpochIntegrationTest.scala  |   5 +-
 .../unit/kafka/server/DelayedOperationTest.scala   |   8 +-
 .../server/HighwatermarkPersistenceTest.scala      |   6 +-
 .../unit/kafka/server/ISRExpirationTest.scala      |   2 +-
 .../unit/kafka/server/LogDirFailureTest.scala      |   7 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |   3 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     |   4 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |  23 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala    |   2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  41 +--
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |   2 +-
 ...chDrivenReplicationProtocolAcceptanceTest.scala |   2 +-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  12 +-
 29 files changed, 696 insertions(+), 563 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 30ce756..256f1a0 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -16,7 +16,7 @@
  */
 package kafka.cluster
 
-import java.util.Optional
+import java.util.{Optional, Properties}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import com.yammer.metrics.core.Gauge
@@ -26,6 +26,7 @@ import kafka.controller.KafkaController
 import kafka.log._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server._
+import kafka.server.checkpoints.OffsetCheckpoints
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -42,19 +43,114 @@ import org.apache.kafka.common.utils.Time
 import scala.collection.JavaConverters._
 import scala.collection.Map
 
-object Partition {
+trait PartitionStateStore {
+  def fetchTopicConfig(): Properties
+  def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int]
+  def expandIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int]
+}
+
+class ZkPartitionStateStore(topicPartition: TopicPartition,
+                            zkClient: KafkaZkClient,
+                            replicaManager: ReplicaManager) extends PartitionStateStore {
+
+  override def fetchTopicConfig(): Properties = {
+    val adminZkClient = new AdminZkClient(zkClient)
+    adminZkClient.fetchEntityConfig(ConfigType.Topic, topicPartition.topic)
+  }
+
+  override def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] = {
+    val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr)
+    if (newVersionOpt.isDefined)
+      replicaManager.isrShrinkRate.mark()
+    newVersionOpt
+  }
+
+  override def expandIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] = {
+    val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr)
+    if (newVersionOpt.isDefined)
+      replicaManager.isrExpandRate.mark()
+    newVersionOpt
+  }
+
+  private def updateIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] = {
+    val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicPartition,
+      leaderAndIsr, controllerEpoch)
+
+    if (updateSucceeded) {
+      replicaManager.recordIsrChange(topicPartition)
+      Some(newVersion)
+    } else {
+      replicaManager.failedIsrUpdatesRate.mark()
+      None
+    }
+  }
+}
+
+class DelayedOperations(topicPartition: TopicPartition,
+                        produce: DelayedOperationPurgatory[DelayedProduce],
+                        fetch: DelayedOperationPurgatory[DelayedFetch],
+                        deleteRecords: DelayedOperationPurgatory[DelayedDeleteRecords]) {
+
+  def checkAndCompleteAll(): Unit = {
+    val requestKey = new TopicPartitionOperationKey(topicPartition)
+    fetch.checkAndComplete(requestKey)
+    produce.checkAndComplete(requestKey)
+    deleteRecords.checkAndComplete(requestKey)
+  }
+
+  def checkAndCompleteFetch(): Unit = {
+    fetch.checkAndComplete(new TopicPartitionOperationKey(topicPartition))
+  }
+
+  def checkAndCompleteProduce(): Unit = {
+    produce.checkAndComplete(new TopicPartitionOperationKey(topicPartition))
+  }
+
+  def checkAndCompleteDeleteRecords(): Unit = {
+    deleteRecords.checkAndComplete(new TopicPartitionOperationKey(topicPartition))
+  }
+
+  def numDelayedDelete: Int = deleteRecords.numDelayed
+
+  def numDelayedFetch: Int = fetch.numDelayed
+
+  def numDelayedProduce: Int = produce.numDelayed
+}
+
+object Partition extends KafkaMetricsGroup {
   def apply(topicPartition: TopicPartition,
             time: Time,
             replicaManager: ReplicaManager): Partition = {
+    val zkIsrBackingStore = new ZkPartitionStateStore(
+      topicPartition,
+      replicaManager.zkClient,
+      replicaManager)
+
+    val delayedOperations = new DelayedOperations(
+      topicPartition,
+      replicaManager.delayedProducePurgatory,
+      replicaManager.delayedFetchPurgatory,
+      replicaManager.delayedDeleteRecordsPurgatory)
+
     new Partition(topicPartition,
-      isOffline = false,
       replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs,
       interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion,
       localBrokerId = replicaManager.config.brokerId,
       time = time,
-      replicaManager = replicaManager,
-      logManager = replicaManager.logManager,
-      zkClient = replicaManager.zkClient)
+      stateStore = zkIsrBackingStore,
+      delayedOperations = delayedOperations,
+      metadataCache = replicaManager.metadataCache,
+      logManager = replicaManager.logManager)
+  }
+
+  def removeMetrics(topicPartition: TopicPartition): Unit = {
+    val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString)
+    removeMetric("UnderReplicated", tags)
+    removeMetric("UnderMinIsr", tags)
+    removeMetric("InSyncReplicasCount", tags)
+    removeMetric("ReplicasCount", tags)
+    removeMetric("LastStableOffsetLag", tags)
+    removeMetric("AtMinIsr", tags)
   }
 }
 
@@ -62,14 +158,14 @@ object Partition {
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
  */
 class Partition(val topicPartition: TopicPartition,
-                val isOffline: Boolean,
                 private val replicaLagTimeMaxMs: Long,
                 private val interBrokerProtocolVersion: ApiVersion,
                 private val localBrokerId: Int,
                 private val time: Time,
-                private val replicaManager: ReplicaManager,
-                private val logManager: LogManager,
-                private val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup {
+                private val stateStore: PartitionStateStore,
+                private val delayedOperations: DelayedOperations,
+                private val metadataCache: MetadataCache,
+                private val logManager: LogManager) extends HostedPartition with Logging with KafkaMetricsGroup {
 
   def topic: String = topicPartition.topic
   def partitionId: Int = topicPartition.partition
@@ -94,68 +190,65 @@ class Partition(val topicPartition: TopicPartition,
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch
   this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
 
-  private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == localBrokerId || replicaId == Request.FutureLocalReplicaId
+  private def isReplicaLocal(replicaId: Int): Boolean = replicaId == localBrokerId || replicaId == Request.FutureLocalReplicaId
 
   private val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
 
-  // Do not create metrics if this partition is ReplicaManager.OfflinePartition
-  if (!isOffline) {
-    newGauge("UnderReplicated",
-      new Gauge[Int] {
-        def value = {
-          if (isUnderReplicated) 1 else 0
-        }
-      },
-      tags
-    )
-
-    newGauge("InSyncReplicasCount",
-      new Gauge[Int] {
-        def value = {
-          if (isLeaderReplicaLocal) inSyncReplicas.size else 0
-        }
-      },
-      tags
-    )
-
-    newGauge("UnderMinIsr",
-      new Gauge[Int] {
-        def value = {
-          if (isUnderMinIsr) 1 else 0
-        }
-      },
-      tags
-    )
-
-    newGauge("AtMinIsr",
-      new Gauge[Int] {
-        def value = {
-          if (isAtMinIsr) 1 else 0
-        }
-      },
-      tags
-    )
-
-    newGauge("ReplicasCount",
-      new Gauge[Int] {
-        def value = {
-          if (isLeaderReplicaLocal) assignedReplicas.size else 0
-        }
-      },
-      tags
-    )
-
-    newGauge("LastStableOffsetLag",
-      new Gauge[Long] {
-        def value = {
-          leaderReplicaIfLocal.map { replica =>
-            replica.highWatermark.messageOffset - replica.lastStableOffset.messageOffset
-          }.getOrElse(0)
-        }
-      },
-      tags
-    )
-  }
+  newGauge("UnderReplicated",
+    new Gauge[Int] {
+      def value = {
+        if (isUnderReplicated) 1 else 0
+      }
+    },
+    tags
+  )
+
+  newGauge("InSyncReplicasCount",
+    new Gauge[Int] {
+      def value = {
+        if (isLeaderReplicaLocal) inSyncReplicas.size else 0
+      }
+    },
+    tags
+  )
+
+  newGauge("UnderMinIsr",
+    new Gauge[Int] {
+      def value = {
+        if (isUnderMinIsr) 1 else 0
+      }
+    },
+    tags
+  )
+
+  newGauge("AtMinIsr",
+    new Gauge[Int] {
+      def value = {
+        if (isAtMinIsr) 1 else 0
+      }
+    },
+    tags
+  )
+
+  newGauge("ReplicasCount",
+    new Gauge[Int] {
+      def value = {
+        if (isLeaderReplicaLocal) assignedReplicas.size else 0
+      }
+    },
+    tags
+  )
+
+  newGauge("LastStableOffsetLag",
+    new Gauge[Long] {
+      def value = {
+        leaderReplicaIfLocal.map { replica =>
+          replica.highWatermark.messageOffset - replica.lastStableOffset.messageOffset
+        }.getOrElse(0)
+      }
+    },
+    tags
+  )
 
   private def isLeaderReplicaLocal: Boolean = leaderReplicaIfLocal.isDefined
 
@@ -185,15 +278,17 @@ class Partition(val topicPartition: TopicPartition,
     * does not exist. This method assumes that the current replica has already been created.
     *
     * @param logDir log directory
+    * @param highWatermarkCheckpoints Checkpoint to load initial high watermark from
     * @return true iff the future replica is created
     */
-  def maybeCreateFutureReplica(logDir: String): Boolean = {
+  def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
     // The writeLock is needed to make sure that while the caller checks the log directory of the
     // current replica and the existence of the future replica, no other thread can update the log directory of the
     // current replica or remove the future replica.
     inWriteLock(leaderIsrUpdateLock) {
       val currentReplica = localReplicaOrException
-      if (currentReplica.log.get.dir.getParent == logDir)
+      val currentLog = currentReplica.log.get
+      if (currentLog.dir.getParent == logDir)
         false
       else {
         futureLocalReplica match {
@@ -204,26 +299,25 @@ class Partition(val topicPartition: TopicPartition,
                 s"different from the requested log dir $logDir")
             false
           case None =>
-            getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false)
+            getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false, highWatermarkCheckpoints)
             true
         }
       }
     }
   }
 
-  def getOrCreateReplica(replicaId: Int, isNew: Boolean = false): Replica = {
+  def getOrCreateReplica(replicaId: Int, isNew: Boolean, offsetCheckpoints: OffsetCheckpoints): Replica = {
     allReplicasMap.getAndMaybePut(replicaId, {
       if (isReplicaLocal(replicaId)) {
-        val adminZkClient = new AdminZkClient(zkClient)
-        val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
+        val props = stateStore.fetchTopicConfig()
         val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
         val log = logManager.getOrCreateLog(topicPartition, config, isNew, replicaId == Request.FutureLocalReplicaId)
-        val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
-        val offsetMap = checkpoint.read()
-        if (!offsetMap.contains(topicPartition))
+        val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse {
           info(s"No checkpointed highwatermark is found for partition $topicPartition")
-        val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset)
-        new Replica(replicaId, topicPartition, time, offset, Some(log))
+          0L
+        }
+        val initialHighWatermark = math.min(checkpointHighWatermark, log.logEndOffset)
+        new Replica(replicaId, topicPartition, time, initialHighWatermark, Some(log))
       } else new Replica(replicaId, topicPartition, time)
     })
   }
@@ -300,6 +394,7 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  // Visible for testing
   def addReplicaIfNotExists(replica: Replica): Replica =
     allReplicasMap.putIfNotExists(replica.brokerId, replica)
 
@@ -370,7 +465,7 @@ class Partition(val topicPartition: TopicPartition,
       inSyncReplicas = Set.empty[Replica]
       leaderReplicaIdOpt = None
       leaderEpochStartOffsetOpt = None
-      removePartitionMetrics()
+      Partition.removeMetrics(topicPartition)
       logManager.asyncDelete(topicPartition)
       if (logManager.getLog(topicPartition, isFuture = true).isDefined)
         logManager.asyncDelete(topicPartition, isFuture = true)
@@ -384,18 +479,23 @@ class Partition(val topicPartition: TopicPartition,
    * from the time when this broker was the leader last time) and setting the new leader and ISR.
    * If the leader replica id does not change, return false to indicate the replica manager.
    */
-  def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
+  def makeLeader(controllerId: Int,
+                 partitionStateInfo: LeaderAndIsrRequest.PartitionState,
+                 correlationId: Int,
+                 highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
       val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
       // add replicas that are new
-      val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet
+      val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map {
+        id => getOrCreateReplica(id, partitionStateInfo.isNew, highWatermarkCheckpoints)
+      }.toSet
       // remove assigned replicas that have been removed by the controller
       (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
       inSyncReplicas = newInSyncReplicas
-      newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew))
+      newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew, highWatermarkCheckpoints))
 
       val leaderReplica = localReplicaOrException
       val leaderEpochStartOffset = leaderReplica.logEndOffset
@@ -448,7 +548,10 @@ class Partition(val topicPartition: TopicPartition,
    *  greater (that is, no updates have been missed), return false to indicate to the
     * replica manager that state is already correct and the become-follower steps can be skipped
    */
-  def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
+  def makeFollower(controllerId: Int,
+                   partitionStateInfo: LeaderAndIsrRequest.PartitionState,
+                   correlationId: Int,
+                   highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
       val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
       val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
@@ -457,7 +560,7 @@ class Partition(val topicPartition: TopicPartition,
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
       // add replicas that are new
-      newAssignedReplicas.foreach(r => getOrCreateReplica(r, partitionStateInfo.isNew))
+      newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew, highWatermarkCheckpoints))
       // remove assigned replicas that have been removed by the controller
       (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
       inSyncReplicas = Set.empty[Replica]
@@ -483,9 +586,9 @@ class Partition(val topicPartition: TopicPartition,
   def updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean = {
     val replicaId = replica.brokerId
     // No need to calculate low watermark if there is no delayed DeleteRecordsRequest
-    val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L
+    val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
     replica.updateLogReadResult(logReadResult)
-    val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L
+    val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
     // check if the LW of the partition has incremented
     // since the replica's logStartOffset may have incremented
     val leaderLWIncremented = newLeaderLW > oldLeaderLW
@@ -533,9 +636,9 @@ class Partition(val topicPartition: TopicPartition,
             val newInSyncReplicas = inSyncReplicas + replica
             info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} " +
               s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")
+
             // update ISR in ZK and cache
-            updateIsr(newInSyncReplicas)
-            replicaManager.isrExpandRate.mark()
+            expandIsr(newInSyncReplicas)
           }
           // check if the HW of the partition can now be incremented
           // since the replica may already be in the ISR and its LEO has just incremented
@@ -634,7 +737,7 @@ class Partition(val topicPartition: TopicPartition,
     if (!isLeaderReplicaLocal)
       throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
     val logStartOffsets = allReplicas.collect {
-      case replica if replicaManager.metadataCache.isBrokerAlive(replica.brokerId) || replica.brokerId == Request.FutureLocalReplicaId => replica.logStartOffset
+      case replica if metadataCache.isBrokerAlive(replica.brokerId) || replica.brokerId == Request.FutureLocalReplicaId => replica.logStartOffset
     }
     CoreUtils.min(logStartOffsets, 0L)
   }
@@ -642,12 +745,7 @@ class Partition(val topicPartition: TopicPartition,
   /**
    * Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
    */
-  private def tryCompleteDelayedRequests() {
-    val requestKey = new TopicPartitionOperationKey(topicPartition)
-    replicaManager.tryCompleteDelayedFetch(requestKey)
-    replicaManager.tryCompleteDelayedProduce(requestKey)
-    replicaManager.tryCompleteDelayedDeleteRecords(requestKey)
-  }
+  private def tryCompleteDelayedRequests(): Unit = delayedOperations.checkAndCompleteAll()
 
   def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
     val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
@@ -669,8 +767,7 @@ class Partition(val topicPartition: TopicPartition,
             )
 
             // update ISR in zk and in cache
-            updateIsr(newInSyncReplicas)
-            replicaManager.isrShrinkRate.mark()
+            shrinkIsr(newInSyncReplicas)
 
             // we may need to increment high watermark since ISR could be down to 1
             maybeIncrementLeaderHW(leaderReplica)
@@ -785,7 +882,7 @@ class Partition(val topicPartition: TopicPartition,
       tryCompleteDelayedRequests()
     else {
       // probably unblock some follower fetch requests since log end offset has been updated
-      replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(topicPartition))
+      delayedOperations.checkAndCompleteFetch()
     }
 
     info
@@ -1015,41 +1112,37 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def updateIsr(newIsr: Set[Replica]) {
+  private def expandIsr(newIsr: Set[Replica]): Unit = {
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)
-    val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicPartition, newLeaderAndIsr,
-      controllerEpoch)
+    val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
+    maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+  }
 
-    if (updateSucceeded) {
-      replicaManager.recordIsrChange(topicPartition)
-      inSyncReplicas = newIsr
-      zkVersion = newVersion
-      trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
-    } else {
-      replicaManager.failedIsrUpdatesRate.mark()
-      info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
-    }
+  private def shrinkIsr(newIsr: Set[Replica]): Unit = {
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)
+    val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
+    maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
   }
 
-  /**
-   * remove deleted log metrics
-   */
-  def removePartitionMetrics() {
-    removeMetric("UnderReplicated", tags)
-    removeMetric("UnderMinIsr", tags)
-    removeMetric("InSyncReplicasCount", tags)
-    removeMetric("ReplicasCount", tags)
-    removeMetric("LastStableOffsetLag", tags)
-    removeMetric("AtMinIsr", tags)
+  private def maybeUpdateIsrAndVersion(isr: Set[Replica], zkVersionOpt: Option[Int]): Unit = {
+    zkVersionOpt match {
+      case Some(newVersion) =>
+        inSyncReplicas = isr
+        zkVersion = newVersion
+        info("ISR updated to [%s] and zkVersion updated to [%d]".format(isr.mkString(","), zkVersion))
+
+      case None =>
+        info(s"Cached zkVersion $zkVersion not equal to that in zookeeper, skip updating ISR")
+    }
   }
 
   override def equals(that: Any): Boolean = that match {
-    case other: Partition => partitionId == other.partitionId && topic == other.topic && isOffline == other.isOffline
+    case other: Partition => partitionId == other.partitionId && topic == other.topic
     case _ => false
   }
 
   override def hashCode: Int =
-    31 + topic.hashCode + 17 * partitionId + (if (isOffline) 1 else 0)
+    31 + topic.hashCode + 17 * partitionId
 
   override def toString(): String = {
     val partitionString = new StringBuilder
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index d424700..85d272c 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -60,7 +60,7 @@ class AdminManager(val config: KafkaConfig,
   private val alterConfigPolicy =
     Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
 
-  def hasDelayedTopicOperations = topicPurgatory.delayed != 0
+  def hasDelayedTopicOperations = topicPurgatory.numDelayed != 0
 
   /**
     * Try to complete delayed topic operations with the request key
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index a977d9a..dac9f79 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -20,6 +20,7 @@ package kafka.server
 
 import java.util.concurrent.TimeUnit
 
+import kafka.cluster.Partition
 import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.TopicPartition
@@ -73,19 +74,19 @@ class DelayedDeleteRecords(delayMs: Long,
       // skip those partitions that have already been satisfied
       if (status.acksPending) {
         val (lowWatermarkReached, error, lw) = replicaManager.getPartition(topicPartition) match {
-          case Some(partition) =>
-            if (partition eq ReplicaManager.OfflinePartition) {
-              (false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
-            } else {
-              partition.leaderReplicaIfLocal match {
-                case Some(_) =>
-                  val leaderLW = partition.lowWatermarkIfLeader
-                  (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
-                case None =>
-                  (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
-              }
+          case partition: Partition =>
+            partition.leaderReplicaIfLocal match {
+              case Some(_) =>
+                val leaderLW = partition.lowWatermarkIfLeader
+                (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
+              case None =>
+                (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
             }
-          case None =>
+
+          case HostedPartition.Offline =>
+            (false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+
+          case HostedPartition.None =>
             (false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
         }
         if (error != Errors.NONE || lowWatermarkReached) {
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index eb20e6d..33187bb 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -44,7 +44,8 @@ import scala.collection.mutable.ListBuffer
  * A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
  */
 abstract class DelayedOperation(override val delayMs: Long,
-    lockOpt: Option[Lock] = None) extends TimerTask with Logging {
+                                lockOpt: Option[Lock] = None)
+  extends TimerTask with Logging {
 
   private val completed = new AtomicBoolean(false)
   private val tryCompletePending = new AtomicBoolean(false)
@@ -209,7 +210,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
   newGauge(
     "NumDelayedOperations",
     new Gauge[Int] {
-      def value: Int = delayed
+      def value: Int = numDelayed
     },
     metricsTags
   )
@@ -288,10 +289,12 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
   def checkAndComplete(key: Any): Int = {
     val wl = watcherList(key)
     val watchers = inLock(wl.watchersLock) { wl.watchersByKey.get(key) }
-    if(watchers == null)
+    val numCompleted = if (watchers == null)
       0
     else
       watchers.tryCompleteWatched()
+    debug(s"Request key $key unblocked $numCompleted $purgatoryName operations")
+    numCompleted
   }
 
   /**
@@ -306,7 +309,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
   /**
    * Return the number of delayed operations in the expiry queue
    */
-  def delayed: Int = timeoutTimer.size
+  def numDelayed: Int = timeoutTimer.size
 
   /**
     * Cancel watching on any delayed operations for the given key. Note the operation will not be completed
@@ -435,11 +438,11 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
     // Trigger a purge if the number of completed but still being watched operations is larger than
     // the purge threshold. That number is computed by the difference btw the estimated total number of
     // operations and the number of pending delayed operations.
-    if (estimatedTotalOperations.get - delayed > purgeInterval) {
+    if (estimatedTotalOperations.get - numDelayed > purgeInterval) {
       // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
       // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
       // a little overestimated total number of operations.
-      estimatedTotalOperations.getAndSet(delayed)
+      estimatedTotalOperations.getAndSet(numDelayed)
       debug("Begin purging watch lists")
       val purged = watcherLists.foldLeft(0) {
         case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index dbecba4..1570d4b 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -22,9 +22,9 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.Lock
 
 import com.yammer.metrics.core.Meter
+import kafka.cluster.Partition
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Pool
-
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -88,12 +88,13 @@ class DelayedProduce(delayMs: Long,
       // skip those partitions that have already been satisfied
       if (status.acksPending) {
         val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
-          case Some(partition) =>
-            if (partition eq ReplicaManager.OfflinePartition)
-              (false, Errors.KAFKA_STORAGE_ERROR)
-            else
-              partition.checkEnoughReplicasReachOffset(status.requiredOffset)
-          case None =>
+          case partition: Partition  =>
+            partition.checkEnoughReplicasReachOffset(status.requiredOffset)
+
+          case HostedPartition.Offline =>
+            (false, Errors.KAFKA_STORAGE_ERROR)
+
+          case HostedPartition.None =>
             // Case A
             (false, Errors.UNKNOWN_TOPIC_OR_PARTITION)
         }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d362d64..9cedb01 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -263,8 +263,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
       if (replicaManager.hasDelayedElectionOperations) {
-        updateMetadataRequest.partitionStates.asScala.foreach { case (tp, ps) =>
-          replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp.topic(), tp.partition()))
+        updateMetadataRequest.partitionStates.asScala.foreach { case (tp, _) =>
+          replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp))
         }
       }
       sendResponseExemptThrottle(request, new UpdateMetadataResponse(Errors.NONE))
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 4312a92..0622b30 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -101,8 +101,8 @@ class ReplicaAlterLogDirsThread(name: String,
   override def processPartitionData(topicPartition: TopicPartition,
                                     fetchOffset: Long,
                                     partitionData: PartitionData[Records]): Option[LogAppendInfo] = {
-    val futureReplica = replicaMgr.futureLocalReplicaOrException(topicPartition)
-    val partition = replicaMgr.getPartition(topicPartition).get
+    val partition = replicaMgr.nonOfflinePartition(topicPartition).get
+    val futureReplica = partition.futureLocalReplicaOrException
     val records = toMemoryRecords(partitionData.records)
 
     if (fetchOffset != futureReplica.logEndOffset)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index ab5be6e..b1b5dd0 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -143,8 +143,8 @@ class ReplicaFetcherThread(name: String,
   override def processPartitionData(topicPartition: TopicPartition,
                                     fetchOffset: Long,
                                     partitionData: FetchData): Option[LogAppendInfo] = {
-    val replica = replicaMgr.localReplicaOrException(topicPartition)
-    val partition = replicaMgr.getPartition(topicPartition).get
+    val partition = replicaMgr.nonOfflinePartition(topicPartition).get
+    val replica = partition.localReplicaOrException
     val records = toMemoryRecords(partitionData.records)
 
     maybeWarnIfOversizedRecords(records, topicPartition)
@@ -277,8 +277,9 @@ class ReplicaFetcherThread(name: String,
    * The logic for finding the truncation offset is implemented in AbstractFetcherThread.getOffsetTruncationState
    */
   override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = {
-    val replica = replicaMgr.localReplicaOrException(tp)
-    val partition = replicaMgr.getPartition(tp).get
+    val partition = replicaMgr.nonOfflinePartition(tp).get
+    val replica = partition.localReplicaOrException
+
     partition.truncateTo(offsetTruncationState.offset, isFuture = false)
 
     if (offsetTruncationState.offset < replica.highWatermark.messageOffset)
@@ -292,7 +293,7 @@ class ReplicaFetcherThread(name: String,
   }
 
   override protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = {
-    val partition = replicaMgr.getPartition(topicPartition).get
+    val partition = replicaMgr.nonOfflinePartition(topicPartition).get
     partition.truncateFullyAndStartAt(offset, isFuture = false)
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2023a97..54d35ef 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -22,14 +22,14 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import java.util.concurrent.locks.Lock
 
-import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.api._
 import kafka.cluster.{BrokerEndPoint, Partition, Replica}
 import kafka.controller.{KafkaController, StateChangeLogger}
 import kafka.log._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
-import kafka.server.checkpoints.OffsetCheckpointFile
+import kafka.server.checkpoints.{OffsetCheckpointFile, OffsetCheckpoints, SimpleOffsetCheckpoints}
 import kafka.utils._
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.errors._
@@ -118,19 +118,30 @@ object LogReadResult {
                                            lastStableOffset = None)
 }
 
+/**
+ * Trait to represent the state of hosted partitions. We create a concrete (active) Partition
+ * instance when the broker receives a LeaderAndIsr request from the controller indicating
+ * that it should be either a leader or follower of a partition.
+ */
+trait HostedPartition
+
+object HostedPartition {
+  /**
+   * This broker does not have any state for this partition locally.
+   */
+  object None extends HostedPartition
+
+  /**
+   * This broker hosts the partition, but it is in an offline log directory.
+   */
+  object Offline extends HostedPartition
+}
+
+
 object ReplicaManager {
   val HighWatermarkFilename = "replication-offset-checkpoint"
   val IsrChangePropagationBlackOut = 5000L
   val IsrChangePropagationInterval = 60000L
-  val OfflinePartition: Partition = new Partition(new TopicPartition("", -1),
-    isOffline = true,
-    replicaLagTimeMaxMs = 0L,
-    interBrokerProtocolVersion = ApiVersion.latestVersion,
-    localBrokerId = -1,
-    time = null,
-    replicaManager = null,
-    logManager = null,
-    zkClient = null)
 }
 
 class ReplicaManager(val config: KafkaConfig,
@@ -181,7 +192,7 @@ class ReplicaManager(val config: KafkaConfig,
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch
   private val localBrokerId = config.brokerId
-  private val allPartitions = new Pool[TopicPartition, Partition](valueFactory = Some(tp =>
+  private val allPartitions = new Pool[TopicPartition, HostedPartition](valueFactory = Some(tp =>
     Partition(tp, time, this)))
   private val replicaStateChangeLock = new Object
   val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower)
@@ -226,7 +237,7 @@ class ReplicaManager(val config: KafkaConfig,
   val offlineReplicaCount = newGauge(
     "OfflineReplicaCount",
     new Gauge[Int] {
-      def value = offlinePartitionsIterator.size
+      def value = offlinePartitionCount
     }
   )
   val underReplicatedPartitions = newGauge(
@@ -248,9 +259,9 @@ class ReplicaManager(val config: KafkaConfig,
     }
   )
 
-  val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
-  val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
-  val failedIsrUpdatesRate = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS)
+  val isrExpandRate: Meter = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
+  val isrShrinkRate: Meter = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
+  val failedIsrUpdatesRate: Meter = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS)
 
   def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count(_.isUnderReplicated)
 
@@ -295,40 +306,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   def getLog(topicPartition: TopicPartition): Option[Log] = logManager.getLog(topicPartition)
 
-  /**
-   * Try to complete some delayed produce requests with the request key;
-   * this can be triggered when:
-   *
-   * 1. The partition HW has changed (for acks = -1)
-   * 2. A follower replica's fetch operation is received (for acks > 1)
-   */
-  def tryCompleteDelayedProduce(key: DelayedOperationKey) {
-    val completed = delayedProducePurgatory.checkAndComplete(key)
-    debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed))
-  }
-
-  /**
-   * Try to complete some delayed fetch requests with the request key;
-   * this can be triggered when:
-   *
-   * 1. The partition HW has changed (for regular fetch)
-   * 2. A new message set is appended to the local log (for follower fetch)
-   */
-  def tryCompleteDelayedFetch(key: DelayedOperationKey) {
-    val completed = delayedFetchPurgatory.checkAndComplete(key)
-    debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed))
-  }
-
-  /**
-   * Try to complete some delayed DeleteRecordsRequest with the request key;
-   * this needs to be triggered when the partition low watermark has changed
-   */
-  def tryCompleteDelayedDeleteRecords(key: DelayedOperationKey) {
-    val completed = delayedDeleteRecordsPurgatory.checkAndComplete(key)
-    debug("Request key %s unblocked %d DeleteRecordsRequest.".format(key.keyLabel, completed))
-  }
-
-  def hasDelayedElectionOperations = delayedElectPreferredLeaderPurgatory.delayed != 0
+  def hasDelayedElectionOperations: Boolean = delayedElectPreferredLeaderPurgatory.numDelayed != 0
 
   def tryCompleteElection(key: DelayedOperationKey): Unit = {
     val completed = delayedElectPreferredLeaderPurgatory.checkAndComplete(key)
@@ -350,24 +328,33 @@ class ReplicaManager(val config: KafkaConfig,
     logDirFailureHandler.start()
   }
 
+  private def maybeRemoveTopicMetrics(topic: String): Unit = {
+    val topicHasOnlinePartition = allPartitions.values.exists {
+      case partition: Partition => topic == partition.topic
+      case _ => false
+    }
+    if (!topicHasOnlinePartition)
+      brokerTopicStats.removeMetrics(topic)
+  }
+
   def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean)  = {
     stateChangeLogger.trace(s"Handling stop replica (delete=$deletePartition) for partition $topicPartition")
 
     if (deletePartition) {
-      val removedPartition = allPartitions.remove(topicPartition)
-      if (removedPartition eq ReplicaManager.OfflinePartition) {
-        allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
-        throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk")
-      }
+      getPartition(topicPartition) match {
+        case HostedPartition.Offline =>
+          throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk")
+
+        case removedPartition: Partition =>
+          if (allPartitions.remove(topicPartition, removedPartition)) {
+            maybeRemoveTopicMetrics(topicPartition.topic)
+            // this will delete the local log. This call may throw exception if the log is on offline directory
+            removedPartition.delete()
+          }
 
-      if (removedPartition != null) {
-        val topicHasPartitions = allPartitions.values.exists(partition => topicPartition.topic == partition.topic)
-        if (!topicHasPartitions)
-          brokerTopicStats.removeMetrics(topicPartition.topic)
-        // this will delete the local log. This call may throw exception if the log is on offline directory
-        removedPartition.delete()
-      } else {
-        stateChangeLogger.trace(s"Ignoring stop replica (delete=$deletePartition) for partition $topicPartition as replica doesn't exist on broker")
+        case HostedPartition.None =>
+          stateChangeLogger.trace(s"Ignoring stop replica (delete=$deletePartition) for partition " +
+            s"$topicPartition as replica doesn't exist on broker")
       }
 
       // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
@@ -409,35 +396,46 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def getOrCreatePartition(topicPartition: TopicPartition): Partition =
-    allPartitions.getAndMaybePut(topicPartition)
+  def getPartition(topicPartition: TopicPartition): HostedPartition = {
+    Option(allPartitions.get(topicPartition)).getOrElse(HostedPartition.None)
+  }
 
-  def getPartition(topicPartition: TopicPartition): Option[Partition] =
-    Option(allPartitions.get(topicPartition))
+  // Visible for testing
+  def createPartition(topicPartition: TopicPartition): Partition = {
+    val partition = Partition(topicPartition, time, this)
+    allPartitions.put(topicPartition, partition)
+    partition
+  }
 
-  def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] =
-    getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition)
+  def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] = {
+    getPartition(topicPartition) match {
+      case partition: Partition => Some(partition)
+      case _ => None
+    }
+  }
 
   // An iterator over all non offline partitions. This is a weakly consistent iterator; a partition made offline after
   // the iterator has been constructed could still be returned by this iterator.
-  private def nonOfflinePartitionsIterator: Iterator[Partition] =
-    allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition)
-
-  // An iterator over all offline partitions. This is a weakly consistent iterator; a partition made offline after the
-  // iterator has been constructed may not be visible.
-  private def offlinePartitionsIterator: Iterator[Partition] =
-    allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition)
+  private def nonOfflinePartitionsIterator: Iterator[Partition] = {
+    allPartitions.values.iterator.flatMap {
+      case p: Partition => Some(p)
+      case _ => None
+    }
+  }
 
+  private def offlinePartitionCount: Int = {
+    allPartitions.values.iterator.count(_ == HostedPartition.Offline)
+  }
 
   def getPartitionOrException(topicPartition: TopicPartition, expectLeader: Boolean): Partition = {
     getPartition(topicPartition) match {
-      case Some(partition) =>
-        if (partition eq ReplicaManager.OfflinePartition)
-          throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory")
-        else
-          partition
+      case partition: Partition =>
+        partition
 
-      case None if metadataCache.contains(topicPartition) =>
+      case HostedPartition.Offline =>
+        throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory")
+
+      case HostedPartition.None if metadataCache.contains(topicPartition) =>
         if (expectLeader) {
           // The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER which
           // forces clients to refresh metadata to find the new location. This can happen, for example,
@@ -448,7 +446,7 @@ class ReplicaManager(val config: KafkaConfig,
           throw new ReplicaNotAvailableException(s"Partition $topicPartition is not available")
         }
 
-      case None =>
+      case HostedPartition.None =>
         throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist")
     }
   }
@@ -583,15 +581,17 @@ class ReplicaManager(val config: KafkaConfig,
           if (!logManager.isLogDirOnline(destinationDir))
             throw new KafkaStorageException(s"Log directory $destinationDir is offline")
 
-          getPartition(topicPartition).foreach { partition =>
-            if (partition eq ReplicaManager.OfflinePartition)
+          getPartition(topicPartition) match {
+            case partition: Partition =>
+              // Stop current replica movement if the destinationDir is different from the existing destination log directory
+              if (partition.futureReplicaDirChanged(destinationDir)) {
+                replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition))
+                partition.removeFutureLocalReplica()
+              }
+            case HostedPartition.Offline =>
               throw new KafkaStorageException(s"Partition $topicPartition is offline")
 
-            // Stop current replica movement if the destinationDir is different from the existing destination log directory
-            if (partition.futureReplicaDirChanged(destinationDir)) {
-              replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition))
-              partition.removeFutureLocalReplica()
-            }
+            case _ => // Do nothing
           }
 
           // If the log for this partition has not been created yet:
@@ -609,7 +609,8 @@ class ReplicaManager(val config: KafkaConfig,
           //   start ReplicaAlterDirThread to move data of this partition from the current log to the future log
           // - Otherwise, return KafkaStorageException. We do not create the future log while there is offline log directory
           //   so that we can avoid creating future log for the same partition in multiple log directories.
-          if (partition.maybeCreateFutureReplica(destinationDir)) {
+          val highWatermarkCheckpoints = new SimpleOffsetCheckpoints(this.highWatermarkCheckpoints)
+          if (partition.maybeCreateFutureReplica(destinationDir, highWatermarkCheckpoints)) {
             val futureReplica = futureLocalReplicaOrException(topicPartition)
             logManager.abortAndPauseCleaning(topicPartition)
 
@@ -779,10 +780,8 @@ class ReplicaManager(val config: KafkaConfig,
             (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
           case t: Throwable =>
             val logStartOffset = getPartition(topicPartition) match {
-              case Some(partition) =>
-                partition.logStartOffset
-              case _ =>
-                -1
+              case partition: Partition => partition.logStartOffset
+              case _ => -1L
             }
             brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
             brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
@@ -1060,43 +1059,52 @@ class ReplicaManager(val config: KafkaConfig,
         val newPartitions = new mutable.HashSet[Partition]
 
         leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
-          val partition = getPartition(topicPartition).getOrElse {
-            val createdPartition = getOrCreatePartition(topicPartition)
-            newPartitions.add(createdPartition)
-            createdPartition
+          val partitionOpt = getPartition(topicPartition) match {
+            case HostedPartition.Offline =>
+              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
+                s"controller $controllerId with correlation id $correlationId " +
+                s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
+                "partition is in an offline log directory")
+              responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
+              None
+
+            case partition: Partition => Some(partition)
+
+            case HostedPartition.None =>
+              val partition = Partition(topicPartition, time, this)
+              allPartitions.putIfNotExists(topicPartition, partition)
+              newPartitions.add(partition)
+              Some(partition)
           }
-          val currentLeaderEpoch = partition.getLeaderEpoch
-          val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch
-          if (partition eq ReplicaManager.OfflinePartition) {
-            stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
-              s"controller $controllerId with correlation id $correlationId " +
-              s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
-              "partition is in an offline log directory")
-            responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
-          } else if (requestLeaderEpoch > currentLeaderEpoch) {
-            // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-            // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-            if(stateInfo.basePartitionState.replicas.contains(localBrokerId))
-              partitionState.put(partition, stateInfo)
-            else {
-              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-                s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
-                s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
-              responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+
+          partitionOpt.foreach { partition =>
+            val currentLeaderEpoch = partition.getLeaderEpoch
+            val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch
+            if (requestLeaderEpoch > currentLeaderEpoch) {
+              // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
+              // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
+              if (stateInfo.basePartitionState.replicas.contains(localBrokerId))
+                partitionState.put(partition, stateInfo)
+              else {
+                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
+                  s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
+                  s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
+                responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              }
+            } else if (requestLeaderEpoch < currentLeaderEpoch) {
+              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
+                s"controller $controllerId with correlation id $correlationId " +
+                s"epoch $controllerEpoch for partition $topicPartition since its associated " +
+                s"leader epoch $requestLeaderEpoch is smaller than the current " +
+                s"leader epoch $currentLeaderEpoch")
+              responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+            } else {
+              stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " +
+                s"controller $controllerId with correlation id $correlationId " +
+                s"epoch $controllerEpoch for partition $topicPartition since its associated " +
+                s"leader epoch $requestLeaderEpoch matches the current leader epoch")
+              responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
             }
-          } else if (requestLeaderEpoch < currentLeaderEpoch) {
-            stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
-              s"controller $controllerId with correlation id $correlationId " +
-              s"epoch $controllerEpoch for partition $topicPartition since its associated " +
-              s"leader epoch $requestLeaderEpoch is smaller than the current " +
-              s"leader epoch $currentLeaderEpoch")
-            responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
-          } else {
-            stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " +
-              s"controller $controllerId with correlation id $correlationId " +
-              s"epoch $controllerEpoch for partition $topicPartition since its associated " +
-              s"leader epoch $requestLeaderEpoch matches the current leader epoch")
-            responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
           }
         }
 
@@ -1105,12 +1113,15 @@ class ReplicaManager(val config: KafkaConfig,
         }
         val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
 
+        val highWatermarkCheckpoints = new SimpleOffsetCheckpoints(this.highWatermarkCheckpoints)
         val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
-          makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
+          makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap,
+            highWatermarkCheckpoints)
         else
           Set.empty[Partition]
         val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
-          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap)
+          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
+            highWatermarkCheckpoints)
         else
           Set.empty[Partition]
 
@@ -1121,10 +1132,11 @@ class ReplicaManager(val config: KafkaConfig,
            * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
            * we need to map this topic-partition to OfflinePartition instead.
            */
-          if (localReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition))
-            allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
+          if (localReplica(topicPartition).isEmpty)
+            markPartitionOffline(topicPartition)
         }
 
+
         // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
         // have been completely populated before starting the checkpointing there by avoiding weird race conditions
         if (!hwThreadInitialized) {
@@ -1140,7 +1152,7 @@ class ReplicaManager(val config: KafkaConfig,
               val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
               // Add future replica to partition's map
-              partition.getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false)
+              partition.getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false, highWatermarkCheckpoints)
 
               // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
               // replica from source dir to destination dir
@@ -1175,13 +1187,14 @@ class ReplicaManager(val config: KafkaConfig,
    *  TODO: the above may need to be fixed later
    */
   private def makeLeaders(controllerId: Int,
-                          epoch: Int,
+                          controllerEpoch: Int,
                           partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
                           correlationId: Int,
-                          responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = {
+                          responseMap: mutable.Map[TopicPartition, Errors],
+                          highWatermarkCheckpoints: OffsetCheckpoints): Set[Partition] = {
     partitionState.keys.foreach { partition =>
       stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from " +
-        s"controller $controllerId epoch $epoch starting the become-leader transition for " +
+        s"controller $controllerId epoch $controllerEpoch starting the become-leader transition for " +
         s"partition ${partition.topicPartition}")
     }
 
@@ -1196,20 +1209,20 @@ class ReplicaManager(val config: KafkaConfig,
       // Update the partition information to be the leader
       partitionState.foreach{ case (partition, partitionStateInfo) =>
         try {
-          if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) {
+          if (partition.makeLeader(controllerId, partitionStateInfo, correlationId, highWatermarkCheckpoints)) {
             partitionsToMakeLeaders += partition
             stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " +
-              s"controller $controllerId epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} " +
+              s"controller $controllerId epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} " +
               s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch})")
           } else
             stateChangeLogger.info(s"Skipped the become-leader state change after marking its " +
-              s"partition as leader with correlation id $correlationId from controller $controllerId epoch $epoch for " +
+              s"partition as leader with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
               s"partition ${partition.topicPartition} (last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
               s"since it is already the leader for the partition.")
         } catch {
           case e: KafkaStorageException =>
             stateChangeLogger.error(s"Skipped the become-leader state change with " +
-              s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+              s"correlation id $correlationId from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
               s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) since " +
               s"the replica for the partition is offline due to disk error $e")
             val dirOpt = getLogDir(partition.topicPartition)
@@ -1222,7 +1235,7 @@ class ReplicaManager(val config: KafkaConfig,
       case e: Throwable =>
         partitionState.keys.foreach { partition =>
           stateChangeLogger.error(s"Error while processing LeaderAndIsr request correlationId $correlationId received " +
-            s"from controller $controllerId epoch $epoch for partition ${partition.topicPartition}", e)
+            s"from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition}", e)
         }
         // Re-throw the exception for it to be caught in KafkaApis
         throw e
@@ -1230,7 +1243,7 @@ class ReplicaManager(val config: KafkaConfig,
 
     partitionState.keys.foreach { partition =>
       stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
-        s"epoch $epoch for the become-leader transition for partition ${partition.topicPartition}")
+        s"epoch $controllerEpoch for the become-leader transition for partition ${partition.topicPartition}")
     }
 
     partitionsToMakeLeaders
@@ -1255,13 +1268,14 @@ class ReplicaManager(val config: KafkaConfig,
    * return the set of partitions that are made follower due to this method
    */
   private def makeFollowers(controllerId: Int,
-                            epoch: Int,
+                            controllerEpoch: Int,
                             partitionStates: Map[Partition, LeaderAndIsrRequest.PartitionState],
                             correlationId: Int,
-                            responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = {
+                            responseMap: mutable.Map[TopicPartition, Errors],
+                            highWatermarkCheckpoints: OffsetCheckpoints) : Set[Partition] = {
     partitionStates.foreach { case (partition, partitionState) =>
       stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
-        s"epoch $epoch starting the become-follower transition for partition ${partition.topicPartition} with leader " +
+        s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " +
         s"${partitionState.basePartitionState.leader}")
     }
 
@@ -1269,7 +1283,6 @@ class ReplicaManager(val config: KafkaConfig,
       responseMap.put(partition.topicPartition, Errors.NONE)
 
     val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
-
     try {
       // TODO: Delete leaders from LeaderAndIsrRequest
       partitionStates.foreach { case (partition, partitionStateInfo) =>
@@ -1278,11 +1291,11 @@ class ReplicaManager(val config: KafkaConfig,
           metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
             // Only change partition state when the leader is available
             case Some(_) =>
-              if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
+              if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, highWatermarkCheckpoints))
                 partitionsToMakeFollower += partition
               else
                 stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " +
-                  s"follower with correlation id $correlationId from controller $controllerId epoch $epoch " +
+                  s"follower with correlation id $correlationId from controller $controllerId epoch $controllerEpoch " +
                   s"for partition ${partition.topicPartition} (last update " +
                   s"controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
                   s"since the new leader $newLeaderBrokerId is the same as the old leader")
@@ -1290,17 +1303,17 @@ class ReplicaManager(val config: KafkaConfig,
               // The leader broker should always be present in the metadata cache.
               // If not, we should record the error message and abort the transition process for this partition
               stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " +
-                s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+                s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
                 s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
                 s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
               // Create the local replica even if the leader is unavailable. This is required to ensure that we include
               // the partition's high watermark in the checkpoint file (see KAFKA-1647)
-              partition.getOrCreateReplica(localBrokerId, isNew = partitionStateInfo.isNew)
+              partition.getOrCreateReplica(localBrokerId, isNew = partitionStateInfo.isNew, highWatermarkCheckpoints)
           }
         } catch {
           case e: KafkaStorageException =>
             stateChangeLogger.error(s"Skipped the become-follower state change with correlation id $correlationId from " +
-              s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+              s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
               s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) with leader " +
               s"$newLeaderBrokerId since the replica for the partition is offline due to disk error $e")
             val dirOpt = getLogDir(partition.topicPartition)
@@ -1313,57 +1326,56 @@ class ReplicaManager(val config: KafkaConfig,
       replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
       partitionsToMakeFollower.foreach { partition =>
         stateChangeLogger.trace(s"Stopped fetchers as part of become-follower request from controller $controllerId " +
-          s"epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} with leader " +
+          s"epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} with leader " +
           s"${partitionStates(partition).basePartitionState.leader}")
       }
 
       partitionsToMakeFollower.foreach { partition =>
         val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
-        tryCompleteDelayedProduce(topicPartitionOperationKey)
-        tryCompleteDelayedFetch(topicPartitionOperationKey)
+        delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
+        delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
       }
 
       partitionsToMakeFollower.foreach { partition =>
         stateChangeLogger.trace(s"Truncated logs and checkpointed recovery boundaries for partition " +
           s"${partition.topicPartition} as part of become-follower request with correlation id $correlationId from " +
-          s"controller $controllerId epoch $epoch with leader ${partitionStates(partition).basePartitionState.leader}")
+          s"controller $controllerId epoch $controllerEpoch with leader ${partitionStates(partition).basePartitionState.leader}")
       }
 
       if (isShuttingDown.get()) {
         partitionsToMakeFollower.foreach { partition =>
           stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower state " +
-            s"change with correlation id $correlationId from controller $controllerId epoch $epoch for " +
+            s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
             s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader} " +
             "since it is shutting down")
         }
-      }
-      else {
+      } else {
         // we do not need to check if the leader exists again since this has been done at the beginning of this process
         val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>
           val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get
             .brokerEndPoint(config.interBrokerListenerName)
           val fetchOffset = partition.localReplicaOrException.highWatermark.messageOffset
           partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset)
-        }.toMap
-        replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
+       }.toMap
 
-        partitionsToMakeFollower.foreach { partition =>
+        replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
+        partitionsToMakeFollowerWithLeaderAndOffset.foreach { case (partition, initialFetchState) =>
           stateChangeLogger.trace(s"Started fetcher to new leader as part of become-follower " +
-            s"request from controller $controllerId epoch $epoch with correlation id $correlationId for " +
-            s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader}")
+            s"request from controller $controllerId epoch $controllerEpoch with correlation id $correlationId for " +
+            s"partition $partition with leader ${initialFetchState.leader}")
         }
       }
     } catch {
       case e: Throwable =>
         stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " +
-          s"received from controller $controllerId epoch $epoch", e)
+          s"received from controller $controllerId epoch $controllerEpoch", e)
         // Re-throw the exception for it to be caught in KafkaApis
         throw e
     }
 
     partitionStates.keys.foreach { partition =>
       stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
-        s"epoch $epoch for the become-follower transition for partition ${partition.topicPartition} with leader " +
+        s"epoch $controllerEpoch for the become-follower transition for partition ${partition.topicPartition} with leader " +
         s"${partitionStates(partition).basePartitionState.leader}")
     }
 
@@ -1439,8 +1451,9 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   // Used only by test
-  def markPartitionOffline(tp: TopicPartition) {
-    allPartitions.put(tp, ReplicaManager.OfflinePartition)
+  def markPartitionOffline(tp: TopicPartition): Unit = replicaStateChangeLock synchronized {
+    allPartitions.put(tp, HostedPartition.Offline)
+    Partition.removeMetrics(tp)
   }
 
   // logDir should be an absolute path
@@ -1467,13 +1480,10 @@ class ReplicaManager(val config: KafkaConfig,
 
       partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica(deleteFromLogDir = false))
       newOfflinePartitions.foreach { topicPartition =>
-        val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
-        partition.removePartitionMetrics()
+        markPartitionOffline(topicPartition)
       }
       newOfflinePartitions.map(_.topic).foreach { topic: String =>
-        val topicHasPartitions = allPartitions.values.exists(partition => topic == partition.topic)
-        if (!topicHasPartitions)
-          brokerTopicStats.removeMetrics(topic)
+        maybeRemoveTopicMetrics(topic)
       }
       highWatermarkCheckpoints = highWatermarkCheckpoints.filterKeys(_ != dir)
 
@@ -1524,17 +1534,17 @@ class ReplicaManager(val config: KafkaConfig,
   def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, EpochEndOffset] = {
     requestedEpochInfo.map { case (tp, partitionData) =>
       val epochEndOffset = getPartition(tp) match {
-        case Some(partition) =>
-          if (partition eq ReplicaManager.OfflinePartition)
-            new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
-          else
-            partition.lastOffsetForLeaderEpoch(partitionData.currentLeaderEpoch, partitionData.leaderEpoch,
-              fetchOnlyFromLeader = true)
+        case partition: Partition =>
+          partition.lastOffsetForLeaderEpoch(partitionData.currentLeaderEpoch, partitionData.leaderEpoch,
+            fetchOnlyFromLeader = true)
+
+        case HostedPartition.Offline =>
+          new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
 
-        case None if metadataCache.contains(tp) =>
+        case HostedPartition.None if metadataCache.contains(tp) =>
           new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
 
-        case None =>
+        case HostedPartition.None =>
           new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
       }
       tp -> epochEndOffset
@@ -1552,7 +1562,7 @@ class ReplicaManager(val config: KafkaConfig,
                          results: Map[TopicPartition, ApiError]): Unit = {
       if (expectedLeaders.nonEmpty) {
         val watchKeys = expectedLeaders.map{
-          case (tp, leader) => new TopicPartitionOperationKey(tp.topic, tp.partition)
+          case (tp, _) => new TopicPartitionOperationKey(tp)
         }.toSeq
         delayedElectPreferredLeaderPurgatory.tryCompleteElseWatch(
           new DelayedElectPreferredLeader(deadline - time.milliseconds(), expectedLeaders, results,
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 2769cb4..715f42f 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -61,3 +61,18 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh
   def read(): Map[TopicPartition, Long] = checkpoint.read().toMap
 
 }
+
+trait OffsetCheckpoints {
+  def fetch(logDir: String, topicPartition: TopicPartition): Option[Long]
+}
+
+class SimpleOffsetCheckpoints(checkpointFilesByLogDir: Map[String, OffsetCheckpointFile])
+  extends OffsetCheckpoints {
+
+  override def fetch(logDir: String, topicPartition: TopicPartition): Option[Long] = {
+    val checkpoint = checkpointFilesByLogDir(logDir)
+    val offsetMap = checkpoint.read()
+    offsetMap.get(topicPartition)
+  }
+
+}
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 33de22b..e2733b8 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -39,13 +39,13 @@ object ReplicationUtils extends Logging {
     try {
       val (writtenLeaderOpt, writtenStat) = zkClient.getDataAndStat(path)
       val expectedLeaderOpt = TopicPartitionStateZNode.decode(expectedLeaderAndIsrInfo, writtenStat)
-      val succeeded = writtenLeaderOpt.map { writtenData =>
+      val succeeded = writtenLeaderOpt.exists { writtenData =>
         val writtenLeaderOpt = TopicPartitionStateZNode.decode(writtenData, writtenStat)
         (expectedLeaderOpt, writtenLeaderOpt) match {
           case (Some(expectedLeader), Some(writtenLeader)) if expectedLeader == writtenLeader => true
           case _ => false
         }
-      }.getOrElse(false)
+      }
       if (succeeded) (true, writtenStat.getVersion)
       else (false, -1)
     } catch {
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 654a92e..c5763ad 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -99,7 +99,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     val newLeaderServer = servers.find(_.config.brokerId == 101).get
 
     TestUtils.waitUntilTrue (
-      () => newLeaderServer.replicaManager.getPartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined,
+      () => newLeaderServer.replicaManager.nonOfflinePartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined,
       "broker 101 should be the new leader", pause = 1L
     )
 
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index e3f7b4d..b3e5ade 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -22,16 +22,16 @@ import java.util.{Optional, Properties}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, TimeoutException}
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.api.{ApiVersion, Request}
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Metric
+import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.log.{Defaults => _, _}
-import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server._
+import kafka.server.checkpoints.OffsetCheckpoints
 import kafka.utils._
-import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{ApiException, OffsetNotAvailableException, ReplicaNotAvailableException}
-import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.utils.Utils
@@ -39,26 +39,30 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, ListOffsetRequest}
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
+import org.mockito.Mockito.{doNothing, mock, when}
 import org.scalatest.Assertions.assertThrows
-import org.easymock.{Capture, EasyMock, IAnswer}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.ArgumentMatchers
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 
 class PartitionTest {
 
   val brokerId = 101
   val topicPartition = new TopicPartition("test-topic", 0)
   val time = new MockTime()
-  val brokerTopicStats = new BrokerTopicStats
-  val metrics = new Metrics
-
   var tmpDir: File = _
   var logDir1: File = _
   var logDir2: File = _
-  var replicaManager: ReplicaManager = _
   var logManager: LogManager = _
   var logConfig: LogConfig = _
-  var quotaManagers: QuotaManagers = _
+  val stateStore: PartitionStateStore = mock(classOf[PartitionStateStore])
+  val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
+  val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+  val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+  var partition: Partition = _
 
   @Before
   def setup(): Unit = {
@@ -72,20 +76,19 @@ class PartitionTest {
       logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time)
     logManager.startup()
 
-    val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
-    brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
-    val brokerConfig = KafkaConfig.fromProps(brokerProps)
-    val kafkaZkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
-    quotaManagers = QuotaFactory.instantiate(brokerConfig, metrics, time, "")
-    replicaManager = new ReplicaManager(
-      config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new MockScheduler(time),
-      logManager, new AtomicBoolean(false), quotaManagers,
-      brokerTopicStats, new MetadataCache(brokerId), new LogDirFailureChannel(brokerConfig.logDirs.size))
-
-    EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(logProps).anyTimes()
-    EasyMock.expect(kafkaZkClient.conditionalUpdatePath(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
-      .andReturn((true, 0)).anyTimes()
-    EasyMock.replay(kafkaZkClient)
+    partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
+      localBrokerId = brokerId,
+      time,
+      stateStore,
+      delayedOperations,
+      metadataCache,
+      logManager)
+
+    when(stateStore.fetchTopicConfig()).thenReturn(createLogProperties(Map.empty))
+    when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, ArgumentMatchers.eq(topicPartition)))
+      .thenReturn(None)
   }
 
   private def createLogProperties(overrides: Map[String, String]): Properties = {
@@ -99,14 +102,8 @@ class PartitionTest {
 
   @After
   def tearDown(): Unit = {
-    brokerTopicStats.close()
-    metrics.close()
-
     logManager.shutdown()
     Utils.delete(tmpDir)
-    logManager.liveLogDirs.foreach(Utils.delete)
-    replicaManager.shutdown(checkpointHW = false)
-    quotaManagers.shutdown()
   }
 
   @Test
@@ -168,17 +165,9 @@ class PartitionTest {
     val latch = new CountDownLatch(1)
 
     logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
-    val log1 = logManager.getOrCreateLog(topicPartition, logConfig)
+    partition.getOrCreateReplica(brokerId, isNew = true, offsetCheckpoints)
     logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
-    val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true)
-    val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1))
-    val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log = Some(log2))
-    val partition = Partition(topicPartition, time, replicaManager)
-
-    partition.addReplicaIfNotExists(futureReplica)
-    partition.addReplicaIfNotExists(currentReplica)
-    assertEquals(Some(currentReplica), partition.localReplica)
-    assertEquals(Some(futureReplica), partition.futureLocalReplica)
+    partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
 
     val thread1 = new Thread {
       override def run(): Unit = {
@@ -207,10 +196,15 @@ class PartitionTest {
   // Verify that replacement works when the replicas have the same log end offset but different base offsets in the
   // active segment
   def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = {
-    // Write records with duplicate keys to current replica and roll at offset 6
     logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
-    val log1 = logManager.getOrCreateLog(topicPartition, logConfig)
-    log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+    val currentReplica = partition.getOrCreateReplica(brokerId, isNew = true, offsetCheckpoints)
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
+    partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
+    val futureReplica = partition.futureLocalReplicaOrException
+
+    // Write records with duplicate keys to current replica and roll at offset 6
+    val currentLog = currentReplica.log.get
+    currentLog.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
       new SimpleRecord("k1".getBytes, "v1".getBytes),
       new SimpleRecord("k1".getBytes, "v2".getBytes),
       new SimpleRecord("k1".getBytes, "v3".getBytes),
@@ -218,15 +212,14 @@ class PartitionTest {
       new SimpleRecord("k2".getBytes, "v5".getBytes),
       new SimpleRecord("k2".getBytes, "v6".getBytes)
     ), leaderEpoch = 0)
-    log1.roll()
-    log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+    currentLog.roll()
+    currentLog.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
       new SimpleRecord("k3".getBytes, "v7".getBytes),
       new SimpleRecord("k4".getBytes, "v8".getBytes)
     ), leaderEpoch = 0)
 
     // Write to the future replica as if the log had been compacted, and do not roll the segment
-    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
-    val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true)
+
     val buffer = ByteBuffer.allocate(1024)
     val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
       TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, 0)
@@ -235,16 +228,7 @@ class PartitionTest {
     builder.appendWithOffset(6L, new SimpleRecord("k3".getBytes, "v7".getBytes))
     builder.appendWithOffset(7L, new SimpleRecord("k4".getBytes, "v8".getBytes))
 
-    log2.appendAsFollower(builder.build())
-
-    val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1))
-    val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log = Some(log2))
-    val partition = Partition(topicPartition, time, replicaManager)
-
-    partition.addReplicaIfNotExists(futureReplica)
-    partition.addReplicaIfNotExists(currentReplica)
-    assertEquals(Some(currentReplica), partition.localReplica)
-    assertEquals(Some(futureReplica), partition.futureLocalReplica)
+    futureReplica.log.get.appendAsFollower(builder.build())
 
     assertTrue(partition.maybeReplaceCurrentWithFutureReplica())
   }
@@ -491,9 +475,10 @@ class PartitionTest {
       new SimpleRecord(20,"k4".getBytes, "v2".getBytes),
       new SimpleRecord(21,"k5".getBytes, "v3".getBytes)))
 
-    val partition = Partition(topicPartition, time, replicaManager)
+    val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true)
+
     assertTrue("Expected first makeLeader() to return 'leader changed'",
-      partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0))
+      partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints))
     assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
     assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas.map(_.brokerId))
 
@@ -532,13 +517,16 @@ class PartitionTest {
       }
     }
 
+    when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, leaderEpoch,
+      List(leader, follower2, follower1), 1)))
+      .thenReturn(Some(2))
+
     // Update follower 1
     partition.updateReplicaLogReadResult(
       follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica))
     partition.updateReplicaLogReadResult(
       follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(2), batch2), leaderReplica))
 
-    // Update follower 2
     partition.updateReplicaLogReadResult(
       follower2Replica, readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica))
     partition.updateReplicaLogReadResult(
@@ -565,12 +553,14 @@ class PartitionTest {
     assertEquals(Right(None), fetchOffsetsForTimestamp(30, Some(IsolationLevel.READ_UNCOMMITTED)))
 
     // Make into a follower
-    assertTrue(partition.makeFollower(controllerId,
-      new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, leaderEpoch + 1, isr, 1, replicas, false), 1))
+    val followerState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2,
+      leaderEpoch + 1, isr, 4, replicas, false)
+    assertTrue(partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints))
 
     // Back to leader, this resets the startLogOffset for this epoch (to 2), we're now in the fault condition
-    assertTrue(partition.makeLeader(controllerId,
-      new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch + 2, isr, 1, replicas, false), 2))
+    val newLeaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch + 2, isr, 5,
+      replicas, false)
+    assertTrue(partition.makeLeader(controllerId, newLeaderState, 2, offsetCheckpoints))
 
     // Try to get offsets as a client
     fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match {
@@ -611,6 +601,9 @@ class PartitionTest {
       case Left(e: ApiException) => fail(s"Should have seen OffsetNotAvailableException, saw $e")
     }
 
+    when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, leaderEpoch + 2,
+      List(leader, follower2, follower1), 5)))
+      .thenReturn(Some(2))
 
     // Next fetch from replicas, HW is moved up to 5 (ahead of the LEO)
     partition.updateReplicaLogReadResult(
@@ -629,27 +622,10 @@ class PartitionTest {
     assertEquals(Right(None), fetchOffsetsForTimestamp(100, Some(IsolationLevel.READ_UNCOMMITTED)))
   }
 
-
   private def setupPartitionWithMocks(leaderEpoch: Int,
                                       isLeader: Boolean,
                                       log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = {
-    val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
-    val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
-    val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
-
-    val partition = new Partition(topicPartition,
-      isOffline = false,
-      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
-      interBrokerProtocolVersion = ApiVersion.latestVersion,
-      localBrokerId = brokerId,
-      time,
-      replicaManager,
-      logManager,
-      zkClient)
-
-    EasyMock.replay(replicaManager, zkClient)
-
-    partition.addReplicaIfNotExists(replica)
+    val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
 
     val controllerId = 0
     val controllerEpoch = 0
@@ -659,13 +635,13 @@ class PartitionTest {
     if (isLeader) {
       assertTrue("Expected become leader transition to succeed",
         partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
-          leaderEpoch, isr, 1, replicas, true), 0))
+          leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
       assertEquals(leaderEpoch, partition.getLeaderEpoch)
       assertEquals(Some(replica), partition.leaderReplicaIfLocal)
     } else {
       assertTrue("Expected become follower transition to succeed",
         partition.makeFollower(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId + 1,
-          leaderEpoch, isr, 1, replicas, true), 0))
+          leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
       assertEquals(leaderEpoch, partition.getLeaderEpoch)
       assertEquals(None, partition.leaderReplicaIfLocal)
     }
@@ -675,12 +651,7 @@ class PartitionTest {
 
   @Test
   def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, logConfig)
-    val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
-    val partition = Partition(topicPartition, time, replicaManager)
-    partition.addReplicaIfNotExists(replica)
-    assertEquals(Some(replica), partition.localReplica)
-
+    val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
     val initialLogStartOffset = 5L
     partition.truncateFullyAndStartAt(initialLogStartOffset, isFuture = false)
     assertEquals(s"Log end offset after truncate fully and start at $initialLogStartOffset:",
@@ -728,37 +699,19 @@ class PartitionTest {
 
   @Test
   def testListOffsetIsolationLevels(): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, logConfig)
-    val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
-    val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
-    val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
-
-    val partition = new Partition(topicPartition,
-      isOffline = false,
-      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
-      interBrokerProtocolVersion = ApiVersion.latestVersion,
-      localBrokerId = brokerId,
-      time,
-      replicaManager,
-      logManager,
-      zkClient)
-
     val controllerId = 0
     val controllerEpoch = 0
     val leaderEpoch = 5
     val replicas = List[Integer](brokerId, brokerId + 1).asJava
     val isr = replicas
 
-    EasyMock.expect(replicaManager.tryCompleteDelayedFetch(EasyMock.anyObject[TopicPartitionOperationKey]))
-        .andVoid()
-
-    EasyMock.replay(replicaManager, zkClient)
+    doNothing().when(delayedOperations).checkAndCompleteFetch()
 
-    partition.addReplicaIfNotExists(replica)
+    val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
 
     assertTrue("Expected become leader transition to succeed",
       partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
-        leaderEpoch, isr, 1, replicas, true), 0))
+        leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
     assertEquals(leaderEpoch, partition.getLeaderEpoch)
     assertEquals(Some(replica), partition.leaderReplicaIfLocal)
 
@@ -804,23 +757,18 @@ class PartitionTest {
 
   @Test
   def testGetReplica(): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, logConfig)
-    val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
-    val partition = Partition(topicPartition, time, replicaManager)
-
     assertEquals(None, partition.localReplica)
     assertThrows[ReplicaNotAvailableException] {
       partition.localReplicaOrException
     }
 
-    partition.addReplicaIfNotExists(replica)
+    val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
     assertEquals(Some(replica), partition.localReplica)
     assertEquals(replica, partition.localReplicaOrException)
   }
 
   @Test
   def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = {
-    val partition = Partition(topicPartition, time, replicaManager)
     assertThrows[ReplicaNotAvailableException] {
       partition.appendRecordsToFollowerOrFutureReplica(
            createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), isFuture = false)
@@ -829,22 +777,20 @@ class PartitionTest {
 
   @Test
   def testMakeFollowerWithNoLeaderIdChange(): Unit = {
-    val partition = Partition(topicPartition, time, replicaManager)
-
     // Start off as follower
     var partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 1,
       List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
-    partition.makeFollower(0, partitionStateInfo, 0)
+    partition.makeFollower(0, partitionStateInfo, 0, offsetCheckpoints)
 
     // Request with same leader and epoch increases by only 1, do become-follower steps
     partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 4,
       List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
-    assertTrue(partition.makeFollower(0, partitionStateInfo, 2))
+    assertTrue(partition.makeFollower(0, partitionStateInfo, 2, offsetCheckpoints))
 
     // Request with same leader and same epoch, skip become-follower steps
     partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 4,
       List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
-    assertFalse(partition.makeFollower(0, partitionStateInfo, 2))
+    assertFalse(partition.makeFollower(0, partitionStateInfo, 2, offsetCheckpoints))
   }
 
   @Test
@@ -865,9 +811,9 @@ class PartitionTest {
     val batch3 = TestUtils.records(records = List(new SimpleRecord("k6".getBytes, "v1".getBytes),
                                                   new SimpleRecord("k7".getBytes, "v2".getBytes)))
 
-    val partition = Partition(topicPartition, time, replicaManager)
+    val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true)
     assertTrue("Expected first makeLeader() to return 'leader changed'",
-               partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0))
+               partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints))
     assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
     assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas.map(_.brokerId))
 
@@ -899,11 +845,14 @@ class PartitionTest {
     assertEquals("Expected leader's HW", lastOffsetOfFirstBatch, leaderReplica.highWatermark.messageOffset)
 
     // current leader becomes follower and then leader again (without any new records appended)
-    partition.makeFollower(
-      controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, leaderEpoch + 1, isr, 1, replicas, false), 1)
+    val followerState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, leaderEpoch + 1, isr, 1,
+      replicas, false)
+    partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints)
+
+    val newLeaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch + 2, isr, 1,
+      replicas, false)
     assertTrue("Expected makeLeader() to return 'leader changed' after makeFollower()",
-               partition.makeLeader(controllerEpoch, new LeaderAndIsrRequest.PartitionState(
-                 controllerEpoch, leader, leaderEpoch + 2, isr, 1, replicas, false), 2))
+               partition.makeLeader(controllerEpoch, newLeaderState, 2, offsetCheckpoints))
     val currentLeaderEpochStartOffset = leaderReplica.logEndOffset
 
     // append records with the latest leader epoch
@@ -918,8 +867,10 @@ class PartitionTest {
 
     // fetch from the follower not in ISR from start offset of the current leader epoch should
     // add this follower to ISR
+    when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, leaderEpoch + 2,
+      List(leader, follower2, follower1), 1))).thenReturn(Some(2))
     partition.updateReplicaLogReadResult(follower1Replica,
-                                         readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), batch3), leaderReplica))
+      readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), batch3), leaderReplica))
     assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicas.map(_.brokerId))
   }
 
@@ -932,8 +883,6 @@ class PartitionTest {
    */
   @Test
   def testDelayedFetchAfterAppendRecords(): Unit = {
-    val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
-    val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
     val controllerId = 0
     val controllerEpoch = 0
     val leaderEpoch = 5
@@ -944,35 +893,38 @@ class PartitionTest {
     val topicPartitions = (0 until 5).map { i => new TopicPartition("test-topic", i) }
     val logs = topicPartitions.map { tp => logManager.getOrCreateLog(tp, logConfig) }
     val replicas = logs.map { log => new Replica(brokerId, log.topicPartition, time, log = Some(log)) }
-    val partitions = replicas.map { replica =>
+    val partitions = ListBuffer.empty[Partition]
+
+    replicas.foreach { replica =>
       val tp = replica.topicPartition
+      val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
       val partition = new Partition(tp,
-        isOffline = false,
         replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
         interBrokerProtocolVersion = ApiVersion.latestVersion,
         localBrokerId = brokerId,
         time,
-        replicaManager,
-        logManager,
-        zkClient)
+        stateStore,
+        delayedOperations,
+        metadataCache,
+        logManager)
+
+      when(delayedOperations.checkAndCompleteFetch())
+        .thenAnswer(new Answer[Unit] {
+          override def answer(invocation: InvocationOnMock): Unit = {
+            // Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch
+            val anotherPartition = (tp.partition + 1) % topicPartitions.size
+            val partition = partitions(anotherPartition)
+            partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true)
+          }
+        })
+
       partition.addReplicaIfNotExists(replica)
-      partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
-        leaderEpoch, isr, 1, replicaIds, true), 0)
-      partition
+      val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
+        leaderEpoch, isr, 1, replicaIds, true)
+      partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+      partitions += partition
     }
 
-    // Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch
-    val tpKey: Capture[TopicPartitionOperationKey] = EasyMock.newCapture()
-    EasyMock.expect(replicaManager.tryCompleteDelayedFetch(EasyMock.capture(tpKey)))
-      .andAnswer(new IAnswer[Unit] {
-        override def answer(): Unit = {
-          val anotherPartition = (tpKey.getValue.partition + 1) % topicPartitions.size
-          val partition = partitions(anotherPartition)
-          partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true)
-        }
-      }).anyTimes()
-    EasyMock.replay(replicaManager, zkClient)
-
     def createRecords(baseOffset: Long): MemoryRecords = {
       val records = List(
         new SimpleRecord("k1".getBytes, "v1".getBytes),
@@ -1050,10 +1002,60 @@ class PartitionTest {
     val isr = List[Integer](leader).asJava
     val leaderEpoch = 8
 
-    val partition = Partition(topicPartition, time, replicaManager)
     assertFalse(partition.isAtMinIsr)
     // Make isr set to only have leader to trigger AtMinIsr (default min isr config is 1)
-    partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0)
+    val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true)
+    partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
     assertTrue(partition.isAtMinIsr)
   }
+
+  @Test
+  def testUseCheckpointToInitializeHighWatermark(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, logConfig)
+    log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k1".getBytes, "v1".getBytes),
+      new SimpleRecord("k2".getBytes, "v2".getBytes),
+      new SimpleRecord("k3".getBytes, "v3".getBytes),
+      new SimpleRecord("k4".getBytes, "v4".getBytes)
+    ), leaderEpoch = 0)
+    log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 5,
+      new SimpleRecord("k5".getBytes, "v5".getBytes),
+      new SimpleRecord("k5".getBytes, "v5".getBytes)
+    ), leaderEpoch = 5)
+
+    when(offsetCheckpoints.fetch(logDir1.getAbsolutePath, topicPartition))
+      .thenReturn(Some(4L))
+
+    val controllerId = 0
+    val controllerEpoch = 3
+    val replicas = List[Integer](brokerId, brokerId + 1).asJava
+    val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
+      6, replicas, 1, replicas, false)
+    partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+    assertEquals(4, partition.localReplicaOrException.highWatermark.messageOffset)
+  }
+
+  @Test
+  def testAddAndRemoveMetrics(): Unit = {
+    val metricsToCheck = List(
+      "UnderReplicated",
+      "UnderMinIsr",
+      "InSyncReplicasCount",
+      "ReplicasCount",
+      "LastStableOffsetLag",
+      "AtMinIsr")
+
+    def getMetric(metric: String): Option[Metric] = {
+      Metrics.defaultRegistry().allMetrics().asScala.filterKeys { metricName =>
+        metricName.getName == metric && metricName.getType == "Partition"
+      }.headOption.map(_._2)
+    }
+
+    assertTrue(metricsToCheck.forall(getMetric(_).isDefined))
+
+    Partition.removeMetrics(topicPartition)
+
+    assertEquals(Set(), Metrics.defaultRegistry().allMetrics().asScala.keySet.filter(_.getType == "Partition"))
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 280fc8e..770868c 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -20,7 +20,7 @@ package kafka.coordinator.group
 import java.util.Optional
 
 import kafka.common.OffsetAndMetadata
-import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager}
+import kafka.server.{DelayedOperationPurgatory, KafkaConfig, HostedPartition, ReplicaManager}
 import kafka.utils._
 import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.TopicPartition
@@ -1092,7 +1092,8 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, syncGroupError)
 
     EasyMock.reset(replicaManager)
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None)
+    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)))
+      .andReturn(HostedPartition.None)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
@@ -1837,7 +1838,7 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
     EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
@@ -2334,7 +2335,7 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
     EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
@@ -2375,7 +2376,7 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
     EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
@@ -2687,7 +2688,8 @@ class GroupCoordinatorTest {
   private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = {
     val (responseFuture, responseCallback) = setupHeartbeatCallback
 
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None)
+    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)))
+      .andReturn(HostedPartition.None)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index dab2d72..0487178 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -2025,7 +2025,7 @@ class GroupMetadataManagerTest {
   }
 
   private def mockGetPartition(): Unit = {
-    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
     EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
index 23ac2dc..6c74ce3 100755
--- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
@@ -191,10 +191,9 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
 
         if (isEpochInRequestStale) {
           sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
-        }
-        else {
+        } else {
           sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
-          assertTrue(broker2.replicaManager.getPartition(tp).isEmpty)
+          assertEquals(HostedPartition.None, broker2.replicaManager.getPartition(tp))
         }
       }
     } finally {
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index 3b077a0..c1e0b9f 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -86,17 +86,17 @@ class DelayedOperationTest {
     purgatory.tryCompleteElseWatch(r2, Array("test1", "test2"))
     purgatory.tryCompleteElseWatch(r3, Array("test1", "test2", "test3"))
 
-    assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed)
+    assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.numDelayed)
     assertEquals("Purgatory should have 6 watched elements", 6, purgatory.watched)
 
     // complete the operations, it should immediately be purged from the delayed operation
     r2.completable = true
     r2.tryComplete()
-    assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.delayed, 2, purgatory.delayed)
+    assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.numDelayed, 2, purgatory.numDelayed)
 
     r3.completable = true
     r3.tryComplete()
-    assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.delayed, 1, purgatory.delayed)
+    assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.numDelayed, 1, purgatory.numDelayed)
 
     // checking a watch should purge the watch list
     purgatory.checkAndComplete("test1")
@@ -117,7 +117,7 @@ class DelayedOperationTest {
 
     val cancelledOperations = purgatory.cancelForKey("key")
     assertEquals(2, cancelledOperations.size)
-    assertEquals(1, purgatory.delayed)
+    assertEquals(1, purgatory.numDelayed)
     assertEquals(1, purgatory.watched)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 61cbd2c..3da22bb 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -72,7 +72,7 @@ class HighwatermarkPersistenceTest {
       var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
       assertEquals(0L, fooPartition0Hw)
       val tp0 = new TopicPartition(topic, 0)
-      val partition0 = replicaManager.getOrCreatePartition(tp0)
+      val partition0 = replicaManager.createPartition(tp0)
       // create leader and follower replicas
       val log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic, 0), LogConfig())
       val leaderReplicaPartition0 = new Replica(configs.head.brokerId, tp0, time, 0, Some(log0))
@@ -117,7 +117,7 @@ class HighwatermarkPersistenceTest {
       var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
       assertEquals(0L, topic1Partition0Hw)
       val t1p0 = new TopicPartition(topic1, 0)
-      val topic1Partition0 = replicaManager.getOrCreatePartition(t1p0)
+      val topic1Partition0 = replicaManager.createPartition(t1p0)
       // create leader log
       val topic1Log0 = logManagers.head.getOrCreateLog(t1p0, LogConfig())
       // create a local replica for topic1
@@ -134,7 +134,7 @@ class HighwatermarkPersistenceTest {
       assertEquals(5L, topic1Partition0Hw)
       // add another partition and set highwatermark
       val t2p0 = new TopicPartition(topic2, 0)
-      val topic2Partition0 = replicaManager.getOrCreatePartition(t2p0)
+      val topic2Partition0 = replicaManager.createPartition(t2p0)
       // create leader log
       val topic2Log0 = logManagers.head.getOrCreateLog(t2p0, LogConfig())
       // create a local replica for topic2
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 006067e..1dd4b24 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -227,7 +227,7 @@ class IsrExpirationTest {
                                                localLog: Log): Partition = {
     val leaderId = config.brokerId
     val tp = new TopicPartition(topic, partitionId)
-    val partition = replicaManager.getOrCreatePartition(tp)
+    val partition = replicaManager.createPartition(tp)
     val leaderReplica = new Replica(leaderId, tp, time, 0, Some(localLog))
 
     val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 0fd289c..6b69c41 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{ExecutionException, TimeUnit}
 
 import kafka.server.LogDirFailureTest._
 import kafka.api.IntegrationTestHarness
+import kafka.cluster.Partition
 import kafka.controller.{OfflineReplica, PartitionAndReplica}
 import kafka.utils.{CoreUtils, Exit, TestUtils}
 import org.apache.kafka.clients.consumer.KafkaConsumer
@@ -112,14 +113,16 @@ class LogDirFailureTest extends IntegrationTestHarness {
     // Send a message to another partition whose leader is the same as partition 0
     // so that ReplicaFetcherThread on the follower will get response from leader immediately
     val anotherPartitionWithTheSameLeader = (1 until partitionNum).find { i =>
-      leaderServer.replicaManager.getPartition(new TopicPartition(topic, i)).flatMap(_.leaderReplicaIfLocal).isDefined
+      leaderServer.replicaManager.nonOfflinePartition(new TopicPartition(topic, i))
+        .flatMap(_.leaderReplicaIfLocal).isDefined
     }.get
     val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, anotherPartitionWithTheSameLeader, topic.getBytes, "message".getBytes)
     // When producer.send(...).get returns, it is guaranteed that ReplicaFetcherThread on the follower
     // has fetched from the leader and attempts to append to the offline replica.
     producer.send(record).get
 
-    assertEquals(brokerCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size)
+    assertEquals(brokerCount, leaderServer.replicaManager.nonOfflinePartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader))
+      .get.inSyncReplicas.size)
     followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach { thread =>
       assertFalse("ReplicaFetcherThread should still be working if its partition count > 0", thread.isShutdownComplete)
     }
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 15f9a9b..780d189 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -23,6 +23,7 @@ import TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import java.io.File
 
+import kafka.cluster.Partition
 import kafka.server.checkpoints.OffsetCheckpointFile
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
@@ -147,7 +148,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
       * is that server1 has caught up on the topicPartition, and has joined the ISR.
       * In the line below, we wait until the condition is met before shutting down server2
       */
-    waitUntilTrue(() => server2.replicaManager.getPartition(topicPartition).get.inSyncReplicas.size == 2,
+    waitUntilTrue(() => server2.replicaManager.nonOfflinePartition(topicPartition).get.inSyncReplicas.size == 2,
       "Server 1 is not able to join the ISR after restart")
 
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 4049504..d149b8a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -614,12 +614,12 @@ class ReplicaAlterLogDirsThreadTest {
     expect(replicaManager.futureLocalReplica(t1p0)).andReturn(Some(futureReplica)).anyTimes()
     expect(replicaManager.localReplicaOrException(t1p0)).andReturn(replicaT1p0).anyTimes()
     expect(replicaManager.futureLocalReplicaOrException(t1p0)).andReturn(futureReplica).anyTimes()
-    expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
+    expect(replicaManager.nonOfflinePartition(t1p0)).andReturn(Some(partition)).anyTimes()
     expect(replicaManager.localReplica(t1p1)).andReturn(Some(replicaT1p1)).anyTimes()
     expect(replicaManager.futureLocalReplica(t1p1)).andReturn(Some(futureReplica)).anyTimes()
     expect(replicaManager.localReplicaOrException(t1p1)).andReturn(replicaT1p1).anyTimes()
     expect(replicaManager.futureLocalReplicaOrException(t1p1)).andReturn(futureReplica).anyTimes()
-    expect(replicaManager.getPartition(t1p1)).andReturn(Some(partition)).anyTimes()
+    expect(replicaManager.nonOfflinePartition(t1p1)).andReturn(Some(partition)).anyTimes()
   }
 
   def stubWithFetchMessages(replicaT1p0: Replica, replicaT1p1: Replica, futureReplica: Replica,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index d6ebdd6..a51641a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -98,9 +98,9 @@ class ReplicaFetcherThreadTest {
     stub(replica, partition, replicaManager)
 
     //Expectations
-    expect(partition.truncateTo(anyLong(), anyBoolean())).once
+    expect(partition.truncateTo(anyLong(), anyBoolean())).times(3)
 
-    replay(replicaManager, logManager, quota, replica)
+    replay(replicaManager, logManager, quota, replica, partition)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1),
@@ -227,9 +227,9 @@ class ReplicaFetcherThreadTest {
     stub(replica, partition, replicaManager)
 
     //Expectations
-    expect(partition.truncateTo(anyLong(), anyBoolean())).once
+    expect(partition.truncateTo(anyLong(), anyBoolean())).times(2)
 
-    replay(replicaManager, logManager, replica)
+    replay(replicaManager, logManager, replica, partition)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1), t1p1 -> new EpochEndOffset(leaderEpoch, 1)).asJava
@@ -609,6 +609,7 @@ class ReplicaFetcherThreadTest {
     val leaderEpoch = 4
 
     //Stub return values
+    expect(partition.truncateTo(0L, false)).times(2)
     expect(replica.logEndOffset).andReturn(0).anyTimes()
     expect(replica.highWatermark).andReturn(new LogOffsetMetadata(0)).anyTimes()
     expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
@@ -618,7 +619,7 @@ class ReplicaFetcherThreadTest {
     expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
     stub(replica, partition, replicaManager)
 
-    replay(replicaManager, logManager, quota, replica)
+    replay(replicaManager, logManager, quota, replica, partition)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsetsReply = Map(
@@ -727,15 +728,13 @@ class ReplicaFetcherThreadTest {
     verify(mockBlockingSend)
   }
 
-  def stub(replica: Replica, partition: Partition, replicaManager: ReplicaManager) = {
-    expect(replicaManager.localReplica(t1p0)).andReturn(Some(replica)).anyTimes()
+  def stub(replica: Replica, partition: Partition, replicaManager: ReplicaManager): Unit = {
     expect(replicaManager.localReplicaOrException(t1p0)).andReturn(replica).anyTimes()
-    expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
-    expect(replicaManager.localReplica(t1p1)).andReturn(Some(replica)).anyTimes()
+    expect(replicaManager.nonOfflinePartition(t1p0)).andReturn(Some(partition)).anyTimes()
     expect(replicaManager.localReplicaOrException(t1p1)).andReturn(replica).anyTimes()
-    expect(replicaManager.getPartition(t1p1)).andReturn(Some(partition)).anyTimes()
-    expect(replicaManager.localReplica(t2p1)).andReturn(Some(replica)).anyTimes()
+    expect(replicaManager.nonOfflinePartition(t1p1)).andReturn(Some(partition)).anyTimes()
     expect(replicaManager.localReplicaOrException(t2p1)).andReturn(replica).anyTimes()
-    expect(replicaManager.getPartition(t2p1)).andReturn(Some(partition)).anyTimes()
+    expect(replicaManager.nonOfflinePartition(t2p1)).andReturn(Some(partition)).anyTimes()
+    expect(partition.localReplicaOrException).andReturn(replica).anyTimes()
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 5b2f2ae..c2d92df 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -237,7 +237,7 @@ class ReplicaManagerQuotasTest {
 
     //create the two replicas
     for ((p, _) <- fetchInfo) {
-      val partition = replicaManager.getOrCreatePartition(p)
+      val partition = replicaManager.createPartition(p)
       val leaderReplica = new Replica(configs.head.brokerId, p, time, 0, Some(log))
       leaderReplica.highWatermark = new LogOffsetMetadata(5)
       partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 1c1cbd6..b7239e0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -26,6 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import TestUtils.createBroker
 import kafka.cluster.BrokerEndPoint
+import kafka.server.checkpoints.SimpleOffsetCheckpoints
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.timer.MockTimer
 import kafka.zk.KafkaZkClient
@@ -85,8 +86,8 @@ class ReplicaManagerTest {
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
-      val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
-      partition.getOrCreateReplica(1)
+      val partition = rm.createPartition(new TopicPartition(topic, 1))
+      partition.getOrCreateReplica(1, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
       rm.checkpointHighWatermarks()
     } finally {
       // shutdown the replica manager upon test completion
@@ -104,8 +105,8 @@ class ReplicaManagerTest {
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
-      val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
-      partition.getOrCreateReplica(1)
+      val partition = rm.createPartition(new TopicPartition(topic, 1))
+      partition.getOrCreateReplica(1, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
       rm.checkpointHighWatermarks()
     } finally {
       // shutdown the replica manager upon test completion
@@ -158,8 +159,8 @@ class ReplicaManagerTest {
     try {
       val brokerList = Seq[Integer](0, 1).asJava
 
-      val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0)
+      val partition = rm.createPartition(new TopicPartition(topic, 0))
+      partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
         collection.immutable.Map(new TopicPartition(topic, 0) ->
@@ -202,8 +203,8 @@ class ReplicaManagerTest {
     try {
       val brokerList = Seq[Integer](0, 1).asJava
 
-      val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0)
+      val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
+      partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -253,8 +254,8 @@ class ReplicaManagerTest {
     try {
       val brokerList = Seq[Integer](0, 1).asJava
 
-      val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0)
+      val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
+      partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -349,8 +350,8 @@ class ReplicaManagerTest {
 
     try {
       val brokerList = Seq[Integer](0, 1).asJava
-      val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0)
+      val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
+      partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -415,8 +416,8 @@ class ReplicaManagerTest {
     try {
       val brokerList = Seq[Integer](0, 1, 2).asJava
 
-      val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0)
+      val partition = rm.createPartition(new TopicPartition(topic, 0))
+      partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -465,8 +466,9 @@ class ReplicaManagerTest {
       // Create 2 partitions, assign replica 0 as the leader for both a different follower (1 and 2) for each
       val tp0 = new TopicPartition(topic, 0)
       val tp1 = new TopicPartition(topic, 1)
-      replicaManager.getOrCreatePartition(tp0).getOrCreateReplica(0)
-      replicaManager.getOrCreatePartition(tp1).getOrCreateReplica(0)
+      val offsetCheckpoints = new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      replicaManager.createPartition(tp0).getOrCreateReplica(0, isNew = false, offsetCheckpoints)
+      replicaManager.createPartition(tp1).getOrCreateReplica(0, isNew = false, offsetCheckpoints)
       val partition0Replicas = Seq[Integer](0, 1).asJava
       val partition1Replicas = Seq[Integer](0, 2).asJava
       val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -556,11 +558,12 @@ class ReplicaManagerTest {
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true)
 
     // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
-    val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, topicPartition))
-    partition.getOrCreateReplica(followerBrokerId)
+    val partition = replicaManager.createPartition(new TopicPartition(topic, topicPartition))
+    val offsetCheckpoints = new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+    partition.getOrCreateReplica(followerBrokerId, isNew = false, offsetCheckpoints)
     partition.makeFollower(controllerId,
       leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
-      correlationId)
+      correlationId, offsetCheckpoints)
 
     // Make local partition a follower - because epoch increased by more than 1, truncation should
     // trigger even though leader does not change
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 94f9a16..41c6b3e 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -117,7 +117,7 @@ class SimpleFetchTest {
       new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
 
     // add the partition with two replicas, both in ISR
-    val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId))
+    val partition = replicaManager.createPartition(new TopicPartition(topic, partitionId))
 
     // create the leader replica with the local log
     val leaderReplica = new Replica(configs.head.brokerId, partition.topicPartition, time, 0, Some(log))
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index ac6dedc..8349541 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -444,7 +444,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
 
   private def awaitISR(tp: TopicPartition): Unit = {
     TestUtils.waitUntilTrue(() => {
-      leader.replicaManager.getPartition(tp).get.inSyncReplicas.map(_.brokerId).size == 2
+      leader.replicaManager.nonOfflinePartition(tp).get.inSyncReplicas.map(_.brokerId).size == 2
     }, "Timed out waiting for replicas to join ISR")
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 3d3b342..eba4167 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -57,7 +57,7 @@ class OffsetsForLeaderEpochTest {
     val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
       QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
-    val partition = replicaManager.getOrCreatePartition(tp)
+    val partition = replicaManager.createPartition(tp)
     val leaderReplica = new Replica(config.brokerId, partition.topicPartition, time, 0, Some(mockLog))
     partition.addReplicaIfNotExists(leaderReplica)
     partition.leaderReplicaIdOpt = Some(config.brokerId)
@@ -79,7 +79,7 @@ class OffsetsForLeaderEpochTest {
     val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
       QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
-    replicaManager.getOrCreatePartition(tp)
+    replicaManager.createPartition(tp)
 
     //Given
     val epochRequested: Integer = 5
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 59ee426..c7f5c24 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -822,14 +822,14 @@ object TestUtils extends Logging {
   }
 
   def isLeaderLocalOnBroker(topic: String, partitionId: Int, server: KafkaServer): Boolean = {
-    server.replicaManager.getPartition(new TopicPartition(topic, partitionId)).exists(_.leaderReplicaIfLocal.isDefined)
+    server.replicaManager.nonOfflinePartition(new TopicPartition(topic, partitionId)).exists(_.leaderReplicaIfLocal.isDefined)
   }
 
   def findLeaderEpoch(brokerId: Int,
                       topicPartition: TopicPartition,
                       servers: Iterable[KafkaServer]): Int = {
     val leaderServer = servers.find(_.config.brokerId == brokerId)
-    val leaderPartition = leaderServer.flatMap(_.replicaManager.getPartition(topicPartition))
+    val leaderPartition = leaderServer.flatMap(_.replicaManager.nonOfflinePartition(topicPartition))
       .getOrElse(fail(s"Failed to find expected replica on broker $brokerId"))
     leaderPartition.getLeaderEpoch
   }
@@ -837,7 +837,7 @@ object TestUtils extends Logging {
   def findFollowerId(topicPartition: TopicPartition,
                      servers: Iterable[KafkaServer]): Int = {
     val followerOpt = servers.find { server =>
-      server.replicaManager.getPartition(topicPartition) match {
+      server.replicaManager.nonOfflinePartition(topicPartition) match {
         case Some(partition) => !partition.leaderReplicaIdOpt.contains(server.config.brokerId)
         case None => false
       }
@@ -903,7 +903,7 @@ object TestUtils extends Logging {
     def newLeaderExists: Option[Int] = {
       servers.find { server =>
         server.config.brokerId != oldLeader &&
-          server.replicaManager.getPartition(tp).exists(_.leaderReplicaIfLocal.isDefined)
+          server.replicaManager.nonOfflinePartition(tp).exists(_.leaderReplicaIfLocal.isDefined)
       }.map(_.config.brokerId)
     }
 
@@ -918,7 +918,7 @@ object TestUtils extends Logging {
                              timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
     def leaderIfExists: Option[Int] = {
       servers.find { server =>
-        server.replicaManager.getPartition(tp).exists(_.leaderReplicaIfLocal.isDefined)
+        server.replicaManager.nonOfflinePartition(tp).exists(_.leaderReplicaIfLocal.isDefined)
       }.map(_.config.brokerId)
     }
 
@@ -1056,7 +1056,7 @@ object TestUtils extends Logging {
       "Topic path /brokers/topics/%s not deleted after /admin/delete_topics/%s path is deleted".format(topic, topic))
     // ensure that the topic-partition has been deleted from all brokers' replica managers
     TestUtils.waitUntilTrue(() =>
-      servers.forall(server => topicPartitions.forall(tp => server.replicaManager.getPartition(tp).isEmpty)),
+      servers.forall(server => topicPartitions.forall(tp => server.replicaManager.nonOfflinePartition(tp).isEmpty)),
       "Replica manager's should have deleted all of this topic's partitions")
     // ensure that logs from all replicas are deleted if delete topic is marked successful in ZooKeeper
     assertTrue("Replica logs not deleted after delete topic is complete",