You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/18 00:24:51 UTC

[2/2] kafka git commit: KAFKA-5036; Second part: Points 2 -> 5): Refactor caching of Latest Epoch

KAFKA-5036; Second part: Points 2 -> 5): Refactor caching of Latest Epoch

This PR covers point (2) and point (5) from KAFKA-5036:

**Commit 1:**
2. Currently, we update the leader epoch in epochCache after log append in the follower but before log append in the leader. It would be more consistent to always do this after log append. This also avoids issues related to failure in log append.
5. The constructor of LeaderEpochFileCache has the following:
lock synchronized { ListBuffer(checkpoint.read(): _*) }
But everywhere else uses a read or write lock. We should use consistent locking.
This is a refactor to the way epochs are cached, replacing the code to cache the latest epoch in the LeaderEpochFileCache by reusing the cached value in Partition. There is no functional change.

**Commit 2:**
Adds an assert(epoch >=0) as epochs are written. Refactors tests so they never hit this assert.

Author: Ben Stopford <be...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #2831 from benstopford/KAFKA-5036-part2-second-try


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/020ca790
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/020ca790
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/020ca790

Branch: refs/heads/trunk
Commit: 020ca7903669306862950260b8d63177402322c3
Parents: 17ce2a7
Author: Ben Stopford <be...@gmail.com>
Authored: Mon Apr 17 17:24:45 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Apr 17 17:24:45 2017 -0700

----------------------------------------------------------------------
 .../kafka/common/record/MemoryRecords.java      |  10 +
 .../main/scala/kafka/cluster/Partition.scala    |   8 +-
 core/src/main/scala/kafka/log/Log.scala         |  60 +++--
 core/src/main/scala/kafka/log/LogSegment.scala  |   2 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   4 +-
 .../server/epoch/LeaderEpochFileCache.scala     |  55 +----
 .../test/scala/other/kafka/StressTestLog.scala  |   2 +-
 .../other/kafka/TestLinearWriteSpeed.scala      |   2 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |   2 +-
 .../unit/kafka/log/BrokerCompressionTest.scala  |   4 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |  10 +-
 .../log/LogCleanerLagIntegrationTest.scala      |   2 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |   8 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   | 104 ++++-----
 .../scala/unit/kafka/log/LogManagerTest.scala   |  16 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 219 +++++++++----------
 .../scala/unit/kafka/server/LogOffsetTest.scala |   8 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala |  12 +-
 .../server/epoch/LeaderEpochFileCacheTest.scala | 102 ++-------
 19 files changed, 278 insertions(+), 352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 9a20a97..a6e804b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -349,6 +349,11 @@ public class MemoryRecords extends AbstractRecords {
         return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, compressionType, records);
     }
 
+    public static MemoryRecords withRecords(CompressionType compressionType, int partitionLeaderEpoch, SimpleRecord... records) {
+        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compressionType, TimestampType.CREATE_TIME, RecordBatch.NO_PRODUCER_ID,
+                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, records);
+    }
+
     public static MemoryRecords withRecords(byte magic, CompressionType compressionType, SimpleRecord... records) {
         return withRecords(magic, 0L, compressionType, TimestampType.CREATE_TIME, records);
     }
@@ -357,6 +362,11 @@ public class MemoryRecords extends AbstractRecords {
         return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, records);
     }
 
+    public static MemoryRecords withRecords(long initialOffset, CompressionType compressionType, Integer partitionLeaderEpoch, SimpleRecord... records) {
+        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, RecordBatch.NO_PRODUCER_ID,
+                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, records);
+    }
+
     public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
                                             long producerId, short producerEpoch, int baseSequence,
                                             int partitionLeaderEpoch, SimpleRecord... records) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 4dd96c3..72e505d 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -172,11 +172,7 @@ class Partition(val topic: String,
 
       //We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
       leaderEpoch = partitionStateInfo.leaderEpoch
-      allReplicas.map(id => getOrCreateReplica(id))
-        .filter(_.isLocal)
-        .foreach { replica =>
-          replica.epochs.get.cacheLatestEpoch(leaderEpoch)
-        }
+      allReplicas.foreach(id => getOrCreateReplica(id))
 
       zkVersion = partitionStateInfo.zkVersion
       val isNewLeader =
@@ -474,7 +470,7 @@ class Partition(val topic: String,
               .format(topicPartition, inSyncSize, minIsr))
           }
 
-          val info = log.append(records, assignOffsets = true)
+          val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch)
           // probably unblock some follower fetch requests since log end offset has been updated
           replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
           // we may need to increment high watermark since ISR could be down to 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d97dfa4..450e9f6 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -436,19 +436,40 @@ class Log(@volatile var dir: File,
     }
   }
 
+
   /**
-   * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
-   *
-   * This method will generally be responsible for assigning offsets to the messages,
-   * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
-   *
-   * @param records The log records to append
-   * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
-   * @param leaderEpochCache Optional cache of Leader Epoch Offsets.
-   * @throws KafkaStorageException If the append fails due to an I/O error.
-   * @return Information about the appended messages including the first and last offset.
-   */
-  def append(records: MemoryRecords, assignOffsets: Boolean = true, leaderEpochCache: LeaderEpochCache = leaderEpochCache): LogAppendInfo = {
+    * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs
+    * @param records The records to append
+    * @throws KafkaStorageException If the append fails due to an I/O error.
+    * @return Information about the appended messages including the first and last offset.
+    */
+  def appendAsLeader(records: MemoryRecords, leaderEpoch: Int): LogAppendInfo = {
+    append(records, assignOffsets = true, leaderEpoch)
+  }
+
+  /**
+    * Append this message set to the active segment of the log without assigning offsets or Partition Leader Epochs
+    * @param records The records to append
+    * @throws KafkaStorageException If the append fails due to an I/O error.
+    * @return Information about the appended messages including the first and last offset.
+    */
+  def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
+    append(records, assignOffsets = false, leaderEpoch = -1)
+  }
+
+  /**
+    * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
+    *
+    * This method will generally be responsible for assigning offsets to the messages,
+    * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
+    *
+    * @param records The log records to append
+    * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
+    * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
+    * @throws KafkaStorageException If the append fails due to an I/O error.
+    * @return Information about the appended messages including the first and last offset.
+    */
+  private def append(records: MemoryRecords, assignOffsets: Boolean = true, leaderEpoch: Int): LogAppendInfo = {
     val appendInfo = analyzeAndValidateRecords(records, isFromClient = assignOffsets)
 
     // return if we have no valid messages or if this is a duplicate of the last appended entry
@@ -468,7 +489,6 @@ class Log(@volatile var dir: File,
           appendInfo.firstOffset = offset.value
           val now = time.milliseconds
           val validateAndOffsetAssignResult = try {
-            leaderEpochCache.maybeAssignLatestCachedEpochToLeo()
             LogValidator.validateMessagesAndAssignOffsets(validRecords,
                                                           offset,
                                                           now,
@@ -478,7 +498,7 @@ class Log(@volatile var dir: File,
                                                           config.messageFormatVersion.messageFormatVersion,
                                                           config.messageTimestampType,
                                                           config.messageTimestampDifferenceMaxMs,
-                                                          leaderEpochCache.latestUsedEpoch())
+                                                          leaderEpoch)
           } catch {
             case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
           }
@@ -504,17 +524,17 @@ class Log(@volatile var dir: File,
             }
           }
         } else {
-          //Update the epoch cache with the epoch stamped by the leader
-          records.batches().asScala.map { batch =>
-            if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
-              leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset())
-          }
-
           // we are taking the offsets we are given
           if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
             throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset))
         }
 
