You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/06 21:51:30 UTC

[2/3] kafka git commit: KIP-101: Alter Replication Protocol to use Leader Epoch rather than High Watermark for Truncation

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index cce59ce..94ed66c 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -17,30 +17,27 @@
 
 package kafka.server
 
-import java.net.SocketTimeoutException
 import java.util
 
 import kafka.admin.AdminUtils
-import kafka.cluster.BrokerEndPoint
-import kafka.log.LogConfig
 import kafka.api.{FetchRequest => _, _}
+import kafka.cluster.{BrokerEndPoint, Replica}
 import kafka.common.KafkaStorageException
-import ReplicaFetcherThread._
+import kafka.log.LogConfig
+import kafka.server.ReplicaFetcherThread._
+import kafka.server.epoch.LeaderEpochCache
+import org.apache.kafka.common.requests.EpochEndOffset._
 import kafka.utils.Exit
-import org.apache.kafka.clients._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.common.network.{ChannelBuilders, NetworkReceive, Selectable, Selector}
-import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse}
-import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
-import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.security.JaasContext
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, ListOffsetRequest, ListOffsetResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse, FetchRequest => JFetchRequest}
 import org.apache.kafka.common.utils.Time
 
-import scala.collection.Map
 import scala.collection.JavaConverters._
+import scala.collection.{Map, mutable}
 
 class ReplicaFetcherThread(name: String,
                            fetcherId: Int,
@@ -49,16 +46,20 @@ class ReplicaFetcherThread(name: String,
                            replicaMgr: ReplicaManager,
                            metrics: Metrics,
                            time: Time,
-                           quota: ReplicationQuotaManager)
+                           quota: ReplicationQuotaManager,
+                           leaderEndpointBlockingSend: Option[BlockingSend] = None)
   extends AbstractFetcherThread(name = name,
                                 clientId = name,
                                 sourceBroker = sourceBroker,
                                 fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
-                                isInterruptible = false) {
+                                isInterruptible = false,
+                                includeLogTruncation = true) {
 
   type REQ = FetchRequest
   type PD = PartitionData
 
+  private val leaderEndpoint = leaderEndpointBlockingSend.getOrElse(
+    new ReplicaFetcherBlockingSend(sourceBroker, brokerConfig, metrics, time, fetcherId, s"broker-${brokerConfig.brokerId}-fetcher-$fetcherId"))
   private val fetchRequestVersion: Short =
     if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
@@ -66,57 +67,18 @@ class ReplicaFetcherThread(name: String,
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
     else 0
-  private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs
   private val replicaId = brokerConfig.brokerId
   private val maxWait = brokerConfig.replicaFetchWaitMaxMs
   private val minBytes = brokerConfig.replicaFetchMinBytes
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
+  private val shouldSendLeaderEpochRequest: Boolean = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
 
-  private def clientId = name
-
-  private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
-
-  // we need to include both the broker id and the fetcher id
-  // as the metrics tag to avoid metric name conflicts with
-  // more than one fetcher thread to the same broker
-  private val networkClient = {
-    val channelBuilder = ChannelBuilders.clientChannelBuilder(
-      brokerConfig.interBrokerSecurityProtocol,
-      JaasContext.Type.SERVER,
-      brokerConfig,
-      brokerConfig.interBrokerListenerName,
-      brokerConfig.saslMechanismInterBrokerProtocol,
-      brokerConfig.saslInterBrokerHandshakeRequestEnable
-    )
-    val selector = new Selector(
-      NetworkReceive.UNLIMITED,
-      brokerConfig.connectionsMaxIdleMs,
-      metrics,
-      time,
-      "replica-fetcher",
-      Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
-      false,
-      channelBuilder
-    )
-    new NetworkClient(
-      selector,
-      new ManualMetadataUpdater(),
-      clientId,
-      1,
-      0,
-      Selectable.USE_DEFAULT_BUFFER_SIZE,
-      brokerConfig.replicaSocketReceiveBufferBytes,
-      brokerConfig.requestTimeoutMs,
-      time,
-      false,
-      new ApiVersions
-    )
-  }
+  private def epochCache(tp: TopicPartition): LeaderEpochCache =  replicaMgr.getReplica(tp).get.epochs.get
 
   override def shutdown(): Unit = {
     super.shutdown()
-    networkClient.close()
+    leaderEndpoint.close()
   }
 
   // process fetched data
@@ -132,7 +94,10 @@ class ReplicaFetcherThread(name: String,
       if (logger.isTraceEnabled)
         trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
           .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
+
+      // Append the leader's messages to the log
       replica.log.get.append(records, assignOffsets = false)
+
       if (logger.isTraceEnabled)
         trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
           .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
@@ -179,8 +144,7 @@ class ReplicaFetcherThread(name: String,
      *
      * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
      */
-    val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP,
-      brokerConfig.brokerId)
+    val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP)
 
     if (leaderEndOffset < replica.logEndOffset.messageOffset) {
       // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
@@ -222,8 +186,7 @@ class ReplicaFetcherThread(name: String,
        * and the current leader's log start offset.
        *
        */
-      val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP,
-        brokerConfig.brokerId)
+      val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP)
       warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d"
         .format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
       val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
@@ -240,32 +203,14 @@ class ReplicaFetcherThread(name: String,
   }
 
   protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
-    val clientResponse = sendRequest(fetchRequest.underlying)
+    val clientResponse = leaderEndpoint.sendRequest(fetchRequest.underlying)
     val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
     fetchResponse.responseData.asScala.toSeq.map { case (key, value) =>
       key -> new PartitionData(value)
     }
   }
 
-  private def sendRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): ClientResponse = {
-    try {
-      if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
-        throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
-      else {
-        val clientRequest = networkClient.newClientRequest(sourceBroker.id.toString, requestBuilder,
-          time.milliseconds(), true)
-        NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
-      }
-    }
-    catch {
-      case e: Throwable =>
-        networkClient.close(sourceBroker.id.toString)
-        throw e
-    }
-
-  }
-
-  private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long, replicaId: Int): Long = {
+  private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long): Long = {
     val requestBuilder = if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) {
         val partitions = Map(topicPartition -> (earliestOrLatest: java.lang.Long))
         ListOffsetRequest.Builder.forReplica(1, replicaId).setTargetTimes(partitions.asJava)
@@ -273,7 +218,7 @@ class ReplicaFetcherThread(name: String,
         val partitions = Map(topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1))
         ListOffsetRequest.Builder.forReplica(0, replicaId).setOffsetData(partitions.asJava)
       }
-    val clientResponse = sendRequest(requestBuilder)
+    val clientResponse = leaderEndpoint.sendRequest(requestBuilder)
     val response = clientResponse.responseBody.asInstanceOf[ListOffsetResponse]
     val partitionData = response.responseData.get(topicPartition)
     partitionData.error match {
@@ -286,12 +231,12 @@ class ReplicaFetcherThread(name: String,
     }
   }
 
-  protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
+  override def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
     val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]
 
     partitionMap.foreach { case (topicPartition, partitionFetchState) =>
       // We will not include a replica in the fetch request if it should be throttled.
-      if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition)) {
+      if (partitionFetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) {
         val logStartOffset = replicaMgr.getReplicaOrException(topicPartition).logStartOffset
         requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize))
       }
