You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/07/08 16:25:34 UTC

[kafka] branch 2.2 updated: KAFKA-9144; Track timestamp from txn markers to prevent early producer expiration (#8960)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 7833cd8  KAFKA-9144; Track timestamp from txn markers to prevent early producer expiration (#8960)
7833cd8 is described below

commit 7833cd81a63d9a02df3b4e7aa762d94035ed5d4f
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Jul 7 19:22:45 2020 -0700

    KAFKA-9144; Track timestamp from txn markers to prevent early producer expiration (#8960)
    
    This is a backport of (#7687) for 2.3.
    
    Existing producer state expiration uses timestamps from data records only and not from transaction markers. This can cause premature producer expiration when the coordinator times out a transaction because we drop the state from existing batches. This in turn can allow the coordinator epoch to revert to a previous value, which can lead to validation failures during log recovery. This patch fixes the problem by also leveraging the timestamp from transaction markers.
    
    We also change the validation logic so that coordinator epoch is verified only for new marker appends. When replicating from the leader and when recovering the log, we only log a warning if we notice that the coordinator epoch has gone backwards. This allows recovery from previous occurrences of this bug.
    
    Finally, this patch fixes one minor issue when loading producer state from the snapshot file. When the only record for a given producer is a control record, the "last offset" field will be set to -1 in the snapshot. We should check for this case when loading to be sure we recover the state consistently.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>
---
 .../kafka/common/record/DefaultRecordBatch.java    |  16 +-
 .../common/record/DefaultRecordBatchTest.java      |   6 +
 core/src/main/scala/kafka/cluster/Partition.scala  |   4 +-
 .../coordinator/group/GroupMetadataManager.scala   |   7 +-
 .../transaction/TransactionStateManager.scala      |  10 +-
 core/src/main/scala/kafka/log/Log.scala            |  99 ++++++++-----
 core/src/main/scala/kafka/log/LogSegment.scala     |   4 +-
 core/src/main/scala/kafka/log/LogValidator.scala   |  60 +++++---
 .../scala/kafka/log/ProducerStateManager.scala     | 159 ++++++++++----------
 core/src/main/scala/kafka/server/KafkaApis.scala   |   5 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   8 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  19 ++-
 .../AbstractCoordinatorConcurrencyTest.scala       |   4 +-
 .../coordinator/group/GroupCoordinatorTest.scala   |   7 +-
 .../group/GroupMetadataManagerTest.scala           |  22 +--
 .../transaction/TransactionStateManagerTest.scala  |  13 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     |   3 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |  64 ++++----
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |   3 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 124 +++++++++++-----
 .../scala/unit/kafka/log/LogValidatorTest.scala    |  76 +++++-----
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 162 +++++++++++++++------
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   7 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  15 +-
 24 files changed, 548 insertions(+), 349 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 19ddb0e..24b9bf0 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -522,16 +522,16 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
     }
 
-    public static int incrementSequence(int baseSequence, int increment) {
-        if (baseSequence > Integer.MAX_VALUE - increment)
-            return increment - (Integer.MAX_VALUE - baseSequence) - 1;
-        return baseSequence + increment;
+    public static int incrementSequence(int sequence, int increment) {
+        if (sequence > Integer.MAX_VALUE - increment)
+            return increment - (Integer.MAX_VALUE - sequence) - 1;
+        return sequence + increment;
     }
 
-    public static int decrementSequence(int baseSequence, int decrement) {
-        if (baseSequence < decrement)
-            return Integer.MAX_VALUE - (decrement - baseSequence) + 1;
-        return baseSequence - decrement;
+    public static int decrementSequence(int sequence, int decrement) {
+        if (sequence < decrement)
+            return Integer.MAX_VALUE - (decrement - sequence) + 1;
+        return sequence - decrement;
     }
 
     private abstract class RecordIterator implements CloseableIterator<Record> {
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index ab8cbb7..1cc4a81 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -382,6 +382,12 @@ public class DefaultRecordBatchTest {
         assertEquals(4, DefaultRecordBatch.incrementSequence(Integer.MAX_VALUE - 5, 10));
     }
 
+    @Test
+    public void testDecrementSequence() {
+        assertEquals(0, DefaultRecordBatch.decrementSequence(5, 5));
+        assertEquals(Integer.MAX_VALUE, DefaultRecordBatch.decrementSequence(0, 1));
+    }
+
     private static DefaultRecordBatch recordsWithInvalidRecordCount(Byte magicValue, long timestamp,
                                               CompressionType codec, int invalidCount) {
         ByteBuffer buf = ByteBuffer.allocate(512);
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 698fb48..f4e6eb9 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -736,7 +736,7 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {
+  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = {
     val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
       leaderReplicaIfLocal match {
         case Some(leaderReplica) =>
@@ -750,7 +750,7 @@ class Partition(val topicPartition: TopicPartition,
               s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
           }
 
-          val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient,
+          val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
             interBrokerProtocolVersion)
 
           // we may need to increment high watermark since ISR could be down to 1
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 7b24498..e56a298 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantLock
 import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1}
 import kafka.common.{MessageFormatter, OffsetAndMetadata}
+import kafka.log.AppendOrigin
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.inLock
@@ -281,7 +282,7 @@ class GroupMetadataManager(brokerId: Int,
       timeout = config.offsetCommitTimeoutMs.toLong,
       requiredAcks = config.offsetCommitRequiredAcks,
       internalTopicsAllowed = true,
-      isFromClient = false,
+      origin = AppendOrigin.Coordinator,
       entriesPerPartition = records,
       delayedProduceLock = Some(group.lock),
       responseCallback = callback)
@@ -780,8 +781,8 @@ class GroupMetadataManager(brokerId: Int,
               try {
                 // do not need to require acks since even if the tombstone is lost,
                 // it will be appended again in the next purge cycle
-                val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones: _*)
-                partition.appendRecordsToLeader(records, isFromClient = false, requiredAcks = 0)
+                val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones.toArray: _*)
+                partition.appendRecordsToLeader(records, origin = AppendOrigin.Coordinator, requiredAcks = 0)
 
                 offsetsRemoved += removedOffsets.size
                 trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index bd6d487..335aaee 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -22,23 +22,23 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import kafka.log.LogConfig
+import kafka.log.{AppendOrigin, LogConfig}
 import kafka.message.UncompressedCodec
 import kafka.server.Defaults
 import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils.{Logging, Pool, Scheduler}
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 
-import scala.collection.mutable
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 
 object TransactionStateManager {
@@ -199,7 +199,7 @@ class TransactionStateManager(brokerId: Int,
           config.requestTimeoutMs,
           TransactionLog.EnforcedRequiredAcks,
           internalTopicsAllowed = true,
-          isFromClient = false,
+          origin = AppendOrigin.Coordinator,
           recordsPerPartition,
           removeFromCacheCallback,
           Some(stateLock.readLock)
@@ -617,7 +617,7 @@ class TransactionStateManager(brokerId: Int,
                 newMetadata.txnTimeoutMs.toLong,
                 TransactionLog.EnforcedRequiredAcks,
                 internalTopicsAllowed = true,
-                isFromClient = false,
+                origin = AppendOrigin.Coordinator,
                 recordsPerPartition,
                 updateCacheCallback,
                 delayedProduceLock = Some(stateLock.readLock))
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 53bde3c..1f4aa02 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -728,7 +728,10 @@ class Log(@volatile var dir: File,
     val completedTxns = ListBuffer.empty[CompletedTxn]
     records.batches.asScala.foreach { batch =>
       if (batch.hasProducerId) {
-        val maybeCompletedTxn = updateProducers(batch, loadedProducers, isFromClient = false)
+        val maybeCompletedTxn = updateProducers(batch,
+          loadedProducers,
+          firstOffsetMetadata = None,
+          origin = AppendOrigin.Replication)
         maybeCompletedTxn.foreach(completedTxns += _)
       }
     }
@@ -816,14 +819,16 @@ class Log(@volatile var dir: File,
    * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs
    *
    * @param records The records to append
-   * @param isFromClient Whether or not this append is from a producer
+   * @param origin Declares the origin of the append which affects required validations
    * @param interBrokerProtocolVersion Inter-broker message protocol version
    * @throws KafkaStorageException If the append fails due to an I/O error.
    * @return Information about the appended messages including the first and last offset.
    */
-  def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true,
+  def appendAsLeader(records: MemoryRecords,
+                     leaderEpoch: Int,
+                     origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
+    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
   }
 
   /**
@@ -834,7 +839,11 @@ class Log(@volatile var dir: File,
    * @return Information about the appended messages including the first and last offset.
    */
   def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
-    append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1)
+    append(records,
+      origin = AppendOrigin.Replication,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
+      assignOffsets = false,
+      leaderEpoch = -1)
   }
 
   /**
@@ -844,7 +853,7 @@ class Log(@volatile var dir: File,
    * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
    *
    * @param records The log records to append
-   * @param isFromClient Whether or not this append is from a producer
+   * @param origin Declares the origin of the append which affects required validations
    * @param interBrokerProtocolVersion Inter-broker message protocol version
    * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
    * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
@@ -853,9 +862,13 @@ class Log(@volatile var dir: File,
    * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset
    * @return Information about the appended messages including the first and last offset.
    */
-  private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
+  private def append(records: MemoryRecords,
+                     origin: AppendOrigin,
+                     interBrokerProtocolVersion: ApiVersion,
+                     assignOffsets: Boolean,
+                     leaderEpoch: Int): LogAppendInfo = {
     maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
-      val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
+      val appendInfo = analyzeAndValidateRecords(records, origin)
 
       // return if we have no valid messages or if this is a duplicate of the last appended entry
       if (appendInfo.shallowCount == 0)
@@ -884,7 +897,7 @@ class Log(@volatile var dir: File,
               config.messageTimestampType,
               config.messageTimestampDifferenceMaxMs,
               leaderEpoch,
-              isFromClient,
+              origin,
               interBrokerProtocolVersion)
           } catch {
             case e: IOException =>
@@ -958,9 +971,19 @@ class Log(@volatile var dir: File,
             s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
         }
 
+        // maybe roll the log if this segment is full
+        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
+
+        val logOffsetMetadata = LogOffsetMetadata(
+          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
+          segmentBaseOffset = segment.baseOffset,
+          relativePositionInSegment = segment.size)
+
         // now that we have valid records, offsets assigned, and timestamps updated, we need to
         // validate the idempotent/transactional state of the producers and collect some metadata
-        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient)
+        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
+          logOffsetMetadata, validRecords, origin)
+
         maybeDuplicate.foreach { duplicate =>
           appendInfo.firstOffset = Some(duplicate.firstOffset)
           appendInfo.lastOffset = duplicate.lastOffset
@@ -969,14 +992,6 @@ class Log(@volatile var dir: File,
           return appendInfo
         }
 
-        // maybe roll the log if this segment is full
-        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
-
-        val logOffsetMetadata = LogOffsetMetadata(
-          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
-          segmentBaseOffset = segment.baseOffset,
-          relativePositionInSegment = segment.size)
-
         segment.append(largestOffset = appendInfo.lastOffset,
           largestTimestamp = appendInfo.maxTimestamp,
           shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
@@ -1088,23 +1103,36 @@ class Log(@volatile var dir: File,
     }
   }
 
-  private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean):
+  private def analyzeAndValidateProducerState(appendOffsetMetadata: LogOffsetMetadata,
+                                              records: MemoryRecords,
+                                              origin: AppendOrigin):
   (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
     val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
     val completedTxns = ListBuffer.empty[CompletedTxn]
-    for (batch <- records.batches.asScala if batch.hasProducerId) {
-      val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
-
-      // if this is a client produce request, there will be up to 5 batches which could have been duplicated.
-      // If we find a duplicate, we return the metadata of the appended batch to the client.
-      if (isFromClient) {
-        maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate =>
-          return (updatedProducers, completedTxns.toList, Some(duplicate))
+    var relativePositionInSegment = appendOffsetMetadata.relativePositionInSegment
+
+    for (batch <- records.batches.asScala) {
+      if (batch.hasProducerId) {
+        val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
+
+        // if this is a client produce request, there will be up to 5 batches which could have been duplicated.
+        // If we find a duplicate, we return the metadata of the appended batch to the client.
+        if (origin == AppendOrigin.Client) {
+          maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate =>
+            return (updatedProducers, completedTxns.toList, Some(duplicate))
+          }
         }
-      }
 
-      val maybeCompletedTxn = updateProducers(batch, updatedProducers, isFromClient = isFromClient)
-      maybeCompletedTxn.foreach(completedTxns += _)
+        // We cache offset metadata for the start of each transaction. This allows us to
+        // compute the last stable offset without relying on additional index lookups.
+        val firstOffsetMetadata = if (batch.isTransactional)
+          Some(LogOffsetMetadata(batch.baseOffset, appendOffsetMetadata.segmentBaseOffset, relativePositionInSegment))
+        else
+          None
+
+        val maybeCompletedTxn = updateProducers(batch, updatedProducers, firstOffsetMetadata, origin)
+        maybeCompletedTxn.foreach(completedTxns += _)
+      }
     }
     (updatedProducers, completedTxns.toList, None)
   }
@@ -1127,7 +1155,7 @@ class Log(@volatile var dir: File,
    * <li> Whether any compression codec is used (if many are used, then the last one is given)
    * </ol>
    */
-  private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {
+  private def analyzeAndValidateRecords(records: MemoryRecords, origin: AppendOrigin): LogAppendInfo = {
     var shallowMessageCount = 0
     var validBytesCount = 0
     var firstOffset: Option[Long] = None
@@ -1141,7 +1169,7 @@ class Log(@volatile var dir: File,
 
     for (batch <- records.batches.asScala) {
       // we only validate V2 and higher to avoid potential compatibility issues with older clients
-      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
+      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin == AppendOrigin.Client && batch.baseOffset != 0)
         throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +
           s"be 0, but it is ${batch.baseOffset}")
 
@@ -1198,10 +1226,11 @@ class Log(@volatile var dir: File,
 
   private def updateProducers(batch: RecordBatch,
                               producers: mutable.Map[Long, ProducerAppendInfo],
-                              isFromClient: Boolean): Option[CompletedTxn] = {
+                              firstOffsetMetadata: Option[LogOffsetMetadata],
+                              origin: AppendOrigin): Option[CompletedTxn] = {
     val producerId = batch.producerId
-    val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, isFromClient))
-    appendInfo.append(batch)
+    val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin))
+    appendInfo.append(batch, firstOffsetMetadata)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index d08c7c5..0b1fb56 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -225,8 +225,8 @@ class LogSegment private[log] (val log: FileRecords,
   private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = {
     if (batch.hasProducerId) {
       val producerId = batch.producerId
-      val appendInfo = producerStateManager.prepareUpdate(producerId, isFromClient = false)
-      val maybeCompletedTxn = appendInfo.append(batch)
+      val appendInfo = producerStateManager.prepareUpdate(producerId, origin = AppendOrigin.Replication)
+      val maybeCompletedTxn = appendInfo.append(batch, firstOffsetMetadataOpt = None)
       producerStateManager.update(appendInfo)
       maybeCompletedTxn.foreach { completedTxn =>
         val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index f7c7d04..f760ee6 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -29,7 +29,32 @@ import org.apache.kafka.common.utils.Time
 import scala.collection.mutable
 import scala.collection.JavaConverters._
 
-private[kafka] object LogValidator extends Logging {
+/**
+ * The source of an append to the log. This is used when determining required validations.
+ */
+private[kafka] sealed trait AppendOrigin
+private[kafka] object AppendOrigin {
+
+  /**
+   * The log append came through replication from the leader. This typically implies minimal validation.
+   * Particularly, we do not decompress record batches in order to validate records individually.
+   */
+  case object Replication extends AppendOrigin
+
+  /**
+   * The log append came from either the group coordinator or the transaction coordinator. We validate
+   * producer epochs for normal log entries (specifically offset commits from the group coordinator) and
+   * we validate coordinate end transaction markers from the transaction coordinator.
+   */
+  case object Coordinator extends AppendOrigin
+
+  /**
+   * The log append came from the client, which implies full validation.
+   */
+  case object Client extends AppendOrigin
+}
+
+private[log] object LogValidator extends Logging {
 
   /**
    * Update the offsets for this message set and do further validation on messages including:
@@ -57,25 +82,27 @@ private[kafka] object LogValidator extends Logging {
                                                       timestampType: TimestampType,
                                                       timestampDiffMaxMs: Long,
                                                       partitionLeaderEpoch: Int,
-                                                      isFromClient: Boolean,
+                                                      origin: AppendOrigin,
                                                       interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = {
     if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // check the magic value
       if (!records.hasMatchingMagic(magic))
         convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, time, now, timestampType,
-          timestampDiffMaxMs, magic, partitionLeaderEpoch, isFromClient)
+          timestampDiffMaxMs, magic, partitionLeaderEpoch, origin)
       else
         // Do in-place validation, offset assignment and maybe set timestamp
         assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
-          partitionLeaderEpoch, isFromClient, magic)
+          partitionLeaderEpoch, origin, magic)
     } else {
       validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, now, sourceCodec, targetCodec, compactedTopic,
-        magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient, interBrokerProtocolVersion)
+        magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, origin, interBrokerProtocolVersion)
     }
   }
 
-  private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = {
-    if (isFromClient) {
+  private def validateBatch(batch: RecordBatch,
+                            origin: AppendOrigin,
+                            toMagic: Byte): Unit = {
+    if (origin == AppendOrigin.Client) {
       if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
         val countFromOffsets = batch.lastOffset - batch.baseOffset + 1
         if (countFromOffsets <= 0)
@@ -131,7 +158,7 @@ private[kafka] object LogValidator extends Logging {
                                                    timestampDiffMaxMs: Long,
                                                    toMagicValue: Byte,
                                                    partitionLeaderEpoch: Int,
-                                                   isFromClient: Boolean): ValidationAndOffsetAssignResult = {
+                                                   origin: AppendOrigin): ValidationAndOffsetAssignResult = {
     val startNanos = time.nanoseconds
     val sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes(toMagicValue, offsetCounter.value,
       CompressionType.NONE, records.records)
@@ -146,7 +173,7 @@ private[kafka] object LogValidator extends Logging {
       offsetCounter.value, now, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
 
     for (batch <- records.batches.asScala) {
-      validateBatch(batch, isFromClient, toMagicValue)
+      validateBatch(batch, origin, toMagicValue)
 
       for (record <- batch.asScala) {
         validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
@@ -174,14 +201,14 @@ private[kafka] object LogValidator extends Logging {
                                          timestampType: TimestampType,
                                          timestampDiffMaxMs: Long,
                                          partitionLeaderEpoch: Int,
-                                         isFromClient: Boolean,
+                                         origin: AppendOrigin,
                                          magic: Byte): ValidationAndOffsetAssignResult = {
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
     val initialOffset = offsetCounter.value
 
     for (batch <- records.batches.asScala) {
-      validateBatch(batch, isFromClient, magic)
+      validateBatch(batch, origin, magic)
 
       var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
       var offsetOfMaxBatchTimestamp = -1L
@@ -247,7 +274,7 @@ private[kafka] object LogValidator extends Logging {
                                                  timestampType: TimestampType,
                                                  timestampDiffMaxMs: Long,
                                                  partitionLeaderEpoch: Int,
-                                                 isFromClient: Boolean,
+                                                 origin: AppendOrigin,
                                                  interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = {
       // No in place assignment situation 1 and 2
       var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0
@@ -259,7 +286,7 @@ private[kafka] object LogValidator extends Logging {
       var uncompressedSizeInBytes = 0
 
       for (batch <- records.batches.asScala) {
-        validateBatch(batch, isFromClient, toMagic)
+        validateBatch(batch, origin, toMagic)
         uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
 
         // Do not compress control records unless they are written compressed
@@ -301,7 +328,7 @@ private[kafka] object LogValidator extends Logging {
           (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
         }
         buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
-          validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
+          validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch,
           uncompressedSizeInBytes)
       } else {
         // we can update the batch only and write the compressed payload as is
@@ -340,8 +367,7 @@ private[kafka] object LogValidator extends Logging {
                                            baseSequence: Int,
                                            isTransactional: Boolean,
                                            partitionLeaderEpoch: Int,
-                                           isFromClient: Boolean,
-                                           uncompresssedSizeInBytes: Int): ValidationAndOffsetAssignResult = {
+                                           uncompressedSizeInBytes: Int): ValidationAndOffsetAssignResult = {
     val startNanos = time.nanoseconds
     val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType,
       validatedRecords.asJava)
@@ -362,7 +388,7 @@ private[kafka] object LogValidator extends Logging {
     // message format V0 or if the inner offsets are not consecutive. This is OK since the impact is the same: we have
     // to rebuild the records (including recompression if enabled).
     val conversionCount = builder.numRecords
-    val recordConversionStats = new RecordConversionStats(uncompresssedSizeInBytes + builder.uncompressedBytesWritten,
+    val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes + builder.uncompressedBytesWritten,
       conversionCount, time.nanoseconds - startNanos)
 
     ValidationAndOffsetAssignResult(
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 5632b7b..6123d26 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -26,7 +26,6 @@ import kafka.server.LogOffsetMetadata
 import kafka.utils.{Logging, nonthreadsafe, threadsafe}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors._
-import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.types._
 import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
 import org.apache.kafka.common.utils.{ByteUtils, Crc32C}
@@ -44,28 +43,6 @@ class CorruptSnapshotException(msg: String) extends KafkaException(msg)
 case class LastRecord(lastDataOffset: Option[Long], producerEpoch: Short)
 
 
-// ValidationType and its subtypes define the extent of the validation to perform on a given ProducerAppendInfo instance
-private[log] sealed trait ValidationType
-private[log] object ValidationType {
-
-  /**
-    * This indicates no validation should be performed on the incoming append. This is the case for all appends on
-    * a replica, as well as appends when the producer state is being built from the log.
-    */
-  case object None extends ValidationType
-
-  /**
-    * We only validate the epoch (and not the sequence numbers) for offset commit requests coming from the transactional
-    * producer. These appends will not have sequence numbers, so we can't validate them.
-    */
-  case object EpochOnly extends ValidationType
-
-  /**
-    * Perform the full validation. This should be used fo regular produce requests coming to the leader.
-    */
-  case object Full extends ValidationType
-}
-
 private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffsetMetadata, var lastOffset: Option[Long] = None) {
   def this(producerId: Long, firstOffset: Long) = this(producerId, LogOffsetMetadata(firstOffset))
 
@@ -79,12 +56,18 @@ private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffset
 
 private[log] object ProducerStateEntry {
   private[log] val NumBatchesToRetain = 5
-  def empty(producerId: Long) = new ProducerStateEntry(producerId, mutable.Queue[BatchMetadata](), RecordBatch.NO_PRODUCER_EPOCH, -1, None)
+
+  def empty(producerId: Long) = new ProducerStateEntry(producerId,
+    batchMetadata = mutable.Queue[BatchMetadata](),
+    producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+    coordinatorEpoch = -1,
+    lastTimestamp = RecordBatch.NO_TIMESTAMP,
+    currentTxnFirstOffset = None)
 }
 
 private[log] case class BatchMetadata(lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) {
-  def firstSeq =  DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta)
-  def firstOffset = lastOffset - offsetDelta
+  def firstSeq: Int =  DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta)
+  def firstOffset: Long = lastOffset - offsetDelta
 
   override def toString: String = {
     "BatchMetadata(" +
@@ -103,28 +86,28 @@ private[log] class ProducerStateEntry(val producerId: Long,
                                       val batchMetadata: mutable.Queue[BatchMetadata],
                                       var producerEpoch: Short,
                                       var coordinatorEpoch: Int,
+                                      var lastTimestamp: Long,
                                       var currentTxnFirstOffset: Option[Long]) {
 
   def firstSeq: Int = if (isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.front.firstSeq
 
-  def firstOffset: Long = if (isEmpty) -1L else batchMetadata.front.firstOffset
+  def firstDataOffset: Long = if (isEmpty) -1L else batchMetadata.front.firstOffset
 
   def lastSeq: Int = if (isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.last.lastSeq
 
   def lastDataOffset: Long = if (isEmpty) -1L else batchMetadata.last.lastOffset
 
-  def lastTimestamp = if (isEmpty) RecordBatch.NO_TIMESTAMP else batchMetadata.last.timestamp
-
   def lastOffsetDelta : Int = if (isEmpty) 0 else batchMetadata.last.offsetDelta
 
   def isEmpty: Boolean = batchMetadata.isEmpty
 
   def addBatch(producerEpoch: Short, lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long): Unit = {
-    maybeUpdateEpoch(producerEpoch)
+    maybeUpdateProducerEpoch(producerEpoch)
     addBatchMetadata(BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp))
+    this.lastTimestamp = timestamp
   }
 
-  def maybeUpdateEpoch(producerEpoch: Short): Boolean = {
+  def maybeUpdateProducerEpoch(producerEpoch: Short): Boolean = {
     if (this.producerEpoch != producerEpoch) {
       batchMetadata.clear()
       this.producerEpoch = producerEpoch
@@ -141,11 +124,12 @@ private[log] class ProducerStateEntry(val producerId: Long,
   }
 
   def update(nextEntry: ProducerStateEntry): Unit = {
-    maybeUpdateEpoch(nextEntry.producerEpoch)
+    maybeUpdateProducerEpoch(nextEntry.producerEpoch)
     while (nextEntry.batchMetadata.nonEmpty)
       addBatchMetadata(nextEntry.batchMetadata.dequeue())
     this.coordinatorEpoch = nextEntry.coordinatorEpoch
     this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset
+    this.lastTimestamp = nextEntry.lastTimestamp
   }
 
   def removeBatchesOlderThan(offset: Long): Unit = batchMetadata.dropWhile(_.lastOffset < offset)
@@ -171,6 +155,7 @@ private[log] class ProducerStateEntry(val producerId: Long,
       s"producerEpoch=$producerEpoch, " +
       s"currentTxnFirstOffset=$currentTxnFirstOffset, " +
       s"coordinatorEpoch=$coordinatorEpoch, " +
+      s"lastTimestamp=$lastTimestamp, " +
       s"batchMetadata=$batchMetadata"
   }
 }
@@ -186,39 +171,41 @@ private[log] class ProducerStateEntry(val producerId: Long,
  *                      the most recent appends made by the producer. Validation of the first incoming append will
  *                      be made against the latest append in the current entry. New appends will replace older appends
  *                      in the current entry so that the space overhead is constant.
- * @param validationType Indicates the extent of validation to perform on the appends on this instance. Offset commits
- *                       coming from the producer should have ValidationType.EpochOnly. Appends which aren't from a client
- *                       should have ValidationType.None. Appends coming from a client for produce requests should have
- *                       ValidationType.Full.
+ * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset
+ *               commits, which originate from the group coordinator, do not have sequence numbers and therefore
+ *               only producer epoch validation is done. Appends which come through replciation are not validated
+ *               (we assume the validation has already been done) and appends from clients require full validation.
  */
 private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
                                       val producerId: Long,
                                       val currentEntry: ProducerStateEntry,
-                                      val validationType: ValidationType) {
+                                      val origin: AppendOrigin) extends Logging {
+
   private val transactions = ListBuffer.empty[TxnMetadata]
   private val updatedEntry = ProducerStateEntry.empty(producerId)
 
   updatedEntry.producerEpoch = currentEntry.producerEpoch
   updatedEntry.coordinatorEpoch = currentEntry.coordinatorEpoch
+  updatedEntry.lastTimestamp = currentEntry.lastTimestamp
   updatedEntry.currentTxnFirstOffset = currentEntry.currentTxnFirstOffset
 
-  private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int, offset: Long): Unit = {
-    validationType match {
-      case ValidationType.None =>
-
-      case ValidationType.EpochOnly =>
-        checkProducerEpoch(producerEpoch, offset)
-
-      case ValidationType.Full =>
-        checkProducerEpoch(producerEpoch, offset)
-        checkSequence(producerEpoch, firstSeq, offset)
+  private def maybeValidateDataBatch(producerEpoch: Short, firstSeq: Int, offset: Long): Unit = {
+    checkProducerEpoch(producerEpoch, offset)
+    if (origin == AppendOrigin.Client) {
+      checkSequence(producerEpoch, firstSeq, offset)
     }
   }
 
   private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
     if (producerEpoch < updatedEntry.producerEpoch) {
-      throw new ProducerFencedException(s"Producer's epoch at offset $offset is no longer valid in " +
-        s"partition $topicPartition: $producerEpoch (request epoch), ${updatedEntry.producerEpoch} (current epoch)")
+      val message = s"Producer's epoch at offset $offset in $topicPartition is $producerEpoch, which is " +
+        s"smaller than the last seen epoch ${updatedEntry.producerEpoch}"
+
+      if (origin == AppendOrigin.Replication) {
+        warn(message)
+      } else {
+        throw new ProducerFencedException(message)
+      }
     }
   }
 
@@ -263,7 +250,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
     nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Int.MaxValue)
   }
 
-  def append(batch: RecordBatch): Option[CompletedTxn] = {
+  def append(batch: RecordBatch, firstOffsetMetadataOpt: Option[LogOffsetMetadata]): Option[CompletedTxn] = {
     if (batch.isControlBatch) {
       val recordIterator = batch.iterator
       if (recordIterator.hasNext) {
@@ -276,20 +263,22 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
         None
       }
     } else {
-      append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.baseOffset, batch.lastOffset,
-        batch.isTransactional)
+      val firstOffsetMetadata = firstOffsetMetadataOpt.getOrElse(LogOffsetMetadata(batch.baseOffset))
+      appendDataBatch(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp,
+        firstOffsetMetadata, batch.lastOffset, batch.isTransactional)
       None
     }
   }
 
-  def append(epoch: Short,
-             firstSeq: Int,
-             lastSeq: Int,
-             lastTimestamp: Long,
-             firstOffset: Long,
-             lastOffset: Long,
-             isTransactional: Boolean): Unit = {
-    maybeValidateAppend(epoch, firstSeq, firstOffset)
+  def appendDataBatch(epoch: Short,
+                      firstSeq: Int,
+                      lastSeq: Int,
+                      lastTimestamp: Long,
+                      firstOffsetMetadata: LogOffsetMetadata,
+                      lastOffset: Long,
+                      isTransactional: Boolean): Unit = {
+    val firstOffset = firstOffsetMetadata.messageOffset
+    maybeValidateDataBatch(epoch, firstSeq, firstOffset)
     updatedEntry.addBatch(epoch, lastSeq, lastOffset, (lastOffset - firstOffset).toInt, lastTimestamp)
 
     updatedEntry.currentTxnFirstOffset match {
@@ -301,24 +290,32 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
       case None if isTransactional =>
         // Began a new transaction
         updatedEntry.currentTxnFirstOffset = Some(firstOffset)
-        transactions += new TxnMetadata(producerId, firstOffset)
+        transactions += TxnMetadata(producerId, firstOffsetMetadata)
 
       case _ => // nothing to do
     }
   }
 
+  private def checkCoordinatorEpoch(endTxnMarker: EndTransactionMarker, offset: Long): Unit = {
+    if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch) {
+      if (origin == AppendOrigin.Replication) {
+        info(s"Detected invalid coordinator epoch for producerId $producerId at " +
+          s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " +
+          s"is older than previously known coordinator epoch ${updatedEntry.coordinatorEpoch}")
+      } else {
+        throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch for producerId $producerId at " +
+          s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " +
+          s"(zombie), ${updatedEntry.coordinatorEpoch} (current)")
+      }
+    }
+  }
+
   def appendEndTxnMarker(endTxnMarker: EndTransactionMarker,
                          producerEpoch: Short,
                          offset: Long,
                          timestamp: Long): CompletedTxn = {
     checkProducerEpoch(producerEpoch, offset)
-
-    if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch)
-      throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch for producerId $producerId at " +
-        s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " +
-        s"(zombie), ${updatedEntry.coordinatorEpoch} (current)")
-
-    updatedEntry.maybeUpdateEpoch(producerEpoch)
+    checkCoordinatorEpoch(endTxnMarker, offset)
 
     val firstOffset = updatedEntry.currentTxnFirstOffset match {
       case Some(txnFirstOffset) => txnFirstOffset
@@ -327,8 +324,11 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
         offset
     }
 
+    updatedEntry.maybeUpdateProducerEpoch(producerEpoch)
     updatedEntry.currentTxnFirstOffset = None
     updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch
+    updatedEntry.lastTimestamp = timestamp
+
     CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
   }
 
@@ -356,6 +356,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
       s"lastSequence=${updatedEntry.lastSeq}, " +
       s"currentTxnFirstOffset=${updatedEntry.currentTxnFirstOffset}, " +
       s"coordinatorEpoch=${updatedEntry.coordinatorEpoch}, " +
+      s"lastTimestamp=${updatedEntry.lastTimestamp}, " +
       s"startedTransactions=$transactions)"
   }
 }
@@ -409,7 +410,7 @@ object ProducerStateManager {
 
       struct.getArray(ProducerEntriesField).map { producerEntryObj =>
         val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
-        val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
+        val producerId = producerEntryStruct.getLong(ProducerIdField)
         val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
         val seq = producerEntryStruct.getInt(LastSequenceField)
         val offset = producerEntryStruct.getLong(LastOffsetField)
@@ -417,8 +418,12 @@ object ProducerStateManager {
         val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
         val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
         val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
-        val newEntry = new ProducerStateEntry(producerId, mutable.Queue[BatchMetadata](BatchMetadata(seq, offset, offsetDelta, timestamp)), producerEpoch,
-          coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None)
+        val lastAppendedDataBatches = mutable.Queue.empty[BatchMetadata]
+        if (offset >= 0)
+          lastAppendedDataBatches += BatchMetadata(seq, offset, offsetDelta, timestamp)
+
+        val newEntry = new ProducerStateEntry(producerId, lastAppendedDataBatches, producerEpoch,
+          coordinatorEpoch, timestamp, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None)
         newEntry
       }
     } catch {
@@ -631,17 +636,9 @@ class ProducerStateManager(val topicPartition: TopicPartition,
     }
   }
 
-  def prepareUpdate(producerId: Long, isFromClient: Boolean): ProducerAppendInfo = {
-    val validationToPerform =
-      if (!isFromClient)
-        ValidationType.None
-      else if (topicPartition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
-        ValidationType.EpochOnly
-      else
-        ValidationType.Full
-
+  def prepareUpdate(producerId: Long, origin: AppendOrigin): ProducerAppendInfo = {
     val currentEntry = lastEntry(producerId).getOrElse(ProducerStateEntry.empty(producerId))
-    new ProducerAppendInfo(topicPartition, producerId, currentEntry, validationToPerform)
+    new ProducerAppendInfo(topicPartition, producerId, currentEntry, origin)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8622bd5..cb20946 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -31,6 +31,7 @@ import kafka.common.OffsetAndMetadata
 import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
+import kafka.log.AppendOrigin
 import kafka.message.ZStdCompressionCodec
 import kafka.network.RequestChannel
 import kafka.security.SecurityUtils
@@ -504,7 +505,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         timeout = produceRequest.timeout.toLong,
         requiredAcks = produceRequest.acks,
         internalTopicsAllowed = internalTopicsAllowed,
-        isFromClient = true,
+        origin = AppendOrigin.Client,
         entriesPerPartition = authorizedRequestInfo,
         responseCallback = sendResponseCallback,
         recordConversionStatsCallback = processingStatsCallback)
@@ -1729,7 +1730,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           timeout = config.requestTimeoutMs.toLong,
           requiredAcks = -1,
           internalTopicsAllowed = true,
-          isFromClient = false,
+          origin = AppendOrigin.Coordinator,
           entriesPerPartition = controlRecords,
           responseCallback = maybeSendResponseCallback(producerId, marker.transactionResult))
       }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index cd19da0..e1004a1 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -479,7 +479,7 @@ class ReplicaManager(val config: KafkaConfig,
   def appendRecords(timeout: Long,
                     requiredAcks: Short,
                     internalTopicsAllowed: Boolean,
-                    isFromClient: Boolean,
+                    origin: AppendOrigin,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                     delayedProduceLock: Option[Lock] = None,
@@ -487,7 +487,7 @@ class ReplicaManager(val config: KafkaConfig,
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
       val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
-        isFromClient = isFromClient, entriesPerPartition, requiredAcks)
+        origin, entriesPerPartition, requiredAcks)
       debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
 
       val produceStatus = localProduceResults.map { case (topicPartition, result) =>
@@ -742,7 +742,7 @@ class ReplicaManager(val config: KafkaConfig,
    * Append the messages to the local replica logs
    */
   private def appendToLocalLog(internalTopicsAllowed: Boolean,
-                               isFromClient: Boolean,
+                               origin: AppendOrigin,
                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
     trace(s"Append [$entriesPerPartition] to local log")
@@ -758,7 +758,7 @@ class ReplicaManager(val config: KafkaConfig,
       } else {
         try {
           val partition = getPartitionOrException(topicPartition, expectLeader = true)
-          val info = partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
+          val info = partition.appendRecordsToLeader(records, origin, requiredAcks)
           val numAppendedMessages = info.numMessages
 
           // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 540be03..4ec5f08 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -503,8 +503,8 @@ class PartitionTest {
     val follower2Replica = partition.getReplica(follower2).get
 
     // append records with initial leader epoch
-    partition.appendRecordsToLeader(batch1, isFromClient = true)
-    partition.appendRecordsToLeader(batch2, isFromClient = true)
+    partition.appendRecordsToLeader(batch1, origin = AppendOrigin.Client, requiredAcks = 0)
+    partition.appendRecordsToLeader(batch2, origin = AppendOrigin.Client, requiredAcks = 0)
     assertEquals("Expected leader's HW not move", leaderReplica.logStartOffset, leaderReplica.highWatermark.messageOffset)
 
     // let the follower in ISR move leader's HW to move further but below LEO
@@ -767,7 +767,7 @@ class PartitionTest {
       new SimpleRecord("k2".getBytes, "v2".getBytes),
       new SimpleRecord("k3".getBytes, "v3".getBytes)),
       baseOffset = 0L)
-    partition.appendRecordsToLeader(records, isFromClient = true)
+    partition.appendRecordsToLeader(records, origin = AppendOrigin.Client, requiredAcks = 0)
 
     def fetchLatestOffset(isolationLevel: Option[IsolationLevel]): TimestampAndOffset = {
       val res = partition.fetchOffsetForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP,
@@ -877,8 +877,9 @@ class PartitionTest {
     val follower2Replica = partition.getReplica(follower2).get
 
     // append records with initial leader epoch
-    val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1, isFromClient = true).lastOffset
-    partition.appendRecordsToLeader(batch2, isFromClient = true)
+    val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1, origin = AppendOrigin.Client,
+      requiredAcks = 0).lastOffset
+    partition.appendRecordsToLeader(batch2, origin = AppendOrigin.Client, requiredAcks = 0)
     assertEquals("Expected leader's HW not move", leaderReplica.logStartOffset, leaderReplica.highWatermark.messageOffset)
 
     // let the follower in ISR move leader's HW to move further but below LEO
@@ -907,7 +908,7 @@ class PartitionTest {
     val currentLeaderEpochStartOffset = leaderReplica.logEndOffset
 
     // append records with the latest leader epoch
-    partition.appendRecordsToLeader(batch3, isFromClient = true)
+    partition.appendRecordsToLeader(batch3, origin = AppendOrigin.Client, requiredAcks = 0)
 
     // fetch from follower not in ISR from log start offset should not add this follower to ISR
     partition.updateReplicaLogReadResult(follower1Replica,
@@ -997,7 +998,11 @@ class PartitionTest {
       // Append records to partitions, one partition-per-thread
       val futures = partitions.map { partition =>
         executor.submit(CoreUtils.runnable {
-          (1 to 10000).foreach { _ => partition.appendRecordsToLeader(createRecords(baseOffset = 0), isFromClient = true) }
+          (1 to 10000).foreach { _ =>
+            partition.appendRecordsToLeader(createRecords(baseOffset = 0),
+              origin = AppendOrigin.Client,
+              requiredAcks = 0)
+          }
         })
       }
       futures.foreach(_.get(15, TimeUnit.SECONDS))
diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index d5becea..54dee95 100644
--- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.Lock
 
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
-import kafka.log.Log
+import kafka.log.{AppendOrigin, Log}
 import kafka.server._
 import kafka.utils._
 import kafka.utils.timer.MockTimer
@@ -173,7 +173,7 @@ object AbstractCoordinatorConcurrencyTest {
     override def appendRecords(timeout: Long,
                                requiredAcks: Short,
                                internalTopicsAllowed: Boolean,
-                               isFromClient: Boolean,
+                               origin: AppendOrigin,
                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                                delayedProduceLock: Option[Lock] = None,
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 6ac2020..ef7a5ec 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
 import kafka.cluster.Partition
+import kafka.log.AppendOrigin
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.internals.Topic
 import org.junit.Assert._
@@ -2012,7 +2013,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       internalTopicsAllowed = EasyMock.eq(true),
-      isFromClient = EasyMock.eq(false),
+      origin = EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
@@ -2108,7 +2109,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       internalTopicsAllowed = EasyMock.eq(true),
-      isFromClient = EasyMock.eq(false),
+      origin = EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
@@ -2138,7 +2139,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       internalTopicsAllowed = EasyMock.eq(true),
-      isFromClient = EasyMock.eq(false),
+      origin = EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 0287888..901d28a 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -20,7 +20,7 @@ package kafka.coordinator.group
 import kafka.api._
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
-import kafka.log.{Log, LogAppendInfo}
+import kafka.log.{AppendOrigin, Log, LogAppendInfo}
 import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
 import kafka.utils.TestUtils.fail
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
@@ -1200,7 +1200,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -1235,7 +1235,7 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     mockGetPartition()
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(replicaManager, partition)
 
@@ -1283,7 +1283,7 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     mockGetPartition()
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(replicaManager, partition)
 
@@ -1358,7 +1358,7 @@ class GroupMetadataManagerTest {
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -1459,7 +1459,7 @@ class GroupMetadataManagerTest {
     // expect the offset tombstone
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -1483,7 +1483,7 @@ class GroupMetadataManagerTest {
     // expect the offset tombstone
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -1526,7 +1526,7 @@ class GroupMetadataManagerTest {
     // expect the offset tombstone
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -1604,7 +1604,7 @@ class GroupMetadataManagerTest {
     // expect the offset tombstone
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -1858,7 +1858,7 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       internalTopicsAllowed = EasyMock.eq(true),
-      isFromClient = EasyMock.eq(false),
+      origin = EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
@@ -1874,7 +1874,7 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       internalTopicsAllowed = EasyMock.eq(true),
-      isFromClient = EasyMock.eq(false),
+      origin = EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.capture(capturedRecords),
       EasyMock.capture(capturedCallback),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 830e2c3..26d0921 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -19,10 +19,9 @@ package kafka.coordinator.transaction
 import java.nio.ByteBuffer
 import java.util.concurrent.locks.ReentrantLock
 
-import kafka.log.Log
+import kafka.log.{AppendOrigin, Log}
 import kafka.server.{FetchDataInfo, LogOffsetMetadata, ReplicaManager}
 import kafka.utils.{MockScheduler, Pool}
-import kafka.utils.TestUtils.fail
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
@@ -31,13 +30,13 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.MockTime
+import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
 import org.junit.{After, Before, Test}
-import org.easymock.{Capture, EasyMock, IAnswer}
+import org.scalatest.Assertions.fail
 
-import scala.collection.Map
-import scala.collection.mutable
 import scala.collection.JavaConverters._
+import scala.collection.{Map, mutable}
 
 class TransactionStateManagerTest {
 
@@ -523,7 +522,7 @@ class TransactionStateManagerTest {
         EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
           EasyMock.eq((-1).toShort),
           EasyMock.eq(true),
-          EasyMock.eq(false),
+          EasyMock.eq(AppendOrigin.Coordinator),
           EasyMock.eq(recordsByPartition),
           EasyMock.capture(capturedArgument),
           EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
@@ -636,7 +635,7 @@ class TransactionStateManagerTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       internalTopicsAllowed = EasyMock.eq(true),
-      isFromClient = EasyMock.eq(false),
+      origin = EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 8ca26a8..8042c2d 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -413,7 +413,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     assertEquals(0L, cleanableOffsets._2)
 
     log.appendAsLeader(MemoryRecords.withEndTransactionMarker(time.milliseconds(), producerId, producerEpoch,
-      new EndTransactionMarker(ControlRecordType.ABORT, 15)), leaderEpoch = 0, isFromClient = false)
+      new EndTransactionMarker(ControlRecordType.ABORT, 15)), leaderEpoch = 0,
+      origin = AppendOrigin.Coordinator)
     log.roll()
     log.onHighWatermarkIncremented(4L)
 
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 7c42d7b..4a0158b 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -264,10 +264,10 @@ class LogCleanerTest extends JUnitSuite {
     appendProducer1(Seq(1, 2))
     appendProducer2(Seq(2, 3))
     appendProducer1(Seq(3, 4))
-    log.appendAsLeader(abortMarker(pid1, producerEpoch), leaderEpoch = 0, isFromClient = false)
-    log.appendAsLeader(commitMarker(pid2, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(pid1, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
+    log.appendAsLeader(commitMarker(pid2, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     appendProducer1(Seq(2))
-    log.appendAsLeader(commitMarker(pid1, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(pid1, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
 
     val abortedTransactions = log.collectAbortedTransactions(log.logStartOffset, log.logEndOffset)
 
@@ -305,11 +305,11 @@ class LogCleanerTest extends JUnitSuite {
     appendProducer2(Seq(5, 6))
     appendProducer3(Seq(6, 7))
     appendProducer1(Seq(7, 8))
-    log.appendAsLeader(abortMarker(pid2, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(pid2, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     appendProducer3(Seq(8, 9))
-    log.appendAsLeader(commitMarker(pid3, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(pid3, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     appendProducer1(Seq(9, 10))
-    log.appendAsLeader(abortMarker(pid1, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(pid1, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
 
     // we have only cleaned the records in the first segment
     val dirtyOffset = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))._1
@@ -340,9 +340,9 @@ class LogCleanerTest extends JUnitSuite {
 
     appendProducer(Seq(1))
     appendProducer(Seq(2, 3))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     appendProducer(Seq(2))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     // cannot remove the marker in this pass because there are still valid records
@@ -351,7 +351,7 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals(List(0, 2, 3, 4, 5), offsetsInLog(log))
 
     appendProducer(Seq(1, 3))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     // the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
@@ -388,10 +388,10 @@ class LogCleanerTest extends JUnitSuite {
     val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
 
     appendProducer(Seq(1))
-    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     appendProducer(Seq(2))
     appendProducer(Seq(2))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
@@ -421,14 +421,16 @@ class LogCleanerTest extends JUnitSuite {
 
     // [{Producer1: 2, 3}], [{Producer2: 2, 3}, {Producer2: Commit}]
     producer2(Seq(2, 3)) // offsets 2, 3
-    log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 4
+    log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0,
+      origin = AppendOrigin.Coordinator) // offset 4
     log.roll()
 
     // [{Producer1: 2, 3}], [{Producer2: 2, 3}, {Producer2: Commit}], [{2}, {3}, {Producer1: Commit}]
     //  {0, 1},              {2, 3},            {4},                   {5}, {6}, {7} ==> Offsets
     log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 5
     log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 6
-    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 7
+    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0,
+      origin = AppendOrigin.Coordinator) // offset 7
     log.roll()
 
     // first time through the records are removed
@@ -449,7 +451,8 @@ class LogCleanerTest extends JUnitSuite {
     // [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
     //  {1},                     {3},                     {4},                 {5}, {6}, {7},                 {8},            {9} ==> Offsets
     producer2(Seq(1)) // offset 8
-    log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 9
+    log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0,
+      origin = AppendOrigin.Coordinator) // offset 9
     log.roll()
 
     // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
@@ -476,7 +479,8 @@ class LogCleanerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
 
     // [{Producer1: Commit}, {2}, {3}]
-    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 7
+    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0,
+      origin = AppendOrigin.Coordinator) // offset 1
     log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 2
     log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 3
     log.roll()
@@ -510,7 +514,7 @@ class LogCleanerTest extends JUnitSuite {
     appendTransaction(Seq(1))
     log.roll()
 
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     // Both the record and the marker should remain after cleaning
@@ -533,7 +537,7 @@ class LogCleanerTest extends JUnitSuite {
     appendTransaction(Seq(1))
     log.roll()
 
-    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     // Both the batch and the marker should remain after cleaning. The batch is retained
@@ -563,9 +567,9 @@ class LogCleanerTest extends JUnitSuite {
 
     appendProducer(Seq(1))
     appendProducer(Seq(2, 3))
-    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     appendProducer(Seq(3))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     // delete horizon set to 0 to verify marker is not removed early
@@ -592,13 +596,15 @@ class LogCleanerTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 2048: java.lang.Integer)
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
-    val appendFirstTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch, isFromClient = false)
+    val appendFirstTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch,
+      origin = AppendOrigin.Replication)
     appendFirstTransaction(Seq(1))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
 
-    val appendSecondTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch, isFromClient = false)
+    val appendSecondTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch,
+      origin = AppendOrigin.Replication)
     appendSecondTransaction(Seq(2))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
 
     log.appendAsLeader(record(1, 1), leaderEpoch = 0)
     log.appendAsLeader(record(2, 1), leaderEpoch = 0)
@@ -631,7 +637,7 @@ class LogCleanerTest extends JUnitSuite {
     val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
 
     appendProducer(Seq(2, 3)) // batch last offset is 1
-    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     def assertAbortedTransactionIndexed(): Unit = {
@@ -871,7 +877,7 @@ class LogCleanerTest extends JUnitSuite {
 
     appendProducer(Seq(1))
     appendProducer(Seq(2, 3))
-    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
@@ -1667,8 +1673,8 @@ class LogCleanerTest extends JUnitSuite {
                                           producerId: Long,
                                           producerEpoch: Short,
                                           leaderEpoch: Int = 0,
-                                          isFromClient: Boolean = true): Seq[Int] => LogAppendInfo = {
-    appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, isFromClient = isFromClient)
+                                          origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = {
+    appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, origin = origin)
   }
 
   private def appendIdempotentAsLeader(log: Log,
@@ -1676,7 +1682,7 @@ class LogCleanerTest extends JUnitSuite {
                                        producerEpoch: Short,
                                        isTransactional: Boolean = false,
                                        leaderEpoch: Int = 0,
-                                       isFromClient: Boolean = true): Seq[Int] => LogAppendInfo = {
+                                       origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = {
     var sequence = 0
     keys: Seq[Int] => {
       val simpleRecords = keys.map { key =>
@@ -1688,7 +1694,7 @@ class LogCleanerTest extends JUnitSuite {
       else
         MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords: _*)
       sequence += simpleRecords.size
-      log.appendAsLeader(records, leaderEpoch, isFromClient)
+      log.appendAsLeader(records, leaderEpoch, origin)
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 353e553..43d98cd 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -352,7 +352,8 @@ class LogSegmentTest {
     // recover again, but this time assuming the transaction from pid2 began on a previous segment
     stateManager = new ProducerStateManager(topicPartition, logDir)
     stateManager.loadProducerEntry(new ProducerStateEntry(pid2,
-      mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, 0, Some(75L)))
+      mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch,
+      0, RecordBatch.NO_TIMESTAMP, Some(75L)))
     segment.recover(stateManager)
     assertEquals(108L, stateManager.mapEndOffset)
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 80fadc4..5320a20 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -84,7 +84,6 @@ class LogTest {
     assertFalse(Log.FutureDirPattern.matcher(name1).matches())
     val name2 = Log.logDeleteDirName(
       new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 5))
-    System.out.println("name2 = " + name2)
     assertEquals(255, name2.length)
     assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches())
     assertTrue(Log.DeleteDirPattern.matcher(name2).matches())
@@ -285,6 +284,44 @@ class LogTest {
     log.close()
   }
 
+
+  @Test
+  def testRecoverAfterNonMonotonicCoordinatorEpochWrite(): Unit = {
+    // Due to KAFKA-9144, we may encounter a coordinator epoch which goes backwards.
+    // This test case verifies that recovery logic relaxes validation in this case and
+    // just takes the latest write.
+
+    val producerId = 1L
+    val coordinatorEpoch = 5
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    var log = createLog(logDir, logConfig)
+    val epoch = 0.toShort
+
+    val firstAppendTimestamp = mockTime.milliseconds()
+    val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, epoch, 0,
+      new SimpleRecord("foo".getBytes),
+      new SimpleRecord("bar".getBytes),
+      new SimpleRecord("baz".getBytes))
+    log.appendAsLeader(records, leaderEpoch = epoch)
+    appendEndTxnMarkerAsLeader(log, producerId, epoch, ControlRecordType.ABORT,
+      timestamp = firstAppendTimestamp, coordinatorEpoch = coordinatorEpoch)
+    assertEquals(firstAppendTimestamp, log.producerStateManager.lastEntry(producerId).get.lastTimestamp)
+
+    mockTime.sleep(log.maxProducerIdExpirationMs)
+    assertEquals(None, log.producerStateManager.lastEntry(producerId))
+
+    val secondAppendTimestamp = mockTime.milliseconds()
+    appendEndTxnMarkerAsLeader(log, producerId, epoch, ControlRecordType.ABORT,
+      timestamp = secondAppendTimestamp, coordinatorEpoch = coordinatorEpoch - 1)
+
+    log.close()
+
+    // Force recovery by setting the recoveryPoint to the log start
+    log = createLog(logDir, logConfig, recoveryPoint = 0L)
+    assertEquals(secondAppendTimestamp, log.producerStateManager.lastEntry(producerId).get.lastTimestamp)
+    log.close()
+  }
+
   @Test
   def testProducerSnapshotsRecoveryAfterUncleanShutdownV1(): Unit = {
     testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.minSupportedFor(RecordVersion.V1).version)
@@ -661,10 +698,10 @@ class LogTest {
 
     // create a batch with a couple gaps to simulate compaction
     val records = TestUtils.records(producerId = pid, producerEpoch = epoch, sequence = seq, baseOffset = baseOffset, records = List(
-      new SimpleRecord(System.currentTimeMillis(), "a".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "b".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "c".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "d".getBytes)))
+      new SimpleRecord(mockTime.milliseconds(), "a".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "b".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "c".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "d".getBytes)))
     records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
 
     val filtered = ByteBuffer.allocate(2048)
@@ -679,8 +716,8 @@ class LogTest {
 
     // append some more data and then truncate to force rebuilding of the PID map
     val moreRecords = TestUtils.records(baseOffset = baseOffset + 4, records = List(
-      new SimpleRecord(System.currentTimeMillis(), "e".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "f".getBytes)))
+      new SimpleRecord(mockTime.milliseconds(), "e".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "f".getBytes)))
     moreRecords.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
     log.appendAsFollower(moreRecords)
 
@@ -704,8 +741,8 @@ class LogTest {
 
     // create an empty batch
     val records = TestUtils.records(producerId = pid, producerEpoch = epoch, sequence = seq, baseOffset = baseOffset, records = List(
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "a".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "b".getBytes)))
+      new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "a".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "b".getBytes)))
     records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
 
     val filtered = ByteBuffer.allocate(2048)
@@ -720,8 +757,8 @@ class LogTest {
 
     // append some more data and then truncate to force rebuilding of the PID map
     val moreRecords = TestUtils.records(baseOffset = baseOffset + 2, records = List(
-      new SimpleRecord(System.currentTimeMillis(), "e".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "f".getBytes)))
+      new SimpleRecord(mockTime.milliseconds(), "e".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "f".getBytes)))
     moreRecords.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
     log.appendAsFollower(moreRecords)
 
@@ -745,10 +782,10 @@ class LogTest {
 
     // create a batch with a couple gaps to simulate compaction
     val records = TestUtils.records(producerId = pid, producerEpoch = epoch, sequence = seq, baseOffset = baseOffset, records = List(
-      new SimpleRecord(System.currentTimeMillis(), "a".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "b".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "c".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "d".getBytes)))
+      new SimpleRecord(mockTime.milliseconds(), "a".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "b".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "c".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "d".getBytes)))
     records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
 
     val filtered = ByteBuffer.allocate(2048)
@@ -989,7 +1026,7 @@ class LogTest {
 
     val lastEntry = log.producerStateManager.lastEntry(producerId)
     assertTrue(lastEntry.isDefined)
-    assertEquals(0L, lastEntry.get.firstOffset)
+    assertEquals(0L, lastEntry.get.firstDataOffset)
     assertEquals(0L, lastEntry.get.lastDataOffset)
   }
 
@@ -1008,9 +1045,8 @@ class LogTest {
       new SimpleRecord("bar".getBytes),
       new SimpleRecord("baz".getBytes))
     log.appendAsLeader(records, leaderEpoch = 0)
-    val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid, epoch),
-      isFromClient = false, leaderEpoch = 0)
-    log.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1)
+    val abortAppendInfo = appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT)
+    log.onHighWatermarkIncremented(abortAppendInfo.lastOffset + 1)
 
     // now there should be no first unstable offset
     assertEquals(None, log.firstUnstableOffset)
@@ -1018,7 +1054,7 @@ class LogTest {
     log.close()
 
     val reopenedLog = createLog(logDir, logConfig)
-    reopenedLog.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1)
+    reopenedLog.onHighWatermarkIncremented(abortAppendInfo.lastOffset + 1)
     assertEquals(None, reopenedLog.firstUnstableOffset)
   }
 
@@ -1027,9 +1063,10 @@ class LogTest {
                             epoch: Short,
                             offset: Long = 0L,
                             coordinatorEpoch: Int = 0,
-                            partitionLeaderEpoch: Int = 0): MemoryRecords = {
+                            partitionLeaderEpoch: Int = 0,
+                            timestamp: Long = mockTime.milliseconds()): MemoryRecords = {
     val marker = new EndTransactionMarker(controlRecordType, coordinatorEpoch)
-    MemoryRecords.withEndTransactionMarker(offset, mockTime.milliseconds(), partitionLeaderEpoch, producerId, epoch, marker)
+    MemoryRecords.withEndTransactionMarker(offset, timestamp, partitionLeaderEpoch, producerId, epoch, marker)
   }
 
   @Test
@@ -3199,7 +3236,7 @@ class LogTest {
 
     val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
     val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, offset,
-      System.currentTimeMillis, leaderEpoch)
+      mockTime.milliseconds, leaderEpoch)
     records.foreach(builder.append)
     builder.build()
   }
@@ -3244,8 +3281,7 @@ class LogTest {
     assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
 
     // now transaction is committed
-    val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.COMMIT, pid, epoch),
-      isFromClient = false, leaderEpoch = 0)
+    val commitAppendInfo = appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT)
 
     // first unstable offset is not updated until the high watermark is advanced
     assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
@@ -3539,7 +3575,20 @@ class LogTest {
   }
 
   @Test
-  def testFirstUnstableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
+  def testEndTxnWithFencedProducerEpoch(): Unit = {
+    val producerId = 1L
+    val epoch = 5.toShort
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
+    appendEndTxnMarkerAsLeader(log, producerId, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
+
+    assertThrows[ProducerFencedException] {
+      appendEndTxnMarkerAsLeader(log, producerId, (epoch - 1).toShort, ControlRecordType.ABORT, coordinatorEpoch = 1)
+    }
+  }
+
+  @Test
+  def testLastStableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
@@ -3666,16 +3715,14 @@ class LogTest {
     assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
 
     // now first producer's transaction is aborted
-    val abortAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid1, epoch),
-      isFromClient = false, leaderEpoch = 0)
+    val abortAppendInfo = appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT)
     log.onHighWatermarkIncremented(abortAppendInfo.lastOffset + 1)
 
     // LSO should now point to one less than the first offset of the second transaction
     assertEquals(Some(secondAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
 
     // commit the second transaction
-    val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.COMMIT, pid2, epoch),
-      isFromClient = false, leaderEpoch = 0)
+    val commitAppendInfo = appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.COMMIT)
     log.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1)
 
     // now there should be no first unstable offset
@@ -3711,9 +3758,8 @@ class LogTest {
     assertEquals(3L, log.logEndOffsetMetadata.segmentBaseOffset)
 
     // now abort the transaction
-    val appendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid, epoch),
-      isFromClient = false, leaderEpoch = 0)
-    log.onHighWatermarkIncremented(appendInfo.lastOffset + 1)
+    val abortAppendInfo = appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT)
+    log.onHighWatermarkIncremented(abortAppendInfo.lastOffset + 1)
     assertEquals(None, log.firstUnstableOffset.map(_.messageOffset))
 
     // now check that a fetch includes the aborted transaction
@@ -3740,7 +3786,7 @@ class LogTest {
     var sequence = 0
     numRecords: Int => {
       val simpleRecords = (sequence until sequence + numRecords).map { seq =>
-        new SimpleRecord(s"$seq".getBytes)
+        new SimpleRecord(mockTime.milliseconds(), s"$seq".getBytes)
       }
       val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId,
         producerEpoch, sequence, simpleRecords: _*)
@@ -3754,9 +3800,11 @@ class LogTest {
                                          producerEpoch: Short,
                                          controlType: ControlRecordType,
                                          coordinatorEpoch: Int = 0,
-                                         leaderEpoch: Int = 0): Unit = {
-    val records = endTxnRecords(controlType, producerId, producerEpoch, coordinatorEpoch = coordinatorEpoch)
-    log.appendAsLeader(records, isFromClient = false, leaderEpoch = leaderEpoch)
+                                         leaderEpoch: Int = 0,
+                                         timestamp: Long = mockTime.milliseconds()): LogAppendInfo = {
+    val records = endTxnRecords(controlType, producerId, producerEpoch,
+      coordinatorEpoch = coordinatorEpoch, timestamp = timestamp)
+    log.appendAsLeader(records, origin = AppendOrigin.Coordinator, leaderEpoch = leaderEpoch)
   }
 
   private def appendNonTransactionalAsLeader(log: Log, numRecords: Int): Unit = {
@@ -3774,7 +3822,7 @@ class LogTest {
     var sequence = 0
     (offset: Long, numRecords: Int) => {
       val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME,
-        offset, System.currentTimeMillis(), producerId, producerEpoch, sequence, true, leaderEpoch)
+        offset, mockTime.milliseconds(), producerId, producerEpoch, sequence, true, leaderEpoch)
       for (seq <- sequence until sequence + numRecords) {
         val record = new SimpleRecord(s"$seq".getBytes)
         builder.append(record)
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 37553b9..0404bb5 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -56,7 +56,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatedResults.validatedRecords
     assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size)
@@ -94,7 +94,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatedResults.validatedRecords
 
@@ -136,7 +136,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatedResults.validatedRecords
 
@@ -194,7 +194,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
@@ -236,7 +236,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = partitionLeaderEpoch,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatingResults.validatedRecords
 
@@ -303,7 +303,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = partitionLeaderEpoch,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatingResults.validatedRecords
 
@@ -354,7 +354,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatedResults.validatedRecords
 
@@ -396,7 +396,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatedResults.validatedRecords
 
@@ -451,7 +451,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = partitionLeaderEpoch,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
     val validatedRecords = validatedResults.validatedRecords
 
@@ -502,7 +502,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
@@ -523,7 +523,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
@@ -544,7 +544,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
@@ -565,7 +565,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
@@ -585,7 +585,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset)
   }
 
@@ -605,7 +605,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset)
   }
 
@@ -626,7 +626,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords
     checkOffsets(messageWithOffset, offset)
   }
@@ -648,7 +648,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords
     checkOffsets(messageWithOffset, offset)
   }
@@ -671,7 +671,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords
     checkOffsets(compressedMessagesWithOffset, offset)
   }
@@ -694,7 +694,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords
     checkOffsets(compressedMessagesWithOffset, offset)
   }
@@ -715,7 +715,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
     checkOffsets(validatedResults.validatedRecords, offset)
     verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
@@ -738,7 +738,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
     checkOffsets(validatedResults.validatedRecords, offset)
     verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
@@ -761,7 +761,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
     checkOffsets(validatedResults.validatedRecords, offset)
     verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
@@ -784,7 +784,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
     checkOffsets(validatedResults.validatedRecords, offset)
     verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
@@ -807,7 +807,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
@@ -827,7 +827,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = false,
+      origin = AppendOrigin.Coordinator,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
     val batches = TestUtils.toList(result.validatedRecords.batches)
     assertEquals(1, batches.size)
@@ -852,7 +852,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset)
   }
 
@@ -873,7 +873,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset)
   }
 
@@ -893,7 +893,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset)
   }
 
@@ -913,7 +913,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset)
   }
 
@@ -934,7 +934,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset)
   }
 
@@ -955,7 +955,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset)
   }
 
@@ -978,7 +978,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset)
   }
 
@@ -1001,7 +1001,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset)
   }
 
@@ -1022,7 +1022,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset)
   }
 
@@ -1043,7 +1043,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset)
   }
 
