You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/04 21:03:00 UTC

[jira] [Commented] (KAFKA-7415) OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing truncation to HW

    [ https://issues.apache.org/jira/browse/KAFKA-7415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638878#comment-16638878 ] 

ASF GitHub Bot commented on KAFKA-7415:
---------------------------------------

hachikuji closed pull request #5678: KAFKA-7415; Persist leader epoch and start offset on becoming a leader
URL: https://github.com/apache/kafka/pull/5678
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 2036bb09da8..307fb81447b 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -301,8 +301,17 @@ class Partition(val topic: String,
       leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
       leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
       zkVersion = partitionStateInfo.basePartitionState.zkVersion
-      val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true)
 
+      // In the case of successive leader elections in a short time period, a follower may have
+      // entries in its log from a later epoch than any entry in the new leader's log. In order
+      // to ensure that these followers can truncate to the right offset, we must cache the new
+      // leader epoch and the start offset since it should be larger than any epoch that a follower
+      // would try to query.
+      leaderReplica.epochs.foreach { epochCache =>
+        epochCache.assign(leaderEpoch, leaderEpochStartOffset)
+      }
+
+      val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
       val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
       val curTimeMs = time.milliseconds
       // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index d729dadcb48..22860c71475 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -18,7 +18,7 @@
 package kafka.cluster
 
 import kafka.log.Log
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -55,7 +55,7 @@ class Replica(val brokerId: Int,
 
   def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs
 
-  val epochs: Option[LeaderEpochCache] = log.map(_.leaderEpochCache)
+  val epochs: Option[LeaderEpochFileCache] = log.map(_.leaderEpochCache)
 
   info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue")
   log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index afe151d69b6..4e335ccc33b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -32,15 +32,15 @@ import kafka.common.{LogSegmentOffsetOverflowException, LongRef, OffsetsOutOfOrd
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
-import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
 import kafka.utils._
-import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.errors.{CorruptRecordException, InvalidOffsetException, KafkaStorageException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
+import org.apache.kafka.common.errors._
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest}
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@@ -229,7 +229,7 @@ class Log(@volatile var dir: File,
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
 
-  @volatile private var _leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache()
+  @volatile private var _leaderEpochCache: LeaderEpochFileCache = initializeLeaderEpochCache()
 
   locally {
     val startMs = time.milliseconds
@@ -239,12 +239,12 @@ class Log(@volatile var dir: File,
     /* Calculate the offset of the next message */
     nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
 
-    _leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset)
+    _leaderEpochCache.truncateFromEnd(nextOffsetMetadata.messageOffset)
 
     logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
 
     // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
-    _leaderEpochCache.clearAndFlushEarliest(logStartOffset)
+    _leaderEpochCache.truncateFromStart(logStartOffset)
 
     // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
     // from scratch.
@@ -296,11 +296,11 @@ class Log(@volatile var dir: File,
 
   def leaderEpochCache = _leaderEpochCache
 
-  private def initializeLeaderEpochCache(): LeaderEpochCache = {
+  private def initializeLeaderEpochCache(): LeaderEpochFileCache = {
     // create the log directory if it doesn't exist
     Files.createDirectories(dir.toPath)
-    new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata,
-      new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel))
+    val checkpointFile = new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel)
+    new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
   }
 
   /**
@@ -422,7 +422,7 @@ class Log(@volatile var dir: File,
    * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow
    */
   private def recoverSegment(segment: LogSegment,
-                             leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized {
+                             leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = lock synchronized {
     val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
     rebuildProducerState(segment.baseOffset, reloadFromCleanShutdown = false, producerStateManager)
     val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache)
@@ -941,7 +941,7 @@ class Log(@volatile var dir: File,
         if (newLogStartOffset > logStartOffset) {
           info(s"Incrementing log start offset to $newLogStartOffset")
           logStartOffset = newLogStartOffset
-          _leaderEpochCache.clearAndFlushEarliest(logStartOffset)
+          _leaderEpochCache.truncateFromStart(logStartOffset)
           producerStateManager.truncateHead(logStartOffset)
           updateFirstUnstableOffset()
         }
@@ -1645,7 +1645,7 @@ class Log(@volatile var dir: File,
             updateLogEndOffset(targetOffset)
             this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
             this.logStartOffset = math.min(targetOffset, this.logStartOffset)
-            _leaderEpochCache.clearAndFlushLatest(targetOffset)
+            _leaderEpochCache.truncateFromEnd(targetOffset)
             loadProducerState(targetOffset, reloadFromCleanShutdown = false)
           }
           true
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 0c00e5588f2..80763a8d797 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
 
 import kafka.common.LogSegmentOffsetOverflowException
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.{FetchDataInfo, LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.errors.CorruptRecordException
@@ -330,7 +330,7 @@ class LogSegment private[log] (val log: FileRecords,
    * @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow
    */
   @nonthreadsafe
-  def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
+  def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
     offsetIndex.reset()
     timeIndex.reset()
     txnIndex.reset()
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 88f5d6bd8e3..cee6bb66bdf 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -18,53 +18,69 @@ package kafka.server.epoch
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import kafka.server.LogOffsetMetadata
 import kafka.server.checkpoints.LeaderEpochCheckpoint
 import org.apache.kafka.common.requests.EpochEndOffset._
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
-import scala.collection.mutable.ListBuffer
 
-trait LeaderEpochCache {
-  def assign(leaderEpoch: Int, offset: Long)
-  def latestEpoch: Int
-  def endOffsetFor(epoch: Int): (Int, Long)
-  def clearAndFlushLatest(offset: Long)
-  def clearAndFlushEarliest(offset: Long)
-  def clearAndFlush()
-  def clear()
-}
+import scala.collection.mutable.ListBuffer
 
 /**
-  * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
-  *
-  * Leader Epoch = epoch assigned to each leader by the controller.
-  * Offset = offset of the first message in each epoch.
-  *
-  * @param leo a function that determines the log end offset
-  * @param checkpoint the checkpoint file
-  */
-class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetMetadata, checkpoint: LeaderEpochCheckpoint) extends LeaderEpochCache with Logging {
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
+ *
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ *
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ * @param logEndOffset function to fetch the current log end offset
+ */
+class LeaderEpochFileCache(topicPartition: TopicPartition,
+                           logEndOffset: () => Long,
+                           checkpoint: LeaderEpochCheckpoint) extends Logging {
+  this.logIdent = s"[LeaderEpochCache $topicPartition] "
+
   private val lock = new ReentrantReadWriteLock()
   private var epochs: ListBuffer[EpochEntry] = inWriteLock(lock) { ListBuffer(checkpoint.read(): _*) }
 
   /**
     * Assigns the supplied Leader Epoch to the supplied Offset
     * Once the epoch is assigned it cannot be reassigned
-    *
-    * @param epoch
-    * @param offset
     */
-  override def assign(epoch: Int, offset: Long): Unit = {
+  def assign(epoch: Int, startOffset: Long): Unit = {
     inWriteLock(lock) {
-      if (epoch >= 0 && epoch > latestEpoch && offset >= latestOffset) {
-        info(s"Updated PartitionLeaderEpoch. ${epochChangeMsg(epoch, offset)}. Cache now contains ${epochs.size} entries.")
-        epochs += EpochEntry(epoch, offset)
-        flush()
+      val updateNeeded = if (epochs.isEmpty) {
+        true
       } else {
-        validateAndMaybeWarn(epoch, offset)
+        val lastEntry = epochs.last
+        lastEntry.epoch != epoch || startOffset < lastEntry.startOffset
       }
+
+      if (updateNeeded) {
+        truncateAndAppend(EpochEntry(epoch, startOffset))
+        flush()
+      }
+    }
+  }
+
+  /**
+   * Remove any entries which violate monotonicity following the insertion of an assigned epoch.
+   */
+  private def truncateAndAppend(entryToAppend: EpochEntry): Unit = {
+    validateAndMaybeWarn(entryToAppend)
+
+    val (retainedEpochs, removedEpochs) = epochs.partition { entry =>
+      entry.epoch < entryToAppend.epoch && entry.startOffset < entryToAppend.startOffset
+    }
+
+    epochs = retainedEpochs :+ entryToAppend
+
+    if (removedEpochs.isEmpty) {
+      debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.")
+    } else {
+      warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " +
+        s"Cache now contains ${epochs.size} entries.")
     }
   }
 
@@ -74,7 +90,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     *
     * @return
     */
-  override def latestEpoch(): Int = {
+  def latestEpoch: Int = {
     inReadLock(lock) {
       if (epochs.isEmpty) UNDEFINED_EPOCH else epochs.last.epoch
     }
@@ -93,45 +109,59 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     * so that the follower falls back to High Water Mark.
     *
     * @param requestedEpoch requested leader epoch
-    * @return leader epoch and offset
+    * @return found leader epoch and end offset
     */
-  override def endOffsetFor(requestedEpoch: Int): (Int, Long) = {
+  def endOffsetFor(requestedEpoch: Int): (Int, Long) = {
     inReadLock(lock) {
       val epochAndOffset =
         if (requestedEpoch == UNDEFINED_EPOCH) {
-          // this may happen if a bootstrapping follower sends a request with undefined epoch or
+          // This may happen if a bootstrapping follower sends a request with undefined epoch or
           // a follower is on the older message format where leader epochs are not recorded
           (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
         } else if (requestedEpoch == latestEpoch) {
-          (requestedEpoch, leo().messageOffset)
+          // For the leader, the latest epoch is always the current leader epoch that is still being written to.
+          // Followers should not have any reason to query for the end offset of the current epoch, but a consumer
+          // might if it is verifying its committed offset following a group rebalance. In this case, we return
+          // the current log end offset which makes the truncation check work as expected.
+          (requestedEpoch, logEndOffset())
         } else {
           val (subsequentEpochs, previousEpochs) = epochs.partition { e => e.epoch > requestedEpoch}
-          if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch)
-            // no epochs recorded or requested epoch < the first epoch cached
+          if (subsequentEpochs.isEmpty) {
+            // The requested epoch is larger than any known epoch. This case should never be hit because
+            // the latest cached epoch is always the largest.
             (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
-          else {
-            // we must get at least one element in previous epochs list, because if we are here,
-            // it means that requestedEpoch >= epochs.head.epoch -- so at least the first epoch is
+          } else if (previousEpochs.isEmpty) {
+            // The requested epoch is smaller than any known epoch, so we return the start offset of the first
+            // known epoch which is larger than it. This may be inaccurate as there could have been
+            // epochs in between, but the point is that the data has already been removed from the log
+            // and we want to ensure that the follower can replicate correctly beginning from the leader's
+            // start offset.
+            (requestedEpoch, subsequentEpochs.head.startOffset)
+          } else {
+            // We have at least one previous epoch and one subsequent epoch. The result is the first
+            // prior epoch and the starting offset of the first subsequent epoch.
             (previousEpochs.last.epoch, subsequentEpochs.head.startOffset)
           }
         }
-      debug(s"Processed offset for epoch request for partition ${topicPartition} epoch:$requestedEpoch and returning epoch ${epochAndOffset._1} and offset ${epochAndOffset._2} from epoch list of size ${epochs.size}")
+      debug(s"Processed end offset request for epoch $requestedEpoch and returning epoch ${epochAndOffset._1} " +
+        s"with end offset ${epochAndOffset._2} from epoch cache of size ${epochs.size}")
       epochAndOffset
     }
   }
 
   /**
     * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset.
-    *
-    * @param offset
     */
-  override def clearAndFlushLatest(offset: Long): Unit = {
+  def truncateFromEnd(endOffset: Long): Unit = {
     inWriteLock(lock) {
-      val before = epochs
-      if (offset >= 0 && offset <= latestOffset()) {
-        epochs = epochs.filter(entry => entry.startOffset < offset)
+      if (endOffset >= 0 && latestEntry.exists(_.startOffset >= endOffset)) {
+        val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset >= endOffset)
+        epochs = previousEntries
+
         flush()
-        info(s"Cleared latest ${before.toSet.filterNot(epochs.toSet)} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition")
+
+        debug(s"Cleared entries $subsequentEntries from epoch cache after " +
+          s"truncating to end offset $endOffset, leaving ${epochs.size} entries in the cache.")
       }
     }
   }
@@ -142,20 +172,21 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     *
     * This method is exclusive: so clearEarliest(6) will retain an entry at offset 6.
     *
-    * @param offset the offset to clear up to
+    * @param startOffset the offset to clear up to
     */
-  override def clearAndFlushEarliest(offset: Long): Unit = {
+  def truncateFromStart(startOffset: Long): Unit = {
     inWriteLock(lock) {
-      val before = epochs
-      if (offset >= 0 && earliestOffset() < offset) {
-        val earliest = epochs.filter(entry => entry.startOffset < offset)
-        if (earliest.nonEmpty) {
-          epochs = epochs --= earliest
-          //If the offset is less than the earliest offset remaining, add previous epoch back, but with an updated offset
-          if (offset < earliestOffset() || epochs.isEmpty)
-            new EpochEntry(earliest.last.epoch, offset) +=: epochs
+      if (epochs.nonEmpty) {
+        val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset > startOffset)
+
+        previousEntries.lastOption.foreach { firstBeforeStartOffset =>
+          val updatedFirstEntry = EpochEntry(firstBeforeStartOffset.epoch, startOffset)
+          epochs = updatedFirstEntry +: subsequentEntries
+
           flush()
-          info(s"Cleared earliest ${before.toSet.filterNot(epochs.toSet).size} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition")
+
+          debug(s"Cleared entries $previousEntries and rewrote first entry $updatedFirstEntry after " +
+            s"truncating to start offset $startOffset, leaving ${epochs.size} in the cache.")
         }
       }
     }
@@ -164,47 +195,55 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
   /**
     * Delete all entries.
     */
-  override def clearAndFlush() = {
+  def clearAndFlush() = {
     inWriteLock(lock) {
       epochs.clear()
       flush()
     }
   }
 
-  override def clear() = {
+  def clear() = {
     inWriteLock(lock) {
       epochs.clear()
     }
   }
 
-  def epochEntries(): ListBuffer[EpochEntry] = {
+  // Visible for testing
+  def epochEntries: ListBuffer[EpochEntry] = {
     epochs
   }
 
-  private def earliestOffset(): Long = {
-    if (epochs.isEmpty) -1 else epochs.head.startOffset
-  }
-
-  private def latestOffset(): Long = {
-    if (epochs.isEmpty) -1 else epochs.last.startOffset
-  }
+  private def latestEntry: Option[EpochEntry] = epochs.lastOption
 
   private def flush(): Unit = {
     checkpoint.write(epochs)
   }
 
-  def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Current: {epoch:$latestEpoch, offset:$latestOffset} for Partition: $topicPartition"
-
-  def validateAndMaybeWarn(epoch: Int, offset: Long) = {
-    assert(epoch >= 0, s"Received a PartitionLeaderEpoch assignment for an epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}")
-    if (epoch < latestEpoch())
-      warn(s"Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. " +
-        s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
-    else if (offset < latestOffset())
-      warn(s"Received a PartitionLeaderEpoch assignment for an offset < latest offset for the most recent, stored PartitionLeaderEpoch. " +
-        s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
+  private def validateAndMaybeWarn(entry: EpochEntry) = {
+    if (entry.epoch < 0) {
+      throw new IllegalArgumentException(s"Received invalid partition leader epoch entry $entry")
+    } else {
+      // If the latest append violates the monotonicity of epochs or starting offsets, our choices
+      // are either to raise an error, ignore the append, or allow the append and truncate the
+      // conflicting entries from the cache. Raising an error risks killing the fetcher threads in
+      // pathological cases (i.e. cases we are not yet aware of). We instead take the final approach
+      // and assume that the latest append is always accurate.
+
+      latestEntry.foreach { latest =>
+        if (entry.epoch < latest.epoch)
+          warn(s"Received leader epoch assignment $entry which has an epoch less than the epoch " +
+            s"of the latest entry $latest. This implies messages have arrived out of order.")
+        else if (entry.startOffset < latest.startOffset)
+          warn(s"Received leader epoch assignment $entry which has a starting offset which is less than " +
+            s"the starting offset of the latest entry $latest. This implies messages have arrived out of order.")
+      }
+    }
   }
 }
 
 // Mapping of epoch to the first offset of the subsequent epoch
-case class EpochEntry(epoch: Int, startOffset: Long)
+case class EpochEntry(epoch: Int, startOffset: Long) {
+  override def toString: String = {
+    s"EpochEntry(epoch=$epoch, startOffset=$startOffset)"
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 343693e82c7..7cdc7789963 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -97,6 +97,39 @@ class PartitionTest {
     replicaManager.shutdown(checkpointHW = false)
   }
 
+  @Test
+  def testMakeLeaderUpdatesEpochCache(): Unit = {
+    val controllerEpoch = 3
+    val leader = brokerId
+    val follower = brokerId + 1
+    val controllerId = brokerId + 3
+    val replicas = List[Integer](leader, follower).asJava
+    val isr = List[Integer](leader, follower).asJava
+    val leaderEpoch = 8
+
+    val log = logManager.getOrCreateLog(topicPartition, logConfig)
+    log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k1".getBytes, "v1".getBytes),
+      new SimpleRecord("k2".getBytes, "v2".getBytes)
+    ), leaderEpoch = 0)
+    log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 5,
+      new SimpleRecord("k3".getBytes, "v3".getBytes),
+      new SimpleRecord("k4".getBytes, "v4".getBytes)
+    ), leaderEpoch = 5)
+    assertEquals(4, log.logEndOffset)
+
+    val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+    assertTrue("Expected makeLeader to succeed",
+      partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch,
+        isr, 1, replicas, true), 0))
+
+    assertEquals(Some(4), partition.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset))
+
+    val epochEndOffset = partition.lastOffsetForLeaderEpoch(leaderEpoch)
+    assertEquals(4, epochEndOffset.endOffset)
+    assertEquals(leaderEpoch, epochEndOffset.leaderEpoch)
+  }
+
   @Test
   // Verify that partition.removeFutureLocalReplica() and partition.maybeReplaceCurrentWithFutureReplica() can run concurrently
   def testMaybeReplaceCurrentWithFutureReplica(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 9a9bc613585..e584b8cedbf 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -25,7 +25,7 @@ import java.util.Properties
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
 import kafka.log.Log.DeleteDirSuffix
-import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
+import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
 import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -287,7 +287,7 @@ class LogTest {
             }
 
             override def recover(producerStateManager: ProducerStateManager,
-                                 leaderEpochCache: Option[LeaderEpochCache]): Int = {
+                                 leaderEpochCache: Option[LeaderEpochFileCache]): Int = {
               recoveredSegments += this
               super.recover(producerStateManager, leaderEpochCache)
             }
@@ -2589,8 +2589,8 @@ class LogTest {
     log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
-    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
-    assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100), epochCache(log).epochEntries().head)
+    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size)
+    assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100), epochCache(log).epochEntries.head)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -2599,7 +2599,7 @@ class LogTest {
     log.delete()
     assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
     assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments())
-    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
+    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size)
   }
 
   @Test
@@ -2612,12 +2612,12 @@ class LogTest {
     log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
-    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
+    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size)
 
     log.close()
     log.delete()
     assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
-    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
+    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size)
   }
 
   @Test
@@ -2790,7 +2790,7 @@ class LogTest {
     for (i <- records.indices)
       log.appendAsFollower(recordsForEpoch(i))
 
-    assertEquals(42, log.leaderEpochCache.asInstanceOf[LeaderEpochFileCache].latestEpoch())
+    assertEquals(42, log.leaderEpochCache.latestEpoch)
   }
 
   @Test
@@ -2845,19 +2845,24 @@ class LogTest {
 
   @Test
   def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords.sizeInBytes)
+    def createRecords(startOffset: Long, epoch: Int): MemoryRecords = {
+      TestUtils.records(Seq(new SimpleRecord("value".getBytes)),
+        baseOffset = startOffset, partitionLeaderEpoch = epoch)
+    }
+
+    val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords(0, 0).sizeInBytes)
     val log = createLog(logDir, logConfig)
     val cache = epochCache(log)
 
-    //Given 2 segments, 10 messages per segment
-    for (epoch <- 1 to 20)
-      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    def append(epoch: Int, startOffset: Long, count: Int): Unit = {
+      for (i <- 0 until count)
+        log.appendAsFollower(createRecords(startOffset + i, epoch))
+    }
 
-    //Simulate some leader changes at specific offsets
-    cache.assign(0, 0)
-    cache.assign(1, 10)
-    cache.assign(2, 16)
+    //Given 2 segments, 10 messages per segment
+    append(epoch = 0, startOffset = 0, count = 10)
+    append(epoch = 1, startOffset = 10, count = 6)
+    append(epoch = 2, startOffset = 16, count = 4)
 
     assertEquals(2, log.numberOfSegments)
     assertEquals(20, log.logEndOffset)
@@ -2909,7 +2914,7 @@ class LogTest {
     assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
 
     // deliberately remove some of the epoch entries
-    leaderEpochCache.clearAndFlushLatest(2)
+    leaderEpochCache.truncateFromEnd(2)
     assertNotEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
     log.close()
 
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index c90a5b97a1a..3dff709bef6 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.{Partition, Replica}
 import kafka.log.Log
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
@@ -253,7 +253,7 @@ class IsrExpirationTest {
 
   private def logMock: Log = {
     val log = EasyMock.createMock(classOf[kafka.log.Log])
-    val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache])
+    val cache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache])
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
     EasyMock.expect(log.onHighWatermarkIncremented(0L))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 8fb5ab6feba..2e28ee13f2d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -21,7 +21,7 @@ import kafka.api.Request
 import kafka.cluster.{BrokerEndPoint, Partition, Replica}
 import kafka.log.LogManager
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils.{DelayedItem, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{KafkaStorageException, ReplicaNotAvailableException}
@@ -46,7 +46,7 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val replica = createNiceMock(classOf[Replica])
     val futureReplica = createNiceMock(classOf[Replica])
     val partition = createMock(classOf[Partition])
@@ -87,7 +87,7 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val replica = createNiceMock(classOf[Replica])
     val partition = createMock(classOf[Partition])
     val replicaManager = createMock(classOf[ReplicaManager])
@@ -133,9 +133,9 @@ class ReplicaAlterLogDirsThreadTest {
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochsT1p0 = createMock(classOf[LeaderEpochCache])
-    val leaderEpochsT1p1 = createMock(classOf[LeaderEpochCache])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochsT1p0 = createMock(classOf[LeaderEpochFileCache])
+    val leaderEpochsT1p1 = createMock(classOf[LeaderEpochFileCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaT1p0 = createNiceMock(classOf[Replica])
     val replicaT1p1 = createNiceMock(classOf[Replica])
@@ -195,8 +195,8 @@ class ReplicaAlterLogDirsThreadTest {
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createMock(classOf[LeaderEpochCache])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replica = createNiceMock(classOf[Replica])
     // one future replica mock because our mocking methods return same values for both future replicas
@@ -265,8 +265,8 @@ class ReplicaAlterLogDirsThreadTest {
     val logManager = createMock(classOf[LogManager])
     val replica = createNiceMock(classOf[Replica])
     val futureReplica = createNiceMock(classOf[Replica])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val partition = createMock(classOf[Partition])
     val replicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
@@ -319,8 +319,8 @@ class ReplicaAlterLogDirsThreadTest {
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[kafka.log.LogManager])
     val replica = createNiceMock(classOf[Replica])
     val futureReplica = createNiceMock(classOf[Replica])
@@ -401,8 +401,8 @@ class ReplicaAlterLogDirsThreadTest {
     //Setup all dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replica = createNiceMock(classOf[Replica])
     val futureReplica = createNiceMock(classOf[Replica])
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 520801c9bab..9440c29ee7b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -20,7 +20,7 @@ import kafka.cluster.{BrokerEndPoint, Replica}
 import kafka.log.LogManager
 import kafka.cluster.Partition
 import kafka.server.QuotaFactory.UnboundedQuota
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.ClientResponse
@@ -154,7 +154,7 @@ class ReplicaFetcherThreadTest {
 
     //Setup all dependencies
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -278,7 +278,7 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -339,7 +339,7 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -388,7 +388,7 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -442,7 +442,7 @@ class ReplicaFetcherThreadTest {
 
     // Setup all dependencies
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -513,7 +513,7 @@ class ReplicaFetcherThreadTest {
 
     // Setup all dependencies
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -574,7 +574,7 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -619,7 +619,7 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
     val quota = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[kafka.log.LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -677,7 +677,7 @@ class ReplicaFetcherThreadTest {
 
     //Setup all stubs
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createNiceMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -728,7 +728,7 @@ class ReplicaFetcherThreadTest {
 
     //Setup all stubs
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createNiceMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 08440528638..90d488dd37a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -26,7 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import TestUtils.createBroker
 import kafka.cluster.BrokerEndPoint
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.timer.MockTimer
 import kafka.zk.KafkaZkClient
@@ -641,7 +641,7 @@ class ReplicaManagerTest {
     val mockScheduler = new MockScheduler(time)
     val mockBrokerTopicStats = new BrokerTopicStats
     val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
-    val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache])
+    val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochFileCache])
     EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader)
     EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
       .andReturn((leaderEpochFromLeader, localLogOffset))
@@ -661,7 +661,7 @@ class ReplicaManagerTest {
         new File(new File(config.logDirs.head), s"$topic-$topicPartition"), 30000),
       logDirFailureChannel = mockLogDirFailureChannel) {
 
-      override def leaderEpochCache: LeaderEpochCache = mockLeaderEpochCache
+      override def leaderEpochCache: LeaderEpochFileCache = mockLeaderEpochCache
 
       override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset)
     }
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
index e7c6a9785bc..0c47f15a09f 100644
--- a/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
@@ -24,7 +24,6 @@ import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnitSuite
 
-
 class LeaderEpochCheckpointFileTest extends JUnitSuite with Logging{
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 48590190b57..5c37891c937 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -90,23 +90,23 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     assertEquals(0, latestRecord(follower).partitionLeaderEpoch())
 
     //Both leader and follower should have recorded Epoch 0 at Offset 0
-    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries)
 
     //Bounce the follower
     bounce(follower)
     awaitISR(tp)
 
     //Nothing happens yet as we haven't sent any new messages.
-    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries)
 
     //Send a message
     producer.send(new ProducerRecord(topic, 0, null, msg)).get
 
     //Epoch1 should now propagate to the follower with the written message
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries)
 
     //The new message should have epoch 1 stamped
     assertEquals(1, latestRecord(leader).partitionLeaderEpoch())
@@ -117,8 +117,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     awaitISR(tp)
 
     //Epochs 2 should be added to the leader, but not on the follower (yet), as there has been no replication.
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries)
 
     //Send a message
     producer.send(new ProducerRecord(topic, 0, null, msg)).get
@@ -128,8 +128,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     assertEquals(2, latestRecord(follower).partitionLeaderEpoch())
 
     //The leader epoch files should now match on leader and follower
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries)
   }
 
   @Test
@@ -377,8 +377,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
 
   private def log(leader: KafkaServer, follower: KafkaServer): Unit = {
     info(s"Bounce complete for follower ${follower.config.brokerId}")
-    info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries())
-    info(s"Follower: leo${follower.config.brokerId}: " + getLog(follower, 0).logEndOffset + " cache: " + epochCache(follower).epochEntries())
+    info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries)
+    info(s"Follower: leo${follower.config.brokerId}: " + getLog(follower, 0).logEndOffset + " cache: " + epochCache(follower).epochEntries)
   }
 
   private def waitForLogsToMatch(b1: KafkaServer, b2: KafkaServer, partition: Int = 0): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index d1f93900ccf..7ac606a2dc5 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -16,6 +16,7 @@
   */
 
 package kafka.server.epoch
+
 import java.io.File
 
 import kafka.server.LogOffsetMetadata
@@ -24,7 +25,7 @@ import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFIN
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
-import org.junit.{Before, Test}
+import org.junit.Test
 
 import scala.collection.mutable.ListBuffer
 
@@ -33,54 +34,44 @@ import scala.collection.mutable.ListBuffer
   */
 class LeaderEpochFileCacheTest {
   val tp = new TopicPartition("TestTopic", 5)
-  var checkpoint: LeaderEpochCheckpoint = _
+  private var logEndOffset = 0L
+  private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
+    private var epochs: Seq[EpochEntry] = Seq()
+    override def write(epochs: Seq[EpochEntry]): Unit = this.epochs = epochs
+    override def read(): Seq[EpochEntry] = this.epochs
+  }
+  private val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint)
 
   @Test
   def shouldAddEpochAndMessageOffsetToCache() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When
-    cache.assign(epoch = 2, offset = 10)
-    leo = 11
+    cache.assign(epoch = 2, startOffset = 10)
+    logEndOffset = 11
 
     //Then
-    assertEquals(2, cache.latestEpoch())
-    assertEquals(EpochEntry(2, 10), cache.epochEntries()(0))
-    assertEquals((2, leo), cache.endOffsetFor(2)) //should match leo
+    assertEquals(2, cache.latestEpoch)
+    assertEquals(EpochEntry(2, 10), cache.epochEntries(0))
+    assertEquals((2, logEndOffset), cache.endOffsetFor(2)) //should match logEndOffset
   }
 
   @Test
   def shouldReturnLogEndOffsetIfLatestEpochRequested() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When just one epoch
-    cache.assign(epoch = 2, offset = 11)
-    cache.assign(epoch = 2, offset = 12)
-    leo = 14
+    cache.assign(epoch = 2, startOffset = 11)
+    cache.assign(epoch = 2, startOffset = 12)
+    logEndOffset = 14
 
     //Then
-    assertEquals((2, leo), cache.endOffsetFor(2))
+    assertEquals((2, logEndOffset), cache.endOffsetFor(2))
   }
 
   @Test
   def shouldReturnUndefinedOffsetIfUndefinedEpochRequested() = {
-    def leoFinder() = new LogOffsetMetadata(0)
     val expectedEpochEndOffset = (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
 
-    //Given cache with some data on leader
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     // assign couple of epochs
-    cache.assign(epoch = 2, offset = 11)
-    cache.assign(epoch = 3, offset = 12)
+    cache.assign(epoch = 2, startOffset = 11)
+    cache.assign(epoch = 3, startOffset = 12)
 
     //When (say a bootstraping follower) sends request for UNDEFINED_EPOCH
     val epochAndOffsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
@@ -92,68 +83,51 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotOverwriteLogEndOffsetForALeaderEpochOnceItHasBeenAssigned() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
     //Given
-    leo = 9
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    logEndOffset = 9
 
-    cache.assign(2, leo)
+    cache.assign(2, logEndOffset)
 
     //When called again later
     cache.assign(2, 10)
 
     //Then the offset should NOT have been updated
-    assertEquals(leo, cache.epochEntries()(0).startOffset)
+    assertEquals(logEndOffset, cache.epochEntries(0).startOffset)
+    assertEquals(ListBuffer(EpochEntry(2, 9)), cache.epochEntries)
   }
 
   @Test
-  def shouldAllowLeaderEpochToChangeEvenIfOffsetDoesNot() = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
+  def shouldEnforceMonotonicallyIncreasingStartOffsets() = {
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
     cache.assign(2, 9)
 
     //When update epoch new epoch but same offset
     cache.assign(3, 9)
 
     //Then epoch should have been updated
-    assertEquals(ListBuffer(EpochEntry(2, 9), EpochEntry(3, 9)), cache.epochEntries())
+    assertEquals(ListBuffer(EpochEntry(3, 9)), cache.epochEntries)
   }
   
   @Test
   def shouldNotOverwriteOffsetForALeaderEpochOnceItHasBeenAssigned() = {
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => new LogOffsetMetadata(0), checkpoint)
     cache.assign(2, 6)
 
     //When called again later with a greater offset
     cache.assign(2, 10)
 
     //Then later update should have been ignored
-    assertEquals(6, cache.epochEntries()(0).startOffset)
+    assertEquals(6, cache.epochEntries(0).startOffset)
   }
 
   @Test
   def shouldReturnUnsupportedIfNoEpochRecorded(){
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
     assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(0))
   }
 
   @Test
   def shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested(){
-    val leo = 73
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    logEndOffset = 73
 
     //When (say a follower on older message format version) sends request for UNDEFINED_EPOCH
     val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
@@ -164,39 +138,41 @@ class LeaderEpochFileCacheTest {
   }
 
   @Test
-  def shouldReturnUnsupportedIfRequestedEpochLessThanFirstEpoch(){
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
-    cache.assign(epoch = 5, offset = 11)
-    cache.assign(epoch = 6, offset = 12)
-    cache.assign(epoch = 7, offset = 13)
+  def shouldReturnFirstEpochIfRequestedEpochLessThanFirstEpoch(){
+    cache.assign(epoch = 5, startOffset = 11)
+    cache.assign(epoch = 6, startOffset = 12)
+    cache.assign(epoch = 7, startOffset = 13)
 
     //When
-    val epochAndOffset = cache.endOffsetFor(5 - 1)
+    val epochAndOffset = cache.endOffsetFor(4)
 
     //Then
-    assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), epochAndOffset)
+    assertEquals((4, 11), epochAndOffset)
   }
 
   @Test
-  def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
+  def shouldTruncateIfMatchingEpochButEarlierStartingOffset(): Unit = {
+    cache.assign(epoch = 5, startOffset = 11)
+    cache.assign(epoch = 6, startOffset = 12)
+    cache.assign(epoch = 7, startOffset = 13)
 
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    // epoch 7 starts at an earlier offset
+    cache.assign(epoch = 7, startOffset = 12)
 
+    assertEquals((5, 12), cache.endOffsetFor(5))
+    assertEquals((5, 12), cache.endOffsetFor(6))
+  }
+
+  @Test
+  def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = {
     //When several epochs
-    cache.assign(epoch = 1, offset = 11)
-    cache.assign(epoch = 1, offset = 12)
-    cache.assign(epoch = 2, offset = 13)
-    cache.assign(epoch = 2, offset = 14)
-    cache.assign(epoch = 3, offset = 15)
-    cache.assign(epoch = 3, offset = 16)
-    leo = 17
+    cache.assign(epoch = 1, startOffset = 11)
+    cache.assign(epoch = 1, startOffset = 12)
+    cache.assign(epoch = 2, startOffset = 13)
+    cache.assign(epoch = 2, startOffset = 14)
+    cache.assign(epoch = 3, startOffset = 15)
+    cache.assign(epoch = 3, startOffset = 16)
+    logEndOffset = 17
 
     //Then get the start offset of the next epoch
     assertEquals((2, 15), cache.endOffsetFor(2))
@@ -204,15 +180,10 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldReturnNextAvailableEpochIfThereIsNoExactEpochForTheOneRequested(){
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When
-    cache.assign(epoch = 0, offset = 10)
-    cache.assign(epoch = 2, offset = 13)
-    cache.assign(epoch = 4, offset = 17)
+    cache.assign(epoch = 0, startOffset = 10)
+    cache.assign(epoch = 2, startOffset = 13)
+    cache.assign(epoch = 4, startOffset = 17)
 
     //Then
     assertEquals((0, 13), cache.endOffsetFor(requestedEpoch = 1))
@@ -222,14 +193,9 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotUpdateEpochAndStartOffsetIfItDidNotChange() = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 2, offset = 7)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 2, startOffset = 7)
 
     //Then
     assertEquals(1, cache.epochEntries.size)
@@ -238,14 +204,10 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked(): Unit = {
-    val leo = 100
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    logEndOffset = 100
 
     //When
-    cache.assign(epoch = 2, offset = 100)
+    cache.assign(epoch = 2, startOffset = 100)
 
     //Then
     assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(3))
@@ -253,35 +215,28 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldSupportEpochsThatDoNotStartFromZero(): Unit = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When
-    cache.assign(epoch = 2, offset = 6)
-    leo = 7
+    cache.assign(epoch = 2, startOffset = 6)
+    logEndOffset = 7
 
     //Then
-    assertEquals((2, leo), cache.endOffsetFor(2))
+    assertEquals((2, logEndOffset), cache.endOffsetFor(2))
     assertEquals(1, cache.epochEntries.size)
-    assertEquals(EpochEntry(2, 6), cache.epochEntries()(0))
+    assertEquals(EpochEntry(2, 6), cache.epochEntries(0))
   }
 
   @Test
   def shouldPersistEpochsBetweenInstances(){
-    def leoFinder() = new LogOffsetMetadata(0)
     val checkpointPath = TestUtils.tempFile().getAbsolutePath
-    checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath))
+    val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath))
 
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
+    val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint)
+    cache.assign(epoch = 2, startOffset = 6)
 
     //When
     val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath))