@@ -303,6 +248,91 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
+    * - Truncate the log to the leader's offset for each partition's epoch.
+    * - If the leader's offset is greater, we stick with the Log End Offset
+    *   otherwise we truncate to the leaders offset.
+    * - If the leader replied with undefined epoch offset we must use the high watermark
+    */
+  override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long] = {
+    val truncationPoints = scala.collection.mutable.HashMap.empty[TopicPartition, Long]
+    val partitionsWithError = mutable.Set[TopicPartition]()
+
+    fetchedEpochs.foreach { case (tp, epochOffset) =>
+      val replica = replicaMgr.getReplica(tp).get
+
+      if (epochOffset.hasError) {
+        info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${epochOffset.error}")
+        partitionsWithError += tp
+      } else {
+        val truncationOffset =
+          if (epochOffset.endOffset() == UNDEFINED_EPOCH_OFFSET)
+            highWatermark(replica, epochOffset)
+          else if (epochOffset.endOffset() >= replica.logEndOffset.messageOffset)
+            logEndOffset(replica, epochOffset)
+          else
+            epochOffset.endOffset
+
+        truncationPoints.put(tp, truncationOffset)
+      }
+    }
+    replicaMgr.logManager.truncateTo(truncationPoints)
+
+    // For partitions that encountered an error, delay them a bit before retrying the leader epoch request
+    delayPartitions(partitionsWithError, brokerConfig.replicaFetchBackoffMs.toLong)
+
+    truncationPoints
+  }
+
+  override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): Map[TopicPartition, Int] = {
+    val result = allPartitions
+      .filter { case (_, state) => state.isTruncatingLog }
+      .map { case (tp, _) => tp -> epochCache(tp).latestUsedEpoch }.toMap
+
+    debug(s"Build leaderEpoch request $result for broker $sourceBroker")
+
+    result
+  }
+
+  override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
+    var result: Map[TopicPartition, EpochEndOffset] = null
+    if (shouldSendLeaderEpochRequest) {
+      val partitionsAsJava = partitions.map { case (tp, epoch) => tp -> epoch.asInstanceOf[Integer] }.toMap.asJava
+      val epochRequest = new OffsetsForLeaderEpochRequest.Builder(partitionsAsJava)
+      try {
+        val response = leaderEndpoint.sendRequest(epochRequest)
+        result = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
+        debug(s"Receive leaderEpoch response $result from broker $sourceBroker")
+      } catch {
+        case t: Throwable =>
+          warn(s"Error when sending leader epoch request for $partitions", t)
+
+          // if we get any unexpected exception, mark all partitions with an error
+          result = partitions.map { case (tp, _) =>
+            tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH_OFFSET)
+          }
+      }
+    } else {
+      // just generate a response with no error but UNDEFINED_OFFSET so that we can fall back to truncating using
+      // high watermark in maybeTruncate()
+      result = partitions.map { case (tp, _) =>
+        tp -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET)
+      }
+    }
+    result
+  }
+
+  private def logEndOffset(replica: Replica, epochOffset: EpochEndOffset): Long = {
+    val logEndOffset = replica.logEndOffset.messageOffset
+    info(s"Based on follower's leader epoch, leader replied with an offset ${epochOffset.endOffset()} >= the follower's log end offset $logEndOffset in ${replica.topicPartition}. No truncation needed.")
+    logEndOffset
+  }
+
+  private def highWatermark(replica: Replica, epochOffset: EpochEndOffset): Long = {
+    warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. High watermark ${replica.highWatermark.messageOffset} will be used for truncation.")
+    replica.highWatermark.messageOffset
+  }
+
+  /**
    *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
    *  the quota is exceeded and the replica is not in sync.
    */
@@ -336,7 +366,5 @@ object ReplicaFetcherThread {
       case Errors.NONE => None
       case e => Some(e.exception)
     }
-
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8f67425..b063b9e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -20,28 +20,34 @@ import java.io.{File, IOException}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
+import org.apache.kafka.common.errors._
 import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.cluster.{Partition, Replica}
-import kafka.common._
 import kafka.controller.KafkaController
 import kafka.log.{Log, LogAppendInfo, LogManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
-import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, PolicyViolationException}
-import org.apache.kafka.common.errors.{NotLeaderForPartitionException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest, DeleteRecordsRequest, DeleteRecordsResponse}
+import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse, LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest}
+import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 
 import scala.collection._
 import scala.collection.JavaConverters._