@@ -1062,7 +1062,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
@@ -1087,7 +1087,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = KAFKA_2_0_IV1)
   }
 
@@ -1122,7 +1122,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion)
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 1d29566..1b69207 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -117,7 +117,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 15.toShort
     val sequence = Int.MaxValue
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
+    append(stateManager, producerId, epoch, sequence, offset, origin = AppendOrigin.Replication)
 
     append(stateManager, producerId, epoch, 0, offset + 500)
 
@@ -135,9 +135,10 @@ class ProducerStateManagerTest extends JUnitSuite {
   def testProducerSequenceWithWrapAroundBatchRecord(): Unit = {
     val epoch = 15.toShort
 
-    val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = false)
+    val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Replication)
     // Sequence number wrap around
-    appendInfo.append(epoch, Int.MaxValue-10, 9, time.milliseconds(), 2000L, 2020L, isTransactional = false)
+    appendInfo.appendDataBatch(epoch, Int.MaxValue - 10, 9, time.milliseconds(),
+      LogOffsetMetadata(2000L), 2020L, isTransactional = false)
     assertEquals(None, stateManager.lastEntry(producerId))
     stateManager.update(appendInfo)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
@@ -145,7 +146,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val lastEntry = stateManager.lastEntry(producerId).get
     assertEquals(Int.MaxValue-10, lastEntry.firstSeq)
     assertEquals(9, lastEntry.lastSeq)