-    val cache2 = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint2)
+    val cache2 = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint2)
 
     //Then
     assertEquals(1, cache2.epochEntries.size)
@@ -289,81 +244,68 @@ class LeaderEpochFileCacheTest {
   }
 
   @Test
-  def shouldNotLetEpochGoBackwardsEvenIfMessageEpochsDo(): Unit = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
+  def shouldEnforceMonotonicallyIncreasingEpochs(): Unit = {
     //Given
-    cache.assign(epoch = 1, offset = 5); leo = 6
-    cache.assign(epoch = 2, offset = 6); leo = 7
-
-    //When we update an epoch in the past with an earlier offset
-    cache.assign(epoch = 1, offset = 7); leo = 8
+    cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6
+    cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7
 
-    //Then epoch should not be changed
-    assertEquals(2, cache.latestEpoch())
+    //When we update an epoch in the past with a different offset, the log has already reached
+    //an inconsistent state. Our options are either to raise an error, ignore the new append,
+    //or truncate the cached epochs to the point of conflict. We take this latter approach in
+    //order to guarantee that epochs and offsets in the cache increase monotonically, which makes
+    //the search logic simpler to reason about.
+    cache.assign(epoch = 1, startOffset = 7); logEndOffset = 8
 
-    //Then end offset for epoch 1 shouldn't have changed
-    assertEquals((1, 6), cache.endOffsetFor(1))
+    //Then later epochs will be removed
+    assertEquals(1, cache.latestEpoch)
 
-    //Then end offset for epoch 2 has to be the offset of the epoch 1 message (I can't think of a better option)
-    assertEquals((2, 8), cache.endOffsetFor(2))
+    //Then end offset for epoch 1 will have changed
+    assertEquals((1, 8), cache.endOffsetFor(1))
 
-    //Epoch history shouldn't have changed
-    assertEquals(EpochEntry(1, 5), cache.epochEntries()(0))
-    assertEquals(EpochEntry(2, 6), cache.epochEntries()(1))
+    //Then end offset for epoch 2 is now undefined
+    assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(2))
+    assertEquals(EpochEntry(1, 7), cache.epochEntries(0))
   }
 
   @Test