+import java.util.{Map => JMap}
+
+import kafka.common.{KafkaStorageException, Topic}
+import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.common.requests.EpochEndOffset._
 
 /*
  * Result metadata of a log append operation on the log
@@ -132,7 +138,7 @@ class ReplicaManager(val config: KafkaConfig,
   private val replicaStateChangeLock = new Object
   val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManager)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
-  val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
+  val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
   private var hwThreadInitialized = false
   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
   val stateChangeLogger = KafkaController.stateChangeLogger
@@ -316,9 +322,11 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def getReplica(topicPartition: TopicPartition, replicaId: Int = localBrokerId): Option[Replica] =
+  def getReplica(topicPartition: TopicPartition, replicaId: Int): Option[Replica] =
     getPartition(topicPartition).flatMap(_.getReplica(replicaId))
 
+  def getReplica(tp: TopicPartition): Option[Replica] = getReplica(tp, localBrokerId)
+
   /**
    * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
    * the callback function will be triggered either when timeout or the required acks are satisfied
@@ -974,9 +982,6 @@ class ReplicaManager(val config: KafkaConfig,
           .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
       }
 
-      logManager.truncateTo(partitionsToMakeFollower.map { partition =>
-        (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset)
-      }.toMap)
       partitionsToMakeFollower.foreach { partition =>
         val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
         tryCompleteDelayedProduce(topicPartitionOperationKey)
@@ -1089,4 +1094,22 @@ class ReplicaManager(val config: KafkaConfig,
     new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
   }
 
+  def getResponseFor(requestedEpochInfo: JMap[TopicPartition, Integer]): JMap[TopicPartition, EpochEndOffset] = {
+    OffsetsForLeaderEpoch.getResponseFor(this, requestedEpochInfo)
+  }
 }
+
+object OffsetsForLeaderEpoch extends Logging {
+  def getResponseFor(replicaManager: ReplicaManager, requestedEpochInfo: JMap[TopicPartition, Integer]): JMap[TopicPartition, EpochEndOffset] = {
+    debug(s"Processing OffsetForEpochRequest: $requestedEpochInfo")
+    requestedEpochInfo.asScala.map { case (tp, epoch) =>
+      val offset = try {
+        new EpochEndOffset(NONE, replicaManager.getLeaderReplicaIfLocal(tp).epochs.get.endOffsetFor(epoch))
+      } catch {
+        case e: NotLeaderForPartitionException => new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET)
+        case e: UnknownTopicOrPartitionException => new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET)
+      }
+      (tp, offset)
+    }.toMap.asJava
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
new file mode 100644
index 0000000..890dde0
--- /dev/null
+++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
@@ -0,0 +1,114 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server.checkpoints
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.nio.file.{FileSystems, Paths}
+import kafka.utils.{Exit, Logging}
+import org.apache.kafka.common.utils.Utils
+import scala.collection.{Seq, mutable}
+
+trait CheckpointFileFormatter[T]{
+  def toLine(entry: T): String
+
+  def fromLine(line: String): Option[T]
+}
+
+class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileFormatter[T]) extends Logging {
+  private val path = file.toPath.toAbsolutePath
+  private val tempPath = Paths.get(path.toString + ".tmp")
+  private val lock = new Object()
+  file.createNewFile()
+
+  def write(entries: Seq[T]) {
+    lock synchronized {
+      // write to temp file and then swap with the existing file
+      val fileOutputStream = new FileOutputStream(tempPath.toFile)
+      val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
+      try {
+        writer.write(version.toString)
+        writer.newLine()
+
+        writer.write(entries.size.toString)
+        writer.newLine()
+
+        entries.foreach { entry =>
+          writer.write(formatter.toLine(entry))
+          writer.newLine()
+        }
+
+        writer.flush()
+        fileOutputStream.getFD().sync()
+      } catch {
+        case e: FileNotFoundException =>
+          if (FileSystems.getDefault.isReadOnly) {
+            fatal("Halting writes to offset checkpoint file because the underlying file system is inaccessible : ", e)
+            Exit.halt(1)
+          }
+          throw e
+      } finally {
+        writer.close()
+      }
+
+      Utils.atomicMoveWithFallback(tempPath, path)
+    }
+  }
+
+  def read(): Seq[T] = {
+    def malformedLineException(line: String) =
+      new IOException(s"Malformed line in offset checkpoint file: $line'")
+
+    lock synchronized {
+      val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
+      var line: String = null
+      try {
+        line = reader.readLine()
+        if (line == null)
+          return Seq.empty
+        val version = line.toInt
+        version match {
+          case version =>
+            line = reader.readLine()
+            if (line == null)
+              return Seq.empty
+            val expectedSize = line.toInt
+            val entries = mutable.Buffer[T]()
+            line = reader.readLine()
+            while (line != null) {
+              val entry = formatter.fromLine(line)
+              entry match {
+                case Some(e) =>
+                  entries += e
+                  line = reader.readLine()
+                case _ => throw malformedLineException(line)
+              }
+            }
+            if (entries.size != expectedSize)
+              throw new IOException(s"Expected $expectedSize entries but found only ${entries.size}")
+            entries
+          case _ =>
+            throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
+        }
+      } catch {
+        case _: NumberFormatException => throw malformedLineException(line)
+      } finally {
+        reader.close()
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
new file mode 100644
index 0000000..9de7564
--- /dev/null
+++ b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.checkpoints
+
+import java.io._
+import java.util.regex.Pattern
+
+import kafka.server.checkpoints.LeaderEpochCheckpointConstants.{CurrentVersion, WhiteSpacesPattern}
+import kafka.server.epoch.EpochEntry
+
+import scala.collection._
+
+trait LeaderEpochCheckpoint {
+  def write(epochs: Seq[EpochEntry])
+  def read(): Seq[EpochEntry]
+}
+
+object LeaderEpochFile {
+  private val LeaderEpochCheckpointFilename = "leader-epoch-checkpoint"
+  def newFile(dir: File) = {new File(dir, LeaderEpochCheckpointFilename)}
+}
+
+private object LeaderEpochCheckpointConstants {
+  val WhiteSpacesPattern = Pattern.compile("\\s+")
+  val CurrentVersion = 0
+}
+
+/**
+  * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
+  */
+class LeaderEpochCheckpointFile(val file: File) extends CheckpointFileFormatter[EpochEntry] with LeaderEpochCheckpoint {
+  val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, this)
+
+  override def toLine(entry: EpochEntry): String = {
+    s"${entry.epoch} ${entry.startOffset}"
+  }
+
+  override def fromLine(line: String): Option[EpochEntry] = {
+    WhiteSpacesPattern.split(line) match {
+      case Array(epoch, offset) =>
+        Some(EpochEntry(epoch.toInt, offset.toLong))
+      case _ => None
+    }
+  }
+
+  def write(epochs: Seq[EpochEntry]) = {
+    checkpoint.write(epochs)
+  }
+
+  def read(): Seq[EpochEntry] = {
+    checkpoint.read()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
new file mode 100644
index 0000000..12ec986
--- /dev/null
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -0,0 +1,60 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server.checkpoints
+
+import java.io._
+import java.util.regex.Pattern
+import kafka.server.epoch.EpochEntry
+import org.apache.kafka.common.TopicPartition
+import scala.collection._
+
+private object OffsetCheckpointConstants {
+  val WhiteSpacesPattern = Pattern.compile("\\s+")
+  val CurrentVersion = 0
+}
+
+trait OffsetCheckpoint {
+  def write(epochs: Seq[EpochEntry])
+  def read(): Seq[EpochEntry]
+}
+
+/**
+  * This class persists a map of (Partition => Offsets) to a file (for a certain replica)
+  */
+class OffsetCheckpointFile(val f: File) extends CheckpointFileFormatter[(TopicPartition, Long)] {
+  val checkpoint = new CheckpointFile[(TopicPartition, Long)](f, OffsetCheckpointConstants.CurrentVersion, this)
+
+  override def toLine(entry: (TopicPartition, Long)): String = {
+    s"${entry._1.topic} ${entry._1.partition} ${entry._2}"
+  }
+
+  override def fromLine(line: String): Option[(TopicPartition, Long)] = {
+    OffsetCheckpointConstants.WhiteSpacesPattern.split(line) match {
+      case Array(topic, partition, offset) =>
+        Some(new TopicPartition(topic, partition.toInt), offset.toLong)
+      case _ => None
+    }
+  }
+
+  def write(offsets: Map[TopicPartition, Long]) = {
+    checkpoint.write(offsets.toSeq)
+  }
+
+  def read(): Map[TopicPartition, Long] = {
+    checkpoint.read().toMap
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
new file mode 100644
index 0000000..4a4727e
--- /dev/null
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -0,0 +1,224 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server.epoch
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import kafka.server.LogOffsetMetadata
+import kafka.server.checkpoints.LeaderEpochCheckpoint
+import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
+import kafka.utils.CoreUtils._
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.EpochEndOffset
+
+import scala.collection.mutable.ListBuffer
+
+trait LeaderEpochCache {
+  def cacheLatestEpoch(leaderEpoch: Int)
+  def maybeAssignLatestCachedEpochToLeo()
+  def assign(leaderEpoch: Int, offset: Long)
+  def latestUsedEpoch(): Int
+  def endOffsetFor(epoch: Int): Long
+  def clearLatest(offset: Long)
+  def clearEarliest(offset: Long)
+  def clear()
+}
+
+/**
+  * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
+  *
+  * Leader Epoch = epoch assigned to each leader by the controller.
+  * Offset = offset of the first message in each epoch.
+  *
+  * @param leo a function that determines the log end offset
+  * @param checkpoint the checkpoint file
+  */
+class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetMetadata, checkpoint: LeaderEpochCheckpoint) extends LeaderEpochCache with Logging {
+  private val lock = new ReentrantReadWriteLock()
+  private var epochs: ListBuffer[EpochEntry] = lock synchronized { ListBuffer(checkpoint.read(): _*) }
+  private var cachedLatestEpoch: Option[Int] = None //epoch which has yet to be assigned to a message.
+
+  /**
+    * Assigns the supplied Leader Epoch to the supplied Offset
+    * Once the epoch is assigned it cannot be reassigned
+    *
+    * @param epoch
+    * @param offset
+    */
+  override def assign(epoch: Int, offset: Long): Unit = {
+    inWriteLock(lock) {
+      if (epoch >= 0 && epoch > latestUsedEpoch && offset >= latestOffset) {
+        info(s"Updated PartitionLeaderEpoch. ${epochChangeMsg(epoch, offset)}. Cache now contains ${epochs.size} entries.")
+        epochs += EpochEntry(epoch, offset)
+        flush()
+      } else {
+        maybeWarn(epoch, offset)
+      }
+    }
+  }
+
+  /**
+    * Returns the current Leader Epoch. This is the latest epoch
+    * which has messages assigned to it.
+    *
+    * @return
+    */
+  override def latestUsedEpoch(): Int = {
+    inReadLock(lock) {
+      if (epochs.isEmpty) UNDEFINED_EPOCH else epochs.last.epoch
+    }
+  }
+
+  /**
+    * Returns the End Offset for a requested Leader Epoch.
+    *
+    * This is defined as the start offset of the first Leader Epoch larger than the
+    * Leader Epoch requested, or else the Log End Offset if the latest epoch was requested.
+    *
+    * @param requestedEpoch
+    * @return offset
+    */
+  override def endOffsetFor(requestedEpoch: Int): Long = {
+    inReadLock(lock) {
+      val offset =
+        if (requestedEpoch == latestUsedEpoch) {
+          leo().messageOffset
+        }
+        else {
+          val subsequentEpochs = epochs.filter(e => e.epoch > requestedEpoch)
+          if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch)
+            UNDEFINED_EPOCH_OFFSET
+          else
+            subsequentEpochs.head.startOffset
+        }
+      debug(s"Processed offset for epoch request for partition ${topicPartition} epoch:$requestedEpoch and returning offset $offset from epoch list of size ${epochs.size}")
+      offset
+    }
+  }
+
+  /**
+    * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset.
+    *
+    * @param offset
+    */
+  override def clearLatest(offset: Long): Unit = {
+    inWriteLock(lock) {
+      val before = epochs
+      if (offset >= 0 && offset <= latestOffset()) {
+        epochs = epochs.filter(entry => entry.startOffset < offset)
+        flush()
+        info(s"Cleared latest ${before.toSet.filterNot(epochs.toSet)} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition")
+      }
+    }
+  }
+
+  /**
+    * Clears old epoch entries. This method searches for the oldest epoch < offset, updates the saved epoch offset to
+    * be offset, then clears any previous epoch entries.
+    *
+    * This method is exclusive: so clearEarliest(6) will retain an entry at offset 6.
+    *
+    * @param offset the offset to clear up to
+    */
+  override def clearEarliest(offset: Long): Unit = {
+    inWriteLock(lock) {
+      val before = epochs
+      if (offset >= 0 && earliestOffset() < offset) {
+        val earliest = epochs.filter(entry => entry.startOffset < offset)
+        if (earliest.size > 0) {
+          epochs = epochs --= earliest
+          //If the offset is less than the earliest offset remaining, add previous epoch back, but with an updated offset
+          if (offset < earliestOffset() || epochs.isEmpty)
+            new EpochEntry(earliest.last.epoch, offset) +=: epochs
+          flush()
+          info(s"Cleared earliest ${before.toSet.filterNot(epochs.toSet).size} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition")
+        }
+      }
+    }
+  }
+
+  /**
+    * Delete all entries.
+    */
+  override def clear() = {
+    inWriteLock(lock) {
+      epochs.clear()
+      flush()
+    }
+  }
+
+  def epochEntries(): ListBuffer[EpochEntry] = {
+    epochs
+  }
+
+  private def earliestOffset(): Long = {
+    if (epochs.isEmpty) -1 else epochs.head.startOffset
+  }
+
+  private def latestOffset(): Long = {
+    if (epochs.isEmpty) -1 else epochs.last.startOffset
+  }
+
+  private def flush(): Unit = {
+    checkpoint.write(epochs)
+  }
+
+  def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Latest: {epoch:$latestUsedEpoch, offset$latestOffset} for Partition: $topicPartition"
+
+  def maybeWarn(epoch: Int, offset: Long) = {
+    if (epoch < latestUsedEpoch())
+      warn(s"Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. " +
+        s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
+    else if (epoch < 0)
+      warn(s"Received an PartitionLeaderEpoch assignment for an epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}")
+    else if (offset < latestOffset() && epoch >= 0)
+      warn(s"Received an PartitionLeaderEpoch assignment for an offset < latest offset for the most recent, stored PartitionLeaderEpoch. " +
+        s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
+  }
+
+  /**
+    * Registers a PartitionLeaderEpoch (typically in response to a leadership change).
+    * This will be cached until {@link #maybeAssignLatestCachedEpochToLeo} is called.
+    *
+    * This allows us to register an epoch in response to a leadership change, but not persist
+    * that epoch until a message arrives and is stamped. This asigns the aassignment of leadership
+    * on leader and follower, for eases debugability.
+    *
+    * @param epoch
+    */
+  override def cacheLatestEpoch(epoch: Int) = {
+    inWriteLock(lock) {
+      cachedLatestEpoch = Some(epoch)
+    }
+  }
+
+  /**
+    * If there is a cached epoch, associate its start offset with the current log end offset if it's not in the epoch list yet.
+    */
+  override def maybeAssignLatestCachedEpochToLeo() = {
+    inWriteLock(lock) {
+      if (cachedLatestEpoch == None) error("Attempt to assign log end offset to epoch before epoch has been set. This should never happen.")
+      cachedLatestEpoch.foreach { epoch =>
+        assign(epoch, leo().messageOffset)
+      }
+    }
+  }
+}
+
+// Mapping of epoch to the first offset of the subsequent epoch
+case class EpochEntry(epoch: Int, startOffset: Long)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e4cece9..ec3eb88 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -102,7 +102,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.STOP_REPLICA -> classOf[requests.StopReplicaResponse],
       ApiKeys.CONTROLLED_SHUTDOWN_KEY -> classOf[requests.ControlledShutdownResponse],
       ApiKeys.CREATE_TOPICS -> classOf[CreateTopicsResponse],
-      ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse]
+      ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse],
+      ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse]
   )
 
   val RequestKeyToError = Map[ApiKeys, (Nothing) => Errors](
@@ -122,7 +123,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
     ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ((resp: requests.ControlledShutdownResponse) => resp.error),
     ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1 == createTopic).get._2.error),