-    assertEquals(2000L, lastEntry.firstOffset)
+    assertEquals(2000L, lastEntry.firstDataOffset)
     assertEquals(2020L, lastEntry.lastDataOffset)
   }
 
@@ -154,7 +155,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 15.toShort
     val sequence = Int.MaxValue
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
+    append(stateManager, producerId, epoch, sequence, offset, origin = AppendOrigin.Replication)
     append(stateManager, producerId, epoch, 1, offset + 500)
   }
 
@@ -163,7 +164,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 5.toShort
     val sequence = 16
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
+    append(stateManager, producerId, epoch, sequence, offset, origin = AppendOrigin.Replication)
 
     val maybeLastEntry = stateManager.lastEntry(producerId)
     assertTrue(maybeLastEntry.isDefined)
@@ -173,7 +174,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     assertEquals(sequence, lastEntry.firstSeq)
     assertEquals(sequence, lastEntry.lastSeq)
     assertEquals(offset, lastEntry.lastDataOffset)
-    assertEquals(offset, lastEntry.firstOffset)
+    assertEquals(offset, lastEntry.firstDataOffset)
   }
 
   @Test
@@ -208,12 +209,12 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
-    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset, isTransactional = true)
+    val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), AppendOrigin.Client)
 
     val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
       relativePositionInSegment = 234224)