-  def shouldNotLetOffsetsGoBackwardsEvenIfEpochsProgress() = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
+  def shouldEnforceOffsetsIncreaseMonotonically() = {
     //When epoch goes forward but offset goes backwards
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 5)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 5)
 
-    //Then latter assign should be ignored
-    assertEquals(EpochEntry(2, 6), cache.epochEntries.toList(0))
+    //The last assignment wins and the conflicting one is removed from the log
+    assertEquals(EpochEntry(3, 5), cache.epochEntries.toList(0))
   }
 
   @Test
   def shouldIncreaseAndTrackEpochsAsLeadersChangeManyTimes(): Unit = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 0, offset = 0) //leo=0
+    cache.assign(epoch = 0, startOffset = 0) //logEndOffset=0
 
     //When
-    cache.assign(epoch = 1, offset = 0) //leo=0
+    cache.assign(epoch = 1, startOffset = 0) //logEndOffset=0
 
     //Then epoch should go up
-    assertEquals(1, cache.latestEpoch())
+    assertEquals(1, cache.latestEpoch)
     //offset for 1 should still be 0
     assertEquals((1, 0), cache.endOffsetFor(1))
     //offset for epoch 0 should still be 0
     assertEquals((0, 0), cache.endOffsetFor(0))
 
     //When we write 5 messages as epoch 1