-    ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors().asScala.find(_._1 == deleteTopic).get._2)
+    ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors().asScala.find(_._1 == deleteTopic).get._2),
+    ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.asScala.get(tp).get.error())
   )
 
   val RequestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@@ -142,7 +144,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.STOP_REPLICA -> ClusterAcl,
     ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ClusterAcl,
     ApiKeys.CREATE_TOPICS -> ClusterCreateAcl,
-    ApiKeys.DELETE_TOPICS -> TopicDeleteAcl
+    ApiKeys.DELETE_TOPICS -> TopicDeleteAcl,
+    ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ClusterAcl
   )
 
   @Before
@@ -201,6 +204,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       build()
   }
 
+  private def offsetsForLeaderEpochRequest = {
+  new OffsetsForLeaderEpochRequest.Builder().add(tp, 7).build()
+}
+
   private def createOffsetFetchRequest = {
     new requests.OffsetFetchRequest.Builder(group, List(tp).asJava).build()
   }
@@ -285,7 +292,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.STOP_REPLICA -> createStopReplicaRequest,
       ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest,
       ApiKeys.CREATE_TOPICS -> createTopicsRequest,
-      ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
+      ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
+      ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index d885d9b..6a4c552 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -112,6 +112,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
   @Before
   override def setUp() {
     super.setUp
+    MockDeserializer.resetStaticVariables
     // create the consumer offset topic
     TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
   }