-    producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
+    producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, time.milliseconds(),
+      logOffsetMetadata, offset, isTransactional = true)
     stateManager.update(producerAppendInfo)
 
     assertEquals(Some(logOffsetMetadata), stateManager.firstUnstableOffset)
@@ -230,11 +231,12 @@ class ProducerStateManagerTest extends JUnitSuite {
         partition,
         producerId,
         ProducerStateEntry.empty(producerId),
-        ValidationType.Full)
-      producerAppendInfo.append(producerEpoch, 0, 0, time.milliseconds(), startOffset, startOffset, isTransactional = true)
+        AppendOrigin.Client
+      )
       val logOffsetMetadata = LogOffsetMetadata(messageOffset = startOffset, segmentBaseOffset = segmentBaseOffset,
         relativePositionInSegment = 50 * relativeOffset)
-      producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
+      producerAppendInfo.appendDataBatch(producerEpoch, 0, 0, time.milliseconds(),
+        logOffsetMetadata, startOffset, isTransactional = true)
       stateManager.update(producerAppendInfo)
     }
 
@@ -277,8 +279,10 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
-    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset, isTransactional = true)
+    val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId),
+      origin = AppendOrigin.Client)
+    producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, time.milliseconds(),
+      LogOffsetMetadata(offset), offset, isTransactional = true)
 
     // use some other offset to simulate a follower append where the log offset metadata won't typically
     // match any of the transaction first offsets