-    leo = 5
+    logEndOffset = 5
 
-    //Then end offset for epoch(1) should be leo => 5
+    //Then end offset for epoch(1) should be logEndOffset => 5
     assertEquals((1, 5), cache.endOffsetFor(1))
     //Epoch 0 should still be at offset 0
     assertEquals((0, 0), cache.endOffsetFor(0))
 
     //When
-    cache.assign(epoch = 2, offset = 5) //leo=5
+    cache.assign(epoch = 2, startOffset = 5) //logEndOffset=5
 
-    leo = 10 //write another 5 messages
+    logEndOffset = 10 //write another 5 messages
 
-    //Then end offset for epoch(2) should be leo => 10
+    //Then end offset for epoch(2) should be logEndOffset => 10
     assertEquals((2, 10), cache.endOffsetFor(2))
 
     //end offset for epoch(1) should be the start offset of epoch(2) => 5
@@ -375,36 +317,30 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldIncreaseAndTrackEpochsAsFollowerReceivesManyMessages(): Unit = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //When new
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When Messages come in
-    cache.assign(epoch = 0, offset = 0); leo = 1
-    cache.assign(epoch = 0, offset = 1); leo = 2
-    cache.assign(epoch = 0, offset = 2); leo = 3
+    cache.assign(epoch = 0, startOffset = 0); logEndOffset = 1
+    cache.assign(epoch = 0, startOffset = 1); logEndOffset = 2
+    cache.assign(epoch = 0, startOffset = 2); logEndOffset = 3
 
     //Then epoch should stay, offsets should grow