@@ -163,10 +164,10 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
     isValidClusterId(MockConsumerInterceptor.CLUSTER_META.get.clusterId)
     assertEquals(MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME.get.clusterId, MockConsumerInterceptor.CLUSTER_META.get.clusterId)
 
-    assertNotEquals(MockDeserializer.CLUSTER_ID_BEFORE_DESERIALIZE, MockDeserializer.NO_CLUSTER_ID)
-    assertNotNull(MockDeserializer.CLUSTER_META)
-    isValidClusterId(MockDeserializer.CLUSTER_META.get.clusterId)
-    assertEquals(MockDeserializer.CLUSTER_ID_BEFORE_DESERIALIZE.get.clusterId, MockDeserializer.CLUSTER_META.get.clusterId)
+    assertNotEquals(MockDeserializer.clusterIdBeforeDeserialize, MockDeserializer.noClusterId)
+    assertNotNull(MockDeserializer.clusterMeta)
+    isValidClusterId(MockDeserializer.clusterMeta.get.clusterId)
+    assertEquals(MockDeserializer.clusterIdBeforeDeserialize.get.clusterId, MockDeserializer.clusterMeta.get.clusterId)
 
     assertNotNull(MockConsumerMetricsReporter.CLUSTER_META)
     isValidClusterId(MockConsumerMetricsReporter.CLUSTER_META.get.clusterId)
@@ -175,7 +176,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
     assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockSerializer.CLUSTER_META.get.clusterId)
     assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockProducerMetricsReporter.CLUSTER_META.get.clusterId)
     assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockConsumerInterceptor.CLUSTER_META.get.clusterId)
-    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockDeserializer.CLUSTER_META.get.clusterId)
+    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockDeserializer.clusterMeta.get.clusterId)
     assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockConsumerMetricsReporter.CLUSTER_META.get.clusterId)
     assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockBrokerMetricsReporter.CLUSTER_META.get.clusterId)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 874637b..b165918 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -17,15 +17,14 @@
 
 package kafka.api.test
 
-import java.util.{Properties, Collection, ArrayList}
+import java.util.{ArrayList, Collection, Properties}
 
 import org.junit.runners.Parameterized
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized.Parameters
 import org.junit.{After, Before, Test}
-import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.junit.Assert._
-
 import kafka.api.FetchRequestBuilder
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.consumer.SimpleConsumer

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 2104842..5aeea89 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -20,8 +20,9 @@ package kafka.log
 import java.io.File
 import java.util.Properties
 
-import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0, KAFKA_0_11_0_IV0}
-import kafka.server.OffsetCheckpoint
+import kafka.api.KAFKA_0_11_0_IV0
+import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
+import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record._
@@ -87,7 +88,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     // and make sure its gone from checkpoint file
     cleaner.logs.remove(topics(0))
     cleaner.updateCheckpoints(logDir)
-    val checkpoints = new OffsetCheckpoint(new File(logDir,cleaner.cleanerManager.offsetCheckpointFile)).read()
+    val checkpoints = new OffsetCheckpointFile(new File(logDir,cleaner.cleanerManager.offsetCheckpointFile)).read()
     // we expect partition 0 to be gone
     assertFalse(checkpoints.contains(topics(0)))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 1400615..1248d1a 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.util.Properties
 
 import kafka.common._
-import kafka.server.OffsetCheckpoint
+import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
@@ -102,7 +102,9 @@ class LogManagerTest {
     time.sleep(maxLogAgeMs + 1)
     assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
-    assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length)
+
+    //There should be a log file, two indexes, the leader epoch checkpoint and the pid snapshot dir
+    assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 2, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).records.sizeInBytes)
 
     try {
@@ -148,7 +150,9 @@ class LogManagerTest {
     time.sleep(logManager.InitialTaskDelayMs)
     assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
-    assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length)
+
+    //There should be a log file, two indexes, the leader epoch checkpoint and the pid snapshot dir
+    assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 2, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).records.sizeInBytes)
     try {
       log.read(0, 1024)
@@ -288,7 +292,7 @@ class LogManagerTest {
     })
 
     logManager.checkpointRecoveryPointOffsets()