@@ -294,14 +298,16 @@ class ProducerStateManagerTest extends JUnitSuite {
   def testPrepareUpdateDoesNotMutate(): Unit = {
     val producerEpoch = 0.toShort
 
-    val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
-    appendInfo.append(producerEpoch, 0, 5, time.milliseconds(), 15L, 20L, isTransactional = false)
+    val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client)
+    appendInfo.appendDataBatch(producerEpoch, 0, 5, time.milliseconds(),
+      LogOffsetMetadata(15L), 20L, isTransactional = false)
     assertEquals(None, stateManager.lastEntry(producerId))
     stateManager.update(appendInfo)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
 
-    val nextAppendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
-    nextAppendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 26L, 30L, isTransactional = false)
+    val nextAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client)
+    nextAppendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(),
+      LogOffsetMetadata(26L), 30L, isTransactional = false)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
 
     var lastEntry = stateManager.lastEntry(producerId).get
@@ -323,23 +329,25 @@ class ProducerStateManagerTest extends JUnitSuite {
     val offset = 9L
     append(stateManager, producerId, producerEpoch, 0, offset)
 
-    val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
-    appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 16L, 20L, isTransactional = true)
+    val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client)
+    appendInfo.appendDataBatch(producerEpoch, 1, 5, time.milliseconds(),
+      LogOffsetMetadata(16L), 20L, isTransactional = true)
     var lastEntry = appendInfo.toEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(1, lastEntry.firstSeq)
     assertEquals(5, lastEntry.lastSeq)
