You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/17 15:52:55 UTC

[kafka] branch trunk updated: KAFKA-8457; Move `Log' reference from `Replica` into `Partition` (#6841)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 57baa40  KAFKA-8457; Move `Log' reference from `Replica` into `Partition` (#6841)
57baa40 is described below

commit 57baa4079d9fc14103411f790b9a025c9f2146a4
Author: Vikas Singh <50...@users.noreply.github.com>
AuthorDate: Mon Jun 17 08:52:37 2019 -0700

    KAFKA-8457; Move `Log' reference from `Replica` into `Partition` (#6841)
    
    A `Partition` object contain one or many `Replica` objects. These replica
    objects in turn can have the "log" if the replica corresponds to the
    local node. All the code in Partition or ReplicaManager peek into
    replica object to fetch the log if they need to operate on that. As
    replica object can represent a local replica or a remote one, this
    lead to a bunch of "if-else" code in log fetch and offset update code.
    
    NOTE: In addition to a "log" that is in use during normal operation, if
    an alter log directory command is issued, we also create a future log
    object. This object catches up with local log and then we switch the log
    directory. So temporarily a Partition can have two local logs. Before
    this change both logs are inside replica objects.
    
    This change is an attempt to untangle this relationship. In particular
    it moves `Log` from `Replica` into `Partition`. So a partition contains
    a local log to which all writes go and possibly a "future log" if the
    partition is being moved between directories. Additionally, it maintains
    a list of remote replicas for offset and "caught up time" data that it uses
    for replication protocol.
    
    Reviewers: José Armando García Sancio <js...@users.noreply.github.com>, Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 521 +++++++++++----------
 core/src/main/scala/kafka/cluster/Replica.scala    | 190 ++------
 core/src/main/scala/kafka/log/Log.scala            |  95 +++-
 core/src/main/scala/kafka/log/LogManager.scala     |   1 +
 core/src/main/scala/kafka/log/LogSegment.scala     |   2 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |   2 +-
 .../scala/kafka/server/DelayedDeleteRecords.scala  |   2 +-
 .../scala/kafka/server/LogOffsetMetadata.scala     |   2 +-
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |  19 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |  30 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  88 ++--
 .../kafka/api/AdminClientIntegrationTest.scala     |  18 +-
 .../integration/kafka/api/ConsumerBounceTest.scala |   2 +-
 .../admin/ReassignPartitionsClusterTest.scala      |   6 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 164 +++----
 .../scala/unit/kafka/cluster/ReplicaTest.scala     |  38 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     |   8 +-
 .../LogCleanerParameterizedIntegrationTest.scala   |   4 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |   5 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   8 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  57 ++-
 .../unit/kafka/log/ProducerStateManagerTest.scala  |   4 +-
 .../server/HighwatermarkPersistenceTest.scala      |  34 +-
 .../unit/kafka/server/ISRExpirationTest.scala      |  83 ++--
 .../unit/kafka/server/LogDirFailureTest.scala      |   8 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala    |   3 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |  12 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     | 147 +++---
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 185 ++++----
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  32 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  50 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |  22 +-
 ...chDrivenReplicationProtocolAcceptanceTest.scala |   2 +-
 .../server/epoch/LeaderEpochIntegrationTest.scala  |   2 +-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   6 +-
 36 files changed, 907 insertions(+), 949 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 251c570..544fc0c 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -170,8 +170,7 @@ class Partition(val topicPartition: TopicPartition,
   def topic: String = topicPartition.topic
   def partitionId: Int = topicPartition.partition
 
-  // allReplicasMap includes both assigned replicas and the future replica if there is ongoing replica movement
-  private val allReplicasMap = new Pool[Int, Replica]
+  private val remoteReplicasMap = new Pool[Int, Replica]
   // The read lock is only required when multiple reads are executed and needs to be in a consistent manner
   private val leaderIsrUpdateLock = new ReentrantReadWriteLock
   private var zkVersion: Int = LeaderAndIsr.initialZKVersion
@@ -180,7 +179,18 @@ class Partition(val topicPartition: TopicPartition,
   // defined when this broker is leader for partition
   @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
   @volatile var leaderReplicaIdOpt: Option[Int] = None
-  @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica]
+  @volatile var inSyncReplicas: Set[Int] = Set.empty[Int]
+  // Includes all valid broker ids (@see Request::isValidBrokerId) that contain logs. Doesn't contain
+  // future log replica id
+  @volatile var allReplicaIds: scala.collection.mutable.Set[Int] = scala.collection.mutable.Set(localBrokerId)
+
+  // Logs belonging to this partition. Majority of time it will be only one log, but if log directory
+  // is getting changed (as a result of ReplicaAlterLogDirs command), we may have two logs until copy
+  // completes and a switch to new location is performed.
+  // log and futureLog variables defined below are used to capture this
+  @volatile var log: Option[Log] = None
+  // If ReplicaAlterLogDir command is in progress, this is future location of the log
+  @volatile var futureLog: Option[Log] = None
 
   /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
    * One way of doing that is through the controller's start replica state change command. When a new broker starts up
@@ -190,13 +200,11 @@ class Partition(val topicPartition: TopicPartition,
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch
   this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
 
-  private def isReplicaLocal(replicaId: Int): Boolean = replicaId == localBrokerId || replicaId == Request.FutureLocalReplicaId
-
   private val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
 
   newGauge("UnderReplicated",
     new Gauge[Int] {
-      def value = {
+      def value: Int = {
         if (isUnderReplicated) 1 else 0
       }
     },
@@ -205,8 +213,8 @@ class Partition(val topicPartition: TopicPartition,
 
   newGauge("InSyncReplicasCount",
     new Gauge[Int] {
-      def value = {
-        if (isLeaderReplicaLocal) inSyncReplicas.size else 0
+      def value: Int = {
+        if (isLeader) inSyncReplicas.size else 0
       }
     },
     tags
@@ -214,7 +222,7 @@ class Partition(val topicPartition: TopicPartition,
 
   newGauge("UnderMinIsr",
     new Gauge[Int] {
-      def value = {
+      def value: Int = {
         if (isUnderMinIsr) 1 else 0
       }
     },
@@ -223,7 +231,7 @@ class Partition(val topicPartition: TopicPartition,
 
   newGauge("AtMinIsr",
     new Gauge[Int] {
-      def value = {
+      def value: Int = {
         if (isAtMinIsr) 1 else 0
       }
     },
@@ -232,8 +240,8 @@ class Partition(val topicPartition: TopicPartition,
 
   newGauge("ReplicasCount",
     new Gauge[Int] {
-      def value = {
-        if (isLeaderReplicaLocal) assignedReplicas.size else 0
+      def value: Int = {
+        if (isLeader) allReplicaIds.size else 0
       }
     },
     tags
@@ -241,36 +249,22 @@ class Partition(val topicPartition: TopicPartition,
 
   newGauge("LastStableOffsetLag",
     new Gauge[Long] {
-      def value = {
-        leaderReplicaIfLocal.map { replica =>
-          replica.highWatermark - replica.lastStableOffset
-        }.getOrElse(0)
+      def value: Long = {
+        log.map(_.lastStableOffsetLag).getOrElse(0)
       }
     },
     tags
   )
 
-  private def isLeaderReplicaLocal: Boolean = leaderReplicaIfLocal.isDefined
-
   def isUnderReplicated: Boolean =
-    isLeaderReplicaLocal && inSyncReplicas.size < assignedReplicas.size
+    isLeader && inSyncReplicas.size < allReplicaIds.size
 
   def isUnderMinIsr: Boolean = {
-    leaderReplicaIfLocal match {
-      case Some(leaderReplica) =>
-        inSyncReplicas.size < leaderReplica.log.get.config.minInSyncReplicas
-      case None =>
-        false
-    }
+    leaderLogIfLocal.exists { inSyncReplicas.size < _.config.minInSyncReplicas }
   }
 
   def isAtMinIsr: Boolean = {
-    leaderReplicaIfLocal match {
-      case Some(leaderReplica) =>
-        inSyncReplicas.size == leaderReplica.log.get.config.minInSyncReplicas
-      case None =>
-        false
-    }
+    leaderLogIfLocal.exists { inSyncReplicas.size == _.config.minInSyncReplicas }
   }
 
   /**
@@ -286,43 +280,72 @@ class Partition(val topicPartition: TopicPartition,
     // current replica and the existence of the future replica, no other thread can update the log directory of the
     // current replica or remove the future replica.
     inWriteLock(leaderIsrUpdateLock) {
-      val currentReplica = localReplicaOrException
-      val currentLog = currentReplica.log.get
-      if (currentLog.dir.getParent == logDir)
+      val currentLogDir = localLogOrException.dir.getParent
+      if (currentLogDir == logDir) {
+        info(s"Current log directory $currentLogDir is same as requested log dir $logDir. " +
+          s"Skipping future replica creation.")
         false
-      else {
-        futureLocalReplica match {
-          case Some(replica) =>
-            val futureReplicaLogDir = replica.log.get.dir.getParent
-            if (futureReplicaLogDir != logDir)
-              throw new IllegalStateException(s"The future log dir $futureReplicaLogDir of $topicPartition is " +
+      } else {
+        futureLog match {
+          case Some(partitionFutureLog) =>
+            val futureLogDir = partitionFutureLog.dir.getParent
+            if (futureLogDir != logDir)
+              throw new IllegalStateException(s"The future log dir $futureLogDir of $topicPartition is " +
                 s"different from the requested log dir $logDir")
             false
           case None =>
-            getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false, highWatermarkCheckpoints)
+            createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true, highWatermarkCheckpoints)
             true
         }
       }
     }
   }
 
-  def getOrCreateReplica(replicaId: Int, isNew: Boolean, offsetCheckpoints: OffsetCheckpoints): Replica = {
-    allReplicasMap.getAndMaybePut(replicaId, {
-      if (isReplicaLocal(replicaId)) {
-        val props = stateStore.fetchTopicConfig()
-        val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
-        val log = logManager.getOrCreateLog(topicPartition, config, isNew, replicaId == Request.FutureLocalReplicaId)
-        val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse {
-          info(s"No checkpointed highwatermark is found for partition $topicPartition")
-          0L
-        }
-        val initialHighWatermark = math.min(checkpointHighWatermark, log.logEndOffset)
-        new Replica(replicaId, topicPartition, time, initialHighWatermark, Some(log))
-      } else new Replica(replicaId, topicPartition, time)
-    })
+  /**
+   * Creates a remote replica and puts that in a map. A future invocation to create
+   * replica with same id will return previously created object.
+   */
+  def getOrCreateReplica(replicaId: Int): Replica = {
+    require(replicaId != localBrokerId, s"Cannot create replica for local broker: $replicaId")
+    val newReplica = remoteReplicasMap.getAndMaybePut(replicaId, new Replica(replicaId, topicPartition))
+    allReplicaIds.add(replicaId)
+    require(remoteReplicasMap.size + 1 == allReplicaIds.size,
+      s"Invalid state. All Replica Ids: $allReplicaIds, remote replica ids: ${remoteReplicasMap.keys}")
+    newReplica
   }
 
-  def getReplica(replicaId: Int): Option[Replica] = Option(allReplicasMap.get(replicaId))
+  def createLogIfNotExists(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = {
+    isFutureReplica match {
+      case true if futureLog.isEmpty =>
+        val log = createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints)
+        this.futureLog = Option(log)
+      case false if log.isEmpty =>
+        val log = createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints)
+        this.log = Option(log)
+      case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
+    }
+  }
+
+  private def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
+    val props = stateStore.fetchTopicConfig()
+    val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
+    val log = logManager.getOrCreateLog(topicPartition, config, isNew, isFutureReplica)
+    val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse {
+      info(s"No checkpointed highwatermark is found for partition $topicPartition")
+      0L
+    }
+    val initialHighWatermark = math.min(checkpointHighWatermark, log.logEndOffset)
+    log.highWatermarkMetadata = LogOffsetMetadata(initialHighWatermark)
+
+    info(s"Log loaded for partition $topicPartition with initial high watermark $initialHighWatermark")
+    log
+  }
+
+  def getReplica(replicaId: Int): Option[Replica] = Option(remoteReplicasMap.get(replicaId))
+
+  private def getReplicaOrException(replicaId: Int): Replica = getReplica(replicaId).getOrElse{
+    throw new ReplicaNotAvailableException(s"Replica with id $replicaId is not available on broker $localBrokerId")
+  }
 
   private def checkCurrentLeaderEpoch(remoteLeaderEpochOpt: Optional[Integer]): Errors = {
     if (!remoteLeaderEpochOpt.isPresent) {
@@ -339,22 +362,21 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def getLocalReplica(replicaId: Int,
-                              currentLeaderEpoch: Optional[Integer],
-                              requireLeader: Boolean): Either[Replica, Errors] = {
+  private def getLocalLog(currentLeaderEpoch: Optional[Integer],
+                          requireLeader: Boolean): Either[Log, Errors] = {
     checkCurrentLeaderEpoch(currentLeaderEpoch) match {
       case Errors.NONE =>
-        if (requireLeader && !leaderReplicaIdOpt.contains(localBrokerId)) {
+        if (requireLeader && !isLeader) {
           Right(Errors.NOT_LEADER_FOR_PARTITION)
         } else {
-          val replica = allReplicasMap.get(replicaId)
-          if (replica == null) {
-            if (requireLeader)
-              Right(Errors.NOT_LEADER_FOR_PARTITION)
-            else
-              Right(Errors.REPLICA_NOT_AVAILABLE)
-          } else {
-            Left(replica)
+          log match {
+            case Some(partitionLog) =>
+              Left(partitionLog)
+            case _ =>
+              if (requireLeader)
+                Right(Errors.NOT_LEADER_FOR_PARTITION)
+              else
+                Right(Errors.REPLICA_NOT_AVAILABLE)
           }
         }
       case error =>
@@ -362,68 +384,68 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def localReplica: Option[Replica] = getReplica(localBrokerId)
-
-  def localReplicaOrException: Replica = localReplica.getOrElse {
-    throw new ReplicaNotAvailableException(s"Replica for partition $topicPartition is not available " +
+  def localLogOrException: Log = log.getOrElse {
+    throw new ReplicaNotAvailableException(s"Log for partition $topicPartition is not available " +
       s"on broker $localBrokerId")
   }
 
-  def futureLocalReplica: Option[Replica] = getReplica(Request.FutureLocalReplicaId)
-
-  def futureLocalReplicaOrException: Replica = futureLocalReplica.getOrElse {
-    throw new ReplicaNotAvailableException(s"Future replica for partition $topicPartition is not available " +
+  def futureLocalLogOrException: Log = futureLog.getOrElse {
+    throw new ReplicaNotAvailableException(s"Future log for partition $topicPartition is not available " +
       s"on broker $localBrokerId")
   }
 
-  def leaderReplicaIfLocal: Option[Replica] = {
-    if (leaderReplicaIdOpt.contains(localBrokerId))
-      localReplica
-    else
-      None
+  def leaderLogIfLocal: Option[Log] = {
+    log.filter(_ => isLeader)
   }
 
-  private def localReplicaWithEpochOrException(currentLeaderEpoch: Optional[Integer],
-                                               requireLeader: Boolean): Replica = {
-    getLocalReplica(localBrokerId, currentLeaderEpoch, requireLeader) match {
-      case Left(replica) => replica
+  /**
+   * Returns true if this node is currently leader for the Partition.
+   */
+  def isLeader: Boolean = leaderReplicaIdOpt.contains(localBrokerId)
+
+  private def localLogWithEpochOrException(currentLeaderEpoch: Optional[Integer],
+                                           requireLeader: Boolean): Log = {
+    getLocalLog(currentLeaderEpoch, requireLeader) match {
+      case Left(localLog) => localLog
       case Right(error) =>
-        throw error.exception(s"Failed to find ${if (requireLeader) "leader " else ""} for " +
+        throw error.exception(s"Failed to find ${if (requireLeader) "leader " else ""} log for " +
           s"partition $topicPartition with leader epoch $currentLeaderEpoch. The current leader " +
           s"is $leaderReplicaIdOpt and the current epoch $leaderEpoch")
     }
   }
 
-  // Visible for testing
-  def addReplicaIfNotExists(replica: Replica): Replica =
-    allReplicasMap.putIfNotExists(replica.brokerId, replica)
+  // Visible for testing -- Only used in tests to add replica to existing partition
+  def addReplicaIfNotExists(replica: Replica): Replica = {
+    allReplicaIds.add(replica.brokerId)
+    remoteReplicasMap.getAndMaybePut(replica.brokerId, replica)
+  }
 
-  def assignedReplicas: Set[Replica] =
-    allReplicasMap.values.filter(replica => Request.isValidBrokerId(replica.brokerId)).toSet
+  // Visible for testing -- Used by unit tests to set log for this partition
+  def setLog(log: Log, isFutureLog: Boolean): Unit = {
+    if (isFutureLog)
+      futureLog = Some(log)
+    else
+      this.log = Some(log)
+  }
 
-  def allReplicas: Set[Replica] =
-    allReplicasMap.values.toSet
+  def remoteReplicas: Set[Replica] =
+    remoteReplicasMap.values.toSet
 
   private def removeReplica(replicaId: Int) {
-    allReplicasMap.remove(replicaId)
+    require(replicaId != localBrokerId, s"Cannot remove replica for local broker: $replicaId")
+    allReplicaIds.remove(replicaId)
+    remoteReplicasMap.remove(replicaId)
   }
 
   def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
     inReadLock(leaderIsrUpdateLock) {
-      futureLocalReplica match {
-        case Some(replica) =>
-          if (replica.log.get.dir.getParent != newDestinationDir)
-            true
-          else
-            false
-        case None => false
-      }
+      futureLog.exists(_.dir.getParent != newDestinationDir)
     }
   }
 
   def removeFutureLocalReplica(deleteFromLogDir: Boolean = true) {
     inWriteLock(leaderIsrUpdateLock) {
-      allReplicasMap.remove(Request.FutureLocalReplicaId)
+      futureLog = None
       if (deleteFromLogDir)
         logManager.asyncDelete(topicPartition, isFuture = true)
     }
@@ -433,25 +455,25 @@ class Partition(val topicPartition: TopicPartition,
   // Only ReplicaAlterDirThread will call this method and ReplicaAlterDirThread should remove the partition
   // from its partitionStates if this method returns true
   def maybeReplaceCurrentWithFutureReplica(): Boolean = {
-    val replica = localReplicaOrException
-    val futureReplicaLEO = futureLocalReplica.map(_.logEndOffset)
-    if (futureReplicaLEO.contains(replica.logEndOffset)) {
+    val localReplicaLEO = localLogOrException.logEndOffset
+    val futureReplicaLEO = futureLog.map(_.logEndOffset)
+    if (futureReplicaLEO.contains(localReplicaLEO)) {
       // The write lock is needed to make sure that while ReplicaAlterDirThread checks the LEO of the
       // current replica, no other thread can update LEO of the current replica via log truncation or log append operation.
       inWriteLock(leaderIsrUpdateLock) {
-        futureLocalReplica match {
-          case Some(futureReplica) =>
-            if (replica.logEndOffset == futureReplica.logEndOffset) {
+        futureLog match {
+          case Some(futurePartitionLog) =>
+            if (log.exists(_.logEndOffset == futurePartitionLog.logEndOffset)) {
               logManager.replaceCurrentWithFutureLog(topicPartition)
-              replica.log = futureReplica.log
-              futureReplica.log = None
-              allReplicasMap.remove(Request.FutureLocalReplicaId)
+              log = futureLog
+              removeFutureLocalReplica(false)
               true
             } else false
           case None =>
             // Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called
             // In this case the partition should have been removed from state of the ReplicaAlterLogDirsThread
-            // Return false so that ReplicaAlterLogDirsThread does not have to remove this partition from the state again to avoid race condition
+            // Return false so that ReplicaAlterLogDirsThread does not have to remove this partition from the
+            // state again to avoid race condition
             false
         }
       }
@@ -461,8 +483,11 @@ class Partition(val topicPartition: TopicPartition,
   def delete() {
     // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted
     inWriteLock(leaderIsrUpdateLock) {
-      allReplicasMap.clear()
-      inSyncReplicas = Set.empty[Replica]
+      remoteReplicasMap.clear()
+      allReplicaIds = scala.collection.mutable.Set(localBrokerId)
+      log = None
+      futureLog = None
+      inSyncReplicas = Set.empty[Int]
       leaderReplicaIdOpt = None
       leaderEpochStartOffsetOpt = None
       Partition.removeMetrics(topicPartition)
@@ -484,21 +509,23 @@ class Partition(val topicPartition: TopicPartition,
                  correlationId: Int,
                  highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
-      val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
       // add replicas that are new
-      val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map {
-        id => getOrCreateReplica(id, partitionStateInfo.isNew, highWatermarkCheckpoints)
-      }.toSet
+      val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(_.toInt)
+      newInSyncReplicas.filter(_ != localBrokerId).foreach(getOrCreateReplica)
+      inSyncReplicas = newInSyncReplicas.toSet
+
       // remove assigned replicas that have been removed by the controller
-      (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
-      inSyncReplicas = newInSyncReplicas
-      newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew, highWatermarkCheckpoints))
+      val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
+      (remoteReplicasMap.keys -- newAssignedReplicas).foreach(removeReplica)
+      newAssignedReplicas.filter(_ != localBrokerId).foreach(getOrCreateReplica)
 
-      val leaderReplica = localReplicaOrException
-      val leaderEpochStartOffset = leaderReplica.logEndOffset
+      createLogIfNotExists(localBrokerId, partitionStateInfo.isNew, isFutureReplica = false, highWatermarkCheckpoints)
+
+      val leaderLog = localLogOrException
+      val leaderEpochStartOffset = leaderLog.logEndOffset
       info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.basePartitionState.leaderEpoch} from " +
         s"offset $leaderEpochStartOffset. Previous Leader Epoch was: $leaderEpoch")
 
@@ -512,27 +539,24 @@ class Partition(val topicPartition: TopicPartition,
       // to ensure that these followers can truncate to the right offset, we must cache the new
       // leader epoch and the start offset since it should be larger than any epoch that a follower
       // would try to query.
-      leaderReplica.log.foreach { log =>
-        log.maybeAssignEpochStartOffset(leaderEpoch, leaderEpochStartOffset)
-      }
+      leaderLog.maybeAssignEpochStartOffset(leaderEpoch, leaderEpochStartOffset)
 
-      val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
-      val curLeaderLogEndOffset = leaderReplica.logEndOffset
+      val isNewLeader = !isLeader
+      val curLeaderLogEndOffset = leaderLog.logEndOffset
       val curTimeMs = time.milliseconds
       // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
-      (assignedReplicas - leaderReplica).foreach { replica =>
-        val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
+      remoteReplicas.foreach { replica =>
+        val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica.brokerId)) curTimeMs else 0L
         replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
       }
 
       if (isNewLeader) {
         // construct the high watermark metadata for the new leader replica
-        leaderReplica.maybeFetchHighWatermarkOffsetMetadata()
+        leaderLog.maybeFetchHighWatermarkOffsetMetadata()
         // mark local replica as the leader after converting hw
         leaderReplicaIdOpt = Some(localBrokerId)
         // reset log end offset for remote replicas
-        assignedReplicas.filter(_.brokerId != localBrokerId).foreach { replica =>
-          replica.updateFetchState(
+        remoteReplicas.foreach { _.updateFetchState(
             followerFetchOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata,
             followerStartOffset = Log.UnknownOffset,
             followerFetchTimeMs = 0L,
@@ -541,7 +565,7 @@ class Partition(val topicPartition: TopicPartition,
         }
       }
       // we may need to increment high watermark since ISR could be down to 1
-      (maybeIncrementLeaderHW(leaderReplica), isNewLeader)
+      (maybeIncrementLeaderHW(leaderLog), isNewLeader)
     }
     // some delayed operations may be unblocked after HW changed
     if (leaderHWIncremented)
@@ -567,10 +591,12 @@ class Partition(val topicPartition: TopicPartition,
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
       // add replicas that are new
-      newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew, highWatermarkCheckpoints))
+      newAssignedReplicas.filter(_ != localBrokerId).foreach(id => getOrCreateReplica(id))
+      createLogIfNotExists(localBrokerId, partitionStateInfo.isNew, isFutureReplica = false, highWatermarkCheckpoints)
+
       // remove assigned replicas that have been removed by the controller
-      (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
-      inSyncReplicas = Set.empty[Replica]
+      (remoteReplicasMap.keys -- newAssignedReplicas).foreach(removeReplica)
+      inSyncReplicas = Set.empty[Int]
       leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
       leaderEpochStartOffsetOpt = None
       zkVersion = partitionStateInfo.basePartitionState.zkVersion
@@ -646,21 +672,20 @@ class Partition(val topicPartition: TopicPartition,
   private def maybeExpandIsr(followerReplica: Replica, followerFetchTimeMs: Long): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
       // check if this replica needs to be added to the ISR
-      leaderReplicaIfLocal match {
-        case Some(leaderReplica) =>
-          val leaderHighwatermark = leaderReplica.highWatermark
-          if (!inSyncReplicas.contains(followerReplica) && isFollowerInSync(followerReplica, leaderHighwatermark)) {
-            val newInSyncReplicas = inSyncReplicas + followerReplica
-
-            info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} " +
-              s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")
+      leaderLogIfLocal match {
+        case Some(leaderLog) =>
+          val leaderHighwatermark = leaderLog.highWatermark
+          if (!inSyncReplicas.contains(followerReplica.brokerId) && isFollowerInSync(followerReplica, leaderHighwatermark)) {
+            val newInSyncReplicas = inSyncReplicas + followerReplica.brokerId
+            info(s"Expanding ISR from ${inSyncReplicas.mkString(",")} " +
+              s"to ${newInSyncReplicas.mkString(",")}")
 
             // update ISR in ZK and cache
             expandIsr(newInSyncReplicas)
           }
           // check if the HW of the partition can now be incremented
           // since the replica may already be in the ISR and its LEO has just incremented
-          maybeIncrementLeaderHW(leaderReplica, followerFetchTimeMs)
+          maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs)
         case None => false // nothing to do if no longer leader
       }
     }
@@ -680,22 +705,28 @@ class Partition(val topicPartition: TopicPartition,
    * produce request.
    */
   def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
-    leaderReplicaIfLocal match {
-      case Some(leaderReplica) =>
+    leaderLogIfLocal match {
+      case Some(leaderLog) =>
         // keep the current immutable replica list reference
         val curInSyncReplicas = inSyncReplicas
 
         if (isTraceEnabled) {
-          def logEndOffsetString(r: Replica) = s"broker ${r.brokerId}: ${r.logEndOffset}"
-          val (ackedReplicas, awaitingReplicas) = curInSyncReplicas.partition { replica =>
-            replica.logEndOffset >= requiredOffset
+          def logEndOffsetString: ((Int, Long)) => String = {
+            case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset"
           }
-          trace(s"Progress awaiting ISR acks for offset $requiredOffset: acked: ${ackedReplicas.map(logEndOffsetString)}, " +
+
+          val curInSyncReplicaObjects = (curInSyncReplicas - localBrokerId).map(getReplicaOrException)
+          val replicaInfo = curInSyncReplicaObjects.map(replica => (replica.brokerId, replica.logEndOffset))
+          val localLogInfo = (localBrokerId, localLogOrException.logEndOffset)
+          val (ackedReplicas, awaitingReplicas) = (replicaInfo + localLogInfo).partition { _._2 >= requiredOffset}
+
+          trace(s"Progress awaiting ISR acks for offset $requiredOffset: " +
+            s"acked: ${ackedReplicas.map(logEndOffsetString)}, " +
             s"awaiting ${awaitingReplicas.map(logEndOffsetString)}")
         }
 
-        val minIsr = leaderReplica.log.get.config.minInSyncReplicas
-        if (leaderReplica.highWatermark >= requiredOffset) {
+        val minIsr = leaderLog.config.minInSyncReplicas
+        if (leaderLog.highWatermark >= requiredOffset) {
           /*
            * The topic may be configured not to accept messages if there are not enough replicas in ISR
            * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
@@ -729,24 +760,29 @@ class Partition(val topicPartition: TopicPartition,
    * Note There is no need to acquire the leaderIsrUpdate lock here
    * since all callers of this private API acquire that lock
    */
-  private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = {
-    val allLogEndOffsets = assignedReplicas.filter { replica =>
-      curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicas.contains(replica)
+  private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long = time.milliseconds): Boolean = {
+    val replicaLogEndOffsets = remoteReplicas.filter { replica =>
+      curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicas.contains(replica.brokerId)
     }.map(_.logEndOffsetMetadata)
-    val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
-    val oldHighWatermark = leaderReplica.highWatermarkMetadata
+    val newHighWatermark = (replicaLogEndOffsets + leaderLog.logEndOffsetMetadata).min(new LogOffsetMetadata.OffsetOrdering)
+    val oldHighWatermark = leaderLog.highWatermarkMetadata
 
     // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
     // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
     if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
       (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
-      leaderReplica.highWatermarkMetadata = newHighWatermark
+      leaderLog.highWatermarkMetadata = newHighWatermark
       debug(s"High watermark updated to $newHighWatermark")
       true
     } else {
-      def logEndOffsetString(r: Replica) = s"replica ${r.brokerId}: ${r.logEndOffsetMetadata}"
+      def logEndOffsetString: ((Int, LogOffsetMetadata)) => String = {
+        case (brokerId, logEndOffsetMetadata) => s"replica $brokerId: $logEndOffsetMetadata"
+      }
+
+      val replicaInfo = remoteReplicas.map(replica => (replica.brokerId, replica.logEndOffsetMetadata))
+      val localLogInfo = (localBrokerId, localLogOrException.logEndOffsetMetadata)
       trace(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old hw $oldHighWatermark. " +
-        s"All current LEOs are ${assignedReplicas.map(logEndOffsetString)}")
+        s"All current LEOs are ${(replicaInfo + localLogInfo).map(logEndOffsetString)}")
       false
     }
   }
@@ -757,12 +793,18 @@ class Partition(val topicPartition: TopicPartition,
    * Low watermark will increase when the leader broker receives either FetchRequest or DeleteRecordsRequest.
    */
   def lowWatermarkIfLeader: Long = {
-    if (!isLeaderReplicaLocal)
+    if (!isLeader)
       throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
-    val logStartOffsets = allReplicas.collect {
-      case replica if metadataCache.isBrokerAlive(replica.brokerId) || replica.brokerId == Request.FutureLocalReplicaId => replica.logStartOffset
+    val logStartOffsets = remoteReplicas.collect {
+      case replica if metadataCache.isBrokerAlive(replica.brokerId) => replica.logStartOffset
+    } + localLogOrException.logStartOffset
+
+    futureLog match {
+      case Some(partitionFutureLog) =>
+        CoreUtils.min(logStartOffsets + partitionFutureLog.logStartOffset, 0L)
+      case None =>
+        CoreUtils.min(logStartOffsets, 0L)
     }
-    CoreUtils.min(logStartOffsets, 0L)
   }
 
   /**
@@ -772,19 +814,19 @@ class Partition(val topicPartition: TopicPartition,
 
   def maybeShrinkIsr(replicaMaxLagTimeMs: Long): Unit = {
     val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
-      leaderReplicaIfLocal match {
-        case Some(leaderReplica) =>
-          val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
+      leaderLogIfLocal match {
+        case Some(leaderLog) =>
+          val outOfSyncReplicas = getOutOfSyncReplicas(replicaMaxLagTimeMs)
           if (outOfSyncReplicas.nonEmpty) {
             val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
             assert(newInSyncReplicas.nonEmpty)
             info("Shrinking ISR from %s to %s. Leader: (highWatermark: %d, endOffset: %d). Out of sync replicas: %s."
-              .format(inSyncReplicas.map(_.brokerId).mkString(","),
-                newInSyncReplicas.map(_.brokerId).mkString(","),
-                leaderReplica.highWatermarkMetadata.messageOffset,
-                leaderReplica.logEndOffset,
-                outOfSyncReplicas.map { replica =>
-                  s"(brokerId: ${replica.brokerId}, endOffset: ${replica.logEndOffset})"
+              .format(inSyncReplicas.mkString(","),
+                newInSyncReplicas.mkString(","),
+                leaderLog.highWatermark,
+                leaderLog.logEndOffset,
+                outOfSyncReplicas.map { replicaId =>
+                  s"(brokerId: $replicaId, endOffset: ${getReplicaOrException(replicaId).logEndOffset})"
                 }.mkString(" ")
               )
             )
@@ -793,7 +835,7 @@ class Partition(val topicPartition: TopicPartition,
             shrinkIsr(newInSyncReplicas)
 
             // we may need to increment high watermark since ISR could be down to 1
-            maybeIncrementLeaderHW(leaderReplica)
+            maybeIncrementLeaderHW(leaderLog)
           } else {
             false
           }
@@ -807,15 +849,16 @@ class Partition(val topicPartition: TopicPartition,
       tryCompleteDelayedRequests()
   }
 
-  private def isFollowerOutOfSync(followerReplica: Replica,
+  private def isFollowerOutOfSync(replicaId: Int,
                                   leaderEndOffset: Long,
                                   currentTimeMs: Long,
                                   maxLagMs: Long): Boolean = {
+    val followerReplica = getReplicaOrException(replicaId)
     followerReplica.logEndOffset != leaderEndOffset &&
       (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs
   }
 
-  def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
+  def getOutOfSyncReplicas(maxLagMs: Long): Set[Int] = {
     /**
      * If the follower already has the same leo as the leader, it will not be considered as out-of-sync,
      * otherwise there are two cases that will be handled here -
@@ -828,10 +871,10 @@ class Partition(val topicPartition: TopicPartition,
      * is violated, that replica is considered to be out of sync
      *
      **/
-    val candidateReplicas = inSyncReplicas - leaderReplica
+    val candidateReplicas = inSyncReplicas - localBrokerId
     val currentTimeMs = time.milliseconds()
-    val leaderEndOffset = leaderReplica.logEndOffset
-    candidateReplicas.filter(r => isFollowerOutOfSync(r, leaderEndOffset, currentTimeMs, maxLagMs))
+    val leaderEndOffset = localLogOrException.logEndOffset
+    candidateReplicas.filter(replicaId => isFollowerOutOfSync(replicaId, leaderEndOffset, currentTimeMs, maxLagMs))
   }
 
   private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Option[LogAppendInfo] = {
@@ -841,13 +884,11 @@ class Partition(val topicPartition: TopicPartition,
       if (isFuture) {
         // Note the replica may be undefined if it is removed by a non-ReplicaAlterLogDirsThread before
         // this method is called
-        futureLocalReplica.map { replica =>
-          replica.log.get.appendAsFollower(records)
-        }
+        futureLog.map { _.appendAsFollower(records) }
       } else {
         // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread
         // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
-        Some(localReplicaOrException.log.get.appendAsFollower(records))
+        Some(localLogOrException.appendAsFollower(records))
       }
     }
   }
@@ -857,9 +898,9 @@ class Partition(val topicPartition: TopicPartition,
       doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
     } catch {
       case e: UnexpectedAppendOffsetException =>
-        val replica = if (isFuture) futureLocalReplicaOrException else localReplicaOrException
-        val logEndOffset = replica.logEndOffset
-        if (logEndOffset == replica.logStartOffset &&
+        val log = if (isFuture) futureLocalLogOrException else localLogOrException
+        val logEndOffset = log.logEndOffset
+        if (logEndOffset == log.logStartOffset &&
             e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) {
           // This may happen if the log start offset on the leader (or current replica) falls in
           // the middle of the batch due to delete records request and the follower tries to
@@ -869,7 +910,7 @@ class Partition(val topicPartition: TopicPartition,
           // (base offset of the batch), which will move recoveryPoint backwards, so we will need
           // to checkpoint the new recovery point before we append
           val replicaName = if (isFuture) "future replica" else "follower"
-          info(s"Unexpected offset in append to $topicPartition. First offset ${e.firstOffset} is less than log start offset ${replica.logStartOffset}." +
+          info(s"Unexpected offset in append to $topicPartition. First offset ${e.firstOffset} is less than log start offset ${log.logStartOffset}." +
                s" Since this is the first record to be appended to the $replicaName's log, will start the log from offset ${e.firstOffset}.")
           truncateFullyAndStartAt(e.firstOffset, isFuture)
           doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
@@ -880,23 +921,22 @@ class Partition(val topicPartition: TopicPartition,
 
   def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {
     val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
-      leaderReplicaIfLocal match {
-        case Some(leaderReplica) =>
-          val log = leaderReplica.log.get
-          val minIsr = log.config.minInSyncReplicas
+      leaderLogIfLocal match {
+        case Some(leaderLog) =>
+          val minIsr = leaderLog.config.minInSyncReplicas
           val inSyncSize = inSyncReplicas.size
 
           // Avoid writing to leader if there are not enough insync replicas to make it safe
           if (inSyncSize < minIsr && requiredAcks == -1) {
-            throw new NotEnoughReplicasException(s"The size of the current ISR ${inSyncReplicas.map(_.brokerId)} " +
+            throw new NotEnoughReplicasException(s"The size of the current ISR $inSyncReplicas " +
               s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
           }
 
-          val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient,
+          val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient,
             interBrokerProtocolVersion)
 
           // we may need to increment high watermark since ISR could be down to 1
-          (info, maybeIncrementLeaderHW(leaderReplica))
+          (info, maybeIncrementLeaderHW(leaderLog))
 
         case None =>
           throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
@@ -922,7 +962,7 @@ class Partition(val topicPartition: TopicPartition,
                   fetchOnlyFromLeader: Boolean,
                   minOneMessage: Boolean): LogReadInfo = inReadLock(leaderIsrUpdateLock) {
     // decide whether to only fetch from leader
-    val localReplica = localReplicaWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
+    val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
 
     /* Read the LogOffsetMetadata prior to performing the read from the log.
      * We use the LogOffsetMetadata to determine if a particular replica is in-sync or not.
@@ -930,10 +970,10 @@ class Partition(val topicPartition: TopicPartition,
      * where data gets appended to the log immediately after the replica has consumed from it
      * This can cause a replica to always be out of sync.
      */
-    val initialHighWatermark = localReplica.highWatermark
-    val initialLogStartOffset = localReplica.logStartOffset
-    val initialLogEndOffset = localReplica.logEndOffset
-    val initialLastStableOffset = localReplica.lastStableOffset
+    val initialHighWatermark = localLog.highWatermark
+    val initialLogStartOffset = localLog.logStartOffset
+    val initialLogEndOffset = localLog.logEndOffset
+    val initialLastStableOffset = localLog.lastStableOffset
 
     val maxOffsetOpt = fetchIsolation match {
       case FetchLogEnd => None
@@ -941,16 +981,9 @@ class Partition(val topicPartition: TopicPartition,
       case FetchTxnCommitted => Some(initialLastStableOffset)
     }
 
-    val fetchedData = localReplica.log match {
-      case Some(log) =>
-        log.read(fetchOffset, maxBytes, maxOffsetOpt, minOneMessage,
+    val fetchedData = localLog.read(fetchOffset, maxBytes, maxOffsetOpt, minOneMessage,
           includeAbortedTxns = fetchIsolation == FetchTxnCommitted)
 
-      case None =>
-        error(s"Leader does not have a local log")
-        FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)
-    }
-
     LogReadInfo(
       fetchedData = fetchedData,
       highWatermark = initialHighWatermark,
@@ -964,12 +997,12 @@ class Partition(val topicPartition: TopicPartition,
                               currentLeaderEpoch: Optional[Integer],
                               fetchOnlyFromLeader: Boolean): Option[TimestampAndOffset] = inReadLock(leaderIsrUpdateLock) {
     // decide whether to only fetch from leader
-    val localReplica = localReplicaWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
+    val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
 
     val lastFetchableOffset = isolationLevel match {
-      case Some(IsolationLevel.READ_COMMITTED) => localReplica.lastStableOffset
-      case Some(IsolationLevel.READ_UNCOMMITTED) => localReplica.highWatermark
-      case None => localReplica.logEndOffset
+      case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
+      case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
+      case None => localLog.logEndOffset
     }
 
     val epochLogString = if(currentLeaderEpoch.isPresent) {
@@ -981,10 +1014,10 @@ class Partition(val topicPartition: TopicPartition,
     // Only consider throwing an error if we get a client request (isolationLevel is defined) and the start offset
     // is lagging behind the high watermark
     val maybeOffsetsError: Option[ApiException] = leaderEpochStartOffsetOpt
-      .filter(epochStart => isolationLevel.isDefined && epochStart > localReplica.highWatermark)
+      .filter(epochStart => isolationLevel.isDefined && epochStart > localLog.highWatermark)
       .map(epochStart => Errors.OFFSET_NOT_AVAILABLE.exception(s"Failed to fetch offsets for " +
         s"partition $topicPartition with leader $epochLogString as this partition's " +
-        s"high watermark (${localReplica.highWatermark}) is lagging behind the " +
+        s"high watermark (${localLog.highWatermark}) is lagging behind the " +
         s"start offset from the beginning of this epoch ($epochStart)."))
 
     def getOffsetByTimestamp: Option[TimestampAndOffset] = {
@@ -1008,14 +1041,14 @@ class Partition(val topicPartition: TopicPartition,
   def fetchOffsetSnapshot(currentLeaderEpoch: Optional[Integer],
                           fetchOnlyFromLeader: Boolean): LogOffsetSnapshot = inReadLock(leaderIsrUpdateLock) {
     // decide whether to only fetch from leader
-    val localReplica = localReplicaWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
-    localReplica.offsetSnapshot
+    val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
+    localLog.offsetSnapshot
   }
 
   def fetchOffsetSnapshotOrError(currentLeaderEpoch: Optional[Integer],
                                  fetchOnlyFromLeader: Boolean): Either[LogOffsetSnapshot, Errors] = {
     inReadLock(leaderIsrUpdateLock) {
-      getLocalReplica(localBrokerId, currentLeaderEpoch, fetchOnlyFromLeader)
+      getLocalLog(currentLeaderEpoch, fetchOnlyFromLeader)
         .left.map(_.offsetSnapshot)
     }
   }
@@ -1024,21 +1057,13 @@ class Partition(val topicPartition: TopicPartition,
                                      maxNumOffsets: Int,
                                      isFromConsumer: Boolean,
                                      fetchOnlyFromLeader: Boolean): Seq[Long] = inReadLock(leaderIsrUpdateLock) {
-    val localReplica = localReplicaWithEpochOrException(Optional.empty(), fetchOnlyFromLeader)
-    val allOffsets = logManager.getLog(topicPartition) match {
-      case Some(log) =>
-        log.legacyFetchOffsetsBefore(timestamp, maxNumOffsets)
-      case None =>
-        if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP || timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
-          Seq(0L)
-        else
-          Nil
-    }
+    val localLog = localLogWithEpochOrException(Optional.empty(), fetchOnlyFromLeader)
+    val allOffsets = localLog.legacyFetchOffsetsBefore(timestamp, maxNumOffsets)
 
     if (!isFromConsumer) {
       allOffsets
     } else {
-      val hw = localReplica.highWatermark
+      val hw = localLog.highWatermark
       if (allOffsets.exists(_ > hw))
         hw +: allOffsets.dropWhile(_ > hw)
       else
@@ -1048,7 +1073,7 @@ class Partition(val topicPartition: TopicPartition,
 
   def logStartOffset: Long = {
     inReadLock(leaderIsrUpdateLock) {
-      leaderReplicaIfLocal.map(_.log.get.logStartOffset).getOrElse(-1)
+      leaderLogIfLocal.map(_.logStartOffset).getOrElse(-1)
     }
   }
 
@@ -1059,20 +1084,20 @@ class Partition(val topicPartition: TopicPartition,
    * Return low watermark of the partition.
    */
   def deleteRecordsOnLeader(offset: Long): LogDeleteRecordsResult = inReadLock(leaderIsrUpdateLock) {
-    leaderReplicaIfLocal match {
-      case Some(leaderReplica) =>
-        if (!leaderReplica.log.get.config.delete)
+    leaderLogIfLocal match {
+      case Some(leaderLog) =>
+        if (!leaderLog.config.delete)
           throw new PolicyViolationException(s"Records of partition $topicPartition can not be deleted due to the configured policy")
 
         val convertedOffset = if (offset == DeleteRecordsRequest.HIGH_WATERMARK)
-          leaderReplica.highWatermark
+          leaderLog.highWatermark
         else
           offset
 
         if (convertedOffset < 0)
           throw new OffsetOutOfRangeException(s"The offset $convertedOffset for partition $topicPartition is not valid")
 
-        leaderReplica.maybeIncrementLogStartOffset(convertedOffset)
+        leaderLog.maybeIncrementLogStartOffset(convertedOffset)
         LogDeleteRecordsResult(
           requestedOffset = convertedOffset,
           lowWatermark = lowWatermarkIfLeader)
@@ -1126,10 +1151,10 @@ class Partition(val topicPartition: TopicPartition,
                                leaderEpoch: Int,
                                fetchOnlyFromLeader: Boolean): EpochEndOffset = {
     inReadLock(leaderIsrUpdateLock) {
-      val localReplicaOrError = getLocalReplica(localBrokerId, currentLeaderEpoch, fetchOnlyFromLeader)
-      localReplicaOrError match {
-        case Left(replica) =>
-          replica.endOffsetForEpoch(leaderEpoch) match {
+      val localLogOrError = getLocalLog(currentLeaderEpoch, fetchOnlyFromLeader)
+      localLogOrError match {
+        case Left(localLog) =>
+          localLog.endOffsetForEpoch(leaderEpoch) match {
             case Some(epochAndOffset) => new EpochEndOffset(NONE, epochAndOffset.leaderEpoch, epochAndOffset.offset)
             case None => new EpochEndOffset(NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
           }
@@ -1139,19 +1164,19 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def expandIsr(newIsr: Set[Replica]): Unit = {
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)
+  private def expandIsr(newIsr: Set[Int]): Unit = {
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion)
     val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
     maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
   }
 
-  private def shrinkIsr(newIsr: Set[Replica]): Unit = {
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)
+  private def shrinkIsr(newIsr: Set[Int]): Unit = {
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion)
     val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
     maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
   }
 
-  private def maybeUpdateIsrAndVersion(isr: Set[Replica], zkVersionOpt: Option[Int]): Unit = {
+  private def maybeUpdateIsrAndVersion(isr: Set[Int], zkVersionOpt: Option[Int]): Unit = {
     zkVersionOpt match {
       case Some(newVersion) =>
         inSyncReplicas = isr
@@ -1171,13 +1196,13 @@ class Partition(val topicPartition: TopicPartition,
   override def hashCode: Int =
     31 + topic.hashCode + 17 * partitionId
 
-  override def toString(): String = {
+  override def toString: String = {
     val partitionString = new StringBuilder
     partitionString.append("Topic: " + topic)
     partitionString.append("; Partition: " + partitionId)
     partitionString.append("; Leader: " + leaderReplicaIdOpt)
-    partitionString.append("; AllReplicas: " + allReplicasMap.keys.mkString(","))
-    partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+    partitionString.append("; AllReplicas: " + remoteReplicasMap.keys.mkString(","))
+    partitionString.append("; InSyncReplicas: " + inSyncReplicas.mkString(","))
     partitionString.toString
   }
 }
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 831233e..1c61fad 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -17,20 +17,12 @@
 
 package kafka.cluster
 
-import kafka.log.{Log, LogOffsetSnapshot}
+import kafka.log.{Log}
 import kafka.utils.Logging
-import kafka.server.{LogOffsetMetadata, OffsetAndEpoch}
-import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.errors.OffsetOutOfRangeException
-import org.apache.kafka.common.utils.Time
-
-class Replica(val brokerId: Int,
-              val topicPartition: TopicPartition,
-              time: Time = Time.SYSTEM,
-              initialHighWatermarkValue: Long = 0L,
-              @volatile var log: Option[Log] = None) extends Logging {
-  // the high watermark offset value, in non-leader replicas only its message offsets are kept
-  @volatile private[this] var _highWatermarkMetadata = new LogOffsetMetadata(initialHighWatermarkValue)
+import kafka.server.{LogOffsetMetadata}
+import org.apache.kafka.common.{TopicPartition}
+
+class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Logging {
   // the log end offset value, kept in all replicas;
   // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
   @volatile private[this] var _logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
@@ -50,12 +42,13 @@ class Replica(val brokerId: Int,
   // the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition.
   @volatile private[this] var _lastCaughtUpTimeMs = 0L
 
-  def isLocal: Boolean = log.isDefined
+  def logStartOffset: Long = _logStartOffset
 
-  def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs
+  def logEndOffsetMetadata: LogOffsetMetadata = _logEndOffsetMetadata
+
+  def logEndOffset: Long = logEndOffsetMetadata.messageOffset
 
-  info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue")
-  log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))
+  def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs
 
   /*
    * If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
@@ -78,168 +71,39 @@ class Replica(val brokerId: Int,
     else if (followerFetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
       _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)
 
-    logStartOffset = followerStartOffset
-    logEndOffsetMetadata = followerFetchOffsetMetadata
+    _logStartOffset = followerStartOffset
+    _logEndOffsetMetadata = followerFetchOffsetMetadata
     lastFetchLeaderLogEndOffset = leaderEndOffset
     lastFetchTimeMs = followerFetchTimeMs
+    trace(s"Updated state of replica to $this")
   }
 
-  def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long) {
+  def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long): Unit = {
     lastFetchLeaderLogEndOffset = curLeaderLogEndOffset
     lastFetchTimeMs = curTimeMs
     _lastCaughtUpTimeMs = lastCaughtUpTimeMs
+    trace(s"Reset state of replica to $this")
   }
 
-  private def logEndOffsetMetadata_=(newLogEndOffset: LogOffsetMetadata) {
-    if (isLocal) {
-      throw new KafkaException(s"Should not set log end offset on partition $topicPartition's local replica $brokerId")
-    } else {
-      _logEndOffsetMetadata = newLogEndOffset
-      trace(s"Setting log end offset for replica $brokerId for partition $topicPartition to [${_logEndOffsetMetadata}]")
-    }
-  }
-
-  def latestEpoch: Option[Int] = {
-    if (isLocal) {
-      log.get.latestEpoch
-    } else {
-      throw new KafkaException(s"Cannot get latest epoch of non-local replica of $topicPartition")
-    }
-  }
-
-  def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
-    if (isLocal) {
-      log.get.endOffsetForEpoch(leaderEpoch)
-    } else {
-      throw new KafkaException(s"Cannot lookup end offset for epoch of non-local replica of $topicPartition")
-    }
-  }
-
-  def logEndOffsetMetadata: LogOffsetMetadata =
-    if (isLocal)
-      log.get.logEndOffsetMetadata
-    else
-      _logEndOffsetMetadata
-
-  def logEndOffset: Long =
-    logEndOffsetMetadata.messageOffset
-
-  /**
-   * Increment the log start offset if the new offset is greater than the previous log start offset. The replica
-   * must be local and the new log start offset must be lower than the current high watermark.
-   */
-  def maybeIncrementLogStartOffset(newLogStartOffset: Long) {
-    if (isLocal) {
-      if (newLogStartOffset > highWatermark)
-        throw new OffsetOutOfRangeException(s"Cannot increment the log start offset to $newLogStartOffset of partition $topicPartition " +
-          s"since it is larger than the high watermark $highWatermark")
-      log.get.maybeIncrementLogStartOffset(newLogStartOffset)
-    } else {
-      throw new KafkaException(s"Should not try to delete records on partition $topicPartition's non-local replica $brokerId")
-    }
-  }
-
-  private def logStartOffset_=(newLogStartOffset: Long) {
-    if (isLocal) {
-      throw new KafkaException(s"Should not set log start offset on partition $topicPartition's local replica $brokerId " +
-                               s"without attempting to delete records of the log")
-    } else {
-      _logStartOffset = newLogStartOffset
-      trace(s"Setting log start offset for remote replica $brokerId for partition $topicPartition to [$newLogStartOffset]")
-    }
-  }
-
-  def logStartOffset: Long =
-    if (isLocal)
-      log.get.logStartOffset
-    else
-      _logStartOffset
-
-  def highWatermarkMetadata_=(newHighWatermarkMetadata: LogOffsetMetadata) {
-    if (isLocal) {
-      if (newHighWatermarkMetadata.messageOffset < 0)
-        throw new IllegalArgumentException("High watermark offset should be non-negative")
-
-      _highWatermarkMetadata = newHighWatermarkMetadata
-      log.foreach(_.onHighWatermarkIncremented(newHighWatermarkMetadata.messageOffset))
-      trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermarkMetadata]")
-    } else {
-      throw new KafkaException(s"Should not set high watermark on partition $topicPartition's non-local replica $brokerId")
-    }
-  }
-
-  def highWatermark_=(newHighWatermark: Long): Unit = {
-    highWatermarkMetadata = LogOffsetMetadata(newHighWatermark)
-  }
-
-  def highWatermarkMetadata: LogOffsetMetadata = _highWatermarkMetadata
-
-  def highWatermark: Long = _highWatermarkMetadata.messageOffset
-
-  /**
-   * The last stable offset (LSO) is defined as the first offset such that all lower offsets have been "decided."
-   * Non-transactional messages are considered decided immediately, but transactional messages are only decided when
-   * the corresponding COMMIT or ABORT marker is written. This implies that the last stable offset will be equal
-   * to the high watermark if there are no transactional messages in the log. Note also that the LSO cannot advance
-   * beyond the high watermark.
-   */
-  def lastStableOffsetMetadata: LogOffsetMetadata = {
-    log.map { log =>
-      log.firstUnstableOffset match {
-        case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark => offsetMetadata
-        case _ => highWatermarkMetadata
-      }
-    }.getOrElse(throw new KafkaException(s"Cannot fetch last stable offset on partition $topicPartition's " +
-      s"non-local replica $brokerId"))
-  }
-
-  def lastStableOffset: Long = lastStableOffsetMetadata.messageOffset
-
-  /*
-   * Convert hw to local offset metadata by reading the log at the hw offset.
-   * If the hw offset is out of range, return the first offset of the first log segment as the offset metadata.
-   */
-  def maybeFetchHighWatermarkOffsetMetadata(): Unit = {
-    if (!isLocal)
-      throw new KafkaException(s"Should not construct complete high watermark on partition $topicPartition's non-local replica $brokerId")
-
-    if (highWatermarkMetadata.messageOffsetOnly) {
-      highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermark).getOrElse {
-        log.get.convertToOffsetMetadata(logStartOffset).getOrElse {
-          val firstSegmentOffset = log.get.logSegments.head.baseOffset
-          new LogOffsetMetadata(firstSegmentOffset, firstSegmentOffset, 0)
-        }
-      }
-    }
-  }
-
-  def offsetSnapshot: LogOffsetSnapshot = {
-    LogOffsetSnapshot(
-      logStartOffset = logStartOffset,
-      logEndOffset = logEndOffsetMetadata,
-      highWatermark =  highWatermarkMetadata,
-      lastStableOffset = lastStableOffsetMetadata)
-  }
-
-  override def equals(that: Any): Boolean = that match {
-    case other: Replica => brokerId == other.brokerId && topicPartition == other.topicPartition
-    case _ => false
-  }
-
-  override def hashCode: Int = 31 + topicPartition.hashCode + 17 * brokerId
-
   override def toString: String = {
     val replicaString = new StringBuilder
     replicaString.append("Replica(replicaId=" + brokerId)
     replicaString.append(s", topic=${topicPartition.topic}")
     replicaString.append(s", partition=${topicPartition.partition}")
-    replicaString.append(s", isLocal=$isLocal")
     replicaString.append(s", lastCaughtUpTimeMs=$lastCaughtUpTimeMs")
-    if (isLocal) {
-      replicaString.append(s", highWatermark=$highWatermarkMetadata")
-      replicaString.append(s", lastStableOffset=$lastStableOffsetMetadata")
-    }
+    replicaString.append(s", logStartOffset=$logStartOffset")
+    replicaString.append(s", logEndOffset=$logEndOffset")
+    replicaString.append(s", logEndOffsetMetadata=$logEndOffsetMetadata")
+    replicaString.append(s", lastFetchLeaderLogEndOffset=$lastFetchLeaderLogEndOffset")
+    replicaString.append(s", lastFetchTimeMs=$lastFetchTimeMs")
     replicaString.append(")")
     replicaString.toString
   }
+
+  override def equals(that: Any): Boolean = that match {
+    case other: Replica => brokerId == other.brokerId && topicPartition == other.topicPartition
+    case _ => false
+  }
+
+  override def hashCode: Int = 31 + topicPartition.hashCode + 17 * brokerId
 }
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 38c3617..52aface 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -266,7 +266,7 @@ class Log(@volatile var dir: File,
    * equals the log end offset (which may never happen for a partition under consistent load). This is needed to
    * prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark.
    */
-  @volatile private var replicaHighWatermark: Option[Long] = None
+  @volatile private[this] var _highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(0)
 
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
@@ -285,7 +285,7 @@ class Log(@volatile var dir: File,
     val nextOffset = loadSegments()
 
     /* Calculate the offset of the next message */
-    nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
+    nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
 
     leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))
 
@@ -304,6 +304,67 @@ class Log(@volatile var dir: File,
       s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms")
   }
 
+  def highWatermark: Long = highWatermarkMetadata.messageOffset
+
+  def highWatermark_=(newHighWatermark: Long): Unit = {
+    highWatermarkMetadata = LogOffsetMetadata(newHighWatermark)
+  }
+
+  def highWatermarkMetadata: LogOffsetMetadata = _highWatermarkMetadata
+
+  def highWatermarkMetadata_=(newHighWatermark: LogOffsetMetadata) {
+    if (newHighWatermark.messageOffset < 0)
+      throw new IllegalArgumentException("High watermark offset should be non-negative")
+
+    lock synchronized {
+      _highWatermarkMetadata = newHighWatermark
+      producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
+      updateFirstUnstableOffset()
+    }
+    trace(s"Setting high watermark [$newHighWatermark]")
+  }
+
+  /*
+   * Convert hw to local offset metadata by reading the log at the hw offset.
+   * If the hw offset is out of range, return the first offset of the first log segment as the offset metadata.
+   */
+  def maybeFetchHighWatermarkOffsetMetadata(): Unit = {
+    if (highWatermarkMetadata.messageOffsetOnly) {
+      highWatermarkMetadata = convertToOffsetMetadata(highWatermark).getOrElse {
+        convertToOffsetMetadata(logStartOffset).getOrElse {
+          val firstSegmentOffset = logSegments.head.baseOffset
+          LogOffsetMetadata(firstSegmentOffset, firstSegmentOffset, 0)
+        }
+      }
+    }
+  }
+
+  /**
+    * The last stable offset (LSO) is defined as the first offset such that all lower offsets have been "decided."
+    * Non-transactional messages are considered decided immediately, but transactional messages are only decided when
+    * the corresponding COMMIT or ABORT marker is written. This implies that the last stable offset will be equal
+    * to the high watermark if there are no transactional messages in the log. Note also that the LSO cannot advance
+    * beyond the high watermark.
+    */
+  def lastStableOffsetMetadata: LogOffsetMetadata = {
+    firstUnstableOffset match {
+      case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark => offsetMetadata
+      case _ => highWatermarkMetadata
+    }
+  }
+
+  def lastStableOffset: Long = lastStableOffsetMetadata.messageOffset
+
+  def lastStableOffsetLag: Long = highWatermark - lastStableOffset
+
+  def offsetSnapshot: LogOffsetSnapshot = {
+    LogOffsetSnapshot(
+      logStartOffset = logStartOffset,
+      logEndOffset = logEndOffsetMetadata,
+      highWatermark =  highWatermarkMetadata,
+      lastStableOffset = lastStableOffsetMetadata)
+  }
+
   private val tags = {
     val maybeFutureTag = if (isFuture) Map("is-future" -> "true") else Map.empty[String, String]
     Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) ++ maybeFutureTag
@@ -577,7 +638,7 @@ class Log(@volatile var dir: File,
   }
 
   private def updateLogEndOffset(messageOffset: Long) {
-    nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size)
+    nextOffsetMetadata = LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size)
   }
 
   /**
@@ -1026,14 +1087,6 @@ class Log(@volatile var dir: File,
     }
   }
 
-  def onHighWatermarkIncremented(highWatermark: Long): Unit = {
-    lock synchronized {
-      replicaHighWatermark = Some(highWatermark)
-      producerStateManager.onHighWatermarkUpdated(highWatermark)
-      updateFirstUnstableOffset()
-    }
-  }
-
   private def updateFirstUnstableOffset(): Unit = lock synchronized {
     checkIfMemoryMappedBufferClosed()
     val updatedFirstStableOffset = producerStateManager.firstUnstableOffset match {
@@ -1055,6 +1108,10 @@ class Log(@volatile var dir: File,
    * Increment the log start offset if the provided offset is larger.
    */
   def maybeIncrementLogStartOffset(newLogStartOffset: Long) {
+    if (newLogStartOffset > highWatermark)
+      throw new OffsetOutOfRangeException(s"Cannot increment the log start offset to $newLogStartOffset of partition $topicPartition " +
+        s"since it is larger than the high watermark ${highWatermark}")
+
     // We don't have to write the log start offset to log-start-offset-checkpoint immediately.
     // The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown
     // in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
@@ -1502,10 +1559,9 @@ class Log(@volatile var dir: File,
    * @return the segments ready to be deleted
    */
   private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
-    if (segments.isEmpty || replicaHighWatermark.isEmpty) {
+    if (segments.isEmpty) {
       Seq.empty
     } else {
-      val highWatermark = replicaHighWatermark.get
       val deletable = ArrayBuffer.empty[LogSegment]
       var segmentEntry = segments.firstEntry
       while (segmentEntry != null) {
@@ -1914,7 +1970,18 @@ class Log(@volatile var dir: File,
     }
   }
 
-  override def toString = "Log(" + dir + ")"
+  override def toString: String = {
+    val logString = new StringBuilder
+    logString.append(s"Log(dir=$dir")
+    logString.append(s", topic=${topicPartition.topic}")
+    logString.append(s", partition=${topicPartition.partition}")
+    logString.append(s", highWatermark=$highWatermarkMetadata")
+    logString.append(s", lastStableOffset=$lastStableOffsetMetadata")
+    logString.append(s", logStartOffset=$logStartOffset")
+    logString.append(s", logEndOffset=$logEndOffset")
+    logString.append(")")
+    logString.toString
+  }
 
   /**
    * This method performs an asynchronous log segment delete by doing the following:
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 91cf79e..6455746 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -803,6 +803,7 @@ class LogManager(logDirs: Seq[File],
         throw new KafkaStorageException(s"The future replica for $topicPartition is offline")
 
       destLog.renameDir(Log.logDirName(topicPartition))
+      destLog.highWatermarkMetadata = sourceLog.highWatermarkMetadata
       // Now that future replica has been successfully renamed to be the current replica
       // Update the cached map and log cleaner as appropriate.
       futureLogs.remove(topicPartition)
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index ecd85f9..08182e8 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -301,7 +301,7 @@ class LogSegment private[log] (val log: FileRecords,
       return null
 
     val startPosition = startOffsetAndSize.position
-    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
+    val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
 
     val adjustedMaxSize =
       if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 203cc62..0aec85a 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -631,7 +631,7 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
-  def partitionCount() = {
+  def partitionCount(): Int = {
     partitionMapLock.lockInterruptibly()
     try partitionStates.size
     finally partitionMapLock.unlock()
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index 236c8d1..852f72d 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -74,7 +74,7 @@ class DelayedDeleteRecords(delayMs: Long,
       if (status.acksPending) {
         val (lowWatermarkReached, error, lw) = replicaManager.getPartition(topicPartition) match {
           case HostedPartition.Online(partition) =>
-            partition.leaderReplicaIfLocal match {
+            partition.leaderLogIfLocal match {
               case Some(_) =>
                 val leaderLW = partition.lowWatermarkIfLeader
                 (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
index 67afac6..6423cfc 100644
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -21,7 +21,7 @@ import kafka.log.Log
 import org.apache.kafka.common.KafkaException
 
 object LogOffsetMetadata {
-  val UnknownOffsetMetadata = new LogOffsetMetadata(-1, 0, 0)
+  val UnknownOffsetMetadata = LogOffsetMetadata(-1, 0, 0)
   val UnknownFilePosition = -1
 
   class OffsetOrdering extends Ordering[LogOffsetMetadata] {
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 8b45501..18f89f7 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -56,15 +56,15 @@ class ReplicaAlterLogDirsThread(name: String,
   private var inProgressPartition: Option[TopicPartition] = None
 
   override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
-    replicaMgr.futureLocalReplicaOrException(topicPartition).latestEpoch
+    replicaMgr.futureLocalLogOrException(topicPartition).latestEpoch
   }
 
   override protected def logEndOffset(topicPartition: TopicPartition): Long = {
-    replicaMgr.futureLocalReplicaOrException(topicPartition).logEndOffset
+    replicaMgr.futureLocalLogOrException(topicPartition).logEndOffset
   }
 
   override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = {
-    replicaMgr.futureLocalReplicaOrException(topicPartition).endOffsetForEpoch(epoch)
+    replicaMgr.futureLocalLogOrException(topicPartition).endOffsetForEpoch(epoch)
   }
 
   def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
@@ -102,17 +102,16 @@ class ReplicaAlterLogDirsThread(name: String,
                                     fetchOffset: Long,
                                     partitionData: PartitionData[Records]): Option[LogAppendInfo] = {
     val partition = replicaMgr.nonOfflinePartition(topicPartition).get
-    val futureReplica = partition.futureLocalReplicaOrException
+    val futureLog = partition.futureLocalLogOrException
     val records = toMemoryRecords(partitionData.records)
 
-    if (fetchOffset != futureReplica.logEndOffset)
+    if (fetchOffset != futureLog.logEndOffset)
       throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format(
-        topicPartition, fetchOffset, futureReplica.logEndOffset))
+        topicPartition, fetchOffset, futureLog.logEndOffset))
 
     val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
-    val futureReplicaHighWatermark = futureReplica.logEndOffset.min(partitionData.highWatermark)
-    futureReplica.highWatermark = futureReplicaHighWatermark
-    futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
+    futureLog.highWatermark = futureLog.logEndOffset.min(partitionData.highWatermark)
+    futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset)
 
     if (partition.maybeReplaceCurrentWithFutureReplica())
       removePartitions(Set(topicPartition))
@@ -228,7 +227,7 @@ class ReplicaAlterLogDirsThread(name: String,
     val partitionsWithError = mutable.Set[TopicPartition]()
 
     try {
-      val logStartOffset = replicaMgr.futureLocalReplicaOrException(tp).logStartOffset
+      val logStartOffset = replicaMgr.futureLocalLogOrException(tp).logStartOffset
       requestMap.put(tp, new FetchRequest.PartitionData(fetchState.fetchOffset, logStartOffset,
         fetchSize, Optional.of(fetchState.currentLeaderEpoch)))
     } catch {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 947e16a..ccf9bc1 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -99,15 +99,15 @@ class ReplicaFetcherThread(name: String,
   private val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
 
   override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
-    replicaMgr.localReplicaOrException(topicPartition).latestEpoch
+    replicaMgr.localLogOrException(topicPartition).latestEpoch
   }
 
   override protected def logEndOffset(topicPartition: TopicPartition): Long = {
-    replicaMgr.localReplicaOrException(topicPartition).logEndOffset
+    replicaMgr.localLogOrException(topicPartition).logEndOffset
   }
 
   override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = {
-    replicaMgr.localReplicaOrException(topicPartition).endOffsetForEpoch(epoch)
+    replicaMgr.localLogOrException(topicPartition).endOffsetForEpoch(epoch)
   }
 
   override def initiateShutdown(): Boolean = {
@@ -144,32 +144,32 @@ class ReplicaFetcherThread(name: String,
                                     fetchOffset: Long,
                                     partitionData: FetchData): Option[LogAppendInfo] = {
     val partition = replicaMgr.nonOfflinePartition(topicPartition).get
-    val replica = partition.localReplicaOrException
+    val log = partition.localLogOrException
     val records = toMemoryRecords(partitionData.records)
 
     maybeWarnIfOversizedRecords(records, topicPartition)
 
-    if (fetchOffset != replica.logEndOffset)
+    if (fetchOffset != log.logEndOffset)
       throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
-        topicPartition, fetchOffset, replica.logEndOffset))
+        topicPartition, fetchOffset, log.logEndOffset))
 
     if (isTraceEnabled)
       trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
-        .format(replica.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
+        .format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
 
     // Append the leader's messages to the log
     val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
 
     if (isTraceEnabled)
       trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
-        .format(replica.logEndOffset, records.sizeInBytes, topicPartition))
-    val followerHighWatermark = replica.logEndOffset.min(partitionData.highWatermark)
+        .format(log.logEndOffset, records.sizeInBytes, topicPartition))
+    val followerHighWatermark = log.logEndOffset.min(partitionData.highWatermark)
     val leaderLogStartOffset = partitionData.logStartOffset
     // for the follower replica, we do not need to keep
     // its segment base offset the physical position,
     // these values will be computed upon making the leader
-    replica.highWatermark = followerHighWatermark
-    replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
+    log.highWatermark = followerHighWatermark
+    log.maybeIncrementLogStartOffset(leaderLogStartOffset)
     if (isTraceEnabled)
       trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")
 
@@ -245,7 +245,7 @@ class ReplicaFetcherThread(name: String,
       // We will not include a replica in the fetch request if it should be throttled.
       if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) {
         try {
-          val logStartOffset = replicaMgr.localReplicaOrException(topicPartition).logStartOffset
+          val logStartOffset = replicaMgr.localLogOrException(topicPartition).logStartOffset
           builder.add(topicPartition, new FetchRequest.PartitionData(
             fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch)))
         } catch {
@@ -278,13 +278,13 @@ class ReplicaFetcherThread(name: String,
    */
   override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = {
     val partition = replicaMgr.nonOfflinePartition(tp).get
-    val replica = partition.localReplicaOrException
+    val log = partition.localLogOrException
 
     partition.truncateTo(offsetTruncationState.offset, isFuture = false)
 
-    if (offsetTruncationState.offset < replica.highWatermark)
+    if (offsetTruncationState.offset < log.highWatermark)
       warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below high watermark " +
-        s"${replica.highWatermark}")
+        s"${log.highWatermark}")
 
     // mark the future replica for truncation only when we do last truncation
     if (offsetTruncationState.truncationCompleted)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7ded985..b107974 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.locks.Lock
 
 import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.api._
-import kafka.cluster.{BrokerEndPoint, Partition, Replica}
+import kafka.cluster.{BrokerEndPoint, Partition}
 import kafka.controller.{KafkaController, StateChangeLogger}
 import kafka.log._
 import kafka.metrics.KafkaMetricsGroup
@@ -445,24 +445,20 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def localReplicaOrException(topicPartition: TopicPartition): Replica = {
-    getPartitionOrException(topicPartition, expectLeader = false).localReplicaOrException
+  def localLogOrException(topicPartition: TopicPartition): Log = {
+    getPartitionOrException(topicPartition, expectLeader = false).localLogOrException
   }
 
-  def futureLocalReplicaOrException(topicPartition: TopicPartition): Replica = {
-    getPartitionOrException(topicPartition, expectLeader = false).futureLocalReplicaOrException
+  def futureLocalLogOrException(topicPartition: TopicPartition): Log = {
+    getPartitionOrException(topicPartition, expectLeader = false).futureLocalLogOrException
   }
 
-  def futureLocalReplica(topicPartition: TopicPartition): Option[Replica] = {
-    nonOfflinePartition(topicPartition).flatMap(_.futureLocalReplica)
-  }
-
-  def localReplica(topicPartition: TopicPartition): Option[Replica] = {
-    nonOfflinePartition(topicPartition).flatMap(_.localReplica)
+  def localLog(topicPartition: TopicPartition): Option[Log] = {
+    nonOfflinePartition(topicPartition).flatMap(_.log)
   }
 
   def getLogDir(topicPartition: TopicPartition): Option[String] = {
-    localReplica(topicPartition).flatMap(_.log).map(_.dir.getParent)
+    localLog(topicPartition).map(_.dir.getParent)
   }
 
   /**
@@ -599,9 +595,9 @@ class ReplicaManager(val config: KafkaConfig,
           // 2) Respond with ReplicaNotAvailableException for this partition in the AlterReplicaLogDirsResponse
           logManager.maybeUpdatePreferredLogDir(topicPartition, destinationDir)
 
-          // throw ReplicaNotAvailableException if replica does not exit for the given partition
+          // throw ReplicaNotAvailableException if replica does not exist for the given partition
           val partition = getPartitionOrException(topicPartition, expectLeader = false)
-          partition.localReplicaOrException
+          partition.localLogOrException
 
           // If the destinationLDir is different from the current log directory of the replica:
           // - If there is no offline log directory, create the future log in the destinationDir (if it does not exist) and
@@ -610,11 +606,11 @@ class ReplicaManager(val config: KafkaConfig,
           //   so that we can avoid creating future log for the same partition in multiple log directories.
           val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
           if (partition.maybeCreateFutureReplica(destinationDir, highWatermarkCheckpoints)) {
-            val futureReplica = futureLocalReplicaOrException(topicPartition)
+            val futureLog = futureLocalLogOrException(topicPartition)
             logManager.abortAndPauseCleaning(topicPartition)
 
             val initialFetchState = InitialFetchState(BrokerEndPoint(config.brokerId, "localhost", -1),
-              partition.getLeaderEpoch, futureReplica.highWatermark)
+              partition.getLeaderEpoch, futureLog.highWatermark)
             replicaAlterLogDirsManager.addFetcherForPartitions(Map(topicPartition -> initialFetchState))
           }
 
@@ -674,12 +670,12 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def getLogEndOffsetLag(topicPartition: TopicPartition, logEndOffset: Long, isFuture: Boolean): Long = {
-    localReplica(topicPartition) match {
-      case Some(replica) =>
+    localLog(topicPartition) match {
+      case Some(log) =>
         if (isFuture)
-          replica.logEndOffset - logEndOffset
+          log.logEndOffset - logEndOffset
         else
-          math.max(replica.highWatermark - logEndOffset, 0)
+          math.max(log.highWatermark - logEndOffset, 0)
       case None =>
         // return -1L to indicate that the LEO lag is not available if the replica is not created or is offline
         DescribeLogDirsResponse.INVALID_OFFSET_LAG
@@ -1009,13 +1005,11 @@ class ReplicaManager(val config: KafkaConfig,
    *  the quota is exceeded and the replica is not in sync.
    */
   def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicPartition, replicaId: Int): Boolean = {
-    val isReplicaInSync = nonOfflinePartition(topicPartition).exists { partition =>
-      partition.getReplica(replicaId).exists(partition.inSyncReplicas.contains)
-    }
+    val isReplicaInSync = nonOfflinePartition(topicPartition).exists(_.inSyncReplicas.contains(replicaId))
     !isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
   }
 
-  def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = localReplica(topicPartition).flatMap(_.log.map(_.config))
+  def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = localLog(topicPartition).map(_.config)
 
   def getMagic(topicPartition: TopicPartition): Option[Byte] = getLogConfig(topicPartition).map(_.messageFormatVersion.recordVersion.value)
 
@@ -1132,7 +1126,7 @@ class ReplicaManager(val config: KafkaConfig,
            * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
            * we need to map this topic-partition to OfflinePartition instead.
            */
-          if (localReplica(topicPartition).isEmpty)
+          if (localLog(topicPartition).isEmpty)
             markPartitionOffline(topicPartition)
         }
 
@@ -1144,18 +1138,19 @@ class ReplicaManager(val config: KafkaConfig,
         for (partition <- newPartitions) {
           val topicPartition = partition.topicPartition
           if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
-            partition.localReplica.foreach { replica =>
+            partition.log.foreach { log =>
               val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
               // Add future replica to partition's map
-              partition.getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false, highWatermarkCheckpoints)
+              partition.createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true,
+                highWatermarkCheckpoints)
 
               // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
               // replica from source dir to destination dir
               logManager.abortAndPauseCleaning(topicPartition)
 
               futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader,
-                partition.getLeaderEpoch, replica.highWatermark))
+                partition.getLeaderEpoch, log.highWatermark))
             }
           }
         }
@@ -1304,7 +1299,8 @@ class ReplicaManager(val config: KafkaConfig,
                 s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
               // Create the local replica even if the leader is unavailable. This is required to ensure that we include
               // the partition's high watermark in the checkpoint file (see KAFKA-1647)
-              partition.getOrCreateReplica(localBrokerId, isNew = partitionStateInfo.isNew, highWatermarkCheckpoints)
+              partition.createLogIfNotExists(localBrokerId, isNew = partitionStateInfo.isNew, isFutureReplica = false,
+                highWatermarkCheckpoints)
           }
         } catch {
           case e: KafkaStorageException =>
@@ -1350,7 +1346,7 @@ class ReplicaManager(val config: KafkaConfig,
         val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>
           val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get
             .brokerEndPoint(config.interBrokerListenerName)
-          val fetchOffset = partition.localReplicaOrException.highWatermark
+          val fetchOffset = partition.localLogOrException.highWatermark
           partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset)
        }.toMap
 
@@ -1415,7 +1411,7 @@ class ReplicaManager(val config: KafkaConfig,
             } else {
               warn(s"Leader $localBrokerId failed to record follower $followerId's position " +
                 s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be " +
-                s"one of the assigned replicas ${partition.assignedReplicas.map(_.brokerId).mkString(",")} " +
+                s"one of the assigned replicas ${partition.allReplicaIds.mkString(",")} " +
                 s"for partition $topicPartition. Empty records will be returned for this partition.")
               readResult.withEmptyFetchInfo
             }
@@ -1429,22 +1425,22 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   private def leaderPartitionsIterator: Iterator[Partition] =
-    nonOfflinePartitionsIterator.filter(_.leaderReplicaIfLocal.isDefined)
+    nonOfflinePartitionsIterator.filter(_.leaderLogIfLocal.isDefined)
 
   def getLogEndOffset(topicPartition: TopicPartition): Option[Long] =
-    nonOfflinePartition(topicPartition).flatMap(_.leaderReplicaIfLocal.map(_.logEndOffset))
+    nonOfflinePartition(topicPartition).flatMap(_.leaderLogIfLocal.map(_.logEndOffset))
 
   // Flushes the highwatermark value for all partitions to the highwatermark file
   def checkpointHighWatermarks() {
-    val replicas = nonOfflinePartitionsIterator.flatMap { partition =>
-      val replicasList: mutable.Set[Replica] = mutable.Set()
-      partition.localReplica.foreach(replicasList.add)
-      partition.futureLocalReplica.foreach(replicasList.add)
-      replicasList
-    }.filter(_.log.isDefined).toBuffer
-    val replicasByDir = replicas.groupBy(_.log.get.dir.getParent)
-    for ((dir, reps) <- replicasByDir) {
-      val hwms = reps.map(r => r.topicPartition -> r.highWatermark).toMap
+    val localLogs = nonOfflinePartitionsIterator.flatMap { partition =>
+      val logsList: mutable.Set[Log] = mutable.Set()
+      partition.log.foreach(logsList.add)
+      partition.futureLog.foreach(logsList.add)
+      logsList
+    }.toBuffer
+    val logsByDir = localLogs.groupBy(_.dir.getParent)
+    for ((dir, logs) <- logsByDir) {
+      val hwms = logs.map(log => log.topicPartition -> log.highWatermark).toMap
       try {
         highWatermarkCheckpoints.get(dir).foreach(_.write(hwms))
       } catch {
@@ -1468,15 +1464,11 @@ class ReplicaManager(val config: KafkaConfig,
     info(s"Stopping serving replicas in dir $dir")
     replicaStateChangeLock synchronized {
       val newOfflinePartitions = nonOfflinePartitionsIterator.filter { partition =>
-        partition.localReplica.exists { replica =>
-          replica.log.isDefined && replica.log.get.dir.getParent == dir
-        }
+        partition.log.exists { _.dir.getParent == dir }
       }.map(_.topicPartition).toSet
 
       val partitionsWithOfflineFutureReplica = nonOfflinePartitionsIterator.filter { partition =>
-        partition.futureLocalReplica.exists { replica =>
-          replica.log.isDefined && replica.log.get.dir.getParent == dir
-        }
+        partition.futureLog.exists { _.dir.getParent == dir }
       }.toSet
 
       replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index d622a79..306bcb5 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -849,7 +849,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     assertEquals(3L, lowWatermark)
 
     for (i <- 0 until brokerCount)
-      assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset)
+      assertEquals(3, servers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
   }
 
   @Test
@@ -858,16 +858,16 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1
 
     def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = {
-      TestUtils.waitUntilTrue(() => servers(followerIndex).replicaManager.localReplica(topicPartition) != None,
+      TestUtils.waitUntilTrue(() => servers(followerIndex).replicaManager.localLog(topicPartition) != None,
                               "Expected follower to create replica for partition")
 
       // wait until the follower discovers that log start offset moved beyond its HW
       TestUtils.waitUntilTrue(() => {
-        servers(followerIndex).replicaManager.localReplica(topicPartition).get.logStartOffset == expectedStartOffset
+        servers(followerIndex).replicaManager.localLog(topicPartition).get.logStartOffset == expectedStartOffset
       }, s"Expected follower to discover new log start offset $expectedStartOffset")
 
       TestUtils.waitUntilTrue(() => {
-        servers(followerIndex).replicaManager.localReplica(topicPartition).get.logEndOffset == expectedEndOffset
+        servers(followerIndex).replicaManager.localLog(topicPartition).get.logEndOffset == expectedEndOffset
       }, s"Expected follower to catch up to log end offset $expectedEndOffset")
     }
 
@@ -888,7 +888,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
     // after the new replica caught up, all replicas should have same log start offset
     for (i <- 0 until brokerCount)
-      assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset)
+      assertEquals(3, servers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
 
     // kill the same follower again, produce more records, and delete records beyond follower's LOE
     killBroker(followerIndex)
@@ -912,8 +912,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     result.all().get()
     // make sure we are in the expected state after delete records
     for (i <- 0 until brokerCount) {
-      assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset)
-      assertEquals(expectedLEO, servers(i).replicaManager.localReplica(topicPartition).get.logEndOffset)
+      assertEquals(3, servers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
+      assertEquals(expectedLEO, servers(i).replicaManager.localLog(topicPartition).get.logEndOffset)
     }
 
     // we will create another dir just for one server
@@ -927,8 +927,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     }, "timed out waiting for replica movement")
 
     // once replica moved, its LSO and LEO should match other replicas
-    assertEquals(3, servers(0).replicaManager.localReplica(topicPartition).get.logStartOffset)
-    assertEquals(expectedLEO, servers(0).replicaManager.localReplica(topicPartition).get.logEndOffset)
+    assertEquals(3, servers.head.replicaManager.localLog(topicPartition).get.logStartOffset)
+    assertEquals(expectedLEO, servers.head.replicaManager.localLog(topicPartition).get.logEndOffset)
   }
 
   @Test
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 16a117b..1bd1c81 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -132,7 +132,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
 
     // wait until all the followers have synced the last HW with leader
     TestUtils.waitUntilTrue(() => servers.forall(server =>
-      server.replicaManager.localReplica(tp).get.highWatermark == numRecords
+      server.replicaManager.localLog(tp).get.highWatermark == numRecords
     ), "Failed to update high watermark for followers after timeout")
 
     val scheduler = new BounceBrokerScheduler(numIters)
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index d604cd0..2b36da8 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -99,14 +99,14 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     val newLeaderServer = servers.find(_.config.brokerId == 101).get
 
     TestUtils.waitUntilTrue (
-      () => newLeaderServer.replicaManager.nonOfflinePartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined,
+      () => newLeaderServer.replicaManager.nonOfflinePartition(topicPartition).flatMap(_.leaderLogIfLocal).isDefined,
       "broker 101 should be the new leader", pause = 1L
     )
 
-    assertEquals(100, newLeaderServer.replicaManager.localReplicaOrException(topicPartition)
+    assertEquals(100, newLeaderServer.replicaManager.localLogOrException(topicPartition)
       .highWatermark)
     val newFollowerServer = servers.find(_.config.brokerId == 102).get
-    TestUtils.waitUntilTrue(() => newFollowerServer.replicaManager.localReplicaOrException(topicPartition)
+    TestUtils.waitUntilTrue(() => newFollowerServer.replicaManager.localLogOrException(topicPartition)
       .highWatermark == 100,
       "partition follower's highWatermark should be 100")
   }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 650dce4..bb1270d 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -41,9 +41,9 @@ import org.junit.{After, Before, Test}
 import org.junit.Assert._
 import org.mockito.Mockito.{doNothing, mock, when}
 import org.scalatest.Assertions.assertThrows
+import org.mockito.ArgumentMatchers
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
-import org.mockito.ArgumentMatchers
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
@@ -125,7 +125,7 @@ class PartitionTest {
     assertEquals(4, log.logEndOffset)
 
     val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true, log = log)
-    assertEquals(Some(4), partition.leaderReplicaIfLocal.map(_.logEndOffset))
+    assertEquals(Some(4), partition.leaderLogIfLocal.map(_.logEndOffset))
 
     val epochEndOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpoch = Optional.of[Integer](leaderEpoch),
       leaderEpoch = leaderEpoch, fetchOnlyFromLeader = true)
@@ -153,7 +153,7 @@ class PartitionTest {
     assertEquals(4, log.logEndOffset)
 
     val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true, log = log)
-    assertEquals(Some(4), partition.leaderReplicaIfLocal.map(_.logEndOffset))
+    assertEquals(Some(4), partition.leaderLogIfLocal.map(_.logEndOffset))
     assertEquals(None, log.latestEpoch)
 
     val epochEndOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpoch = Optional.of[Integer](leaderEpoch),
@@ -168,7 +168,7 @@ class PartitionTest {
     val latch = new CountDownLatch(1)
 
     logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
-    partition.getOrCreateReplica(brokerId, isNew = true, offsetCheckpoints)
+    partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
     logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
     partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
 
@@ -192,7 +192,7 @@ class PartitionTest {
     latch.countDown()
     thread1.join()
     thread2.join()
-    assertEquals(None, partition.futureLocalReplica)
+    assertEquals(None, partition.futureLog)
   }
 
   @Test
@@ -200,13 +200,12 @@ class PartitionTest {
   // active segment
   def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = {
     logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
-    val currentReplica = partition.getOrCreateReplica(brokerId, isNew = true, offsetCheckpoints)
+    partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
     logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
     partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
-    val futureReplica = partition.futureLocalReplicaOrException
 
     // Write records with duplicate keys to current replica and roll at offset 6
-    val currentLog = currentReplica.log.get
+    val currentLog = partition.log.get
     currentLog.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
       new SimpleRecord("k1".getBytes, "v1".getBytes),
       new SimpleRecord("k1".getBytes, "v2".getBytes),
@@ -231,7 +230,8 @@ class PartitionTest {
     builder.appendWithOffset(6L, new SimpleRecord("k3".getBytes, "v7".getBytes))
     builder.appendWithOffset(7L, new SimpleRecord("k4".getBytes, "v8".getBytes))
 
-    futureReplica.log.get.appendAsFollower(builder.build())
+    val futureLog = partition.futureLocalLogOrException
+    futureLog.appendAsFollower(builder.build())
 
     assertTrue(partition.maybeReplaceCurrentWithFutureReplica())
   }
@@ -483,15 +483,14 @@ class PartitionTest {
     assertTrue("Expected first makeLeader() to return 'leader changed'",
       partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints))
     assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
-    assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas.map(_.brokerId))
+    assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas)
 
     // after makeLeader(() call, partition should know about all the replicas
-    val leaderReplica = partition.getReplica(leader).get
-
     // append records with initial leader epoch
     partition.appendRecordsToLeader(batch1, isFromClient = true)
     partition.appendRecordsToLeader(batch2, isFromClient = true)
-    assertEquals("Expected leader's HW not move", leaderReplica.logStartOffset, leaderReplica.highWatermark)
+    assertEquals("Expected leader's HW not move", partition.localLogOrException.logStartOffset,
+      partition.localLogOrException.highWatermark)
 
     // let the follower in ISR move leader's HW to move further but below LEO
     def updateFollowerFetchState(followerId: Int, fetchOffsetMetadata: LogOffsetMetadata): Unit = {
@@ -500,7 +499,7 @@ class PartitionTest {
         followerFetchOffsetMetadata = fetchOffsetMetadata,
         followerStartOffset = 0L,
         followerFetchTimeMs = time.milliseconds(),
-        leaderEndOffset = leaderReplica.logEndOffset)
+        leaderEndOffset = partition.localLogOrException.logEndOffset)
     }
 
     def fetchOffsetsForTimestamp(timestamp: Long, isolation: Option[IsolationLevel]): Either[ApiException, Option[TimestampAndOffset]] = {
@@ -527,7 +526,7 @@ class PartitionTest {
     updateFollowerFetchState(follower2, LogOffsetMetadata(2))
 
     // At this point, the leader has gotten 5 writes, but followers have only fetched two
-    assertEquals(2, partition.localReplica.get.highWatermark)
+    assertEquals(2, partition.localLogOrException.highWatermark)
 
     // Get the LEO
     fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, None) match {
@@ -617,7 +616,7 @@ class PartitionTest {
   private def setupPartitionWithMocks(leaderEpoch: Int,
                                       isLeader: Boolean,
                                       log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = {
-    val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
+    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
 
     val controllerId = 0
     val controllerEpoch = 0
@@ -629,13 +628,12 @@ class PartitionTest {
         partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
           leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
       assertEquals(leaderEpoch, partition.getLeaderEpoch)
-      assertEquals(Some(replica), partition.leaderReplicaIfLocal)
     } else {
       assertTrue("Expected become follower transition to succeed",
         partition.makeFollower(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId + 1,
           leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
       assertEquals(leaderEpoch, partition.getLeaderEpoch)
-      assertEquals(None, partition.leaderReplicaIfLocal)
+      assertEquals(None, partition.leaderLogIfLocal)
     }
 
     partition
@@ -643,20 +641,22 @@ class PartitionTest {
 
   @Test
   def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
-    val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
+    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    val log = partition.localLogOrException
+
     val initialLogStartOffset = 5L
     partition.truncateFullyAndStartAt(initialLogStartOffset, isFuture = false)
     assertEquals(s"Log end offset after truncate fully and start at $initialLogStartOffset:",
-                 initialLogStartOffset, replica.logEndOffset)
+                 initialLogStartOffset, log.logEndOffset)
     assertEquals(s"Log start offset after truncate fully and start at $initialLogStartOffset:",
-                 initialLogStartOffset, replica.logStartOffset)
+                 initialLogStartOffset, log.logStartOffset)
 
     // verify that we cannot append records that do not contain log start offset even if the log is empty
     assertThrows[UnexpectedAppendOffsetException] {
       // append one record with offset = 3
       partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L), isFuture = false)
     }
-    assertEquals(s"Log end offset should not change after failure to append", initialLogStartOffset, replica.logEndOffset)
+    assertEquals(s"Log end offset should not change after failure to append", initialLogStartOffset, log.logEndOffset)
 
     // verify that we can append records that contain log start offset, even when first
     // offset < log start offset if the log is empty
@@ -666,13 +666,13 @@ class PartitionTest {
                                      new SimpleRecord("k3".getBytes, "v3".getBytes)),
                                 baseOffset = newLogStartOffset)
     partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
-    assertEquals(s"Log end offset after append of 3 records with base offset $newLogStartOffset:", 7L, replica.logEndOffset)
-    assertEquals(s"Log start offset after append of 3 records with base offset $newLogStartOffset:", newLogStartOffset, replica.logStartOffset)
+    assertEquals(s"Log end offset after append of 3 records with base offset $newLogStartOffset:", 7L, log.logEndOffset)
+    assertEquals(s"Log start offset after append of 3 records with base offset $newLogStartOffset:", newLogStartOffset, log.logStartOffset)
 
     // and we can append more records after that
     partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L), isFuture = false)
-    assertEquals(s"Log end offset after append of 1 record at offset 7:", 8L, replica.logEndOffset)
-    assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset)
+    assertEquals(s"Log end offset after append of 1 record at offset 7:", 8L, log.logEndOffset)
+    assertEquals(s"Log start offset not expected to change:", newLogStartOffset, log.logStartOffset)
 
     // but we cannot append to offset < log start if the log is not empty
     assertThrows[UnexpectedAppendOffsetException] {
@@ -681,12 +681,12 @@ class PartitionTest {
                                    baseOffset = 3L)
       partition.appendRecordsToFollowerOrFutureReplica(records2, isFuture = false)
     }
-    assertEquals(s"Log end offset should not change after failure to append", 8L, replica.logEndOffset)
+    assertEquals(s"Log end offset should not change after failure to append", 8L, log.logEndOffset)
 
     // we still can append to next offset
     partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L), isFuture = false)
-    assertEquals(s"Log end offset after append of 1 record at offset 8:", 9L, replica.logEndOffset)
-    assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset)
+    assertEquals(s"Log end offset after append of 1 record at offset 8:", 9L, log.logEndOffset)
+    assertEquals(s"Log start offset not expected to change:", newLogStartOffset, log.logStartOffset)
   }
 
   @Test
@@ -699,13 +699,12 @@ class PartitionTest {
 
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
-    val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
+    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
 
     assertTrue("Expected become leader transition to succeed",
       partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
         leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
     assertEquals(leaderEpoch, partition.getLeaderEpoch)
-    assertEquals(Some(replica), partition.leaderReplicaIfLocal)
 
     val records = createTransactionalRecords(List(
       new SimpleRecord("k1".getBytes, "v1".getBytes),
@@ -736,7 +735,7 @@ class PartitionTest {
     assertEquals(0L, fetchLatestOffset(isolationLevel = Some(IsolationLevel.READ_UNCOMMITTED)).offset)
     assertEquals(0L, fetchLatestOffset(isolationLevel = Some(IsolationLevel.READ_COMMITTED)).offset)
 
-    replica.highWatermark = 1L
+    partition.log.get.highWatermark = 1L
 
     assertEquals(3L, fetchLatestOffset(isolationLevel = None).offset)
     assertEquals(1L, fetchLatestOffset(isolationLevel = Some(IsolationLevel.READ_UNCOMMITTED)).offset)
@@ -749,14 +748,19 @@ class PartitionTest {
 
   @Test
   def testGetReplica(): Unit = {
-    assertEquals(None, partition.localReplica)
+    assertEquals(None, partition.log)
     assertThrows[ReplicaNotAvailableException] {
-      partition.localReplicaOrException
+      partition.localLogOrException
     }
 
-    val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
-    assertEquals(Some(replica), partition.localReplica)
-    assertEquals(replica, partition.localReplicaOrException)
+    assertThrows[IllegalArgumentException] {
+      val replica = partition.getOrCreateReplica(brokerId)
+    }
+
+    val remoteReplicaId = brokerId + 1;
+    val replica = partition.getOrCreateReplica(remoteReplicaId)
+    assertEquals(replica.brokerId, remoteReplicaId)
+    assertEquals(replica.topicPartition, partition.topicPartition)
   }
 
   @Test
@@ -807,15 +811,14 @@ class PartitionTest {
     assertTrue("Expected first makeLeader() to return 'leader changed'",
                partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints))
     assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
-    assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas.map(_.brokerId))
+    assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas)
 
     // after makeLeader(() call, partition should know about all the replicas
-    val leaderReplica = partition.getReplica(leader).get
-
     // append records with initial leader epoch
     val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1, isFromClient = true).lastOffset
     partition.appendRecordsToLeader(batch2, isFromClient = true)
-    assertEquals("Expected leader's HW not move", leaderReplica.logStartOffset, leaderReplica.highWatermark)
+    assertEquals("Expected leader's HW not move", partition.localLogOrException.logStartOffset,
+      partition.log.get.highWatermark)
 
     // let the follower in ISR move leader's HW to move further but below LEO
     def updateFollowerFetchState(followerId: Int, fetchOffsetMetadata: LogOffsetMetadata): Unit = {
@@ -824,12 +827,12 @@ class PartitionTest {
         followerFetchOffsetMetadata = fetchOffsetMetadata,
         followerStartOffset = 0L,
         followerFetchTimeMs = time.milliseconds(),
-        leaderEndOffset = leaderReplica.logEndOffset)
+        leaderEndOffset = partition.localLogOrException.logEndOffset)
     }
 
     updateFollowerFetchState(follower2, LogOffsetMetadata(0))
     updateFollowerFetchState(follower2, LogOffsetMetadata(lastOffsetOfFirstBatch))
-    assertEquals("Expected leader's HW", lastOffsetOfFirstBatch, leaderReplica.highWatermark)
+    assertEquals("Expected leader's HW", lastOffsetOfFirstBatch, partition.log.get.highWatermark)
 
     // current leader becomes follower and then leader again (without any new records appended)
     val followerState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, leaderEpoch + 1, isr, 1,
@@ -840,7 +843,7 @@ class PartitionTest {
       replicas, false)
     assertTrue("Expected makeLeader() to return 'leader changed' after makeFollower()",
                partition.makeLeader(controllerEpoch, newLeaderState, 2, offsetCheckpoints))
-    val currentLeaderEpochStartOffset = leaderReplica.logEndOffset
+    val currentLeaderEpochStartOffset = partition.localLogOrException.logEndOffset
 
     // append records with the latest leader epoch
     partition.appendRecordsToLeader(batch3, isFromClient = true)
@@ -848,14 +851,14 @@ class PartitionTest {
     // fetch from follower not in ISR from log start offset should not add this follower to ISR
     updateFollowerFetchState(follower1, LogOffsetMetadata(0))
     updateFollowerFetchState(follower1, LogOffsetMetadata(lastOffsetOfFirstBatch))
-    assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas.map(_.brokerId))
+    assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas)
 
     // fetch from the follower not in ISR from start offset of the current leader epoch should
     // add this follower to ISR
     when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, leaderEpoch + 2,
       List(leader, follower2, follower1), 1))).thenReturn(Some(2))
     updateFollowerFetchState(follower1, LogOffsetMetadata(currentLeaderEpochStartOffset))
-    assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicas.map(_.brokerId))
+    assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicas)
   }
 
   /**
@@ -876,11 +879,10 @@ class PartitionTest {
 
     val topicPartitions = (0 until 5).map { i => new TopicPartition("test-topic", i) }
     val logs = topicPartitions.map { tp => logManager.getOrCreateLog(tp, logConfig) }
-    val replicas = logs.map { log => new Replica(brokerId, log.topicPartition, time, log = Some(log)) }
     val partitions = ListBuffer.empty[Partition]
 
-    replicas.foreach { replica =>
-      val tp = replica.topicPartition
+    logs.foreach { log =>
+      val tp = log.topicPartition
       val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
       val partition = new Partition(tp,
         replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
@@ -902,7 +904,7 @@ class PartitionTest {
           }
         })
 
-      partition.addReplicaIfNotExists(replica)
+      partition.setLog(log, isFutureLog = false)
       val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
         leaderEpoch, isr, 1, replicaIds, true)
       partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
@@ -1007,7 +1009,7 @@ class PartitionTest {
 
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
-    partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
+    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
 
     val initializeTimeMs = time.milliseconds()
     assertTrue("Expected become leader transition to succeed",
@@ -1058,11 +1060,11 @@ class PartitionTest {
 
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
-    partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
+    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
     assertTrue("Expected become leader transition to succeed",
       partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
         leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
-    assertEquals(Set(brokerId), partition.inSyncReplicas.map(_.brokerId))
+    assertEquals(Set(brokerId), partition.inSyncReplicas)
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
@@ -1074,7 +1076,7 @@ class PartitionTest {
       followerFetchTimeMs = time.milliseconds(),
       leaderEndOffset = 6L)
 
-    assertEquals(Set(brokerId), partition.inSyncReplicas.map(_.brokerId))
+    assertEquals(Set(brokerId), partition.inSyncReplicas)
     assertEquals(3L, remoteReplica.logEndOffset)
     assertEquals(0L, remoteReplica.logStartOffset)
 
@@ -1092,7 +1094,7 @@ class PartitionTest {
       followerFetchTimeMs = time.milliseconds(),
       leaderEndOffset = 6L)
 
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas.map(_.brokerId))
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
     assertEquals(10L, remoteReplica.logEndOffset)
     assertEquals(0L, remoteReplica.logStartOffset)
   }
@@ -1111,11 +1113,11 @@ class PartitionTest {
 
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
-    partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
+    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
     assertTrue("Expected become leader transition to succeed",
       partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
         leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
-    assertEquals(Set(brokerId), partition.inSyncReplicas.map(_.brokerId))
+    assertEquals(Set(brokerId), partition.inSyncReplicas)
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
@@ -1136,7 +1138,7 @@ class PartitionTest {
       leaderEndOffset = 10L)
 
     // Follower state is updated, but the ISR has not expanded
-    assertEquals(Set(brokerId), partition.inSyncReplicas.map(_.brokerId))
+    assertEquals(Set(brokerId), partition.inSyncReplicas)
     assertEquals(10L, remoteReplica.logEndOffset)
     assertEquals(0L, remoteReplica.logStartOffset)
   }
@@ -1156,12 +1158,12 @@ class PartitionTest {
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
     val initializeTimeMs = time.milliseconds()
-    partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
+    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
     assertTrue("Expected become leader transition to succeed",
       partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
         leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas.map(_.brokerId))
-    assertEquals(0L, partition.localReplicaOrException.highWatermark)
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertEquals(0L, partition.localLogOrException.highWatermark)
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
@@ -1170,7 +1172,7 @@ class PartitionTest {
 
     // On initialization, the replica is considered caught up and should not be removed
     partition.maybeShrinkIsr(10000)
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas.map(_.brokerId))
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
 
     // If enough time passes without a fetch update, the ISR should shrink
     time.sleep(10001)
@@ -1182,8 +1184,8 @@ class PartitionTest {
     when(stateStore.shrinkIsr(controllerEpoch, updatedLeaderAndIsr)).thenReturn(Some(2))
 
     partition.maybeShrinkIsr(10000)
-    assertEquals(Set(brokerId), partition.inSyncReplicas.map(_.brokerId))
-    assertEquals(10L, partition.localReplicaOrException.highWatermark)
+    assertEquals(Set(brokerId), partition.inSyncReplicas)
+    assertEquals(10L, partition.localLogOrException.highWatermark)
   }
 
   @Test
@@ -1201,12 +1203,12 @@ class PartitionTest {
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
     val initializeTimeMs = time.milliseconds()
-    partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
+    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
     assertTrue("Expected become leader transition to succeed",
       partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
         leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas.map(_.brokerId))
-    assertEquals(0L, partition.localReplicaOrException.highWatermark)
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertEquals(0L, partition.localLogOrException.highWatermark)
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
@@ -1222,7 +1224,7 @@ class PartitionTest {
       followerFetchTimeMs = firstFetchTimeMs,
       leaderEndOffset = 10L)
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
-    assertEquals(5L, partition.localReplicaOrException.highWatermark)
+    assertEquals(5L, partition.localLogOrException.highWatermark)
     assertEquals(5L, remoteReplica.logEndOffset)
     assertEquals(0L, remoteReplica.logStartOffset)
 
@@ -1236,14 +1238,14 @@ class PartitionTest {
       followerFetchTimeMs = time.milliseconds(),
       leaderEndOffset = 15L)
     assertEquals(firstFetchTimeMs, remoteReplica.lastCaughtUpTimeMs)
-    assertEquals(10L, partition.localReplicaOrException.highWatermark)
+    assertEquals(10L, partition.localLogOrException.highWatermark)
     assertEquals(10L, remoteReplica.logEndOffset)
     assertEquals(0L, remoteReplica.logStartOffset)
 
     // The ISR should not be shrunk because the follower has caught up with the leader at the
     // time of the first fetch.
     partition.maybeShrinkIsr(10000)
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas.map(_.brokerId))
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
   }
 
   @Test
@@ -1261,12 +1263,12 @@ class PartitionTest {
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
     val initializeTimeMs = time.milliseconds()
-    partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
+    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
     assertTrue("Expected become leader transition to succeed",
       partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
         leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas.map(_.brokerId))
-    assertEquals(0L, partition.localReplicaOrException.highWatermark)
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertEquals(0L, partition.localLogOrException.highWatermark)
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
@@ -1280,7 +1282,7 @@ class PartitionTest {
       followerFetchTimeMs = time.milliseconds(),
       leaderEndOffset = 10L)
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
-    assertEquals(10L, partition.localReplicaOrException.highWatermark)
+    assertEquals(10L, partition.localLogOrException.highWatermark)
     assertEquals(10L, remoteReplica.logEndOffset)
     assertEquals(0L, remoteReplica.logStartOffset)
 
@@ -1289,7 +1291,7 @@ class PartitionTest {
 
     // The ISR should not be shrunk because the follower is caught up to the leader's log end
     partition.maybeShrinkIsr(10000)
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas.map(_.brokerId))
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
   }
 
   @Test
@@ -1307,12 +1309,12 @@ class PartitionTest {
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
     val initializeTimeMs = time.milliseconds()
-    partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
+    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
     assertTrue("Expected become leader transition to succeed",
       partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
         leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas.map(_.brokerId))
-    assertEquals(0L, partition.localReplicaOrException.highWatermark)
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertEquals(0L, partition.localLogOrException.highWatermark)
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
@@ -1330,8 +1332,8 @@ class PartitionTest {
     when(stateStore.shrinkIsr(controllerEpoch, updatedLeaderAndIsr)).thenReturn(None)
 
     partition.maybeShrinkIsr(10000)
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas.map(_.brokerId))
-    assertEquals(0L, partition.localReplicaOrException.highWatermark)
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertEquals(0L, partition.localLogOrException.highWatermark)
   }
 
   @Test
@@ -1348,7 +1350,7 @@ class PartitionTest {
     val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
       6, replicas, 1, replicas, false)
     partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
-    assertEquals(4, partition.localReplicaOrException.highWatermark)
+    assertEquals(4, partition.localLogOrException.highWatermark)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
index 64e298f..31192f0 100644
--- a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
@@ -21,7 +21,6 @@ import java.util.Properties
 import kafka.log.{Log, LogConfig, LogManager}
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils.{MockTime, TestUtils}
-import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import org.apache.kafka.common.utils.Utils
 import org.junit.{After, Before, Test}
@@ -34,7 +33,6 @@ class ReplicaTest {
   val time = new MockTime()
   val brokerTopicStats = new BrokerTopicStats
   var log: Log = _
-  var replica: Replica = _
 
   @Before
   def setup(): Unit = {
@@ -53,11 +51,6 @@ class ReplicaTest {
       maxProducerIdExpirationMs = 60 * 60 * 1000,
       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10))
-
-    replica = new Replica(brokerId = 0,
-      topicPartition = new TopicPartition("foo", 0),
-      time = time,
-      log = Some(log))
   }
 
   @After
@@ -70,13 +63,9 @@ class ReplicaTest {
   @Test
   def testSegmentDeletionWithHighWatermarkInitialization(): Unit = {
     val initialHighWatermark = 25L
-    replica = new Replica(brokerId = 0,
-      topicPartition = new TopicPartition("foo", 0),
-      time = time,
-      initialHighWatermarkValue = initialHighWatermark,
-      log = Some(log))
+    log.highWatermark = initialHighWatermark
 
-    assertEquals(initialHighWatermark, replica.highWatermark)
+    assertEquals(initialHighWatermark, log.highWatermark)
 
     val expiredTimestamp = time.milliseconds() - 1000
     for (i <- 0 until 100) {
@@ -87,7 +76,7 @@ class ReplicaTest {
     val initialNumSegments = log.numberOfSegments
     log.deleteOldSegments()
     assertTrue(log.numberOfSegments < initialNumSegments)
-    assertTrue(replica.logStartOffset <= initialHighWatermark)
+    assertTrue(log.logStartOffset <= initialHighWatermark)
   }
 
   @Test
@@ -100,25 +89,25 @@ class ReplicaTest {
 
     // ensure we have at least a few segments so the test case is not trivial
     assertTrue(log.numberOfSegments > 5)
-    assertEquals(0L, replica.highWatermark)
-    assertEquals(0L, replica.logStartOffset)
-    assertEquals(100L, replica.logEndOffset)
+    assertEquals(0L, log.highWatermark)
+    assertEquals(0L, log.logStartOffset)
+    assertEquals(100L, log.logEndOffset)
 
     for (hw <- 0 to 100) {
-      replica.highWatermark = hw
-      assertEquals(hw, replica.highWatermark)
+      log.highWatermark = hw
+      assertEquals(hw, log.highWatermark)
       log.deleteOldSegments()
-      assertTrue(replica.logStartOffset <= hw)
+      assertTrue(log.logStartOffset <= hw)
 
       // verify that all segments up to the high watermark have been deleted
 
       log.logSegments.headOption.foreach { segment =>
         assertTrue(segment.baseOffset <= hw)
-        assertTrue(segment.baseOffset >= replica.logStartOffset)
+        assertTrue(segment.baseOffset >= log.logStartOffset)
       }
       log.logSegments.tail.foreach { segment =>
         assertTrue(segment.baseOffset > hw)
-        assertTrue(segment.baseOffset >= replica.logStartOffset)
+        assertTrue(segment.baseOffset >= log.logStartOffset)
       }
     }
 
@@ -134,8 +123,7 @@ class ReplicaTest {
       log.appendAsLeader(records, leaderEpoch = 0)
     }
 
-    replica.highWatermark = 25L
-    replica.maybeIncrementLogStartOffset(26L)
+    log.highWatermark = 25L
+    log.maybeIncrementLogStartOffset(26L)
   }
-
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 7c83cde..15b47ca 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -20,7 +20,7 @@ package kafka.log
 import java.io.File
 import java.util.Properties
 
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel, LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record._
@@ -218,7 +218,7 @@ class LogCleanerManagerTest extends Logging {
     log.appendAsLeader(records, leaderEpoch = 0)
     log.roll()
     log.appendAsLeader(records, leaderEpoch = 0)
-    log.onHighWatermarkIncremented(2L)
+    log.highWatermarkMetadata = LogOffsetMetadata(2L)
 
     // simulate cleanup thread working on the log partition
     val deletableLog = cleanerManager.pauseCleaningForNonCompactedPartitions()
@@ -403,7 +403,7 @@ class LogCleanerManagerTest extends Logging {
     log.appendAsLeader(MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 2,
       new SimpleRecord(time.milliseconds(), "3".getBytes, "c".getBytes)), leaderEpoch = 0)
     log.roll()
-    log.onHighWatermarkIncremented(3L)
+    log.highWatermarkMetadata = LogOffsetMetadata(3L)
 
     time.sleep(compactionLag + 1)
     // although the compaction lag has been exceeded, the undecided data should not be cleaned
@@ -415,7 +415,7 @@ class LogCleanerManagerTest extends Logging {
     log.appendAsLeader(MemoryRecords.withEndTransactionMarker(time.milliseconds(), producerId, producerEpoch,
       new EndTransactionMarker(ControlRecordType.ABORT, 15)), leaderEpoch = 0, isFromClient = false)
     log.roll()
-    log.onHighWatermarkIncremented(4L)
+    log.highWatermarkMetadata = LogOffsetMetadata(4L)
 
     // the first segment should now become cleanable immediately
     cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 61e3ea5..d0ef078 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 
 import kafka.api.KAFKA_0_11_0_IV0
 import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
-import kafka.server.KafkaConfig
+import kafka.server.{KafkaConfig, LogOffsetMetadata}
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
@@ -102,7 +102,7 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A
       val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)
       val startSize = log.size
 
-      log.onHighWatermarkIncremented(log.logEndOffset)
+      log.highWatermarkMetadata = LogOffsetMetadata(log.logEndOffset)
 
       val firstDirty = log.activeSegment.baseOffset
       cleaner.startup()
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 825fa90..42ad8a0 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -24,7 +24,7 @@ import java.util.Properties
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import kafka.common._
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel, LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.CorruptRecordException
@@ -126,8 +126,9 @@ class LogCleanerTest {
     val t = new Thread() {
       override def run(): Unit = {
         deleteStartLatch.await(5000, TimeUnit.MILLISECONDS)
+        log.highWatermark = log.activeSegment.baseOffset
         log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset)
-        log.onHighWatermarkIncremented(log.activeSegment.baseOffset)
+        log.highWatermarkMetadata = LogOffsetMetadata(log.activeSegment.baseOffset)
         log.deleteOldSegments()
         deleteCompleteLatch.countDown()
       }
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index ed1824d..94ae3ed 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -20,7 +20,7 @@ package kafka.log
 import java.io._
 import java.util.{Collections, Properties}
 
-import kafka.server.FetchDataInfo
+import kafka.server.{FetchDataInfo, LogOffsetMetadata}
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -119,7 +119,8 @@ class LogManagerTest {
       offset = info.lastOffset
     }
     assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermarkMetadata = LogOffsetMetadata(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
 
     log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds))
 
@@ -173,7 +174,8 @@ class LogManagerTest {
       offset = info.firstOffset.get
     }
 
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermarkMetadata = LogOffsetMetadata(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     assertEquals("Check we have the expected number of segments.", numMessages * setSize / config.segmentSize, log.numberOfSegments)
 
     // this cleanup shouldn't find any expired segments but should delete some to reduce size
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index dc9a423..8fb65ff 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -317,6 +317,7 @@ class LogTest {
 
     // Increment the log start offset
     val startOffset = 4
+    log.highWatermark = log.logEndOffset
     log.maybeIncrementLogStartOffset(startOffset)
     assertTrue(log.logEndOffset > log.logStartOffset)
 
@@ -830,6 +831,7 @@ class LogTest {
       producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
     assertEquals(2, log.activeProducersWithLastSequence.size)
 
+    log.highWatermark = log.logEndOffset
     log.maybeIncrementLogStartOffset(1L)
 
     assertEquals(1, log.activeProducersWithLastSequence.size)
@@ -862,8 +864,8 @@ class LogTest {
     assertEquals(2, log.logSegments.size)
     assertEquals(2, log.activeProducersWithLastSequence.size)
 
+    log.highWatermark = log.logEndOffset
     log.maybeIncrementLogStartOffset(1L)
-    log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
 
     assertEquals(1, log.logSegments.size)
@@ -921,7 +923,7 @@ class LogTest {
     assertEquals(3, log.logSegments.size)
     assertEquals(Set(pid1, pid2), log.activeProducersWithLastSequence.keySet)
 
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
 
     assertEquals(2, log.logSegments.size)
@@ -1010,7 +1012,7 @@ class LogTest {
     log.appendAsLeader(records, leaderEpoch = 0)
     val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid, epoch),
       isFromClient = false, leaderEpoch = 0)
-    log.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1)
+    log.highWatermark = commitAppendInfo.lastOffset + 1
 
     // now there should be no first unstable offset
     assertEquals(None, log.firstUnstableOffset)
@@ -1018,7 +1020,7 @@ class LogTest {
     log.close()
 
     val reopenedLog = createLog(logDir, logConfig)
-    reopenedLog.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1)
+    reopenedLog.highWatermark = commitAppendInfo.lastOffset + 1
     assertEquals(None, reopenedLog.firstUnstableOffset)
   }
 
@@ -1557,7 +1559,7 @@ class LogTest {
       assertEquals(currOffset, messagesToAppend)
 
       // time goes by; the log file is deleted
-      log.onHighWatermarkIncremented(currOffset)
+      log.highWatermark = currOffset
       log.deleteOldSegments()
 
       assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset, log.logEndOffset)
@@ -2033,7 +2035,7 @@ class LogTest {
     val segments = log.logSegments.toArray
     val oldFiles = segments.map(_.log.file) ++ segments.map(_.lazyOffsetIndex.file)
 
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
 
     assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
@@ -2063,7 +2065,7 @@ class LogTest {
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     // expire all segments
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
     log.close()
     log = createLog(logDir, logConfig)
@@ -2818,7 +2820,7 @@ class LogTest {
 
     // only segments with offset before the current high watermark are eligible for deletion
     for (hw <- 25 to 30) {
-      log.onHighWatermarkIncremented(hw)
+      log.highWatermark = hw
       log.deleteOldSegments()
       assertTrue(log.logStartOffset <= hw)
       log.logSegments.foreach { segment =>
@@ -2831,7 +2833,7 @@ class LogTest {
     }
 
     // expire all segments
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
     assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size)
@@ -2875,7 +2877,7 @@ class LogTest {
       log.appendAsLeader(createRecords, leaderEpoch = 0)
     assertEquals("should have 3 segments", 3, log.numberOfSegments)
     assertEquals(log.logStartOffset, 0)
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
 
     log.maybeIncrementLogStartOffset(1)
     log.deleteOldSegments()
@@ -2907,7 +2909,7 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
     assertEquals("should have 2 segments", 2,log.numberOfSegments)
   }
@@ -2922,7 +2924,7 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
     assertEquals("should have 3 segments", 3,log.numberOfSegments)
   }
@@ -2937,7 +2939,7 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
     assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
   }
@@ -2952,7 +2954,7 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
     assertEquals("There should be 3 segments remaining", 3, log.numberOfSegments)
   }
@@ -2971,7 +2973,7 @@ class LogTest {
     log.logSegments.head.lastModified = mockTime.milliseconds - 20000
 
     val segments = log.numberOfSegments
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
     assertEquals("There should be 3 segments remaining", segments, log.numberOfSegments)
   }
@@ -2986,7 +2988,7 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
     assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
   }
@@ -3004,12 +3006,13 @@ class LogTest {
 
     // Three segments should be created
     assertEquals(3, log.logSegments.count(_ => true))
+    log.highWatermark = log.logEndOffset
     log.maybeIncrementLogStartOffset(recordsPerSegment)
 
     // The first segment, which is entirely before the log start offset, should be deleted
     // Of the remaining the segments, the first can overlap the log start offset and the rest must have a base offset
     // greater than the start offset
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
     assertEquals("There should be 2 segments remaining", 2, log.numberOfSegments)
     assertTrue(log.logSegments.head.baseOffset <= log.logStartOffset)
@@ -3081,7 +3084,7 @@ class LogTest {
     cache.assign(2, 10)
 
     //When first segment is removed
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
 
     //The oldest epoch entry should have been removed
@@ -3106,7 +3109,7 @@ class LogTest {
     cache.assign(2, 10)
 
     //When first segment removed (up to offset 5)
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
 
     //The first entry should have gone from (0,0) => (0,5)
@@ -3261,7 +3264,7 @@ class LogTest {
 
     // first unstable offset is not updated until the high watermark is advanced
     assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
-    log.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1)
+    log.highWatermark = commitAppendInfo.lastOffset + 1
 
     // now there should be no first unstable offset
     assertEquals(None, log.firstUnstableOffset)
@@ -3568,6 +3571,7 @@ class LogTest {
 
     assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
 
+    log.highWatermark = log.logEndOffset
     log.maybeIncrementLogStartOffset(5L)
 
     // the first unstable offset should be lower bounded by the log start offset
@@ -3592,8 +3596,9 @@ class LogTest {
 
     assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
 
+    log.highWatermark = log.logEndOffset
     log.maybeIncrementLogStartOffset(8L)
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.deleteOldSegments()
     assertEquals(1, log.logSegments.size)
 
@@ -3630,7 +3635,7 @@ class LogTest {
     assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
 
     // Even if the high watermark is updated, the first unstable offset does not move
-    log.onHighWatermarkIncremented(12L)
+    log.highWatermark = 12L
     assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
 
     log.close()
@@ -3638,7 +3643,7 @@ class LogTest {
     val reopenedLog = createLog(logDir, logConfig)
     assertEquals(12L, reopenedLog.logEndOffset)
     assertEquals(2, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
-    reopenedLog.onHighWatermarkIncremented(12L)
+    reopenedLog.highWatermark = 12L
     assertEquals(None, reopenedLog.firstUnstableOffset.map(_.messageOffset))
   }
 
@@ -3680,7 +3685,7 @@ class LogTest {
     // now first producer's transaction is aborted
     val abortAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid1, epoch),
       isFromClient = false, leaderEpoch = 0)
-    log.onHighWatermarkIncremented(abortAppendInfo.lastOffset + 1)
+    log.highWatermark = abortAppendInfo.lastOffset + 1
 
     // LSO should now point to one less than the first offset of the second transaction
     assertEquals(Some(secondAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
@@ -3688,7 +3693,7 @@ class LogTest {
     // commit the second transaction
     val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.COMMIT, pid2, epoch),
       isFromClient = false, leaderEpoch = 0)
-    log.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1)
+    log.highWatermark = commitAppendInfo.lastOffset + 1
 
     // now there should be no first unstable offset
     assertEquals(None, log.firstUnstableOffset)
@@ -3725,7 +3730,7 @@ class LogTest {
     // now abort the transaction
     val appendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid, epoch),
       isFromClient = false, leaderEpoch = 0)
-    log.onHighWatermarkIncremented(appendInfo.lastOffset + 1)
+    log.highWatermark = appendInfo.lastOffset + 1
     assertEquals(None, log.firstUnstableOffset.map(_.messageOffset))
 
     // now check that a fetch includes the aborted transaction
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 6f4979b..3db3c22 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -211,7 +211,7 @@ class ProducerStateManagerTest {
     val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset, isTransactional = true)
 
-    val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
+    val logOffsetMetadata = LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
       relativePositionInSegment = 234224)
     producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
     stateManager.update(producerAppendInfo)
@@ -283,7 +283,7 @@ class ProducerStateManagerTest {
 
     // use some other offset to simulate a follower append where the log offset metadata won't typically
     // match any of the transaction first offsets
-    val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset - 23429, segmentBaseOffset = 990000L,
+    val logOffsetMetadata = LogOffsetMetadata(messageOffset = offset - 23429, segmentBaseOffset = 990000L,
       relativePositionInSegment = 234224)
     producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
     stateManager.update(producerAppendInfo)
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 61c521b..92820d6 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -75,18 +75,17 @@ class HighwatermarkPersistenceTest {
       val partition0 = replicaManager.createPartition(tp0)
       // create leader and follower replicas
       val log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic, 0), LogConfig())
-      val leaderReplicaPartition0 = new Replica(configs.head.brokerId, tp0, time, 0, Some(log0))
-      partition0.addReplicaIfNotExists(leaderReplicaPartition0)
-      val followerReplicaPartition0 = new Replica(configs.last.brokerId, tp0, time)
+      partition0.setLog(log0, isFutureLog = false)
+      val followerReplicaPartition0 = new Replica(configs.last.brokerId, tp0)
       partition0.addReplicaIfNotExists(followerReplicaPartition0)
       replicaManager.checkpointHighWatermarks()
       fooPartition0Hw = hwmFor(replicaManager, topic, 0)
-      assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
+      assertEquals(log0.highWatermark, fooPartition0Hw)
       // set the high watermark for local replica
-      partition0.localReplica.get.highWatermark = 5L
+      partition0.localLogOrException.highWatermark = 5L
       replicaManager.checkpointHighWatermarks()
       fooPartition0Hw = hwmFor(replicaManager, topic, 0)
-      assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
+      assertEquals(log0.highWatermark, fooPartition0Hw)
       EasyMock.verify(zkClient)
     } finally {
       // shutdown the replica manager upon test completion
@@ -121,16 +120,15 @@ class HighwatermarkPersistenceTest {
       // create leader log
       val topic1Log0 = logManagers.head.getOrCreateLog(t1p0, LogConfig())
       // create a local replica for topic1
-      val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, t1p0, time, 0, Some(topic1Log0))
-      topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
+      topic1Partition0.setLog(topic1Log0, isFutureLog = false)
       replicaManager.checkpointHighWatermarks()
       topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
-      assertEquals(leaderReplicaTopic1Partition0.highWatermark, topic1Partition0Hw)
+      assertEquals(topic1Log0.highWatermark, topic1Partition0Hw)
       // set the high watermark for local replica
-      topic1Partition0.localReplica.get.highWatermark = 5L
+      topic1Partition0.localLogOrException.highWatermark = 5L
       replicaManager.checkpointHighWatermarks()
       topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
-      assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark)
+      assertEquals(5L, topic1Log0.highWatermark)
       assertEquals(5L, topic1Partition0Hw)
       // add another partition and set highwatermark
       val t2p0 = new TopicPartition(topic2, 0)
@@ -138,17 +136,16 @@ class HighwatermarkPersistenceTest {
       // create leader log
       val topic2Log0 = logManagers.head.getOrCreateLog(t2p0, LogConfig())
       // create a local replica for topic2
-      val leaderReplicaTopic2Partition0 =  new Replica(configs.head.brokerId, t2p0, time, 0, Some(topic2Log0))
-      topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
+      topic2Partition0.setLog(topic2Log0, isFutureLog = false)
       replicaManager.checkpointHighWatermarks()
       var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
-      assertEquals(leaderReplicaTopic2Partition0.highWatermark, topic2Partition0Hw)
+      assertEquals(topic2Log0.highWatermark, topic2Partition0Hw)
       // set the highwatermark for local replica
-      topic2Partition0.localReplica.get.highWatermark = 15L
-      assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark)
+      topic2Partition0.localLogOrException.highWatermark = 15L
+      assertEquals(15L, topic2Log0.highWatermark)
       // change the highwatermark for topic1
-      topic1Partition0.localReplica.get.highWatermark = 10L
-      assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark)
+      topic1Partition0.localLogOrException.highWatermark = 10L
+      assertEquals(10L, topic1Log0.highWatermark)
       replicaManager.checkpointHighWatermarks()
       // verify checkpointed hw for topic 2
       topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
@@ -169,5 +166,4 @@ class HighwatermarkPersistenceTest {
     replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read.getOrElse(
       new TopicPartition(topic, partition), 0L)
   }
-
 }
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 0ee0fa1..efdadbb 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -76,25 +76,24 @@ class IsrExpirationTest {
 
     // create one partition and all replicas
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
-    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
-    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas)
 
     // let the follower catch up to the Leader logEndOffset - 1
-    for (replica <- partition0.assignedReplicas - leaderReplica)
+    for (replica <- partition0.remoteReplicas)
       replica.updateFetchState(
-        followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset - 1),
+        followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 1),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
         leaderEndOffset = leaderLogEndOffset)
-    var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
-    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
+    var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
 
     // let some time pass
     time.sleep(150)
 
     // now follower hasn't pulled any data for > replicaMaxLagTimeMs ms. So it is stuck
-    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
-    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+    partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
+    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR)
     EasyMock.verify(log)
   }
 
@@ -107,14 +106,13 @@ class IsrExpirationTest {
 
     // create one partition and all replicas
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
-    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
-    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas)
 
     // Let enough time pass for the replica to be considered stuck
     time.sleep(150)
 
-    val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
-    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+    val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
+    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR)
     EasyMock.verify(log)
   }
 
@@ -128,50 +126,48 @@ class IsrExpirationTest {
     val log = logMock
     // add one partition
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
-    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
-    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
-
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas)
     // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms
-    for (replica <- partition0.assignedReplicas - leaderReplica)
+    for (replica <- partition0.remoteReplicas)
       replica.updateFetchState(
-        followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset - 2),
+        followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 2),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
         leaderEndOffset = leaderLogEndOffset)
 
     // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log.
     // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck
-    var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
-    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
+    var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
 
     time.sleep(75)
 
-    (partition0.assignedReplicas - leaderReplica).foreach { r =>
+    partition0.remoteReplicas.foreach { r =>
       r.updateFetchState(
-        followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset - 1),
+        followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 1),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
         leaderEndOffset = leaderLogEndOffset)
     }
-    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
-    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
+    partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
 
     time.sleep(75)
 
     // The replicas will no longer be in ISR
-    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
-    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+    partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
+    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR)
 
     // Now actually make a fetch to the end of the log. The replicas should be back in ISR
-    (partition0.assignedReplicas - leaderReplica).foreach { r =>
+    partition0.remoteReplicas.foreach { r =>
       r.updateFetchState(
-        followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset),
+        followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
         leaderEndOffset = leaderLogEndOffset)
     }
-    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
-    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
+    partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
 
     EasyMock.verify(log)
   }
@@ -185,26 +181,25 @@ class IsrExpirationTest {
 
     // create one partition and all replicas
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
-    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
-    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas)
 
     // let the follower catch up to the Leader logEndOffset
-    for (replica <- partition0.assignedReplicas - leaderReplica)
+    for (replica <- partition0.remoteReplicas)
       replica.updateFetchState(
-        followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset),
+        followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
         leaderEndOffset = leaderLogEndOffset)
 
-    var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
-    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
+    var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
 
     // let some time pass
     time.sleep(150)
 
     // even though follower hasn't pulled any data for > replicaMaxLagTimeMs ms, the follower has already caught up. So it is not out-of-sync.
-    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
-    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
+    partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
     EasyMock.verify(log)
   }
 
@@ -213,16 +208,16 @@ class IsrExpirationTest {
     val leaderId = config.brokerId
     val tp = new TopicPartition(topic, partitionId)
     val partition = replicaManager.createPartition(tp)
-    val leaderReplica = new Replica(leaderId, tp, time, 0, Some(localLog))
+    partition.setLog(localLog, isFutureLog = false)
 
-    val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
+    val allReplicas = getFollowerReplicas(partition, leaderId, time)
     allReplicas.foreach(r => partition.addReplicaIfNotExists(r))
     // set in sync replicas for this partition to all the assigned replicas
-    partition.inSyncReplicas = allReplicas.toSet
+    partition.inSyncReplicas = allReplicas.map(_.brokerId).toSet + leaderId
     // set lastCaughtUpTime to current time
-    for (replica <- partition.assignedReplicas - leaderReplica)
+    for (replica <- partition.remoteReplicas)
       replica.updateFetchState(
-        followerFetchOffsetMetadata = new LogOffsetMetadata(0L),
+        followerFetchOffsetMetadata = LogOffsetMetadata(0L),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
         leaderEndOffset = 0L)
@@ -235,15 +230,15 @@ class IsrExpirationTest {
   private def logMock: Log = {
     val log: Log = EasyMock.createMock(classOf[Log])
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
-    EasyMock.expect(log.onHighWatermarkIncremented(0L))
     EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLogEndOffset)).anyTimes()
+    EasyMock.expect(log.logEndOffset).andReturn(leaderLogEndOffset).anyTimes()
     EasyMock.replay(log)
     log
   }
 
   private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {
     configs.filter(_.brokerId != leaderId).map { config =>
-      new Replica(config.brokerId, partition.topicPartition, time)
+      new Replica(config.brokerId, partition.topicPartition)
     }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index f83562c..03ce3a5 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -113,7 +113,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
     // so that ReplicaFetcherThread on the follower will get response from leader immediately
     val anotherPartitionWithTheSameLeader = (1 until partitionNum).find { i =>
       leaderServer.replicaManager.nonOfflinePartition(new TopicPartition(topic, i))
-        .flatMap(_.leaderReplicaIfLocal).isDefined
+        .flatMap(_.leaderLogIfLocal).isDefined
     }.get
     val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, anotherPartitionWithTheSameLeader, topic.getBytes, "message".getBytes)
     // When producer.send(...).get returns, it is guaranteed that ReplicaFetcherThread on the follower
@@ -145,8 +145,8 @@ class LogDirFailureTest extends IntegrationTestHarness {
     TestUtils.consumeRecords(consumer, 1)
 
     // Make log directory of the partition on the leader broker inaccessible by replacing it with a file
-    val replica = leaderServer.replicaManager.localReplicaOrException(partition)
-    val logDir = replica.log.get.dir.getParentFile
+    val localLog = leaderServer.replicaManager.localLogOrException(partition)
+    val logDir = localLog.dir.getParentFile
     CoreUtils.swallow(Utils.delete(logDir), this)
     logDir.createNewFile()
     assertTrue(logDir.isFile)
@@ -164,7 +164,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
 
     // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the topic will be offline
     TestUtils.waitUntilTrue(() => !leaderServer.logManager.isLogDirOnline(logDir.getAbsolutePath), "Expected log directory offline", 3000L)
-    assertTrue(leaderServer.replicaManager.localReplica(partition).isEmpty)
+    assertTrue(leaderServer.replicaManager.localLog(partition).isEmpty)
 
     // The second send() should fail due to either KafkaStorageException or NotLeaderForPartitionException
     try {
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 7b52e7b..46fcee5 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -78,7 +78,8 @@ class LogOffsetTest extends BaseRequestTest {
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
     log.flush()
 
-    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.highWatermarkMetadata = LogOffsetMetadata(log.logEndOffset)
+    log.highWatermark = log.logEndOffset
     log.maybeIncrementLogStartOffset(3)
     log.deleteOldSegments()
 
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index e868f6c..ee7472f 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -104,7 +104,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
 
     // give some time for the follower 1 to record leader HW
     TestUtils.waitUntilTrue(() =>
-      server2.replicaManager.localReplica(topicPartition).get.highWatermark == numMessages,
+      server2.replicaManager.localLogOrException(topicPartition).highWatermark == numMessages,
       "Failed to update high watermark for follower after timeout")
 
     servers.foreach(_.replicaManager.checkpointHighWatermarks())
@@ -166,7 +166,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
 
     // give some time for follower 1 to record leader HW of 60
     TestUtils.waitUntilTrue(() =>
-      server2.replicaManager.localReplica(topicPartition).get.highWatermark == hw,
+      server2.replicaManager.localLogOrException(topicPartition).highWatermark == hw,
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(_.shutdown())
@@ -180,7 +180,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     val hw = 20L
     // give some time for follower 1 to record leader HW of 600
     TestUtils.waitUntilTrue(() =>
-      server2.replicaManager.localReplica(topicPartition).get.highWatermark == hw,
+      server2.replicaManager.localLogOrException(topicPartition).highWatermark == hw,
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(_.shutdown())
@@ -199,7 +199,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
 
     // allow some time for the follower to get the leader HW
     TestUtils.waitUntilTrue(() =>
-      server2.replicaManager.localReplica(topicPartition).get.highWatermark == hw,
+      server2.replicaManager.localLogOrException(topicPartition).highWatermark == hw,
       "Failed to update high watermark for follower after timeout")
     // kill the server hosting the preferred replica
     server1.shutdown()
@@ -226,11 +226,11 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     hw += 2
 
     // allow some time for the follower to create replica
-    TestUtils.waitUntilTrue(() => server1.replicaManager.localReplica(topicPartition).nonEmpty,
+    TestUtils.waitUntilTrue(() => server1.replicaManager.localLog(topicPartition).nonEmpty,
       "Failed to create replica in follower after timeout")
     // allow some time for the follower to get the leader HW
     TestUtils.waitUntilTrue(() =>
-      server1.replicaManager.localReplica(topicPartition).get.highWatermark == hw,
+      server1.replicaManager.localLogOrException(topicPartition).highWatermark == hw,
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(_.shutdown())
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index d149b8a..6607272 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -18,8 +18,8 @@ package kafka.server
 
 import java.util.Optional
 
-import kafka.cluster.{BrokerEndPoint, Partition, Replica}
-import kafka.log.LogManager
+import kafka.cluster.{BrokerEndPoint, Partition}
+import kafka.log.{Log, LogManager}
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.utils.{DelayedItem, TestUtils}
 import org.apache.kafka.common.TopicPartition
@@ -28,7 +28,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRequest}
 import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
 import org.easymock.EasyMock._
-import org.easymock.{Capture, CaptureType, EasyMock, IAnswer}
+import org.easymock.{Capture, CaptureType, EasyMock, IAnswer, IExpectationSetters}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -153,11 +153,11 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val replicaT1p0: Replica = createNiceMock(classOf[Replica])
-    val replicaT1p1: Replica = createNiceMock(classOf[Replica])
+    val logT1p0: Log = createNiceMock(classOf[Log])
+    val logT1p1: Log = createNiceMock(classOf[Log])
     // one future replica mock because our mocking methods return same values for both future replicas
-    val futureReplicaT1p0: Replica = createNiceMock(classOf[Replica])
-    val futureReplicaT1p1: Replica = createNiceMock(classOf[Replica])
+    val futureLogT1p0: Log = createNiceMock(classOf[Log])
+    val futureLogT1p1: Log = createNiceMock(classOf[Log])
     val partitionT1p0: Partition = createMock(classOf[Partition])
     val partitionT1p1: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
@@ -173,33 +173,32 @@ class ReplicaAlterLogDirsThreadTest {
       .andStubReturn(partitionT1p0)
     expect(replicaManager.getPartitionOrException(t1p1, expectLeader = false))
       .andStubReturn(partitionT1p1)
-    expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplicaT1p0)
-    expect(replicaManager.futureLocalReplicaOrException(t1p1)).andStubReturn(futureReplicaT1p1)
+    expect(replicaManager.futureLocalLogOrException(t1p0)).andStubReturn(futureLogT1p0)
+    expect(replicaManager.futureLocalLogOrException(t1p1)).andStubReturn(futureLogT1p1)
     expect(partitionT1p0.truncateTo(capture(truncateCaptureT1p0), anyBoolean())).anyTimes()
     expect(partitionT1p1.truncateTo(capture(truncateCaptureT1p1), anyBoolean())).anyTimes()
 
-    expect(futureReplicaT1p0.logEndOffset).andReturn(futureReplicaLEO).anyTimes()
-    expect(futureReplicaT1p1.logEndOffset).andReturn(futureReplicaLEO).anyTimes()
+    expect(futureLogT1p0.logEndOffset).andReturn(futureReplicaLEO).anyTimes()
+    expect(futureLogT1p1.logEndOffset).andReturn(futureReplicaLEO).anyTimes()
 
-    expect(futureReplicaT1p0.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
-    expect(futureReplicaT1p0.endOffsetForEpoch(leaderEpoch)).andReturn(
+    expect(futureLogT1p0.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
+    expect(futureLogT1p0.endOffsetForEpoch(leaderEpoch)).andReturn(
       Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch))).anyTimes()
     expect(partitionT1p0.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, fetchOnlyFromLeader = false))
       .andReturn(new EpochEndOffset(leaderEpoch, replicaT1p0LEO))
       .anyTimes()
 
-    expect(futureReplicaT1p1.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
-    expect(futureReplicaT1p1.endOffsetForEpoch(leaderEpoch)).andReturn(
+    expect(futureLogT1p1.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
+    expect(futureLogT1p1.endOffsetForEpoch(leaderEpoch)).andReturn(
       Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch))).anyTimes()
     expect(partitionT1p1.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, fetchOnlyFromLeader = false))
       .andReturn(new EpochEndOffset(leaderEpoch, replicaT1p1LEO))
       .anyTimes()
 
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
-    stubWithFetchMessages(replicaT1p0, replicaT1p1, futureReplicaT1p0, partitionT1p0, replicaManager, responseCallback)
+    stubWithFetchMessages(logT1p0, logT1p1, futureLogT1p0, partitionT1p0, replicaManager, responseCallback)
 
-    replay(replicaManager, logManager, quotaManager, replicaT1p0, replicaT1p1,
-      futureReplicaT1p0, partitionT1p0, partitionT1p1)
+    replay(replicaManager, logManager, quotaManager, partitionT1p0, partitionT1p1, logT1p0, logT1p1, futureLogT1p0, futureLogT1p1)
 
     //Create the thread
     val endPoint = new BrokerEndPoint(0, "localhost", 1000)
@@ -231,9 +230,9 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
     // one future replica mock because our mocking methods return same values for both future replicas
-    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val futureLog: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
@@ -247,31 +246,31 @@ class ReplicaAlterLogDirsThreadTest {
     //Stubs
     expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
       .andStubReturn(partition)
-    expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
+    expect(replicaManager.futureLocalLogOrException(t1p0)).andStubReturn(futureLog)
 
     expect(partition.truncateTo(capture(truncateToCapture), EasyMock.eq(true))).anyTimes()
-    expect(futureReplica.logEndOffset).andReturn(futureReplicaLEO).anyTimes()
-    expect(futureReplica.latestEpoch).andReturn(Some(leaderEpoch)).once()
-    expect(futureReplica.latestEpoch).andReturn(Some(leaderEpoch - 2)).once()
+    expect(futureLog.logEndOffset).andReturn(futureReplicaLEO).anyTimes()
+    expect(futureLog.latestEpoch).andReturn(Some(leaderEpoch)).once()
+    expect(futureLog.latestEpoch).andReturn(Some(leaderEpoch - 2)).once()
 
     // leader replica truncated and fetched new offsets with new leader epoch
     expect(partition.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, fetchOnlyFromLeader = false))
       .andReturn(new EpochEndOffset(leaderEpoch - 1, replicaLEO))
       .anyTimes()
     // but future replica does not know about this leader epoch, so returns a smaller leader epoch
-    expect(futureReplica.endOffsetForEpoch(leaderEpoch - 1)).andReturn(
+    expect(futureLog.endOffsetForEpoch(leaderEpoch - 1)).andReturn(
       Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch - 2))).anyTimes()
     // finally, the leader replica knows about the leader epoch and returns end offset
     expect(partition.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch - 2, fetchOnlyFromLeader = false))
       .andReturn(new EpochEndOffset(leaderEpoch - 2, replicaEpochEndOffset))
       .anyTimes()
-    expect(futureReplica.endOffsetForEpoch(leaderEpoch - 2)).andReturn(
+    expect(futureLog.endOffsetForEpoch(leaderEpoch - 2)).andReturn(
       Some(OffsetAndEpoch(futureReplicaEpochEndOffset, leaderEpoch - 2))).anyTimes()
 
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
-    stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, responseCallback)
+    stubWithFetchMessages(log, null, futureLog, partition, replicaManager, responseCallback)
 
-    replay(replicaManager, logManager, quotaManager, replica, futureReplica, partition)
+    replay(replicaManager, logManager, quotaManager, partition, log, futureLog)
 
     //Create the thread
     val endPoint = new BrokerEndPoint(0, "localhost", 1000)
@@ -305,8 +304,8 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
-    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
+    val futureLog: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
@@ -318,16 +317,15 @@ class ReplicaAlterLogDirsThreadTest {
     expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
       .andStubReturn(partition)
     expect(partition.truncateTo(capture(truncated), isFuture = EasyMock.eq(true))).anyTimes()
-    expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
+    expect(replicaManager.futureLocalLogOrException(t1p0)).andStubReturn(futureLog)
 
-    expect(futureReplica.logEndOffset).andReturn(futureReplicaLEO).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
 
     // pretend this is a completely new future replica, with no leader epochs recorded
-    expect(futureReplica.latestEpoch).andReturn(None).anyTimes()
+    expect(futureLog.latestEpoch).andReturn(None).anyTimes()
 
-    stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, responseCallback)
-    replay(replicaManager, logManager, quotaManager, replica, futureReplica, partition)
+    stubWithFetchMessages(log, null, futureLog, partition, replicaManager, responseCallback)
+    replay(replicaManager, logManager, quotaManager, partition, log, futureLog)
 
     //Create the thread
     val endPoint = new BrokerEndPoint(0, "localhost", 1000)
@@ -359,8 +357,8 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
-    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
+    val futureLog: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
@@ -374,14 +372,13 @@ class ReplicaAlterLogDirsThreadTest {
       .andStubReturn(partition)
     expect(partition.truncateTo(capture(truncated), isFuture = EasyMock.eq(true))).once()
 
-    expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
-    expect(futureReplica.latestEpoch).andStubReturn(Some(futureReplicaLeaderEpoch))
-    expect(futureReplica.endOffsetForEpoch(futureReplicaLeaderEpoch)).andReturn(
+    expect(replicaManager.futureLocalLogOrException(t1p0)).andStubReturn(futureLog)
+    expect(futureLog.logEndOffset).andReturn(futureReplicaLEO).anyTimes()
+    expect(futureLog.latestEpoch).andStubReturn(Some(futureReplicaLeaderEpoch))
+    expect(futureLog.endOffsetForEpoch(futureReplicaLeaderEpoch)).andReturn(
       Some(OffsetAndEpoch(futureReplicaLEO, futureReplicaLeaderEpoch)))
-    expect(futureReplica.logEndOffset).andReturn(futureReplicaLEO).anyTimes()
-    expect(replicaManager.localReplica(t1p0)).andReturn(Some(replica)).anyTimes()
-    expect(replicaManager.futureLocalReplica(t1p0)).andReturn(Some(futureReplica)).anyTimes()
-    expect(replicaManager.futureLocalReplicaOrException(t1p0)).andReturn(futureReplica).anyTimes()
+    expect(replicaManager.localLog(t1p0)).andReturn(Some(log)).anyTimes()
+    expect(replicaManager.futureLocalLogOrException(t1p0)).andReturn(futureLog).anyTimes()
 
     // this will cause fetchEpochsFromLeader return an error with undefined offset
     expect(partition.lastOffsetForLeaderEpoch(Optional.of(1), futureReplicaLeaderEpoch, fetchOnlyFromLeader = false))
@@ -406,7 +403,7 @@ class ReplicaAlterLogDirsThreadTest {
         }
       }).anyTimes()
 
-    replay(replicaManager, logManager, quotaManager, replica, futureReplica, partition)
+    replay(replicaManager, logManager, quotaManager, partition, log, futureLog)
 
     //Create the thread
     val endPoint = new BrokerEndPoint(0, "localhost", 1000)
@@ -442,8 +439,8 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
-    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
+    val futureLog: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
@@ -458,15 +455,15 @@ class ReplicaAlterLogDirsThreadTest {
         .andReturn(new EpochEndOffset(leaderEpoch, replicaLEO))
     expect(partition.truncateTo(futureReplicaLEO, isFuture = true)).once()
 
-    expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
-    expect(futureReplica.latestEpoch).andStubReturn(Some(leaderEpoch))
-    expect(futureReplica.logEndOffset).andReturn(futureReplicaLEO).anyTimes()
-    expect(futureReplica.endOffsetForEpoch(leaderEpoch)).andReturn(
+    expect(replicaManager.futureLocalLogOrException(t1p0)).andStubReturn(futureLog)
+    expect(futureLog.latestEpoch).andStubReturn(Some(leaderEpoch))
+    expect(futureLog.logEndOffset).andStubReturn(futureReplicaLEO)
+    expect(futureLog.endOffsetForEpoch(leaderEpoch)).andReturn(
       Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
-    stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, responseCallback)
+    stubWithFetchMessages(log, null, futureLog, partition, replicaManager, responseCallback)
 
-    replay(replicaManager, logManager, quotaManager, replica, futureReplica, partition)
+    replay(replicaManager, logManager, quotaManager, partition, log, futureLog)
 
     //Create the fetcher thread
     val endPoint = new BrokerEndPoint(0, "localhost", 1000)
@@ -496,17 +493,16 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
-    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
+    val futureLog: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     //Stubs
-    expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
-    stub(replica, replica, futureReplica, partition, replicaManager)
+    stub(log, null, futureLog, partition, replicaManager)
 
-    replay(replicaManager, logManager, quotaManager, replica, futureReplica, partition)
+    replay(replicaManager, logManager, quotaManager, partition, log)
 
     //Create the fetcher thread
     val endPoint = new BrokerEndPoint(0, "localhost", 1000)
@@ -546,17 +542,18 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
-    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
+    val futureLog: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     //Stubs
-    expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
+    val startOffset = 123
+    expect(futureLog.logStartOffset).andReturn(startOffset).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
-    stub(replica, replica, futureReplica, partition, replicaManager)
+    stub(log, null, futureLog, partition, replicaManager)
 
-    replay(replicaManager, logManager, quotaManager, replica, futureReplica, partition)
+    replay(replicaManager, logManager, quotaManager, partition, log, futureLog)
 
     //Create the fetcher thread
     val endPoint = new BrokerEndPoint(0, "localhost", 1000)
@@ -609,23 +606,21 @@ class ReplicaAlterLogDirsThreadTest {
     assertFalse(partitionsWithError3.nonEmpty)
   }
 
-  def stub(replicaT1p0: Replica, replicaT1p1: Replica, futureReplica: Replica, partition: Partition, replicaManager: ReplicaManager) = {
-    expect(replicaManager.localReplica(t1p0)).andReturn(Some(replicaT1p0)).anyTimes()
-    expect(replicaManager.futureLocalReplica(t1p0)).andReturn(Some(futureReplica)).anyTimes()
-    expect(replicaManager.localReplicaOrException(t1p0)).andReturn(replicaT1p0).anyTimes()
-    expect(replicaManager.futureLocalReplicaOrException(t1p0)).andReturn(futureReplica).anyTimes()
+  def stub(logT1p0: Log, logT1p1: Log, futureLog: Log, partition: Partition,
+           replicaManager: ReplicaManager): IExpectationSetters[Option[Partition]] = {
+    expect(replicaManager.localLog(t1p0)).andReturn(Some(logT1p0)).anyTimes()
+    expect(replicaManager.localLogOrException(t1p0)).andReturn(logT1p0).anyTimes()
+    expect(replicaManager.futureLocalLogOrException(t1p0)).andReturn(futureLog).anyTimes()
     expect(replicaManager.nonOfflinePartition(t1p0)).andReturn(Some(partition)).anyTimes()
-    expect(replicaManager.localReplica(t1p1)).andReturn(Some(replicaT1p1)).anyTimes()
-    expect(replicaManager.futureLocalReplica(t1p1)).andReturn(Some(futureReplica)).anyTimes()
-    expect(replicaManager.localReplicaOrException(t1p1)).andReturn(replicaT1p1).anyTimes()
-    expect(replicaManager.futureLocalReplicaOrException(t1p1)).andReturn(futureReplica).anyTimes()
+    expect(replicaManager.localLog(t1p1)).andReturn(Some(logT1p1)).anyTimes()
+    expect(replicaManager.localLogOrException(t1p1)).andReturn(logT1p1).anyTimes()
+    expect(replicaManager.futureLocalLogOrException(t1p1)).andReturn(futureLog).anyTimes()
     expect(replicaManager.nonOfflinePartition(t1p1)).andReturn(Some(partition)).anyTimes()
   }
 
-  def stubWithFetchMessages(replicaT1p0: Replica, replicaT1p1: Replica, futureReplica: Replica,
-                            partition: Partition, replicaManager: ReplicaManager,
-                            responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]) = {
-    stub(replicaT1p0, replicaT1p1, futureReplica, partition, replicaManager)
+  def stubWithFetchMessages(logT1p0: Log, logT1p1: Log, futureLog: Log, partition: Partition, replicaManager: ReplicaManager,
+          responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]): IExpectationSetters[Unit] = {
+    stub(logT1p0, logT1p1, futureLog, partition, replicaManager)
     expect(replicaManager.fetchMessages(
       EasyMock.anyLong(),
       EasyMock.anyInt(),
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 5400f86..bb4cbda 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -18,8 +18,8 @@ package kafka.server
 
 import java.util.Optional
 
-import kafka.cluster.{BrokerEndPoint, Partition, Replica}
-import kafka.log.LogManager
+import kafka.cluster.{BrokerEndPoint, Partition}
+import kafka.log.{Log, LogManager}
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils
@@ -79,28 +79,29 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpoch = 5
 
     //Stubs
-    expect(replica.logEndOffset).andReturn(0).anyTimes()
-    expect(replica.highWatermark).andReturn(0L).anyTimes()
-    expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).once()
-    expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).once()
-    expect(replica.latestEpoch).andReturn(None).once()  // t2p1 doesnt support epochs
-    expect(replica.endOffsetForEpoch(leaderEpoch)).andReturn(
+    expect(partition.localLogOrException).andReturn(log).anyTimes()
+    expect(log.logEndOffset).andReturn(0).anyTimes()
+    expect(log.highWatermark).andReturn(0).anyTimes()
+    expect(log.latestEpoch).andReturn(Some(leaderEpoch)).once()
+    expect(log.latestEpoch).andReturn(Some(leaderEpoch)).once()
+    expect(log.latestEpoch).andReturn(None).once()  // t2p1 doesnt support epochs
+    expect(log.endOffsetForEpoch(leaderEpoch)).andReturn(
       Some(OffsetAndEpoch(0, leaderEpoch))).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
-    stub(replica, partition, replicaManager)
+    stub(partition, replicaManager, log)
 
     //Expectations
     expect(partition.truncateTo(anyLong(), anyBoolean())).times(3)
 
-    replay(replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, partition, log)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1),
@@ -210,26 +211,26 @@ class ReplicaFetcherThreadTest {
     //Setup all dependencies
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpoch = 5
 
     //Stubs
-    expect(replica.logEndOffset).andReturn(0).anyTimes()
-    expect(replica.highWatermark).andReturn(0L).anyTimes()
-    expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
-    expect(replica.endOffsetForEpoch(leaderEpoch)).andReturn(
+    expect(partition.localLogOrException).andReturn(log).anyTimes()
+    expect(log.highWatermark).andReturn(0).anyTimes()
+    expect(log.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
+    expect(log.endOffsetForEpoch(leaderEpoch)).andReturn(
       Some(OffsetAndEpoch(0, leaderEpoch))).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
-    stub(replica, partition, replicaManager)
+    stub(partition, replicaManager, log)
 
     //Expectations
     expect(partition.truncateTo(anyLong(), anyBoolean())).times(2)
 
-    replay(replicaManager, logManager, replica, partition)
+    replay(replicaManager, logManager, partition, log)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1), t1p1 -> new EpochEndOffset(leaderEpoch, 1)).asJava
@@ -270,7 +271,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -279,23 +280,26 @@ class ReplicaFetcherThreadTest {
 
     //Stubs
     expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes()
-    expect(replica.logEndOffset).andReturn(initialLEO).anyTimes()
-    expect(replica.highWatermark).andReturn(initialLEO - 1).anyTimes()
-    expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
-    expect(replica.endOffsetForEpoch(leaderEpoch)).andReturn(
+    expect(partition.localLogOrException).andReturn(log).anyTimes()
+    expect(log.highWatermark).andReturn(initialLEO - 1).anyTimes()
+    expect(log.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
+    expect(log.endOffsetForEpoch(leaderEpoch)).andReturn(
       Some(OffsetAndEpoch(initialLEO, leaderEpoch))).anyTimes()
+    expect(log.logEndOffset).andReturn(initialLEO).anyTimes()
+    expect(replicaManager.localLogOrException(anyObject(classOf[TopicPartition]))).andReturn(log).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
-    stub(replica, partition, replicaManager)
+    stub(partition, replicaManager, log)
 
-    replay(replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, partition, log)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
     val offsetsReply = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 156), t2p1 -> new EpochEndOffset(leaderEpoch, 172)).asJava
 
     //Create the thread
     val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
-    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs.head, failedPartitions, replicaManager,
+      new Metrics(), new SystemTime(), quota, Some(mockNetwork))
     thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L), t2p1 -> offsetAndEpoch(0L)))
 
     //Run it
@@ -318,7 +322,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -328,15 +332,17 @@ class ReplicaFetcherThreadTest {
 
     //Stubs
     expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes()
-    expect(replica.logEndOffset).andReturn(initialLEO).anyTimes()
-    expect(replica.highWatermark).andReturn(initialLEO - 3).anyTimes()
-    expect(replica.latestEpoch).andReturn(Some(leaderEpochAtFollower)).anyTimes()
-    expect(replica.endOffsetForEpoch(leaderEpochAtLeader)).andReturn(None).anyTimes()
+    expect(partition.localLogOrException).andReturn(log).anyTimes()
+    expect(log.highWatermark).andReturn(initialLEO - 3).anyTimes()
+    expect(log.latestEpoch).andReturn(Some(leaderEpochAtFollower)).anyTimes()
+    expect(log.endOffsetForEpoch(leaderEpochAtLeader)).andReturn(None).anyTimes()
+    expect(log.logEndOffset).andReturn(initialLEO).anyTimes()
+    expect(replicaManager.localLogOrException(anyObject(classOf[TopicPartition]))).andReturn(log).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
-    stub(replica, partition, replicaManager)
+    stub(partition, replicaManager, log)
 
-    replay(replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, partition, log)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
     val offsetsReply = Map(t1p0 -> new EpochEndOffset(leaderEpochAtLeader, 156),
@@ -344,7 +350,8 @@ class ReplicaFetcherThreadTest {
 
     //Create the thread
     val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
-    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs.head, failedPartitions,
+      replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
     thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L), t2p1 -> offsetAndEpoch(0L)))
 
     //Run it
@@ -370,7 +377,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -378,18 +385,20 @@ class ReplicaFetcherThreadTest {
 
     // Stubs
     expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes()
-    expect(replica.logEndOffset).andReturn(initialLEO).anyTimes()
-    expect(replica.highWatermark).andReturn(initialLEO - 2).anyTimes()
-    expect(replica.latestEpoch).andReturn(Some(5)).anyTimes()
-    expect(replica.endOffsetForEpoch(4)).andReturn(
+    expect(partition.localLogOrException).andReturn(log).anyTimes()
+    expect(log.highWatermark).andReturn(initialLEO - 2).anyTimes()
+    expect(log.latestEpoch).andReturn(Some(5)).anyTimes()
+    expect(log.endOffsetForEpoch(4)).andReturn(
       Some(OffsetAndEpoch(120, 3))).anyTimes()
-    expect(replica.endOffsetForEpoch(3)).andReturn(
+    expect(log.endOffsetForEpoch(3)).andReturn(
       Some(OffsetAndEpoch(120, 3))).anyTimes()
+    expect(log.logEndOffset).andReturn(initialLEO).anyTimes()
+    expect(replicaManager.localLogOrException(anyObject(classOf[TopicPartition]))).andReturn(log).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
-    stub(replica, partition, replicaManager)
+    stub(partition, replicaManager, log)
 
-    replay(replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, partition, log)
 
     // Define the offsets for the OffsetsForLeaderEpochResponse
     val offsets = Map(t1p0 -> new EpochEndOffset(4, 155), t1p1 -> new EpochEndOffset(4, 143)).asJava
@@ -441,7 +450,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -449,18 +458,20 @@ class ReplicaFetcherThreadTest {
 
     // Stubs
     expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes()
-    expect(replica.logEndOffset).andReturn(initialLEO).anyTimes()
-    expect(replica.highWatermark).andReturn(initialLEO - 2).anyTimes()
-    expect(replica.latestEpoch).andReturn(Some(5)).anyTimes()
-    expect(replica.endOffsetForEpoch(4)).andReturn(
+    expect(partition.localLogOrException).andReturn(log).anyTimes()
+    expect(log.highWatermark).andReturn(initialLEO - 2).anyTimes()
+    expect(log.latestEpoch).andReturn(Some(5)).anyTimes()
+    expect(log.endOffsetForEpoch(4)).andReturn(
       Some(OffsetAndEpoch(120, 3))).anyTimes()
-    expect(replica.endOffsetForEpoch(3)).andReturn(
+    expect(log.endOffsetForEpoch(3)).andReturn(
       Some(OffsetAndEpoch(120, 3))).anyTimes()
+    expect(log.logEndOffset).andReturn(initialLEO).anyTimes()
+    expect(replicaManager.localLogOrException(anyObject(classOf[TopicPartition]))).andReturn(log).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
-    stub(replica, partition, replicaManager)
+    stub(partition, replicaManager, log)
 
-    replay(replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, partition, log)
 
     // Define the offsets for the OffsetsForLeaderEpochResponse with undefined epoch to simulate
     // older protocol version
@@ -502,7 +513,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -511,20 +522,20 @@ class ReplicaFetcherThreadTest {
 
     //Stubs
     expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes()
-    expect(replica.logEndOffset).andReturn(initialLeo).anyTimes()
-    expect(replica.highWatermark).andReturn(initialFetchOffset).anyTimes()
-    expect(replica.latestEpoch).andReturn(Some(5))
+    expect(partition.localLogOrException).andReturn(log).anyTimes()
+    expect(log.highWatermark).andReturn(initialFetchOffset).anyTimes()
+    expect(log.latestEpoch).andReturn(Some(5))
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
-    stub(replica, partition, replicaManager)
-    replay(replicaManager, logManager, quota, replica, partition)
+    stub(partition, replicaManager, log)
+    replay(replicaManager, logManager, quota, partition, log)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
     val offsetsReply = Map(t1p0 -> new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)).asJava
 
     //Create the thread
     val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
-    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs.head, failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
     thread.addPartitions(Map(t1p0 -> offsetAndEpoch(initialFetchOffset)))
 
     //Run it
@@ -545,7 +556,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -554,17 +565,19 @@ class ReplicaFetcherThreadTest {
     val initialLeo = 300
 
     //Stubs
-    expect(replica.highWatermark).andReturn(highWaterMark).anyTimes()
+    expect(log.highWatermark).andReturn(highWaterMark).anyTimes()
     expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes()
-    expect(replica.logEndOffset).andReturn(initialLeo).anyTimes()
-    expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
+    expect(partition.localLogOrException).andReturn(log).anyTimes()
+    expect(log.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
     // this is for the last reply with EpochEndOffset(5, 156)
-    expect(replica.endOffsetForEpoch(leaderEpoch)).andReturn(
+    expect(log.endOffsetForEpoch(leaderEpoch)).andReturn(
       Some(OffsetAndEpoch(initialLeo, leaderEpoch))).anyTimes()
+    expect(log.logEndOffset).andReturn(initialLeo).anyTimes()
+    expect(replicaManager.localLogOrException(anyObject(classOf[TopicPartition]))).andReturn(log).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
-    stub(replica, partition, replicaManager)
-    replay(replicaManager, logManager, quota, replica, partition)
+    stub(partition, replicaManager, log)
+    replay(replicaManager, logManager, quota, partition, log)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
     val offsetsReply = mutable.Map(
@@ -574,7 +587,7 @@ class ReplicaFetcherThreadTest {
 
     //Create the thread
     val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
-    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs.head, failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
     thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L), t1p1 -> offsetAndEpoch(0L)))
 
     //Run thread 3 times
@@ -602,7 +615,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createNiceMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager])
 
@@ -610,16 +623,16 @@ class ReplicaFetcherThreadTest {
 
     //Stub return values
     expect(partition.truncateTo(0L, false)).times(2)
-    expect(replica.logEndOffset).andReturn(0).anyTimes()
-    expect(replica.highWatermark).andReturn(0).anyTimes()
-    expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
-    expect(replica.endOffsetForEpoch(leaderEpoch)).andReturn(
+    expect(partition.localLogOrException).andReturn(log).anyTimes()
+    expect(log.highWatermark).andReturn(0).anyTimes()
+    expect(log.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
+    expect(log.endOffsetForEpoch(leaderEpoch)).andReturn(
       Some(OffsetAndEpoch(0, leaderEpoch))).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
-    stub(replica, partition, replicaManager)
+    stub(partition, replicaManager, log)
 
-    replay(replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, partition, log)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsetsReply = Map(
@@ -655,21 +668,23 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createNiceMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica: Replica = createNiceMock(classOf[Replica])
+    val log: Log = createNiceMock(classOf[Log])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager])
 
     //Stub return values
     expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).once
-    expect(replica.logEndOffset).andReturn(initialLEO).anyTimes()
-    expect(replica.highWatermark).andReturn(initialLEO - 2).anyTimes()
-    expect(replica.latestEpoch).andReturn(Some(5)).anyTimes()
-    expect(replica.endOffsetForEpoch(5)).andReturn(Some(OffsetAndEpoch(initialLEO, 5))).anyTimes()
+    expect(partition.localLogOrException).andReturn(log).anyTimes()
+    expect(log.highWatermark).andReturn(initialLEO - 2).anyTimes()
+    expect(log.latestEpoch).andReturn(Some(5)).anyTimes()
+    expect(log.endOffsetForEpoch(5)).andReturn(Some(OffsetAndEpoch(initialLEO, 5))).anyTimes()
+    expect(log.logEndOffset).andReturn(initialLEO).anyTimes()
+    expect(replicaManager.localLogOrException(anyObject(classOf[TopicPartition]))).andReturn(log).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
-    stub(replica, partition, replicaManager)
+    stub(partition, replicaManager, log)
 
-    replay(replicaManager, logManager, quota, replica, partition)
+    replay(replicaManager, logManager, quota, partition, log)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsetsReply = Map(
@@ -678,7 +693,8 @@ class ReplicaFetcherThreadTest {
 
     //Create the fetcher thread
     val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
-    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, failedPartitions, replicaManager, new Metrics(),
+      new SystemTime(), quota, Some(mockNetwork))
 
     //When
     thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L), t1p1 -> offsetAndEpoch(0L)))
@@ -728,13 +744,12 @@ class ReplicaFetcherThreadTest {
     verify(mockBlockingSend)
   }
 
-  def stub(replica: Replica, partition: Partition, replicaManager: ReplicaManager): Unit = {
-    expect(replicaManager.localReplicaOrException(t1p0)).andReturn(replica).anyTimes()
+  def stub(partition: Partition, replicaManager: ReplicaManager, log: Log): Unit = {
+    expect(replicaManager.localLogOrException(t1p0)).andReturn(log).anyTimes()
     expect(replicaManager.nonOfflinePartition(t1p0)).andReturn(Some(partition)).anyTimes()
-    expect(replicaManager.localReplicaOrException(t1p1)).andReturn(replica).anyTimes()
+    expect(replicaManager.localLogOrException(t1p1)).andReturn(log).anyTimes()
     expect(replicaManager.nonOfflinePartition(t1p1)).andReturn(Some(partition)).anyTimes()
-    expect(replicaManager.localReplicaOrException(t2p1)).andReturn(replica).anyTimes()
+    expect(replicaManager.localLogOrException(t2p1)).andReturn(log).anyTimes()
     expect(replicaManager.nonOfflinePartition(t2p1)).andReturn(Some(partition)).anyTimes()
-    expect(partition.localReplicaOrException).andReturn(replica).anyTimes()
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index d298003..e1a3e2c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -149,7 +149,7 @@ class ReplicaManagerQuotasTest {
   def testCompleteInDelayedFetchWithReplicaThrottling(): Unit = {
     // Set up DelayedFetch where there is data to return to a follower replica, either in-sync or out of sync
     def setupDelayedFetch(isReplicaInSync: Boolean): DelayedFetch = {
-      val endOffsetMetadata = new LogOffsetMetadata(messageOffset = 100L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
+      val endOffsetMetadata = LogOffsetMetadata(messageOffset = 100L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
       val partition: Partition = EasyMock.createMock(classOf[Partition])
 
       val offsetSnapshot = LogOffsetSnapshot(
@@ -170,7 +170,7 @@ class ReplicaManagerQuotasTest {
       EasyMock.replay(replicaManager, partition)
 
       val tp = new TopicPartition("t1", 0)
-      val fetchPartitionStatus = FetchPartitionStatus(new LogOffsetMetadata(messageOffset = 50L, segmentBaseOffset = 0L,
+      val fetchPartitionStatus = FetchPartitionStatus(LogOffsetMetadata(messageOffset = 50L, segmentBaseOffset = 0L,
          relativePositionInSegment = 250), new PartitionData(50, 0, 1, Optional.empty()))
       val fetchMetadata = FetchMetadata(fetchMinBytes = 1,
         fetchMaxBytes = 1000,
@@ -198,7 +198,9 @@ class ReplicaManagerQuotasTest {
     val log: Log = createNiceMock(classOf[Log])
     expect(log.logStartOffset).andReturn(0L).anyTimes()
     expect(log.logEndOffset).andReturn(20L).anyTimes()
-    expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()
+    expect(log.highWatermark).andReturn(5).anyTimes()
+    expect(log.lastStableOffset).andReturn(5).anyTimes()
+    expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(20L)).anyTimes()
 
     //if we ask for len 1 return a message
     expect(log.read(anyObject(),
@@ -207,7 +209,7 @@ class ReplicaManagerQuotasTest {
       minOneMessage = anyBoolean(),
       includeAbortedTxns = EasyMock.eq(false))).andReturn(
       FetchDataInfo(
-        new LogOffsetMetadata(0L, 0L, 0),
+        LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.withRecords(CompressionType.NONE, record)
       )).anyTimes()
 
@@ -218,7 +220,7 @@ class ReplicaManagerQuotasTest {
       minOneMessage = anyBoolean(),
       includeAbortedTxns = EasyMock.eq(false))).andReturn(
       FetchDataInfo(
-        new LogOffsetMetadata(0L, 0L, 0),
+        LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.EMPTY
       )).anyTimes()
     replay(log)
@@ -231,25 +233,25 @@ class ReplicaManagerQuotasTest {
     expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     replay(logManager)
 
+    val leaderBrokerId = configs.head.brokerId
     replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient, scheduler, logManager,
       new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
-      new BrokerTopicStats, new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
+      new BrokerTopicStats, new MetadataCache(leaderBrokerId), new LogDirFailureChannel(configs.head.logDirs.size))
 
     //create the two replicas
     for ((p, _) <- fetchInfo) {
       val partition = replicaManager.createPartition(p)
-      val leaderReplica = new Replica(configs.head.brokerId, p, time, 0, Some(log))
-      leaderReplica.highWatermark = 5
-      partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
-      val followerReplica = new Replica(configs.last.brokerId, p, time, 0, Some(log))
-      val allReplicas = Set(leaderReplica, followerReplica)
-      allReplicas.foreach(partition.addReplicaIfNotExists)
+      log.highWatermark = 5
+      partition.leaderReplicaIdOpt = Some(leaderBrokerId)
+      partition.setLog(log, isFutureLog = false)
+
+      val followerReplica = new Replica(configs.last.brokerId, p)
+      val allReplicas : Set[Int] = Set(leaderBrokerId, followerReplica.brokerId)
+      partition.addReplicaIfNotExists(followerReplica)
       if (bothReplicasInSync) {
         partition.inSyncReplicas = allReplicas
-        followerReplica.highWatermark = 5
       } else {
-        partition.inSyncReplicas = Set(leaderReplica)
-        followerReplica.highWatermark = 0
+        partition.inSyncReplicas = Set(leaderBrokerId)
       }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 64a29ed..5778650 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -87,7 +87,8 @@ class ReplicaManagerTest {
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
       val partition = rm.createPartition(new TopicPartition(topic, 1))
-      partition.getOrCreateReplica(1, isNew = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
+      partition.createLogIfNotExists(1, isNew = false, isFutureReplica = false,
+        new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
       rm.checkpointHighWatermarks()
     } finally {
       // shutdown the replica manager upon test completion
@@ -106,7 +107,8 @@ class ReplicaManagerTest {
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
       val partition = rm.createPartition(new TopicPartition(topic, 1))
-      partition.getOrCreateReplica(1, isNew = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
+      partition.createLogIfNotExists(1, isNew = false, isFutureReplica = false,
+        new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
       rm.checkpointHighWatermarks()
     } finally {
       // shutdown the replica manager upon test completion
@@ -160,7 +162,8 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1).asJava
 
       val partition = rm.createPartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
+      partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+        new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
         collection.immutable.Map(new TopicPartition(topic, 0) ->
@@ -168,7 +171,7 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       rm.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
-          .localReplicaOrException
+          .localLogOrException
 
       val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()))
       val appendResult = appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response =>
@@ -204,7 +207,8 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1).asJava
 
       val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+        new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -213,7 +217,7 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
-        .localReplicaOrException
+        .localLogOrException
 
       val producerId = 234L
       val epoch = 5.toShort
@@ -255,7 +259,8 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1).asJava
 
       val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+        new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -264,7 +269,7 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
-        .localReplicaOrException
+        .localLogOrException
 
       val producerId = 234L
       val epoch = 5.toShort
@@ -351,7 +356,8 @@ class ReplicaManagerTest {
     try {
       val brokerList = Seq[Integer](0, 1).asJava
       val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+        new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -360,7 +366,7 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
-        .localReplicaOrException
+        .localLogOrException
 
       val producerId = 234L
       val epoch = 5.toShort
@@ -417,7 +423,8 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1, 2).asJava
 
       val partition = rm.createPartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
+      partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+        new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -426,7 +433,7 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       rm.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
-        .localReplicaOrException
+        .localLogOrException
 
       // Append a couple of messages.
       for(i <- 1 to 2) {
@@ -479,7 +486,6 @@ class ReplicaManagerTest {
 
       assertTrue(partition.getReplica(1).isDefined)
       val followerReplica = partition.getReplica(1).get
-      assertEquals(None, followerReplica.log)
       assertEquals(-1L, followerReplica.logStartOffset)
       assertEquals(-1L, followerReplica.logEndOffset)
 
@@ -553,8 +559,8 @@ class ReplicaManagerTest {
       val tp0 = new TopicPartition(topic, 0)
       val tp1 = new TopicPartition(topic, 1)
       val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-      replicaManager.createPartition(tp0).getOrCreateReplica(0, isNew = false, offsetCheckpoints)
-      replicaManager.createPartition(tp1).getOrCreateReplica(0, isNew = false, offsetCheckpoints)
+      replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+      replicaManager.createPartition(tp1).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
       val partition0Replicas = Seq[Integer](0, 1).asJava
       val partition1Replicas = Seq[Integer](0, 2).asJava
       val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -609,12 +615,12 @@ class ReplicaManagerTest {
         responseCallback = fetchCallback,
         isolationLevel = IsolationLevel.READ_UNCOMMITTED
       )
-      val tp0Replica = replicaManager.localReplica(tp0)
-      assertTrue(tp0Replica.isDefined)
-      assertEquals("hw should be incremented", 1, tp0Replica.get.highWatermark)
+      val tp0Log = replicaManager.localLog(tp0)
+      assertTrue(tp0Log.isDefined)
+      assertEquals("hw should be incremented", 1, tp0Log.get.highWatermark)
 
-      replicaManager.localReplica(tp1)
-      val tp1Replica = replicaManager.localReplica(tp1)
+      replicaManager.localLog(tp1)
+      val tp1Replica = replicaManager.localLog(tp1)
       assertTrue(tp1Replica.isDefined)
       assertEquals("hw should not be incremented", 0, tp1Replica.get.highWatermark)
 
@@ -646,7 +652,7 @@ class ReplicaManagerTest {
     // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
     val partition = replicaManager.createPartition(new TopicPartition(topic, topicPartition))
     val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-    partition.getOrCreateReplica(followerBrokerId, isNew = false, offsetCheckpoints)
+    partition.createLogIfNotExists(followerBrokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
     partition.makeFollower(controllerId,
       leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
       correlationId, offsetCheckpoints)
@@ -713,6 +719,8 @@ class ReplicaManagerTest {
       override def latestEpoch: Option[Int] = Some(leaderEpochFromLeader)
 
       override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset)
+
+      override def logEndOffset: Long = localLogOffset
     }
 
     // Expect to call LogManager.truncateTo exactly once
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index e7e6e8f..1f3179c 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -82,7 +82,9 @@ class SimpleFetchTest {
     EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
     EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
-    EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes()
+    EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLEO)).anyTimes()
+    EasyMock.expect(log.highWatermark).andReturn(partitionHW).anyTimes()
+    EasyMock.expect(log.lastStableOffset).andReturn(partitionHW).anyTimes()
     EasyMock.expect(log.read(
       startOffset = 0,
       maxLength = fetchSize,
@@ -90,7 +92,7 @@ class SimpleFetchTest {
       minOneMessage = true,
       includeAbortedTxns = false))
       .andReturn(FetchDataInfo(
-        new LogOffsetMetadata(0L, 0L, 0),
+        LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.withRecords(CompressionType.NONE, recordToHW)
       )).anyTimes()
     EasyMock.expect(log.read(
@@ -100,7 +102,7 @@ class SimpleFetchTest {
       minOneMessage = true,
       includeAbortedTxns = false))
       .andReturn(FetchDataInfo(
-        new LogOffsetMetadata(0L, 0L, 0),
+        LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.withRecords(CompressionType.NONE, recordToLEO)
       )).anyTimes()
     EasyMock.replay(log)
@@ -120,22 +122,22 @@ class SimpleFetchTest {
     val partition = replicaManager.createPartition(new TopicPartition(topic, partitionId))
 
     // create the leader replica with the local log
-    val leaderReplica = new Replica(configs.head.brokerId, partition.topicPartition, time, 0, Some(log))
-    leaderReplica.highWatermark = partitionHW
-    partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
+    log.highWatermark = partitionHW
+    partition.leaderReplicaIdOpt = Some(configs.head.brokerId)
+    partition.setLog(log, false)
 
     // create the follower replica with defined log end offset
-    val followerReplica= new Replica(configs(1).brokerId, partition.topicPartition, time)
-    val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
+    val followerReplica= new Replica(configs(1).brokerId, partition.topicPartition)
+    val leo = LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
     followerReplica.updateFetchState(
       followerFetchOffsetMetadata = leo,
       followerStartOffset = 0L,
       followerFetchTimeMs= time.milliseconds,
       leaderEndOffset = leo.messageOffset)
+    partition.addReplicaIfNotExists(followerReplica)
 
     // add both of them to ISR
-    val allReplicas = List(leaderReplica, followerReplica)
-    allReplicas.foreach(partition.addReplicaIfNotExists)
+    val allReplicas = List(configs.head.brokerId, followerReplica.brokerId)
     partition.inSyncReplicas = allReplicas.toSet
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 8349541..3e48109 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -444,7 +444,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
 
   private def awaitISR(tp: TopicPartition): Unit = {
     TestUtils.waitUntilTrue(() => {
-      leader.replicaManager.nonOfflinePartition(tp).get.inSyncReplicas.map(_.brokerId).size == 2
+      leader.replicaManager.nonOfflinePartition(tp).get.inSyncReplicas.size == 2
     }, "Timed out waiting for replicas to join ISR")
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index cf78bac..ef3ac85 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -145,7 +145,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
 
     brokers += createServer(fromProps(createBrokerConfig(101, zkConnect)))
 
-    def leo() = brokers(1).replicaManager.localReplica(tp).get.logEndOffset
+    def leo() = brokers(1).replicaManager.localLog(tp).get.logEndOffset
 
     TestUtils.createTopic(zkClient, tp.topic, Map(tp.partition -> Seq(101)), brokers)
     producer = createProducer(getBrokerListStrFromServers(brokers), acks = -1)
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index eba4167..08be8a2 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -20,7 +20,6 @@ import java.io.File
 import java.util.Optional
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.cluster.Replica
 import kafka.log.{Log, LogManager}
 import kafka.server._
 import kafka.utils.{MockTime, TestUtils}
@@ -58,8 +57,7 @@ class OffsetsForLeaderEpochTest {
       QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     val partition = replicaManager.createPartition(tp)
-    val leaderReplica = new Replica(config.brokerId, partition.topicPartition, time, 0, Some(mockLog))
-    partition.addReplicaIfNotExists(leaderReplica)
+    partition.setLog(mockLog, isFutureLog = false)
     partition.leaderReplicaIdOpt = Some(config.brokerId)
 
     //When
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 5c9284f..98fbc4e 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -848,7 +848,7 @@ object TestUtils extends Logging {
   }
 
   def isLeaderLocalOnBroker(topic: String, partitionId: Int, server: KafkaServer): Boolean = {
-    server.replicaManager.nonOfflinePartition(new TopicPartition(topic, partitionId)).exists(_.leaderReplicaIfLocal.isDefined)
+    server.replicaManager.nonOfflinePartition(new TopicPartition(topic, partitionId)).exists(_.leaderLogIfLocal.isDefined)
   }
 
   def findLeaderEpoch(brokerId: Int,
@@ -929,7 +929,7 @@ object TestUtils extends Logging {
     def newLeaderExists: Option[Int] = {
       servers.find { server =>
         server.config.brokerId != oldLeader &&
-          server.replicaManager.nonOfflinePartition(tp).exists(_.leaderReplicaIfLocal.isDefined)
+          server.replicaManager.nonOfflinePartition(tp).exists(_.leaderLogIfLocal.isDefined)
       }.map(_.config.brokerId)
     }
 
@@ -944,7 +944,7 @@ object TestUtils extends Logging {
                              timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
     def leaderIfExists: Option[Int] = {
       servers.find { server =>
-        server.replicaManager.nonOfflinePartition(tp).exists(_.leaderReplicaIfLocal.isDefined)
+        server.replicaManager.nonOfflinePartition(tp).exists(_.leaderLogIfLocal.isDefined)
       }.map(_.config.brokerId)
     }