-    val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read()
+    val checkpoints = new OffsetCheckpointFile(new File(logDir, logManager.RecoveryPointCheckpointFile)).read()
 
     topicPartitions.zip(logs).foreach {
       case(tp, log) => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 25b3480..3f531d9 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -346,4 +346,29 @@ class LogSegmentTest {
     assertEquals(oldSize, size)
     assertEquals(size, fileSize)
   }
+
+  @Test
+  def shouldTruncateEvenIfOffsetPointsToAGapInTheLog() {
+    val seg = createSegment(40)
+    val offset = 40
+
+    def records(offset: Long, record: String): MemoryRecords =
+      MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, offset, CompressionType.NONE, TimestampType.CREATE_TIME,
+        new SimpleRecord(offset * 1000, record.getBytes))
+
+    //Given two messages with a gap between them (e.g. mid offset compacted away)
+    val ms1 = records(offset, "first message")
+    seg.append(offset, offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
+    val ms2 = records(offset + 3, "message after gap")
+    seg.append(offset + 3, offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2)
+
+    // When we truncate to an offset without a corresponding log entry
+    seg.truncateTo(offset + 1)
+
+    //Then we should still truncate the record that was present (i.e. offset + 3 is gone)
+    val log = seg.read(offset, None, 10000)
+    assertEquals(offset, log.records.batches.iterator.next().baseOffset())
+    assertEquals(1, log.records.batches.asScala.size)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index a7af24e..4fcf1c3 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -28,10 +28,14 @@ import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils._
 import kafka.server.KafkaConfig
-import org.apache.kafka.common.record._
+import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
+import org.apache.kafka.common.record.{RecordBatch, _}
 import org.apache.kafka.common.utils.Utils
+import org.easymock.EasyMock
+import org.easymock.EasyMock._
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 
 class LogTest extends JUnitSuite {
 
@@ -1235,7 +1239,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     val recoveryPoint = 50L
-    for (_ <- 0 until 50) {
+    for (_ <- 0 until 10) {
       // create a log and write some messages to it
       logDir.mkdirs()
       var log = new Log(logDir,
@@ -1441,9 +1445,14 @@ class LogTest extends JUnitSuite {
     for (_ <- 0 until 100)
       log.append(set)
 
+    log.leaderEpochCache.assign(0, 40)
+    log.leaderEpochCache.assign(1, 90)
+
     // expire all segments
     log.deleteOldSegments()
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
+    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
+    assertEquals("Epoch entry should be the latest epoch and the leo.", new EpochEntry(1, 100), epochCache(log).epochEntries().head)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -1452,6 +1461,8 @@ class LogTest extends JUnitSuite {
     log.delete()
     assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
     assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments())
+    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
+
   }
 
   @Test
@@ -1480,6 +1491,10 @@ class LogTest extends JUnitSuite {
     assertEquals(log.logStartOffset, 15)
   }
 
+  def epochCache(log: Log): LeaderEpochFileCache = {
+    log.leaderEpochCache.asInstanceOf[LeaderEpochFileCache]
+  }
+
   @Test
   def shouldDeleteSizeBasedSegments() {
     val set = TestUtils.singletonRecords("test".getBytes)
@@ -1566,6 +1581,205 @@ class LogTest extends JUnitSuite {
     assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
   }
 
+  @Test
+  def shouldApplyEpochToMessageOnAppendIfLeader() {
+    val messageIds = (0 until 50).toArray
+    val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
+
+    //Given this partition is on leader epoch 72
+    val epoch = 72
+    val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+
+    //When appending messages as a leader (i.e. assignOffsets = true)
+    for (i <- records.indices)
+      log.append(
+        MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)),
+        leaderEpochCache = mockCache(epoch),
+        assignOffsets = true
+      )
+
+    //Then leader epoch should be set on messages
+    for (i <- records.indices) {
+      val read = log.read(i, 100, Some(i+1)).records.batches().iterator.next()
+      assertEquals("Should have set leader epoch", 72, read.partitionLeaderEpoch())
+    }
+  }
+
+  @Test
+  def followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache() {
+    val messageIds = (0 until 50).toArray
+    val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
+
+    val cache = createMock(classOf[LeaderEpochCache])
+
+    //Given each message has an offset & epoch, as msgs from leader would
+    def recordsForEpoch(i: Int): MemoryRecords = {
+      val recs = MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i))
+      recs.batches().asScala.foreach{record =>
+        record.setPartitionLeaderEpoch(42)
+        record.setLastOffset(i)
+      }
+      recs
+    }
+
+    //Verify we save the epoch to the cache.
+    expect(cache.assign(EasyMock.eq(42), anyInt())).times(records.size)
+    replay(cache)
+
+    val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+
+    //When appending as follower (assignOffsets = false)
+    for (i <- records.indices)
+      log.append(recordsForEpoch(i), assignOffsets = false, leaderEpochCache = cache)
+
+    verify(cache)
+  }
+
+  @Test
+  def shouldTruncateLeaderEpochsWhenDeletingSegments() {
+    val set = TestUtils.singletonRecords("test".getBytes)
+    val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
+    val cache = epochCache(log)
+
+    // Given three segments of 5 messages each
+    for (e <- 0 until 15) {
+      log.append(set)
+    }
+
+    //Given epochs
+    cache.assign(0, 0)
+    cache.assign(1, 5)
+    cache.assign(2, 10)
+
+    //When first segment is removed
+    log.deleteOldSegments
+
+    //The oldest epoch entry should have been removed
+    assertEquals(ListBuffer(EpochEntry(1, 5), EpochEntry(2, 10)), cache.epochEntries)
+  }
+
+  @Test
+  def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() {
+    val set = TestUtils.singletonRecords("test".getBytes)
+    val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
+    val cache = epochCache(log)
+
+    // Given three segments of 5 messages each
+    for (e <- 0 until 15) {
+      log.append(set)
+    }
+
+    //Given epochs
+    cache.assign(0, 0)
+    cache.assign(1, 7)
+    cache.assign(2, 10)
+
+    //When first segment removed (up to offset 5)
+    log.deleteOldSegments
+
+    //The the first entry should have gone from (0,0) => (0,5)
+    assertEquals(ListBuffer(EpochEntry(0, 5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries)
+  }
+
+  @Test
+  def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
+    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    val logProps = CoreUtils.propsWith(LogConfig.SegmentBytesProp, (10 * set.sizeInBytes).toString)
+    val log = new Log(logDir, LogConfig( logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val cache = epochCache(log)
+
+    //Given 2 segments, 10 messages per segment
+    for (epoch <- 1 to 20)
+      log.append(set)
+
+    //Simulate some leader changes at specific offsets
+    cache.assign(0, 0)
+    cache.assign(1, 10)
+    cache.assign(2, 16)
+
+    assertEquals(2, log.numberOfSegments)
+    assertEquals(20, log.logEndOffset)
+
+    //When truncate to LEO (no op)
+    log.truncateTo(log.logEndOffset)
+
+    //Then no change
+    assertEquals(3, cache.epochEntries().size)
+
+    //When truncate
+    log.truncateTo(11)
+
+    //Then no change
+    assertEquals(2, cache.epochEntries().size)
+
+    //When truncate
+    log.truncateTo(10)
+
+    //Then
+    assertEquals(1, cache.epochEntries().size)
+
+    //When truncate all
+    log.truncateTo(0)
+
+    //Then
+    assertEquals(0, cache.epochEntries().size)
+  }
+
+  /**
+    * Append a bunch of messages to a log and then re-open it with recovery and check that the leader epochs are recovered properly.
+    */
+  @Test
+  def testLogRecoversForLeaderEpoch() {
+    val log = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val leaderEpochCache = epochCache(log)
+    val firstBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 1, offset = 0)
+    log.append(records = firstBatch, assignOffsets = false)
+
+    val secondBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 2, offset = 1)
+    log.append(records = secondBatch, assignOffsets = false)
+
+    val thirdBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 2, offset = 2)
+    log.append(records = thirdBatch, assignOffsets = false)
+
+    val fourthBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 3, offset = 3)
+    log.append(records = fourthBatch, assignOffsets = false)
+
+    assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
+
+    // deliberately remove some of the epoch entries
+    leaderEpochCache.clearLatest(2)
+    assertNotEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
+    log.close()
+
+    // reopen the log and recover from the beginning
+    val recoveredLog = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val recoveredLeaderEpochCache = epochCache(recoveredLog)
+
+    // epoch entries should be recovered
+    assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), recoveredLeaderEpochCache.epochEntries)
+    recoveredLog.close()
+  }
+
+  /**
+    * Wrap a single record log buffer with leader epoch.
+    */
+  private def singletonRecordsWithLeaderEpoch(value: Array[Byte],
+                                      key: Array[Byte] = null,
+                                      leaderEpoch: Int,
+                                      offset: Long,
+                                      codec: CompressionType = CompressionType.NONE,
+                                      timestamp: Long = RecordBatch.NO_TIMESTAMP,
+                                      magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): MemoryRecords = {
+    val records = Seq(new SimpleRecord(timestamp, key, value))
+
+    val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+    val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, offset,
+      System.currentTimeMillis, leaderEpoch)
+    records.foreach(builder.append)
+    builder.build()
+  }
+
+
   def createLog(messageSizeInBytes: Int, retentionMs: Int = -1,
                 retentionBytes: Int = -1, cleanupPolicy: String = "delete"): Log = {
     val logProps = new Properties()
@@ -1583,4 +1797,11 @@ class LogTest extends JUnitSuite {
       time = time)
     log
   }