-    assertEquals(16L, lastEntry.firstOffset)
+    assertEquals(16L, lastEntry.firstDataOffset)
     assertEquals(20L, lastEntry.lastDataOffset)
     assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
     assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
 
-    appendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 26L, 30L, isTransactional = true)
+    appendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(),
+      LogOffsetMetadata(26L), 30L, isTransactional = true)
     lastEntry = appendInfo.toEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(1, lastEntry.firstSeq)
     assertEquals(10, lastEntry.lastSeq)
-    assertEquals(16L, lastEntry.firstOffset)
+    assertEquals(16L, lastEntry.firstDataOffset)
     assertEquals(30L, lastEntry.lastDataOffset)
     assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
     assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
@@ -356,7 +364,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     // verify that appending the transaction marker doesn't affect the metadata of the cached record batches.
     assertEquals(1, lastEntry.firstSeq)
     assertEquals(10, lastEntry.lastSeq)
-    assertEquals(16L, lastEntry.firstOffset)
+    assertEquals(16L, lastEntry.firstDataOffset)
     assertEquals(30L, lastEntry.lastDataOffset)
     assertEquals(coordinatorEpoch, lastEntry.coordinatorEpoch)
     assertEquals(None, lastEntry.currentTxnFirstOffset)
@@ -429,17 +437,81 @@ class ProducerStateManagerTest extends JUnitSuite {
   }
 
   @Test