-    assertEquals(0, cache.latestEpoch())
-    assertEquals((0, leo), cache.endOffsetFor(0))
+    assertEquals(0, cache.latestEpoch)
+    assertEquals((0, logEndOffset), cache.endOffsetFor(0))
 
     //When messages arrive with greater epoch
-    cache.assign(epoch = 1, offset = 3); leo = 4
-    cache.assign(epoch = 1, offset = 4); leo = 5
-    cache.assign(epoch = 1, offset = 5); leo = 6
+    cache.assign(epoch = 1, startOffset = 3); logEndOffset = 4
+    cache.assign(epoch = 1, startOffset = 4); logEndOffset = 5
+    cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6
 
-    assertEquals(1, cache.latestEpoch())
-    assertEquals((1, leo), cache.endOffsetFor(1))
+    assertEquals(1, cache.latestEpoch)
+    assertEquals((1, logEndOffset), cache.endOffsetFor(1))
 
     //When
-    cache.assign(epoch = 2, offset = 6); leo = 7
-    cache.assign(epoch = 2, offset = 7); leo = 8
-    cache.assign(epoch = 2, offset = 8); leo = 9
+    cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7
+    cache.assign(epoch = 2, startOffset = 7); logEndOffset = 8
+    cache.assign(epoch = 2, startOffset = 8); logEndOffset = 9
 