+
+  private def mockCache(epoch: Int) = {
+    val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache])
+    EasyMock.expect(cache.latestUsedEpoch()).andReturn(epoch).anyTimes()
+    EasyMock.replay(cache)
+    cache
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 3babfc8..00cda21 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -24,11 +24,12 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.requests.EpochEndOffset
 import org.junit.Assert.{assertFalse, assertTrue}
 import org.junit.{Before, Test}
 
 import scala.collection.JavaConverters._
-import scala.collection.{Map, mutable}
+import scala.collection.{Map, Set, mutable}
 
 class AbstractFetcherThreadTest {
 
@@ -104,7 +105,7 @@ class AbstractFetcherThreadTest {
                            clientId: String,
                            sourceBroker: BrokerEndPoint,
                            fetchBackOffMs: Int = 0)
-    extends AbstractFetcherThread(name, clientId, sourceBroker, fetchBackOffMs) {
+    extends AbstractFetcherThread(name, clientId, sourceBroker, fetchBackOffMs, isInterruptible = true, includeLogTruncation = false) {
 
     type REQ = DummyFetchRequest
     type PD = PartitionData
@@ -122,6 +123,12 @@ class AbstractFetcherThreadTest {
 
     override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest =
       new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.fetchOffset) }.toMap)
+
+    override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): Map[TopicPartition, Int] = { Map() }
+
+    override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() }
+
+    override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long] = { Map() }
   }
 
 