-  def testRecoverFromSnapshot(): Unit = {
+  def testRecoverFromSnapshotUnfinishedTransaction(): Unit = {
     val epoch = 0.toShort
-    append(stateManager, producerId, epoch, 0, 0L)
-    append(stateManager, producerId, epoch, 1, 1L)
+    append(stateManager, producerId, epoch, 0, 0L, isTransactional = true)
+    append(stateManager, producerId, epoch, 1, 1L, isTransactional = true)
 
     stateManager.takeSnapshot()
     val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
     recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds)
 
+    // The snapshot only persists the last appended batch metadata
+    val loadedEntry = recoveredMapping.lastEntry(producerId)
+    assertEquals(1, loadedEntry.get.firstDataOffset)
+    assertEquals(1, loadedEntry.get.firstSeq)
+    assertEquals(1, loadedEntry.get.lastDataOffset)
+    assertEquals(1, loadedEntry.get.lastSeq)
+    assertEquals(Some(0), loadedEntry.get.currentTxnFirstOffset)
+
     // entry added after recovery
-    append(recoveredMapping, producerId, epoch, 2, 2L)
+    append(recoveredMapping, producerId, epoch, 2, 2L, isTransactional = true)
+  }
+
+  @Test
+  def testRecoverFromSnapshotFinishedTransaction(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 0, 0L, isTransactional = true)
+    append(stateManager, producerId, epoch, 1, 1L, isTransactional = true)
+    appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.ABORT, offset = 2L)
+
+    stateManager.takeSnapshot()
+    val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds)
+
+    // The snapshot only persists the last appended batch metadata
+    val loadedEntry = recoveredMapping.lastEntry(producerId)
+    assertEquals(1, loadedEntry.get.firstDataOffset)
+    assertEquals(1, loadedEntry.get.firstSeq)
+    assertEquals(1, loadedEntry.get.lastDataOffset)
+    assertEquals(1, loadedEntry.get.lastSeq)
+    assertEquals(None, loadedEntry.get.currentTxnFirstOffset)
+  }
+
+  @Test
+  def testRecoverFromSnapshotEmptyTransaction(): Unit = {
+    val epoch = 0.toShort
+    val appendTimestamp = time.milliseconds()
+    append(stateManager, producerId, epoch, 0, 0L, isTransactional = true)
+    appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.ABORT,
+      offset = 1L, timestamp = appendTimestamp)
+    appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.ABORT,
+      offset = 2L, timestamp = appendTimestamp)
+    stateManager.takeSnapshot()
+
+    val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(logStartOffset = 0L, logEndOffset = 3L, time.milliseconds)
+
+    val lastEntry = recoveredMapping.lastEntry(producerId)
+    assertTrue(lastEntry.isDefined)
+    assertEquals(appendTimestamp, lastEntry.get.lastTimestamp)
+    assertEquals(None, lastEntry.get.currentTxnFirstOffset)
+  }
+
+  @Test
+  def testProducerStateAfterFencingAbortMarker(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 0, 0L, isTransactional = true)
+    appendEndTxnMarker(stateManager, producerId, (epoch + 1).toShort, ControlRecordType.ABORT, offset = 1L)
+
+    val lastEntry = stateManager.lastEntry(producerId).get
+    assertEquals(None, lastEntry.currentTxnFirstOffset)
+    assertEquals(-1, lastEntry.lastDataOffset)
+    assertEquals(-1, lastEntry.firstDataOffset)
+
+    // The producer should not be expired because we want to preserve fencing epochs
+    stateManager.removeExpiredProducers(time.milliseconds())
+    assertTrue(stateManager.lastEntry(producerId).isDefined)
   }
 
   @Test(expected = classOf[UnknownProducerIdException])