+        //Update the epoch cache with the epoch stamped onto the message by the leader
+        validRecords.batches().asScala.map { batch =>
+          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+            leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset())
+        }
+
         // check messages set size may be exceed config.segmentSize
         if (validRecords.sizeInBytes > config.segmentSize) {
           throw new RecordBatchTooLargeException("Message batch size is %d bytes which exceeds the maximum configured segment size of %d."

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 4f055a6..df3c372 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -249,7 +249,7 @@ class LogSegment(val log: FileRecords,
 
         if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
           leaderEpochCache.foreach { cache =>
-            if (batch.partitionLeaderEpoch > cache.latestUsedEpoch()) // this is to avoid unnecessary warning in cache.assign()
+            if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is to avoid unnecessary warning in cache.assign()
               cache.assign(batch.partitionLeaderEpoch, batch.baseOffset())
           }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 94ed66c..9016fcf 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -96,7 +96,7 @@ class ReplicaFetcherThread(name: String,
           .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
 
       // Append the leader's messages to the log
-      replica.log.get.append(records, assignOffsets = false)
+      replica.log.get.appendAsFollower(records)
 
       if (logger.isTraceEnabled)
         trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
@@ -286,7 +286,7 @@ class ReplicaFetcherThread(name: String,
   override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): Map[TopicPartition, Int] = {
     val result = allPartitions
       .filter { case (_, state) => state.isTruncatingLog }
-      .map { case (tp, _) => tp -> epochCache(tp).latestUsedEpoch }.toMap
+      .map { case (tp, _) => tp -> epochCache(tp).latestEpoch }.toMap
 
     debug(s"Build leaderEpoch request $result for broker $sourceBroker")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 25acfc7..56b1e55 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -24,14 +24,11 @@ import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFIN
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
-
 import scala.collection.mutable.ListBuffer
 
 trait LeaderEpochCache {
-  def cacheLatestEpoch(leaderEpoch: Int)
-  def maybeAssignLatestCachedEpochToLeo()
   def assign(leaderEpoch: Int, offset: Long)
-  def latestUsedEpoch(): Int
+  def latestEpoch(): Int
   def endOffsetFor(epoch: Int): Long
   def clearLatest(offset: Long)
   def clearEarliest(offset: Long)
@@ -50,7 +47,6 @@ trait LeaderEpochCache {
 class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetMetadata, checkpoint: LeaderEpochCheckpoint) extends LeaderEpochCache with Logging {
   private val lock = new ReentrantReadWriteLock()
   private var epochs: ListBuffer[EpochEntry] = inWriteLock(lock) { ListBuffer(checkpoint.read(): _*) }
-  private var cachedLatestEpoch: Option[Int] = None //epoch which has yet to be assigned to a message.
 
   /**
     * Assigns the supplied Leader Epoch to the supplied Offset
@@ -61,12 +57,12 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     */
   override def assign(epoch: Int, offset: Long): Unit = {
     inWriteLock(lock) {
-      if (epoch >= 0 && epoch > latestUsedEpoch && offset >= latestOffset) {
+      if (epoch >= 0 && epoch > latestEpoch && offset >= latestOffset) {
         info(s"Updated PartitionLeaderEpoch. ${epochChangeMsg(epoch, offset)}. Cache now contains ${epochs.size} entries.")
         epochs += EpochEntry(epoch, offset)
         flush()
       } else {
-        maybeWarn(epoch, offset)
+        validateAndMaybeWarn(epoch, offset)
       }
     }
   }
@@ -77,7 +73,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     *
     * @return
     */
-  override def latestUsedEpoch(): Int = {
+  override def latestEpoch(): Int = {
     inReadLock(lock) {
       if (epochs.isEmpty) UNDEFINED_EPOCH else epochs.last.epoch
     }
@@ -95,7 +91,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
   override def endOffsetFor(requestedEpoch: Int): Long = {
     inReadLock(lock) {
       val offset =
-        if (requestedEpoch == latestUsedEpoch) {
+        if (requestedEpoch == latestEpoch) {
           leo().messageOffset
         }
         else {
@@ -177,46 +173,17 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     checkpoint.write(epochs)
   }
 
-  def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Latest: {epoch:$latestUsedEpoch, offset$latestOffset} for Partition: $topicPartition"
+  def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Latest: {epoch:$latestEpoch, offset$latestOffset} for Partition: $topicPartition"
 
-  def maybeWarn(epoch: Int, offset: Long) = {
-    if (epoch < latestUsedEpoch())
+  def validateAndMaybeWarn(epoch: Int, offset: Long) = {
+    assert(epoch >= 0, s"Received a PartitionLeaderEpoch assignment for an epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}")
+    if (epoch < latestEpoch())
       warn(s"Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. " +
         s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
-    else if (epoch < 0)
-      warn(s"Received an PartitionLeaderEpoch assignment for an epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}")
-    else if (offset < latestOffset() && epoch >= 0)
-      warn(s"Received an PartitionLeaderEpoch assignment for an offset < latest offset for the most recent, stored PartitionLeaderEpoch. " +
+    else if (offset < latestOffset())
+      warn(s"Received a PartitionLeaderEpoch assignment for an offset < latest offset for the most recent, stored PartitionLeaderEpoch. " +
         s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
   }
-
-  /**
-    * Registers a PartitionLeaderEpoch (typically in response to a leadership change).
-    * This will be cached until {@link #maybeAssignLatestCachedEpochToLeo} is called.
-    *
-    * This allows us to register an epoch in response to a leadership change, but not persist
-    * that epoch until a message arrives and is stamped. This asigns the aassignment of leadership
-    * on leader and follower, for eases debugability.
-    *
-    * @param epoch
-    */
-  override def cacheLatestEpoch(epoch: Int) = {
-    inWriteLock(lock) {
-      cachedLatestEpoch = Some(epoch)
-    }
-  }
-
-  /**
-    * If there is a cached epoch, associate its start offset with the current log end offset if it's not in the epoch list yet.
-    */
-  override def maybeAssignLatestCachedEpochToLeo() = {
-    inWriteLock(lock) {
-      if (cachedLatestEpoch == None) error("Attempt to assign log end offset to epoch before epoch has been set. This should never happen.")
-      cachedLatestEpoch.foreach { epoch =>
-        assign(epoch, leo().messageOffset)
-      }
-    }
-  }
 }
 
 // Mapping of epoch to the first offset of the subsequent epoch

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 0c1297f..806702d 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -85,7 +85,7 @@ object StressTestLog {
   class WriterThread(val log: Log) extends WorkerThread {
     @volatile var offset = 0
     override def work() {
-      val logAppendInfo = log.append(TestUtils.singletonRecords(offset.toString.getBytes))
+      val logAppendInfo = log.appendAsFollower(TestUtils.singletonRecords(offset.toString.getBytes))
       require(logAppendInfo.firstOffset == offset && logAppendInfo.lastOffset == offset)
       offset += 1
       if(offset % 1000 == 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 34c6775..69152e3 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -208,7 +208,7 @@ object TestLinearWriteSpeed {
     Utils.delete(dir)
     val log = new Log(dir, config, 0L, 0L, scheduler, Time.SYSTEM)
     def write(): Int = {
-      log.append(messages, true)
+      log.appendAsLeader(messages, leaderEpoch = 0)
       messages.sizeInBytes
     }
     def close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index ba270ad..caac222 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -302,7 +302,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     var counter = 0
     for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
       val count = counter
-      log.append(TestUtils.singletonRecords(value = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true)
+      log.appendAsLeader(TestUtils.singletonRecords(value = counter.toString.getBytes, key = key.toString.getBytes), leaderEpoch = 0)
       counter += 1
       (key, count)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 49faa85..cfd66de 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -56,8 +56,8 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
     val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     /* append two messages */
-    log.append(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec),
-      new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)))
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), 0,
+      new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
 
     def readBatch(offset: Int) = log.read(offset, 4096).records.batches.iterator.next()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 5aeea89..7ef7559 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -73,7 +73,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
 
     checkLogAfterAppendingDups(log, startSize, appends)
 
-    val appendInfo = log.append(largeMessageSet, assignOffsets = true)
+    val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
     val largeMessageOffset = appendInfo.firstOffset
 
     val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, numDups = 3, log = log, codec = codec)
@@ -178,7 +178,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
 
     val appends2: Seq[(Int, String, Long)] = {
       val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
-      val appendInfo = log.append(largeMessageSet, assignOffsets = true)
+      val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
       val largeMessageOffset = appendInfo.firstOffset
 
       // also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly
@@ -264,8 +264,8 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
                         startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
     for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
       val value = counter.toString
-      val appendInfo = log.append(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
-        key = key.toString.getBytes, magicValue = magicValue), assignOffsets = true)
+      val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
+        key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0)
       counter += 1
       (key, value, appendInfo.firstOffset)
     }
@@ -283,7 +283,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
       new SimpleRecord(key.toString.getBytes, payload.toString.getBytes)
     }
 
-    val appendInfo = log.append(MemoryRecords.withRecords(magicValue, codec, records: _*), assignOffsets = true)
+    val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, codec, records: _*), leaderEpoch = 0)
     val offsets = appendInfo.firstOffset to appendInfo.lastOffset
 
     kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 2cfcc07..6b9f4ea 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -116,7 +116,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
   private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, timestamp: Long): Seq[(Int, Int)] = {
     for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
       val count = counter
-      log.append(TestUtils.singletonRecords(value = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
+      log.appendAsLeader(TestUtils.singletonRecords(value = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0)
       counter += 1
       (key, count)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index e933c87..c2ae155 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -100,7 +100,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     while(log.numberOfSegments < 8)
-      log.append(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()))
+      log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
 
     val topicPartition = new TopicPartition("log", 0)
     val lastClean = Map(topicPartition -> 0L)
@@ -123,7 +123,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
     val t0 = time.milliseconds
     while(log.numberOfSegments < 4)
-      log.append(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t0))
+      log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t0), leaderEpoch = 0)
 
     val activeSegAtT0 = log.activeSegment
 
@@ -131,7 +131,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     val t1 = time.milliseconds
 
     while (log.numberOfSegments < 8)
-      log.append(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t1))
+      log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t1), leaderEpoch = 0)
 
     val topicPartition = new TopicPartition("log", 0)
     val lastClean = Map(topicPartition -> 0L)
@@ -155,7 +155,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
     val t0 = time.milliseconds
     while (log.numberOfSegments < 8)
-      log.append(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t0))
+      log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t0), leaderEpoch = 0)
 
     time.sleep(compactionLag + 1)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 3261626..44d47c9 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -68,7 +68,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // append messages to the log until we have four segments
     while(log.numberOfSegments < 4)
-      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
     val keysFound = keysInLog(log)
     assertEquals(0L until log.logEndOffset, keysFound)
 
@@ -102,7 +102,7 @@ class LogCleanerTest extends JUnitSuite {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     while(log.numberOfSegments < 2)
-      log.append(record(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte)))
+      log.appendAsLeader(record(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte)), leaderEpoch = 0)
     val keysFound = keysInLog(log)
     assertEquals(0L until log.logEndOffset, keysFound)
 
@@ -128,16 +128,16 @@ class LogCleanerTest extends JUnitSuite {
 
     // append messages with the keys 0 through N
     while(log.numberOfSegments < 2)
-      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
 
     // delete all even keys between 0 and N
     val leo = log.logEndOffset
     for(key <- 0 until leo.toInt by 2)
-      log.append(tombstoneRecord(key))
+      log.appendAsLeader(tombstoneRecord(key), leaderEpoch = 0)
 
     // append some new unique keys to pad out to a new active segment
     while(log.numberOfSegments < 4)
-      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
 
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
     val keys = keysInLog(log).toSet
@@ -153,11 +153,11 @@ class LogCleanerTest extends JUnitSuite {
 
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
-    log.append(record(0,0)) // offset 0
-    log.append(record(1,1)) // offset 1
-    log.append(record(0,0)) // offset 2
-    log.append(record(1,1)) // offset 3
-    log.append(record(0,0)) // offset 4
+    log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 0
+    log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 1
+    log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 2
+    log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 3
+    log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 4
     // roll the segment, so we can clean the messages already appended
     log.roll()
 
@@ -180,11 +180,11 @@ class LogCleanerTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
 
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
-    log.append(record(0, 0)) // offset 0
-    log.append(record(0, 1, pid = 1, epoch = 0, sequence = 0)) // offset 1
-    log.append(record(0, 2, pid = 2, epoch = 0, sequence = 0)) // offset 2
-    log.append(record(0, 3, pid = 3, epoch = 0, sequence = 0)) // offset 3
-    log.append(record(1, 1, pid = 2, epoch = 0, sequence = 1)) // offset 4
+    log.appendAsLeader(record(0, 0), leaderEpoch = 0) // offset 0
+    log.appendAsLeader(record(0, 1, pid = 1, epoch = 0, sequence = 0), leaderEpoch = 0) // offset 1
+    log.appendAsLeader(record(0, 2, pid = 2, epoch = 0, sequence = 0), leaderEpoch = 0) // offset 2
+    log.appendAsLeader(record(0, 3, pid = 3, epoch = 0, sequence = 0), leaderEpoch = 0) // offset 3
+    log.appendAsLeader(record(1, 1, pid = 2, epoch = 0, sequence = 1), leaderEpoch = 0) // offset 4
 
     // roll the segment, so we can clean the messages already appended
     log.roll()
@@ -203,11 +203,11 @@ class LogCleanerTest extends JUnitSuite {
 
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
-    log.append(record(0,0)) // offset 0
-    log.append(record(1,1)) // offset 1
-    log.append(record(0,0)) // offset 2
-    log.append(record(1,1)) // offset 3
-    log.append(record(0,0)) // offset 4
+    log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 0
+    log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 1
+    log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 2
+    log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 3
+    log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 4
     // roll the segment, so we can clean the messages already appended
     log.roll()
 
@@ -241,14 +241,14 @@ class LogCleanerTest extends JUnitSuite {
 
     // append messages with the keys 0 through N-1, values equal offset
     while(log.numberOfSegments <= numCleanableSegments)
-      log.append(record(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
+      log.appendAsLeader(record(log.logEndOffset.toInt % N, log.logEndOffset.toInt), leaderEpoch = 0)
 
     // at this point one message past the cleanable segments has been added
     // the entire segment containing the first uncleanable offset should not be cleaned.
     val firstUncleanableOffset = log.logEndOffset + 1  // +1  so it is past the baseOffset
 
     while(log.numberOfSegments < numTotalSegments - 1)
-      log.append(record(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
+      log.appendAsLeader(record(log.logEndOffset.toInt % N, log.logEndOffset.toInt), leaderEpoch = 0)
 
     // the last (active) segment has just one message
 
@@ -278,7 +278,7 @@ class LogCleanerTest extends JUnitSuite {
     // create 6 segments with only one message in each segment
     def createRecorcs = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
     for (_ <- 0 until 6)
-      log.append(createRecorcs, assignOffsets = true)
+      log.appendAsLeader(createRecorcs, leaderEpoch = 0)
 
     val logToClean = LogToClean(new TopicPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset)
 
@@ -296,7 +296,7 @@ class LogCleanerTest extends JUnitSuite {
     // create 6 segments with only one message in each segment
     def createRecords = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
     for (_ <- 0 until 6)
-      log.append(createRecords, assignOffsets = true)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     // segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable
     val segs = log.logSegments.toSeq
@@ -328,14 +328,14 @@ class LogCleanerTest extends JUnitSuite {
 
     // append unkeyed messages
     while(log.numberOfSegments < 2)
-      log.append(unkeyedRecord(log.logEndOffset.toInt))
+      log.appendAsLeader(unkeyedRecord(log.logEndOffset.toInt), leaderEpoch = 0)
     val numInvalidMessages = unkeyedMessageCountInLog(log)
 
     val sizeWithUnkeyedMessages = log.size
 
     // append keyed messages
     while(log.numberOfSegments < 3)
-      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
 
     val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
     val (_, stats) = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
@@ -373,7 +373,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // append messages to the log until we have four segments
     while(log.numberOfSegments < 4)
-      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
 
     val keys = keysInLog(log)
     val map = new FakeOffsetMap(Int.MaxValue)
@@ -398,7 +398,7 @@ class LogCleanerTest extends JUnitSuite {
     // append some messages to the log
     var i = 0
     while(log.numberOfSegments < 10) {
-      log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
+      log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
       i += 1
     }
 
@@ -451,12 +451,12 @@ class LogCleanerTest extends JUnitSuite {
 
     // fill up first segment
     while (log.numberOfSegments == 1)
-      log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
+      log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
 
     // forward offset and append message to next segment at offset Int.MaxValue
     val records = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue - 1)
-    log.append(records, assignOffsets = false)
-    log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
+    log.appendAsFollower(records)
+    log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
     assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
 
     // grouping should result in a single group with maximum relative offset of Int.MaxValue
@@ -464,7 +464,7 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals(1, groups.size)
 
     // append another message, making last offset of second segment > Int.MaxValue
-    log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
+    log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
 
     // grouping should not group the two segments to ensure that maximum relative offset in each group <= Int.MaxValue
     groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
@@ -473,7 +473,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // append more messages, creating new segments, further grouping should still occur
     while (log.numberOfSegments < 4)
-      log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
+      log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
 
     groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
     assertEquals(log.numberOfSegments - 1, groups.size)
@@ -555,7 +555,7 @@ class LogCleanerTest extends JUnitSuite {
     var log = makeLog(config = config)
     var messageCount = 0
     while(log.numberOfSegments < 10) {
-      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
       messageCount += 1
     }
     val allKeys = keysInLog(log)
@@ -591,7 +591,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // add some more messages and clean the log again
     while(log.numberOfSegments < 10) {
-      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
       messageCount += 1
     }
     for (k <- 1 until messageCount by 2)
@@ -606,7 +606,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // add some more messages and clean the log again
     while(log.numberOfSegments < 10) {
-      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
       messageCount += 1
     }
     for (k <- 1 until messageCount by 2)
@@ -651,11 +651,11 @@ class LogCleanerTest extends JUnitSuite {
     val log = makeLog()
     val cleaner = makeCleaner(2)
 
-    log.append(record(0,0))
-    log.append(record(1,1))
-    log.append(record(2,2))
-    log.append(record(3,3))
-    log.append(record(4,4))
+    log.appendAsLeader(record(0,0), leaderEpoch = 0)
+    log.appendAsLeader(record(1,1), leaderEpoch = 0)
+    log.appendAsLeader(record(2,2), leaderEpoch = 0)
+    log.appendAsLeader(record(3,3), leaderEpoch = 0)
+    log.appendAsLeader(record(4,4), leaderEpoch = 0)
     log.roll()
 
     val stats = new CleanerStats()
@@ -695,8 +695,8 @@ class LogCleanerTest extends JUnitSuite {
     val noDupSetOffset = 50
     val noDupSet = noDupSetKeys zip (noDupSetOffset until noDupSetOffset + noDupSetKeys.size)
 
-    log.append(invalidCleanedMessage(dupSetOffset, dupSet, codec), assignOffsets = false)
-    log.append(invalidCleanedMessage(noDupSetOffset, noDupSet, codec), assignOffsets = false)
+    log.appendAsFollower(invalidCleanedMessage(dupSetOffset, dupSet, codec))
+    log.appendAsFollower(invalidCleanedMessage(noDupSetOffset, noDupSet, codec))
 
     log.roll()
 
@@ -739,22 +739,22 @@ class LogCleanerTest extends JUnitSuite {
     val cleaner = makeCleaner(10)
 
     // Append a message with a large timestamp.
-    log.append(TestUtils.singletonRecords(value = "0".getBytes,
+    log.appendAsLeader(TestUtils.singletonRecords(value = "0".getBytes,
                                           key = "0".getBytes,
-                                          timestamp = time.milliseconds() + logConfig.deleteRetentionMs + 10000))
+                                          timestamp = time.milliseconds() + logConfig.deleteRetentionMs + 10000), leaderEpoch = 0)
     log.roll()
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
     // Append a tombstone with a small timestamp and roll out a new log segment.
-    log.append(TestUtils.singletonRecords(value = null,
+    log.appendAsLeader(TestUtils.singletonRecords(value = null,
                                           key = "0".getBytes,
-                                          timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000))
+                                          timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000), leaderEpoch = 0)
     log.roll()
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 1, log.activeSegment.baseOffset))
     assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.batches.iterator.next().lastOffset)
     // Append a message and roll out another log segment.
-    log.append(TestUtils.singletonRecords(value = "1".getBytes,
+    log.appendAsLeader(TestUtils.singletonRecords(value = "1".getBytes,
                                           key = "1".getBytes,
-                                          timestamp = time.milliseconds()))
+                                          timestamp = time.milliseconds()), leaderEpoch = 0)
     log.roll()
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
     assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.batches.iterator.next().lastOffset)
@@ -762,7 +762,7 @@ class LogCleanerTest extends JUnitSuite {
 
   private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
     for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
-      yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).lastOffset
+      yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset
   }
 
   private def invalidCleanedMessage(initialOffset: Long,
@@ -790,7 +790,7 @@ class LogCleanerTest extends JUnitSuite {
   }
 
   private def messageWithOffset(key: Array[Byte], value: Array[Byte], offset: Long): MemoryRecords =
-    MemoryRecords.withRecords(offset, CompressionType.NONE, new SimpleRecord(key, value))
+    MemoryRecords.withRecords(offset, CompressionType.NONE, 0, new SimpleRecord(key, value))
 
   private def messageWithOffset(key: Int, value: Int, offset: Long): MemoryRecords =
     messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset)
@@ -812,7 +812,7 @@ class LogCleanerTest extends JUnitSuite {
 
   def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
     for((key, value) <- seq)
-      yield log.append(record(key, value)).firstOffset
+      yield log.appendAsLeader(record(key, value), leaderEpoch = 0).firstOffset
   }
 
   def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 1248d1a..b0eb29a 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -69,7 +69,7 @@ class LogManagerTest {
     val log = logManager.createLog(new TopicPartition(name, 0), logConfig)
     val logFile = new File(logDir, name + "-0")
     assertTrue(logFile.exists)
-    log.append(TestUtils.singletonRecords("test".getBytes()))
+    log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
   }
 
   /**
@@ -92,7 +92,7 @@ class LogManagerTest {
     var offset = 0L
     for(_ <- 0 until 200) {
       val set = TestUtils.singletonRecords("test".getBytes())
-      val info = log.append(set)
+      val info = log.appendAsLeader(set, leaderEpoch = 0)
       offset = info.lastOffset
     }
     assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
@@ -114,7 +114,7 @@ class LogManagerTest {
       case _: OffsetOutOfRangeException => // This is good.
     }
     // log should still be appendable
-    log.append(TestUtils.singletonRecords("test".getBytes()))
+    log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
   }
 
   /**
@@ -140,7 +140,7 @@ class LogManagerTest {
     val numMessages = 200
     for (_ <- 0 until numMessages) {
       val set = TestUtils.singletonRecords("test".getBytes())
-      val info = log.append(set)
+      val info = log.appendAsLeader(set, leaderEpoch = 0)
       offset = info.firstOffset
     }
 
@@ -161,7 +161,7 @@ class LogManagerTest {
       case _: OffsetOutOfRangeException => // This is good.
     }
     // log should still be appendable
-    log.append(TestUtils.singletonRecords("test".getBytes()))
+    log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
   }
 
   /**
@@ -176,7 +176,7 @@ class LogManagerTest {
     var offset = 0L
     for (_ <- 0 until 200) {
       val set = TestUtils.singletonRecords("test".getBytes(), key="test".getBytes())
-      val info = log.append(set)
+      val info = log.appendAsLeader(set, leaderEpoch = 0)
       offset = info.lastOffset
     }
 
@@ -205,7 +205,7 @@ class LogManagerTest {
     val lastFlush = log.lastFlushTime
     for (_ <- 0 until 200) {
       val set = TestUtils.singletonRecords("test".getBytes())
-      log.append(set)
+      log.appendAsLeader(set, leaderEpoch = 0)
     }
     time.sleep(logManager.InitialTaskDelayMs)
     assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime)
@@ -286,7 +286,7 @@ class LogManagerTest {
     val logs = topicPartitions.map(this.logManager.createLog(_, logConfig))
     logs.foreach(log => {
       for (_ <- 0 until 50)
-        log.append(TestUtils.singletonRecords("test".getBytes()))
+        log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
 
       log.flush()
     })

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index b61f261..d42abd4 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -85,41 +85,41 @@ class LogTest {
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
     // Test the segment rolling behavior when messages do not have a timestamp.
     time.sleep(log.config.segmentMs + 1)
-    log.append(createRecords)
+    log.appendAsLeader(createRecords, leaderEpoch = 0)
     assertEquals("Log doesn't roll if doing so creates an empty segment.", 1, log.numberOfSegments)
 
-    log.append(createRecords)
+    log.appendAsLeader(createRecords, leaderEpoch = 0)
     assertEquals("Log rolls on this append since time has expired.", 2, log.numberOfSegments)
 
     for (numSegments <- 3 until 5) {
       time.sleep(log.config.segmentMs + 1)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
       assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
     }
 
     // Append a message with timestamp to a segment whose first message do not have a timestamp.
     val timestamp = time.milliseconds + log.config.segmentMs + 1
     def createRecordsWithTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = timestamp)
-    log.append(createRecordsWithTimestamp)
+    log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0)
     assertEquals("Segment should not have been rolled out because the log rolling should be based on wall clock.", 4, log.numberOfSegments)
 
     // Test the segment rolling behavior when messages have timestamps.
     time.sleep(log.config.segmentMs + 1)
-    log.append(createRecordsWithTimestamp)
+    log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0)
     assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments)
 
     // move the wall clock beyond log rolling time
     time.sleep(log.config.segmentMs + 1)
-    log.append(createRecordsWithTimestamp)
+    log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0)
     assertEquals("Log should not roll because the roll should depend on timestamp of the first message.", 5, log.numberOfSegments)
 
     val recordWithExpiredTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
-    log.append(recordWithExpiredTimestamp)
+    log.appendAsLeader(recordWithExpiredTimestamp, leaderEpoch = 0)
     assertEquals("Log should roll because the timestamp in the message should make the log segment expire.", 6, log.numberOfSegments)
 
     val numSegments = log.numberOfSegments
     time.sleep(log.config.segmentMs + 1)
-    log.append(MemoryRecords.withRecords(CompressionType.NONE))
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE), leaderEpoch = 0)
     assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments)
   }
 
@@ -138,10 +138,10 @@ class LogTest {
     val epoch: Short = 0
 
     val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = epoch, sequence = 0)
-    log.append(records, assignOffsets = true)
+    log.appendAsLeader(records, leaderEpoch = 0)
 
     val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = epoch, sequence = 2)
-    log.append(nextRecords, assignOffsets = true)
+    log.appendAsLeader(nextRecords, leaderEpoch = 0)
   }
 
   @Test
@@ -163,7 +163,7 @@ class LogTest {
     for (_ <- 0 to 5) {
       val record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
         pid = pid, epoch = epoch, sequence = seq)
-      log.append(record, assignOffsets = true)
+      log.appendAsLeader(record, leaderEpoch = 0)
       seq = seq + 1
     }
     // Append an entry with multiple log records.
@@ -172,11 +172,11 @@ class LogTest {
       new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
       new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)
     ), pid = pid, epoch = epoch, sequence = seq)
-    val multiEntryAppendInfo = log.append(createRecords, assignOffsets = true)
+    val multiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 0)
     assertEquals("should have appended 3 entries", multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1, 3)
 
     // Append a Duplicate of the tail, when the entry at the tail has multiple records.
-    val dupMultiEntryAppendInfo = log.append(createRecords, assignOffsets = true)
+    val dupMultiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 0)
     assertEquals("Somehow appended a duplicate entry with multiple log records to the tail",
       multiEntryAppendInfo.firstOffset, dupMultiEntryAppendInfo.firstOffset)
     assertEquals("Somehow appended a duplicate entry with multiple log records to the tail",
@@ -191,7 +191,7 @@ class LogTest {
           new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
           new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)),
         pid = pid, epoch = epoch, sequence = seq - 2)
-      log.append(records, assignOffsets = true)
+      log.appendAsLeader(records, leaderEpoch = 0)
       fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " +
         "in the middle of the log.")
     } catch {
@@ -203,7 +203,7 @@ class LogTest {
       val records = TestUtils.records(
         List(new SimpleRecord(time.milliseconds, s"key-1".getBytes, s"value-1".getBytes)),
         pid = pid, epoch = epoch, sequence = 1)
-      log.append(records, assignOffsets = true)
+      log.appendAsLeader(records, leaderEpoch = 0)
       fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " +
         "in the middle of the log.")
     } catch {
@@ -213,8 +213,8 @@ class LogTest {
     // Append a duplicate entry with a single records at the tail of the log. This should return the appendInfo of the original entry.
     def createRecordsWithDuplicate = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
       pid = pid, epoch = epoch, sequence = seq)
-    val origAppendInfo = log.append(createRecordsWithDuplicate, assignOffsets = true)
-    val newAppendInfo = log.append(createRecordsWithDuplicate, assignOffsets = true)
+    val origAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0)
+    val newAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0)
     assertEquals("Inserted a duplicate records into the log", origAppendInfo.firstOffset, newAppendInfo.firstOffset)
     assertEquals("Inserted a duplicate records into the log", origAppendInfo.lastOffset, newAppendInfo.lastOffset)
   }
@@ -234,29 +234,29 @@ class LogTest {
 
     val buffer = ByteBuffer.allocate(512)
 
-    var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), 1L, epoch, 0)
+    var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), 1L, epoch, 0, 0)
     builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
     builder.close()
 
     // Append a record with other pids.
-    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), 2L, epoch, 0)
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), 2L, epoch, 0, 0)
     builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
     builder.close()
 
     // Append a record with other pids.
-    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), 3L, epoch, 0)
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), 3L, epoch, 0, 0)
     builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
     builder.close()
 
     // Append a record with other pids.
-    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), 4L, epoch, 0)
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), 4L, epoch, 0, 0)
     builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
     builder.close()
 
     buffer.flip()
     val memoryRecords = MemoryRecords.readableRecords(buffer)
 
-    log.append(memoryRecords, assignOffsets = false)
+    log.appendAsFollower(memoryRecords)
     log.flush()
 
     val fetchedData = log.read(0, Int.MaxValue)
@@ -310,7 +310,7 @@ class LogTest {
 
     buffer.flip()
 
-    log.append(MemoryRecords.readableRecords(buffer), assignOffsets = false)
+    log.appendAsFollower(MemoryRecords.readableRecords(buffer))
     // Should throw a duplicate seqeuence exception here.
     fail("should have thrown a DuplicateSequenceNumberException.")
   }
@@ -331,10 +331,10 @@ class LogTest {
     val oldEpoch: Short = 0
 
     val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = newEpoch, sequence = 0)
-    log.append(records, assignOffsets = true)
+    log.appendAsLeader(records, leaderEpoch = 0)
 
     val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = oldEpoch, sequence = 0)
-    log.append(nextRecords, assignOffsets = true)
+    log.appendAsLeader(nextRecords, leaderEpoch = 0)
   }
 
   /**
@@ -357,15 +357,15 @@ class LogTest {
       scheduler = time.scheduler,
       time = time)
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
-    log.append(set)
+    log.appendAsLeader(set, leaderEpoch = 0)
 
     time.sleep(log.config.segmentMs - maxJitter)
     set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
-    log.append(set)
+    log.appendAsLeader(set, leaderEpoch = 0)
     assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments)
     time.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
     set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
-    log.append(set)
+    log.appendAsLeader(set, leaderEpoch = 0)
     assertEquals("Log should roll after segmentMs adjusted by random jitter", 2, log.numberOfSegments)
   }
 
@@ -389,7 +389,7 @@ class LogTest {
 
     // segments expire in size
     for (_ <- 1 to (msgPerSeg + 1))
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
     assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
   }
 
@@ -400,7 +400,7 @@ class LogTest {
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
     val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
-    log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds))
+    log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds), leaderEpoch = 0)
   }
 
   /**
@@ -416,7 +416,7 @@ class LogTest {
     val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
 
     for(value <- values)
-      log.append(TestUtils.singletonRecords(value = value))
+      log.appendAsLeader(TestUtils.singletonRecords(value = value), leaderEpoch = 0)
 
     for(i <- values.indices) {
       val read = log.read(i, 100, Some(i+1)).records.batches.iterator.next()
@@ -442,7 +442,7 @@ class LogTest {
 
     // now test the case that we give the offsets and use non-sequential offsets
     for(i <- records.indices)
-      log.append(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)), assignOffsets = false)
+      log.appendAsFollower(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, 0, records(i)))
     for(i <- 50 until messageIds.max) {
       val idx = messageIds.indexWhere(_ >= i)
       val read = log.read(i, 100, None).records.records.iterator.next()
@@ -465,7 +465,7 @@ class LogTest {
 
     // keep appending until we have two segments with only a single message in the second segment
     while(log.numberOfSegments == 1)
-      log.append(TestUtils.singletonRecords(value = "42".getBytes))
+      log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0)
 
     // now manually truncate off all but one message from the first segment to create a gap in the messages
     log.logSegments.head.truncateTo(1)
@@ -484,7 +484,7 @@ class LogTest {
 
     // now test the case that we give the offsets and use non-sequential offsets
     for (i <- records.indices)
-      log.append(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)), assignOffsets = false)
+      log.appendAsFollower(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, 0, records(i)))
 
     for (i <- 50 until messageIds.max) {
       val idx = messageIds.indexWhere(_ >= i)
@@ -512,7 +512,7 @@ class LogTest {
 
     // now test the case that we give the offsets and use non-sequential offsets
     for (i <- records.indices)
-      log.append(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)), assignOffsets = false)
+      log.appendAsFollower(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, 0, records(i)))
 
     for (i <- 50 until messageIds.max) {
       assertEquals(MemoryRecords.EMPTY, log.read(i, 0).records)
@@ -543,7 +543,7 @@ class LogTest {
     // set up replica log starting with offset 1024 and with one message (at offset 1024)
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
     val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
-    log.append(TestUtils.singletonRecords(value = "42".getBytes))
+    log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0)
 
     assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes)
 
@@ -577,7 +577,7 @@ class LogTest {
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes,
                                                                                 timestamp = time.milliseconds))
-    messageSets.foreach(log.append(_))
+    messageSets.foreach(log.appendAsLeader(_, leaderEpoch = 0))
     log.flush()
 
     /* do successive reads to ensure all our messages are there */
@@ -614,8 +614,8 @@ class LogTest {
     val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
-    log.append(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)))
-    log.append(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("alpha".getBytes), new SimpleRecord("beta".getBytes)))
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("alpha".getBytes), new SimpleRecord("beta".getBytes)), leaderEpoch = 0)
 
     def read(offset: Int) = log.read(offset, 4096).records.records
 
@@ -639,7 +639,7 @@ class LogTest {
       logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
       val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
       for(i <- 0 until messagesToAppend)
-        log.append(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10))
+        log.appendAsLeader(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10), leaderEpoch = 0)
 
       val currOffset = log.logEndOffset
       assertEquals(currOffset, messagesToAppend)
@@ -653,7 +653,7 @@ class LogTest {
       assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
       assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
                    currOffset,
-                   log.append(TestUtils.singletonRecords(value = "hello".getBytes, timestamp = time.milliseconds)).firstOffset)
+                   log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, timestamp = time.milliseconds), leaderEpoch = 0).firstOffset)
 
       // cleanup the log
       log.delete()
@@ -676,7 +676,7 @@ class LogTest {
     val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     try {
-      log.append(messageSet)
+      log.appendAsLeader(messageSet, leaderEpoch = 0)
       fail("message set should throw RecordBatchTooLargeException.")
     } catch {
       case _: RecordBatchTooLargeException => // this is good
@@ -703,28 +703,28 @@ class LogTest {
     val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     try {
-      log.append(messageSetWithUnkeyedMessage)
+      log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
       case _: CorruptRecordException => // this is good
     }
     try {
-      log.append(messageSetWithOneUnkeyedMessage)
+      log.appendAsLeader(messageSetWithOneUnkeyedMessage, leaderEpoch = 0)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
       case _: CorruptRecordException => // this is good
     }
     try {
-      log.append(messageSetWithCompressedUnkeyedMessage)
+      log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, leaderEpoch = 0)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
       case _: CorruptRecordException => // this is good
     }
 
     // the following should succeed without any InvalidMessageException
-    log.append(messageSetWithKeyedMessage)
-    log.append(messageSetWithKeyedMessages)
-    log.append(messageSetWithCompressedKeyedMessage)
+    log.appendAsLeader(messageSetWithKeyedMessage, leaderEpoch = 0)
+    log.appendAsLeader(messageSetWithKeyedMessages, leaderEpoch = 0)
+    log.appendAsLeader(messageSetWithCompressedKeyedMessage, leaderEpoch = 0)
   }
 
   /**
@@ -745,10 +745,10 @@ class LogTest {
     val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     // should be able to append the small message
-    log.append(first)
+    log.appendAsLeader(first, leaderEpoch = 0)
 
     try {
-      log.append(second)
+      log.appendAsLeader(second, leaderEpoch = 0)
       fail("Second message set should throw MessageSizeTooLargeException.")
     } catch {
       case _: RecordTooLargeException => // this is good
@@ -770,8 +770,8 @@ class LogTest {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
-        timestamp = time.milliseconds + i * 10))
+      log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
+        timestamp = time.milliseconds + i * 10), leaderEpoch = 0)
     assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
     val lastIndexOffset = log.activeSegment.index.lastOffset
     val numIndexEntries = log.activeSegment.index.entries
@@ -818,9 +818,9 @@ class LogTest {
     val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     val messages = (0 until numMessages).map { i =>
-      MemoryRecords.withRecords(100 + i, CompressionType.NONE, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
+      MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
     }
-    messages.foreach(log.append(_, assignOffsets = false))
+    messages.foreach(log.appendAsFollower(_))
     val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries }
     assertEquals(s"There should be ${numMessages - 1} time index entries", numMessages - 1, timeIndexEntries)
     assertEquals(s"The last time index entry should have timestamp ${time.milliseconds + numMessages - 1}",
@@ -841,7 +841,7 @@ class LogTest {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
+      log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10), leaderEpoch = 0)
     val indexFiles = log.logSegments.map(_.index.file)
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
@@ -880,7 +880,7 @@ class LogTest {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
+      log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0)
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
 
@@ -911,7 +911,7 @@ class LogTest {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
+      log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10), leaderEpoch = 0)
     val indexFiles = log.logSegments.map(_.index.file)
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
@@ -961,7 +961,7 @@ class LogTest {
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (_ <- 1 to msgPerSeg)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
     assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset)
@@ -982,7 +982,7 @@ class LogTest {
     assertEquals("Should change log size", 0, log.size)
 
     for (_ <- 1 to msgPerSeg)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
     assertEquals("Should be back to original size", log.size, size)
@@ -991,7 +991,7 @@ class LogTest {
     assertEquals("Should change log size", log.size, 0)
 
     for (_ <- 1 to msgPerSeg)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     assertTrue("Should be ahead of to original offset", log.logEndOffset > msgPerSeg)
     assertEquals("log size should be same as before", size, log.size)
@@ -1016,12 +1016,12 @@ class LogTest {
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (i<- 1 to msgPerSeg)
-      log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i))
+      log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i), leaderEpoch = 0)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     time.sleep(msgPerSeg)
     for (i<- 1 to msgPerSeg)
-      log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i))
+      log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i), leaderEpoch = 0)
     assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
     val expectedEntries = msgPerSeg - 1
 
@@ -1035,7 +1035,7 @@ class LogTest {
 
     time.sleep(msgPerSeg)
     for (i<- 1 to msgPerSeg)
-      log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i))
+      log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i), leaderEpoch = 0)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
   }
 
@@ -1068,7 +1068,7 @@ class LogTest {
 
     // check that we can append to the log
     for (_ <- 0 until 10)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     log.delete()
   }
@@ -1095,7 +1095,7 @@ class LogTest {
 
     // add enough messages to roll over several segments then close and re-open and attempt to truncate
     for (_ <- 0 until 100)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
     log.close()
     log = new Log(logDir,
                   config,
@@ -1132,7 +1132,7 @@ class LogTest {
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     // files should be renamed
     val segments = log.logSegments.toArray
@@ -1173,7 +1173,7 @@ class LogTest {
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     // expire all segments
     log.deleteOldSegments()
@@ -1196,7 +1196,7 @@ class LogTest {
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
                       time = time)
-    log.append(TestUtils.singletonRecords(value = null))
+    log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)
     val head = log.read(0, 4096, None).records.records.iterator.next()
     assertEquals(0, head.offset)
     assertTrue("Message payload should be null.", !head.hasValue)
@@ -1211,9 +1211,9 @@ class LogTest {
       scheduler = time.scheduler,
       time = time)
     val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray
-    records.foreach(record => log.append(MemoryRecords.withRecords(CompressionType.NONE, record)))
+    records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0))
     val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes))
-    log.append(invalidRecord, assignOffsets = false)
+    log.appendAsFollower(invalidRecord)
   }
 
   @Test
@@ -1224,8 +1224,8 @@ class LogTest {
       recoveryPoint = 0L,
       scheduler = time.scheduler,
       time = time)
-    log.append(MemoryRecords.withRecords(CompressionType.NONE,
-      new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)))
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
+      new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0)
   }
 
   @Test
@@ -1249,7 +1249,7 @@ class LogTest {
                         time = time)
       val numMessages = 50 + TestUtils.random.nextInt(50)
       for (_ <- 0 until numMessages)
-        log.append(createRecords)
+        log.appendAsLeader(createRecords, leaderEpoch = 0)
       val records = log.logSegments.flatMap(_.log.records.asScala.toList).toList
       log.close()
 
@@ -1290,21 +1290,21 @@ class LogTest {
       recoveryPoint = 0L,
       scheduler = time.scheduler,
       time = time)
-    val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
-    val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, new SimpleRecord("v3".getBytes(), "k3".getBytes()))
-    val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, CompressionType.NONE, new SimpleRecord("v4".getBytes(), "k4".getBytes()))
-    val set4 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 4, CompressionType.NONE, new SimpleRecord("v5".getBytes(), "k5".getBytes()))
+    val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
+    val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, 0, new SimpleRecord("v3".getBytes(), "k3".getBytes()))
+    val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, CompressionType.NONE, 0, new SimpleRecord("v4".getBytes(), "k4".getBytes()))
+    val set4 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 4, CompressionType.NONE, 0, new SimpleRecord("v5".getBytes(), "k5".getBytes()))
     //Writes into an empty log with baseOffset 0
-    log.append(set1, false)
+    log.appendAsFollower(set1)
     assertEquals(0L, log.activeSegment.baseOffset)
     //This write will roll the segment, yielding a new segment with base offset = max(2, 1) = 2
-    log.append(set2, false)
+    log.appendAsFollower(set2)
     assertEquals(2L, log.activeSegment.baseOffset)
     //This will also roll the segment, yielding a new segment with base offset = max(3, Integer.MAX_VALUE+3) = Integer.MAX_VALUE+3
-    log.append(set3, false)
+    log.appendAsFollower(set3)
     assertEquals(Integer.MAX_VALUE.toLong + 3, log.activeSegment.baseOffset)
     //This will go into the existing log
-    log.append(set4, false)
+    log.appendAsFollower(set4)
     assertEquals(Integer.MAX_VALUE.toLong + 3, log.activeSegment.baseOffset)
     log.close()
     val indexFiles = logDir.listFiles.filter(file => file.getName.contains(".index"))
@@ -1340,7 +1340,7 @@ class LogTest {
       scheduler = time.scheduler,
       time = time)
     for (_ <- 0 until 100)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
     log.close()
 
     // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
@@ -1442,7 +1442,7 @@ class LogTest {
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     log.leaderEpochCache.assign(0, 40)
     log.leaderEpochCache.assign(1, 90)
@@ -1455,7 +1455,7 @@ class LogTest {
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     log.delete()
     assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
@@ -1470,7 +1470,7 @@ class LogTest {
     val log = createLog(createRecords.sizeInBytes)
 
     for (_ <- 0 until 15)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
     assertEquals("should have 3 segments", 3, log.numberOfSegments)
     assertEquals(log.logStartOffset, 0)
 
@@ -1501,7 +1501,7 @@ class LogTest {
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     log.deleteOldSegments
     assertEquals("should have 2 segments", 2,log.numberOfSegments)
@@ -1514,7 +1514,7 @@ class LogTest {
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     log.deleteOldSegments
     assertEquals("should have 3 segments", 3,log.numberOfSegments)
@@ -1527,7 +1527,7 @@ class LogTest {
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     log.deleteOldSegments()
     assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
@@ -1540,7 +1540,7 @@ class LogTest {
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     log.deleteOldSegments()
     assertEquals("There should be 3 segments remaining", 3, log.numberOfSegments)
@@ -1555,7 +1555,7 @@ class LogTest {
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     // mark oldest segment as older the retention.ms
     log.logSegments.head.lastModified = time.milliseconds - 20000
@@ -1574,7 +1574,7 @@ class LogTest {
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     log.deleteOldSegments()
     assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
@@ -1587,13 +1587,13 @@ class LogTest {
     //Given this partition is on leader epoch 72
     val epoch = 72
     val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    log.leaderEpochCache.assign(epoch, records.size)
 
     //When appending messages as a leader (i.e. assignOffsets = true)
     for (record <- records)
-      log.append(
+      log.appendAsLeader(
         MemoryRecords.withRecords(CompressionType.NONE, record),
-        leaderEpochCache = mockCache(epoch),
-        assignOffsets = true
+        leaderEpoch = epoch
       )
 
     //Then leader epoch should be set on messages
@@ -1608,8 +1608,6 @@ class LogTest {
     val messageIds = (0 until 50).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
-    val cache = createMock(classOf[LeaderEpochCache])
-
     //Given each message has an offset & epoch, as msgs from leader would
     def recordsForEpoch(i: Int): MemoryRecords = {
       val recs = MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i))
@@ -1620,17 +1618,13 @@ class LogTest {
       recs
     }
 
-    //Verify we save the epoch to the cache.
-    expect(cache.assign(EasyMock.eq(42), anyInt)).times(records.size)
-    replay(cache)
-
     val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     //When appending as follower (assignOffsets = false)
     for (i <- records.indices)
-      log.append(recordsForEpoch(i), assignOffsets = false, leaderEpochCache = cache)
+      log.appendAsFollower(recordsForEpoch(i))
 
-    verify(cache)
+    assertEquals(42, log.leaderEpochCache.asInstanceOf[LeaderEpochFileCache].latestEpoch())
   }
 
   @Test
@@ -1641,7 +1635,7 @@ class LogTest {
 
     // Given three segments of 5 messages each
     for (e <- 0 until 15) {
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
     }
 
     //Given epochs
@@ -1664,7 +1658,7 @@ class LogTest {
 
     // Given three segments of 5 messages each
     for (e <- 0 until 15) {
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
     }
 
     //Given epochs
@@ -1688,7 +1682,7 @@ class LogTest {
 
     //Given 2 segments, 10 messages per segment
     for (epoch <- 1 to 20)
-      log.append(createRecords)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     //Simulate some leader changes at specific offsets
     cache.assign(0, 0)
@@ -1731,16 +1725,16 @@ class LogTest {
     val log = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val leaderEpochCache = epochCache(log)
     val firstBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 1, offset = 0)
-    log.append(records = firstBatch, assignOffsets = false)
+    log.appendAsFollower(records = firstBatch)
 
     val secondBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 2, offset = 1)
-    log.append(records = secondBatch, assignOffsets = false)
+    log.appendAsFollower(records = secondBatch)
 
     val thirdBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 2, offset = 2)
-    log.append(records = thirdBatch, assignOffsets = false)
+    log.appendAsFollower(records = thirdBatch)
 
     val fourthBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 3, offset = 3)
-    log.append(records = fourthBatch, assignOffsets = false)
+    log.appendAsFollower(records = fourthBatch)
 
     assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
 
@@ -1795,11 +1789,4 @@ class LogTest {
       time = time)
     log
   }
-
-  private def mockCache(epoch: Int) = {
-    val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache])
-    EasyMock.expect(cache.latestUsedEpoch).andReturn(epoch).anyTimes
-    EasyMock.replay(cache)
-    cache
-  }
 }