@@ -137,7 +144,7 @@ class AbstractFetcherThreadTest {
     fetcherThread.addPartitions(Map(partition -> 0L))
 
     // Wait until fetcherThread finishes the work
-    TestUtils.waitUntilTrue(() => fetcherThread.fetchCount > 3, "Failed waiting for fetcherThread tp finish the work")
+    TestUtils.waitUntilTrue(() => fetcherThread.fetchCount > 3, "Failed waiting for fetcherThread to finish the work")
 
     fetcherThread.shutdown()
 
@@ -198,7 +205,7 @@ class AbstractFetcherThreadTest {
       val requestMap = new mutable.HashMap[TopicPartition, Long]
       partitionMap.foreach { case (topicPartition, partitionFetchState) =>
         // Add backoff delay check
-        if (partitionFetchState.isActive)
+        if (partitionFetchState.isReadyForFetch)
           requestMap.put(topicPartition, partitionFetchState.fetchOffset)
       }
       new DummyFetchRequest(requestMap)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 3898d2b..0c62a50 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.{Partition, Replica}
 import kafka.log.Log
+import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
+import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
@@ -205,6 +207,9 @@ class IsrExpirationTest {
 
   private def logMock: Log = {
     val log = EasyMock.createMock(classOf[kafka.log.Log])
+    val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache])
+    EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
+    EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
     EasyMock.replay(log)
     log
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 40ac7ec..54cee6b 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -23,6 +23,7 @@ import TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import java.io.File
 
+import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
@@ -56,8 +57,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
   val message = "hello"
 
   var producer: KafkaProducer[Integer, String] = null
-  def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs.head, ReplicaManager.HighWatermarkFilename))
-  def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs.head, ReplicaManager.HighWatermarkFilename))
+  def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.head, ReplicaManager.HighWatermarkFilename))
+  def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.head, ReplicaManager.HighWatermarkFilename))
   var servers = Seq.empty[KafkaServer]
 
   // Some tests restart the brokers then produce more data. But since test brokers use random ports, we need

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
new file mode 100644
index 0000000..d4118c1
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -0,0 +1,402 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import kafka.cluster.{BrokerEndPoint, Replica}
+import kafka.log.LogManager
+import kafka.server.epoch.LeaderEpochCache
+import org.apache.kafka.common.requests.EpochEndOffset._
+import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.common.requests.EpochEndOffset
+import org.apache.kafka.common.utils.SystemTime
+import org.easymock.EasyMock._
+import org.easymock.{Capture, CaptureType}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.{Map, mutable}
+
+class ReplicaFetcherThreadTest {
+
+  private val t1p0 = new TopicPartition("topic1", 0)
+  private val t1p1 = new TopicPartition("topic1", 1)
+  private val t2p1 = new TopicPartition("topic2", 1)
+
+  @Test
+  def shouldNotIssueLeaderEpochRequestIfInterbrokerVersionBelow11(): Unit = {
+    val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+    props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2")
+    props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2")
+    val config = KafkaConfig.fromProps(props)
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaFetcherThread(
+      name = "bob",
+      fetcherId = 0,
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = null,
+      metrics =  new Metrics(),
+      time = new SystemTime(),
+      quota = null,
+      leaderEndpointBlockingSend = None)
+
+    val result = thread.fetchEpochsFromLeader(Map(t1p0 -> 0, t1p1 -> 0))
+
+    val expected = Map(
+      t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET),
+      t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET)
+    )
+
+    assertEquals("results from leader epoch request should have undefined offset", expected, result)
+  }
+
+  @Test
+  def shouldHandleExceptionFromBlockingSend(): Unit = {
+    val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+    val config = KafkaConfig.fromProps(props)
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val mockBlockingSend = createMock(classOf[BlockingSend])
+
+    expect(mockBlockingSend.sendRequest(anyObject())).andThrow(new NullPointerException).once()
+    replay(mockBlockingSend)
+
+    val thread = new ReplicaFetcherThread(
+      name = "bob",
+      fetcherId = 0,
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = null,
+      metrics =  new Metrics(),
+      time = new SystemTime(),
+      quota = null,
+      leaderEndpointBlockingSend = Some(mockBlockingSend))
+
+    val result = thread.fetchEpochsFromLeader(Map(t1p0 -> 0, t1p1 -> 0))
+
+    val expected = Map(
+      t1p0 -> new EpochEndOffset(Errors.UNKNOWN, UNDEFINED_EPOCH_OFFSET),
+      t1p1 -> new EpochEndOffset(Errors.UNKNOWN, UNDEFINED_EPOCH_OFFSET)
+    )
+
+    assertEquals("results from leader epoch request should have undefined offset", expected, result)
+    verify(mockBlockingSend)
+  }
+
+  @Test
+  def shouldFetchLeaderEpochOnFirstFetchOnly(): Unit = {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+
+    //Setup all dependencies
+    val quota = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    //Stubs
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
+    expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stub(replica, replicaManager)
+
+
+    //Expectations
+    expect(logManager.truncateTo(anyObject())).once
+
+    replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+    //Define the offsets for the OffsetsForLeaderEpochResponse
+    val offsets = Map(t1p0 -> new EpochEndOffset(1), t1p1 -> new EpochEndOffset(1)).asJava
+
+    //Create the fetcher thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, endPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+    //Loop 1
+    thread.doWork()
+    assertEquals(1, mockNetwork.epochFetchCount)
+    assertEquals(1, mockNetwork.fetchCount)
+
+    //Loop 2 we should not fetch epochs
+    thread.doWork()
+    assertEquals(1, mockNetwork.epochFetchCount)
+    assertEquals(2, mockNetwork.fetchCount)
+
+    //Loop 3 we should not fetch epochs
+    thread.doWork()
+    assertEquals(1, mockNetwork.epochFetchCount)
+    assertEquals(3, mockNetwork.fetchCount)
+
+    //Assert that truncate to is called exactly once (despite two loops)
+    verify(logManager)
+  }
+
+  @Test
+  def shouldTruncateToOffsetSpecifiedInEpochOffsetResponse(): Unit = {
+
+    //Create a capture to track what partitions/offsets are truncated
+    val truncateToCapture: Capture[Map[TopicPartition, Long]] = newCapture(CaptureType.ALL)
+
+    // Setup all the dependencies
+    val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
+    val quota = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs = createMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    val initialLEO = 200
+
+    //Stubs
+    expect(logManager.truncateTo(capture(truncateToCapture))).once
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes()
+    expect(leaderEpochs.latestUsedEpoch).andReturn(5).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stub(replica, replicaManager)
+
+
+    replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+    //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
+    val offsetsReply = Map(t1p0 -> new EpochEndOffset(156), t2p1 -> new EpochEndOffset(172)).asJava
+
+    //Create the thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0))
+
+    //Run it
+    thread.doWork()
+
+    //We should have truncated to the offsets in the response
+    assertEquals(156, truncateToCapture.getValue.get(t1p0).get)
+    assertEquals(172, truncateToCapture.getValue.get(t2p1).get)
+  }
+
+  @Test
+  def shouldTruncateToHighWatermarkIfLeaderReturnsUndefinedOffset(): Unit = {
+
+    //Create a capture to track what partitions/offsets are truncated
+    val truncated: Capture[Map[TopicPartition, Long]] = newCapture(CaptureType.ALL)
+
+    // Setup all the dependencies
+    val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
+    val quota = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    val highWaterMark = 100
+    val initialLeo = 300
+
+    //Stubs
+    expect(replica.highWatermark).andReturn(new LogOffsetMetadata(highWaterMark)).anyTimes()
+    expect(logManager.truncateTo(capture(truncated))).once
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes()
+    expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stub(replica, replicaManager)
+    replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+    //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
+    val offsetsReply = Map(t1p0 -> new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH_OFFSET)).asJava
+
+    //Create the thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    thread.addPartitions(Map(t1p0 -> 0))
+
+    //Run it
+    thread.doWork()
+
+    //We should have truncated to the highwatermark for partitino 2 only
+    assertEquals(highWaterMark, truncated.getValue.get(t1p0).get)
+  }
+
+  @Test
+  def shouldPollIndefinitelyIfLeaderReturnsAnyException(): Unit = {
+
+    //Create a capture to track what partitions/offsets are truncated
+    val truncated: Capture[Map[TopicPartition, Long]] = newCapture(CaptureType.ALL)
+
+    // Setup all the dependencies
+    val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
+    val quota = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[kafka.log.LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
+
+    val highWaterMark = 100
+    val initialLeo = 300
+
+    //Stubs
+    expect(replica.highWatermark).andReturn(new LogOffsetMetadata(highWaterMark)).anyTimes()
+    expect(logManager.truncateTo(capture(truncated))).anyTimes()
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes()
+    expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stub(replica, replicaManager)
+    replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+    //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
+    val offsetsReply = mutable.Map(
+      t1p0 -> new EpochEndOffset(NOT_LEADER_FOR_PARTITION, EpochEndOffset.UNDEFINED_EPOCH_OFFSET),
+      t1p1 -> new EpochEndOffset(UNKNOWN, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
+    ).asJava
+
+    //Create the thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0))
+
+    //Run thread 3 times
+    (0 to 3).foreach { _ =>
+      thread.doWork()
+    }
+
+    //Then should loop continuously while there is no leader
+    for (invocation <- truncated.getValues().asScala)
+      assertEquals(0, invocation.size)
+
+    //New leader elected and replies
+    offsetsReply.put(t1p0, new EpochEndOffset(156))
+
+    thread.doWork()
+
+    //Now the final call should have actually done a truncation (to offset 156)
+    assertEquals(156, truncated.getValues.asScala.last.get(t1p0).get)
+  }
+
+  @Test
+  def shouldMovePartitionsOutOfTruncatingLogState(): Unit = {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+
+    //Setup all stubs
+    val quota = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val logManager = createNiceMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val replicaManager = createNiceMock(classOf[ReplicaManager])
+
+    //Stub return values
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
+    expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stub(replica, replicaManager)
+
+    replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+    //Define the offsets for the OffsetsForLeaderEpochResponse
+    val offsetsReply = Map(
+      t1p0 -> new EpochEndOffset(1), t1p1 -> new EpochEndOffset(1)
+    ).asJava
+
+    //Create the fetcher thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+
+    //When
+    thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+    //Then all partitions should start in an TruncatingLog state
+    assertTrue(thread.partitionStates.partitionStates().asScala.forall(_.value().truncatingLog))
+
+    //When
+    thread.doWork()
+
+    //Then none should be TruncatingLog anymore
+    assertFalse(thread.partitionStates.partitionStates().asScala.forall(_.value().truncatingLog))
+  }
+
+  @Test
+  def shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest(): Unit ={
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+    val truncateToCapture: Capture[Map[TopicPartition, Long]] = newCapture(CaptureType.ALL)
+    val initialLEO = 100
+
+    //Setup all stubs
+    val quota = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val logManager = createNiceMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val replicaManager = createNiceMock(classOf[ReplicaManager])
+
+    //Stub return values
+    expect(logManager.truncateTo(capture(truncateToCapture))).once
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes()
+    expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stub(replica, replicaManager)
+
+    replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+    //Define the offsets for the OffsetsForLeaderEpochResponse
+    val offsetsReply = Map(
+      t1p0 -> new EpochEndOffset(52), t1p1 -> new EpochEndOffset(49)
+    ).asJava
+
+    //Create the fetcher thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+
+    //When
+    thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+    //When the epoch request is outstanding, remove one of the partitions to simulate a leader change. We do this via a callback passed to the mock thread
+    val partitionThatBecameLeader = t1p0
+    mockNetwork.setEpochRequestCallback(() => {
+      thread.removePartitions(Set(partitionThatBecameLeader))
+    })
+
+    //When
+    thread.doWork()
+
+    //Then we should not have truncated the partition that became leader
+    assertEquals(None, truncateToCapture.getValue.get(partitionThatBecameLeader))
+    assertEquals(49, truncateToCapture.getValue.get(t1p1).get)
+  }
+
+  def stub(replica: Replica, replicaManager: ReplicaManager) = {
+    expect(replicaManager.getReplica(t1p0)).andReturn(Some(replica)).anyTimes()
+    expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replica).anyTimes()
+    expect(replicaManager.getReplica(t1p1)).andReturn(Some(replica)).anyTimes()
+    expect(replicaManager.getReplicaOrException(t1p1)).andReturn(replica).anyTimes()
+    expect(replicaManager.getReplica(t2p1)).andReturn(Some(replica)).anyTimes()
+    expect(replicaManager.getReplicaOrException(t2p1)).andReturn(replica).anyTimes()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index a720a6a..d6b1649 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -146,7 +146,7 @@ class ReplicaManagerQuotasTest {
     val scheduler = createNiceMock(classOf[KafkaScheduler])
 
     //Create log which handles both a regular read and a 0 bytes read
-    val log = createMock(classOf[Log])
+    val log = createNiceMock(classOf[Log])
     expect(log.logStartOffset).andReturn(0L).anyTimes()
     expect(log.logEndOffset).andReturn(20L).anyTimes()
     expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()