@@ -471,7 +543,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Nonetheless
     // the append on a replica should be accepted with the local producer state updated to the appended value.
     assertFalse(recoveredMapping.activeProducers.contains(producerId))
-    append(recoveredMapping, producerId, epoch, sequence, 2L, 70001, isFromClient = false)
+    append(recoveredMapping, producerId, epoch, sequence, 2L, 70001, origin = AppendOrigin.Replication)
     assertTrue(recoveredMapping.activeProducers.contains(producerId))
     val producerStateEntry = recoveredMapping.activeProducers.get(producerId).head
     assertEquals(epoch, producerStateEntry.producerEpoch)
@@ -487,7 +559,7 @@ class ProducerStateManagerTest extends JUnitSuite {
 
     // First we ensure that we raise an OutOfOrderSequenceException is raised when the append comes from a client.
     try {
-      append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = true)
+      append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, origin = AppendOrigin.Client)
       fail("Expected an OutOfOrderSequenceException to be raised.")
     } catch {
       case _ : OutOfOrderSequenceException =>
@@ -497,7 +569,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     }
 
     assertEquals(0L, stateManager.activeProducers(producerId).lastSeq)
-    append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = false)
+    append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, origin = AppendOrigin.Replication)
     assertEquals(outOfOrderSequence, stateManager.activeProducers(producerId).lastSeq)
   }
 
@@ -738,9 +810,10 @@ class ProducerStateManagerTest extends JUnitSuite {
     val stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
 
     val epoch = 0.toShort
-    append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 99, isTransactional = true)
-    append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 100, isTransactional = true)
-
+    append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 99,
+      isTransactional = true, origin = AppendOrigin.Coordinator)
+    append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 100,
+      isTransactional = true, origin = AppendOrigin.Coordinator)
   }
 
   @Test(expected = classOf[ProducerFencedException])
@@ -831,7 +904,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     EasyMock.replay(batch)
 
     // Appending the empty control batch should not throw and a new transaction shouldn't be started
-    append(stateManager, producerId, producerEpoch, baseOffset, batch, isFromClient = true)
+    append(stateManager, producerId, producerEpoch, baseOffset, batch, origin = AppendOrigin.Client)
     assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
   }
 
@@ -872,7 +945,7 @@ class ProducerStateManagerTest extends JUnitSuite {
                                  offset: Long,
                                  coordinatorEpoch: Int = 0,
                                  timestamp: Long = time.milliseconds()): (CompletedTxn, Long) = {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Coordinator)
     val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
     val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp)
     mapping.update(producerAppendInfo)
@@ -889,9 +962,10 @@ class ProducerStateManagerTest extends JUnitSuite {
                      offset: Long,
                      timestamp: Long = time.milliseconds(),
                      isTransactional: Boolean = false,
-                     isFromClient : Boolean = true): Unit = {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
-    producerAppendInfo.append(producerEpoch, seq, seq, timestamp, offset, offset, isTransactional)
+                     origin : AppendOrigin = AppendOrigin.Client): Unit = {
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, origin)
+    producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, timestamp,
+      LogOffsetMetadata(offset), offset, isTransactional)
     stateManager.update(producerAppendInfo)
     stateManager.updateMapEndOffset(offset + 1)
   }
@@ -901,14 +975,14 @@ class ProducerStateManagerTest extends JUnitSuite {
                      producerEpoch: Short,
                      offset: Long,
                      batch: RecordBatch,
-                     isFromClient : Boolean): Unit = {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
-    producerAppendInfo.append(batch)
+                     origin: AppendOrigin): Unit = {
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, origin)
+    producerAppendInfo.append(batch, firstOffsetMetadataOpt = None)
     stateManager.update(producerAppendInfo)
     stateManager.updateMapEndOffset(offset + 1)
   }
 
-  private def currentSnapshotOffsets =
+  private def currentSnapshotOffsets: Set[Long] =
     logDir.listFiles.map(Log.offsetFromFile).toSet
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f3eae1e..249c171 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -27,6 +27,7 @@ import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.log.AppendOrigin
 import kafka.network.RequestChannel
 import kafka.network.RequestChannel.SendResponse
 import kafka.security.auth.Authorizer
@@ -268,7 +269,7 @@ class KafkaApisTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.eq(true),
-      EasyMock.eq(false),
+      EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
       EasyMock.anyObject(),
@@ -307,7 +308,7 @@ class KafkaApisTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.eq(true),
-      EasyMock.eq(false),
+      EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
       EasyMock.anyObject(),
@@ -338,7 +339,7 @@ class KafkaApisTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.eq(true),
-      EasyMock.eq(false),
+      EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject(),
       EasyMock.anyObject(),
       EasyMock.anyObject(),
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6f73690..adde9ff 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -22,9 +22,10 @@ import java.util.{Optional, Properties}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import TestUtils.createBroker
+import kafka.api.Request
+import kafka.log.{AppendOrigin, Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.cluster.BrokerEndPoint
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.timer.MockTimer
@@ -129,7 +130,7 @@ class ReplicaManagerTest {
         timeout = 0,
         requiredAcks = 3,
         internalTopicsAllowed = false,
-        isFromClient = true,
+        origin = AppendOrigin.Client,
         entriesPerPartition = Map(new TopicPartition("test1", 0) -> MemoryRecords.withRecords(CompressionType.NONE,
           new SimpleRecord("first message".getBytes))),
         responseCallback = callback)
@@ -368,7 +369,8 @@ class ReplicaManagerTest {
       // now commit the transaction
       val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0)
       val commitRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker)
-      appendRecords(replicaManager, new TopicPartition(topic, 0), commitRecordBatch, isFromClient = false)
+      appendRecords(replicaManager, new TopicPartition(topic, 0), commitRecordBatch,
+        origin = AppendOrigin.Coordinator)
         .onFire { response => assertEquals(Errors.NONE, response.error) }
 
       // the LSO has advanced, but the appended commit marker has not been replicated, so
@@ -436,7 +438,8 @@ class ReplicaManagerTest {
       // now abort the transaction
       val endTxnMarker = new EndTransactionMarker(ControlRecordType.ABORT, 0)
       val abortRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker)
-      appendRecords(replicaManager, new TopicPartition(topic, 0), abortRecordBatch, isFromClient = false)
+      appendRecords(replicaManager, new TopicPartition(topic, 0), abortRecordBatch,
+        origin = AppendOrigin.Coordinator)
         .onFire { response => assertEquals(Errors.NONE, response.error) }
 
       // fetch as follower to advance the high watermark
@@ -793,7 +796,7 @@ class ReplicaManagerTest {
   private def appendRecords(replicaManager: ReplicaManager,
                             partition: TopicPartition,
                             records: MemoryRecords,
-                            isFromClient: Boolean = true,
+                            origin: AppendOrigin = AppendOrigin.Client,
                             requiredAcks: Short = -1): CallbackResult[PartitionResponse] = {
     val result = new CallbackResult[PartitionResponse]()
     def appendCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = {
@@ -806,7 +809,7 @@ class ReplicaManagerTest {
       timeout = 1000,
       requiredAcks = requiredAcks,
       internalTopicsAllowed = false,
-      isFromClient = isFromClient,
+      origin = origin,
       entriesPerPartition = Map(partition -> records),
       responseCallback = appendCallback)