-    assertEquals(2, cache.latestEpoch())
-    assertEquals((2, leo), cache.endOffsetFor(2))
+    assertEquals(2, cache.latestEpoch)
+    assertEquals((2, logEndOffset), cache.endOffsetFor(2))
 
     //Older epochs should return the start offset of the first message in the subsequent epoch.
     assertEquals((0, 3), cache.endOffsetFor(0))
@@ -413,16 +349,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldDropEntriesOnEpochBoundaryWhenRemovingLatestEntries(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When clear latest on epoch boundary
-    cache.clearAndFlushLatest(offset = 8)
+    cache.truncateFromEnd(endOffset = 8)
 
     //Then should remove two latest epochs (remove is inclusive)
     assertEquals(ListBuffer(EpochEntry(2, 6)), cache.epochEntries)
@@ -430,16 +363,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldPreserveResetOffsetOnClearEarliestIfOneExists(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset ON epoch boundary
-    cache.clearAndFlushEarliest(offset = 8)
+    cache.truncateFromStart(startOffset = 8)
 
     //Then should preserve (3, 8)
     assertEquals(ListBuffer(EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@@ -447,16 +377,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldUpdateSavedOffsetWhenOffsetToClearToIsBetweenEpochs(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset BETWEEN epoch boundaries
-    cache.clearAndFlushEarliest(offset = 9)
+    cache.truncateFromStart(startOffset = 9)
 
     //Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed
     assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
@@ -464,16 +391,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotClearAnythingIfOffsetToEarly(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset before first epoch offset
-    cache.clearAndFlushEarliest(offset = 1)
+    cache.truncateFromStart(startOffset = 1)
 
     //Then nothing should change
     assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@@ -481,16 +405,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotClearAnythingIfOffsetToFirstOffset(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset on earliest epoch boundary
-    cache.clearAndFlushEarliest(offset = 6)
+    cache.truncateFromStart(startOffset = 6)
 
     //Then nothing should change
     assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@@ -498,16 +419,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldRetainLatestEpochOnClearAllEarliest(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When
-    cache.clearAndFlushEarliest(offset = 11)
+    cache.truncateFromStart(startOffset = 11)
 
     //Then retain the last
     assertEquals(ListBuffer(EpochEntry(4, 11)), cache.epochEntries)
@@ -515,16 +433,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When we clear from a postition between offset 8 & offset 11
-    cache.clearAndFlushEarliest(offset = 9)
+    cache.truncateFromStart(startOffset = 9)
 
     //Then we should update the middle epoch entry's offset
     assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
@@ -532,16 +447,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 0, offset = 0)
-    cache.assign(epoch = 1, offset = 7)
-    cache.assign(epoch = 2, offset = 10)
+    cache.assign(epoch = 0, startOffset = 0)
+    cache.assign(epoch = 1, startOffset = 7)
+    cache.assign(epoch = 2, startOffset = 10)
 
     //When we clear from a postition between offset 0 & offset 7
-    cache.clearAndFlushEarliest(offset = 5)
+    cache.truncateFromStart(startOffset = 5)
 
     //Then we should keeep epoch 0 but update the offset appropriately
     assertEquals(ListBuffer(EpochEntry(0,5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries)
@@ -549,16 +461,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset beyond last epoch
-    cache.clearAndFlushEarliest(offset = 15)
+    cache.truncateFromStart(startOffset = 15)
 
     //Then update the last
     assertEquals(ListBuffer(EpochEntry(4, 15)), cache.epochEntries)
@@ -566,51 +475,42 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldDropEntriesBetweenEpochBoundaryWhenRemovingNewest(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset BETWEEN epoch boundaries
-    cache.clearAndFlushLatest(offset = 9)
+    cache.truncateFromEnd(endOffset = 9)
 
     //Then should keep the preceding epochs
-    assertEquals(3, cache.latestEpoch())
+    assertEquals(3, cache.latestEpoch)
     assertEquals(ListBuffer(EpochEntry(2, 6), EpochEntry(3, 8)), cache.epochEntries)
   }
 
   @Test
   def shouldClearAllEntries(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
-    //When 
+    //When
     cache.clearAndFlush()
 
-    //Then 
+    //Then
     assertEquals(0, cache.epochEntries.size)
   }
 
   @Test
   def shouldNotResetEpochHistoryHeadIfUndefinedPassed(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset on epoch boundary
-    cache.clearAndFlushLatest(offset = UNDEFINED_EPOCH_OFFSET)
+    cache.truncateFromEnd(endOffset = UNDEFINED_EPOCH_OFFSET)
 
     //Then should do nothing
     assertEquals(3, cache.epochEntries.size)
@@ -618,16 +518,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotResetEpochHistoryTailIfUndefinedPassed(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset on epoch boundary
-    cache.clearAndFlushEarliest(offset = UNDEFINED_EPOCH_OFFSET)
+    cache.truncateFromEnd(endOffset = UNDEFINED_EPOCH_OFFSET)
 
     //Then should do nothing
     assertEquals(3, cache.epochEntries.size)
@@ -635,54 +532,26 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldFetchLatestEpochOfEmptyCache(): Unit = {
-    //Given
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //When
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
     assertEquals(-1, cache.latestEpoch)
   }
 
   @Test
   def shouldFetchEndOffsetOfEmptyCache(): Unit = {
-    //Given
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //When
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
     assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(7))
   }
 
   @Test
   def shouldClearEarliestOnEmptyCache(): Unit = {
-    //Given
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //When
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
-    cache.clearAndFlushEarliest(7)
+    cache.truncateFromStart(7)
   }
 
   @Test
   def shouldClearLatestOnEmptyCache(): Unit = {
-    //Given
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //When
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
-    cache.clearAndFlushLatest(7)
+    cache.truncateFromEnd(7)
   }
 
-  @Before
-  def setUp() {
-    checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile())
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 5ad641f11a0..efc07177037 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -16,7 +16,7 @@
   */
 package kafka.server.epoch
 
-import java.util.{Optional, Map => JMap}
+import java.util.Optional
 
 import kafka.server.KafkaConfig._
 import kafka.server.{BlockingSend, KafkaServer, ReplicaFetcherBlockingSend}
@@ -37,9 +37,10 @@ import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRe
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
+import scala.collection.mutable.ListBuffer
 
 class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
-  var brokers: Seq[KafkaServer] = null
+  var brokers: ListBuffer[KafkaServer] = ListBuffer()
   val topic1 = "foo"
   val topic2 = "bar"
   val t1p0 = new TopicPartition(topic1, 0)
@@ -60,7 +61,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() {
-    brokers = (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+    brokers ++= (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
 
     // Given two topics with replication of a single partition
     for (topic <- List(topic1, topic2)) {
@@ -94,7 +95,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
   def shouldSendLeaderEpochRequestAndGetAResponse(): Unit = {
 
     //3 brokers, put partition on 100/101 and then pretend to be 102
-    brokers = (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+    brokers ++= (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
 
     val assignment1 = Map(0 -> Seq(100), 1 -> Seq(101))
     TestUtils.createTopic(zkClient, topic1, assignment1, brokers)
@@ -138,9 +139,12 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def shouldIncreaseLeaderEpochBetweenLeaderRestarts(): Unit = {
-
     //Setup: we are only interested in the single partition on broker 101
-    brokers = Seq(100, 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+    brokers += createServer(fromProps(createBrokerConfig(100, zkConnect)))
+    assertEquals(100, TestUtils.waitUntilControllerElected(zkClient))
+
+    brokers += createServer(fromProps(createBrokerConfig(101, zkConnect)))
+
     def leo() = brokers(1).replicaManager.getReplica(tp).get.logEndOffset.messageOffset
     TestUtils.createTopic(zkClient, tp.topic, Map(tp.partition -> Seq(101)), brokers)
     producer = createProducer(getBrokerListStrFromServers(brokers), acks = -1)
@@ -150,10 +154,10 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     var fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
 
     //Then epoch should be 0 and leo: 1
-    var offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset()
-    assertEquals(1, offset)
-    assertEquals(leo(), offset)
-
+    var epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp)
+    assertEquals(0, epochEndOffset.leaderEpoch)
+    assertEquals(1, epochEndOffset.endOffset)
+    assertEquals(1, leo())
 
     //2. When broker is bounced
     brokers(1).shutdown()
@@ -162,15 +166,23 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get
     fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
 
-
     //Then epoch 0 should still be the start offset of epoch 1
-    offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset()
-    assertEquals(1, offset)
-
-    //Then epoch 2 should be the leo (NB: The leader epoch goes up in factors of 2 - This is because we have to first change leader to -1 and then change it again to the live replica)
-    assertEquals(2, fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset())
-    assertEquals(leo(), fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset())
-
+    epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp)
+    assertEquals(1, epochEndOffset.endOffset)
+    assertEquals(0, epochEndOffset.leaderEpoch)
+
+    //No data written in epoch 1
+    epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 1))(tp)
+    assertEquals(0, epochEndOffset.leaderEpoch)
+    assertEquals(1, epochEndOffset.endOffset)
+
+    //Then epoch 2 should be the leo (NB: The leader epoch goes up in factors of 2 -
+    //This is because we have to first change leader to -1 and then change it again to the live replica)
+    //Note that the expected leader changes depend on the controller being on broker 100, which is not restarted
+    epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 2))(tp)
+    assertEquals(2, epochEndOffset.leaderEpoch)
+    assertEquals(2, epochEndOffset.endOffset)
+    assertEquals(2, leo())
 
     //3. When broker is bounced again
     brokers(1).shutdown()
@@ -179,7 +191,6 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get
     fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
 
-
     //Then Epoch 0 should still map to offset 1
     assertEquals(1, fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset())
 
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 4fdc4d26992..86a087b480d 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -47,7 +47,7 @@ class OffsetsForLeaderEpochTest {
 
     //Stubs
     val mockLog = createNiceMock(classOf[kafka.log.Log])
-    val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache])
+    val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochFileCache])
     val logManager = createNiceMock(classOf[kafka.log.LogManager])
     expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset)
     expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 4dc822b52c9..df9902f7003 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -373,10 +373,11 @@ object TestUtils extends Logging {
               producerId: Long = RecordBatch.NO_PRODUCER_ID,
               producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
               sequence: Int = RecordBatch.NO_SEQUENCE,
-              baseOffset: Long = 0L): MemoryRecords = {
+              baseOffset: Long = 0L,
+              partitionLeaderEpoch: Int = RecordBatch.NO_PARTITION_LEADER_EPOCH): MemoryRecords = {
     val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
     val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, baseOffset,
-      System.currentTimeMillis, producerId, producerEpoch, sequence)
+      System.currentTimeMillis, producerId, producerEpoch, sequence, false, partitionLeaderEpoch)
     records.foreach(builder.append)
     builder.build()
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing truncation to HW
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7415
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7415
>             Project: Kafka
>          Issue Type: Bug
>          Components: replication
>    Affects Versions: 2.0.0
>            Reporter: Anna Povzner
>            Assignee: Jason Gustafson
>            Priority: Major
>
> If the follower's last appended epoch is ahead of the leader's last appended epoch, the OffsetsForLeaderEpoch response will incorrectly send (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), and the follower will truncate to HW. This may lead to data loss in some rare cases where 2 back-to-back leader elections happen (failure of one leader, followed by quick re-election of the next leader due to preferred leader election, so that all replicas are still in the ISR, and then failure of the 3rd leader).
> The bug is in LeaderEpochFileCache.endOffsetFor(), which returns (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) if the requested leader epoch is ahead of the last leader epoch in the cache. The method should return (last leader epoch in the cache, LEO) in this scenario.
> We don't create an entry in a leader epoch cache until a message is appended with the new leader epoch. Every append to log calls LeaderEpochFileCache.assign(). However, it would be much cleaner if `makeLeader` created an entry in the cache as soon as replica becomes a leader, which will fix the bug. In case the leader never appends any messages, and the next leader epoch starts with the same offset, we already have clearAndFlushLatest() that clears entries with start offsets greater or equal to the passed offset. LeaderEpochFileCache.assign() could be merged with clearAndFlushLatest(), so that we clear cache entries with offsets equal or greater than the start offset of the new epoch, so that we do not need to call these methods separately. 
>  
> Here is an example of a scenario where the issue leads to the data loss.
> Suppose we have three replicas: r1, r2, and r3. Initially, the ISR consists of (r1, r2, r3) and the leader is r1. The data up to offset 10 has been committed to the ISR. Here is the initial state:
> {code:java}
> Leader: r1
> leader epoch: 0
> ISR(r1, r2, r3)
> r1: [hw=10, leo=10]
> r2: [hw=8, leo=10]
> r3: [hw=5, leo=10]
> {code}
> Replica 1 fails and leaves the ISR, which makes Replica 2 the new leader with leader epoch = 1. The leader appends a batch, but it is not replicated yet to the followers.
> {code:java}
> Leader: r2
> leader epoch: 1
> ISR(r2, r3)
> r1: [hw=10, leo=10]
> r2: [hw=8, leo=11]
> r3: [hw=5, leo=10]
> {code}
> Replica 3 is elected a leader (due to preferred leader election) before it has a chance to truncate, with leader epoch 2. 
> {code:java}
> Leader: r3
> leader epoch: 2
> ISR(r2, r3)
> r1: [hw=10, leo=10]
> r2: [hw=8, leo=11]
> r3: [hw=5, leo=10]
> {code}
> Replica 2 sends OffsetsForLeaderEpoch(leader epoch = 1) to Replica 3. Replica 3 incorrectly replies with UNDEFINED_EPOCH_OFFSET, and Replica 2 truncates to HW. If Replica 3 fails before Replica 2 re-fetches the data, this may lead to data loss.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)