You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/10/05 05:27:48 UTC

kafka git commit: KAFKA-6003; Accept appends on replicas unconditionally when local producer state doesn't exist

Repository: kafka
Updated Branches:
  refs/heads/trunk b5b266eee -> 6ea4fffdd


KAFKA-6003; Accept appends on replicas unconditionally when local producer state doesn't exist

Without this patch, if the replica's log was somehow truncated before
the leader's, it is possible for the replica fetcher thread to
continuously throw an OutOfOrderSequenceException because the
incoming sequence would be non-zero and there is no local state.

This patch changes the behavior so that the replica state is updated to
the leader's state if there was no local state for the producer at the
time of the append.

Author: Apurva Mehta <ap...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #4004 from apurvam/KAFKA-6003-handle-unknown-producer-on-replica


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6ea4fffd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6ea4fffd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6ea4fffd

Branch: refs/heads/trunk
Commit: 6ea4fffdd287a0c6a02c1b6dc1006b1a7b614405
Parents: b5b266e
Author: Apurva Mehta <ap...@confluent.io>
Authored: Wed Oct 4 22:27:03 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Oct 4 22:27:03 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |   8 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |   2 +-
 .../scala/kafka/log/ProducerStateManager.scala  | 101 ++++++++++++-------
 .../scala/unit/kafka/log/LogSegmentTest.scala   |   5 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  15 ++-
 .../kafka/log/ProducerStateManagerTest.scala    |  64 ++++++++++--
 6 files changed, 134 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6ea4fffd/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index dc47194..d397ca6 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -514,7 +514,7 @@ class Log(@volatile var dir: File,
     val completedTxns = ListBuffer.empty[CompletedTxn]
     records.batches.asScala.foreach { batch =>
       if (batch.hasProducerId) {
-        val maybeCompletedTxn = updateProducers(batch, loadedProducers, loadingFromLog = true)
+        val maybeCompletedTxn = updateProducers(batch, loadedProducers, isFromClient = false)
         maybeCompletedTxn.foreach(completedTxns += _)
       }
     }
@@ -791,7 +791,7 @@ class Log(@volatile var dir: File,
           return (updatedProducers, completedTxns.toList, Some(duplicate))
         }
 
-      val maybeCompletedTxn = updateProducers(batch, updatedProducers, loadingFromLog = false)
+      val maybeCompletedTxn = updateProducers(batch, updatedProducers, isFromClient = isFromClient)
       maybeCompletedTxn.foreach(completedTxns += _)
     }
     (updatedProducers, completedTxns.toList, None)
@@ -878,9 +878,9 @@ class Log(@volatile var dir: File,
 
   private def updateProducers(batch: RecordBatch,
                               producers: mutable.Map[Long, ProducerAppendInfo],
-                              loadingFromLog: Boolean): Option[CompletedTxn] = {
+                              isFromClient: Boolean): Option[CompletedTxn] = {
     val producerId = batch.producerId
-    val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, loadingFromLog))
+    val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, isFromClient))
     appendInfo.append(batch)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ea4fffd/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 06c4e2d..845f08f 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -145,7 +145,7 @@ class LogSegment(val log: FileRecords,
   private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = {
     if (batch.hasProducerId) {
       val producerId = batch.producerId
-      val appendInfo = producerStateManager.prepareUpdate(producerId, loadingFromLog = true)
+      val appendInfo = producerStateManager.prepareUpdate(producerId, isFromClient = false)
       val maybeCompletedTxn = appendInfo.append(batch)
       producerStateManager.update(appendInfo)
       maybeCompletedTxn.foreach { completedTxn =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ea4fffd/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 1cf9a14..81726c1 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -36,6 +36,15 @@ import scala.collection.{immutable, mutable}
 
 class CorruptSnapshotException(msg: String) extends KafkaException(msg)
 
+
+// ValidationType and its subtypes define the extent of the validation to perform on a given ProducerAppendInfo instance
+private[log] sealed trait ValidationType
+private[log] object ValidationType {
+  case object None extends ValidationType
+  case object EpochOnly extends ValidationType
+  case object Full extends ValidationType
+}
+
 private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffsetMetadata, var lastOffset: Option[Long] = None) {
   def this(producerId: Long, firstOffset: Long) = this(producerId, LogOffsetMetadata(firstOffset))
 
@@ -138,49 +147,58 @@ private[log] class ProducerIdEntry(val producerId: Long, val batchMetadata: muta
  *                      the most recent appends made by the producer. Validation of the first incoming append will
  *                      be made against the lastest append in the current entry. New appends will replace older appends
  *                      in the current entry so that the space overhead is constant.
- * @param validateSequenceNumbers Whether or not sequence numbers should be validated. The only current use
- *                                of this is the consumer offsets topic which uses producer ids from incoming
- *                                TxnOffsetCommit, but has no sequence number to validate and does not depend
- *                                on the deduplication which sequence numbers provide.
- * @param loadingFromLog This parameter indicates whether the new append is being loaded directly from the log.
- *                       This is used to repopulate producer state when the broker is initialized. The only
- *                       difference in behavior is that we do not validate the sequence number of the first append
- *                       since we may have lost previous sequence numbers when segments were removed due to log
- *                       retention enforcement.
+ * @param validationType Indicates the extent of validation to perform on the appends on this instance. Offset commits
+ *                       coming from the producer should have EpochOnlyValidation. Appends which aren't from a client
+ *                       will not be validated at all, and should be set to NoValidation. All other appends should
+ *                       have FullValidation.
  */
 private[log] class ProducerAppendInfo(val producerId: Long,
                                       currentEntry: ProducerIdEntry,
-                                      validateSequenceNumbers: Boolean,
-                                      loadingFromLog: Boolean) {
+                                      validationType: ValidationType) {
 
   private val transactions = ListBuffer.empty[TxnMetadata]
 
-  private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int) = {
+  private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int) = {
+    validationType match {
+      case ValidationType.None =>
+
+      case ValidationType.EpochOnly =>
+        checkEpoch(producerEpoch)
+
+      case ValidationType.Full =>
+        checkEpoch(producerEpoch)
+        checkSequence(producerEpoch, firstSeq, lastSeq)
+    }
+  }
+
+  private def checkEpoch(producerEpoch: Short): Unit = {
     if (isFenced(producerEpoch)) {
       throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " +
         s"with a newer epoch. $producerEpoch (request epoch), ${currentEntry.producerEpoch} (server epoch)")
-    } else if (validateSequenceNumbers) {
-      if (producerEpoch != currentEntry.producerEpoch) {
-        if (firstSeq != 0) {
-          if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
-            throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
-              s"(request epoch), $firstSeq (seq. number)")
-          } else {
-            throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible " +
-              s"that the last message with the producerId=$producerId has been removed due to hitting the retention limit.")
-          }
+    }
+  }
+
+  private def checkSequence(producerEpoch: Short, firstSeq: Int, lastSeq: Int): Unit = {
+    if (producerEpoch != currentEntry.producerEpoch) {
+      if (firstSeq != 0) {
+        if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
+          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
+            s"(request epoch), $firstSeq (seq. number)")
+        } else {
+          throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible " +
+            s"that the last message with the producerId=$producerId has been removed due to hitting the retention limit.")
         }
-      } else if (currentEntry.lastSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
-        // the epoch was bumped by a control record, so we expect the sequence number to be reset
-        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $firstSeq " +
-          s"(incoming seq. number), but expected 0")
-      } else if (isDuplicate(firstSeq, lastSeq)) {
-        throw new DuplicateSequenceException(s"Duplicate sequence number for producerId $producerId: (incomingBatch.firstSeq, " +
-          s"incomingBatch.lastSeq): ($firstSeq, $lastSeq).")
-      } else if (!inSequence(firstSeq, lastSeq)) {
-        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $firstSeq " +
-          s"(incoming seq. number), ${currentEntry.lastSeq} (current end sequence number)")
       }
+    } else if (currentEntry.lastSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
+      // the epoch was bumped by a control record, so we expect the sequence number to be reset
+      throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $firstSeq " +
+        s"(incoming seq. number), but expected 0")
+    } else if (isDuplicate(firstSeq, lastSeq)) {
+      throw new DuplicateSequenceException(s"Duplicate sequence number for producerId $producerId: (incomingBatch.firstSeq, " +
+        s"incomingBatch.lastSeq): ($firstSeq, $lastSeq).")
+    } else if (!inSequence(firstSeq, lastSeq)) {
+      throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $firstSeq " +
+        s"(incoming seq. number), ${currentEntry.lastSeq} (current end sequence number)")
     }
   }
 
@@ -216,10 +234,7 @@ private[log] class ProducerAppendInfo(val producerId: Long,
              lastTimestamp: Long,
              lastOffset: Long,
              isTransactional: Boolean): Unit = {
-    if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog)
-      // skip validation if this is the first entry when loading from the log. Log retention
-      // will generally have removed the beginning entries from each producer id
-      validateAppend(epoch, firstSeq, lastSeq)
+    maybeValidateAppend(epoch, firstSeq, lastSeq)
 
     currentEntry.addBatchMetadata(epoch, lastSeq, lastOffset, lastSeq - firstSeq, lastTimestamp)
 
@@ -541,9 +556,17 @@ class ProducerStateManager(val topicPartition: TopicPartition,
     }
   }
 
-  def prepareUpdate(producerId: Long, loadingFromLog: Boolean): ProducerAppendInfo =
-    new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.empty(producerId)), validateSequenceNumbers,
-      loadingFromLog)
+  def prepareUpdate(producerId: Long, isFromClient: Boolean): ProducerAppendInfo = {
+    val validationToPerform =
+      if (!isFromClient)
+        ValidationType.None
+      else if (topicPartition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
+        ValidationType.EpochOnly
+      else
+        ValidationType.Full
+
+    new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.empty(producerId)), validationToPerform)
+  }
 
   /**
    * Update the mapping with the given append information

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ea4fffd/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 0f866e7..cef2bca 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -274,7 +274,7 @@ class LogSegmentTest {
     val segment = createSegment(100)
     val producerEpoch = 0.toShort
     val partitionLeaderEpoch = 15
-    val sequence = 0
+    val sequence = 100
 
     val pid1 = 5L
     val pid2 = 10L
@@ -317,7 +317,8 @@ class LogSegmentTest {
 
     // recover again, but this time assuming the transaction from pid2 began on a previous segment
     stateManager = new ProducerStateManager(topicPartition, logDir)
-    stateManager.loadProducerEntry(new ProducerIdEntry(pid2, mutable.Queue[BatchMetadata](BatchMetadata(10, 90L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, 0, Some(75L)))
+    stateManager.loadProducerEntry(new ProducerIdEntry(pid2,
+      mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, 0, Some(75L)))
     segment.recover(stateManager)
     assertEquals(108L, stateManager.mapEndOffset)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ea4fffd/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 2ae62c5..6d40967 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -869,7 +869,7 @@ class LogTest {
     }
   }
 
-  @Test(expected = classOf[DuplicateSequenceException])
+  @Test
   def testDuplicateAppendToFollower() : Unit = {
     val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
@@ -877,15 +877,20 @@ class LogTest {
     val pid = 1L
     val baseSequence = 0
     val partitionLeaderEpoch = 0
+    // The point of this test is to ensure that validation isn't performed on the follower.
     // this is a bit contrived. to trigger the duplicate case for a follower append, we have to append
     // a batch with matching sequence numbers, but valid increasing offsets
+    assertEquals(0L, log.logEndOffset)
     log.appendAsFollower(MemoryRecords.withIdempotentRecords(0L, CompressionType.NONE, pid, epoch, baseSequence,
       partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
     log.appendAsFollower(MemoryRecords.withIdempotentRecords(2L, CompressionType.NONE, pid, epoch, baseSequence,
       partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+    // Ensure that even the duplicate sequences are accepted on the follower.
+    assertEquals(4L, log.logEndOffset)
   }
 
-  @Test(expected = classOf[DuplicateSequenceException])
+  @Test
   def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = {
     val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
@@ -930,9 +935,11 @@ class LogTest {
 
     val records = MemoryRecords.readableRecords(buffer)
     records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
+
+    // Ensure that batches with duplicates are accepted on the follower.
+    assertEquals(0L, log.logEndOffset)
     log.appendAsFollower(records)
-    // Should throw a duplicate sequence exception here.
-    fail("should have thrown a DuplicateSequenceNumberException.")
+    assertEquals(5L, log.logEndOffset)
   }
 
   @Test(expected = classOf[ProducerFencedException])

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ea4fffd/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 9eb9ae7..8650624 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -86,7 +86,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 15.toShort
     val sequence = Int.MaxValue
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true)
+    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
 
     append(stateManager, producerId, epoch, 0, offset + 500)
 
@@ -105,7 +105,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 15.toShort
     val sequence = Int.MaxValue
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true)
+    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
     append(stateManager, producerId, epoch, 1, offset + 500)
   }
 
@@ -114,7 +114,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 5.toShort
     val sequence = 16
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true)
+    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
 
     val maybeLastEntry = stateManager.lastEntry(producerId)
     assertTrue(maybeLastEntry.isDefined)
@@ -159,8 +159,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId), validateSequenceNumbers = true,
-      loadingFromLog = false)
+    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId), ValidationType.Full)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true)
 
     val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
@@ -176,8 +175,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId), validateSequenceNumbers = true,
-      loadingFromLog = false)
+    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId), ValidationType.Full)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true)
 
     // use some other offset to simulate a follower append where the log offset metadata won't typically
@@ -197,7 +195,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val offset = 9L
     append(stateManager, producerId, producerEpoch, 0, offset)
 
-    val appendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false)
+    val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
     appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, isTransactional = true)
     var lastEntry = appendInfo.latestEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
@@ -321,6 +319,50 @@ class ProducerStateManagerTest extends JUnitSuite {
   }
 
   @Test
+  def testAcceptAppendWithoutProducerStateOnReplica(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 0, 0L, 0)
+    append(stateManager, producerId, epoch, 1, 1L, 1)
+
+    stateManager.takeSnapshot()
+    val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(0L, 1L, 70000)
+
+    val sequence = 2
+    // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Nonetheless
+    // the append on a replica should be accepted with the local producer state updated to the appended value.
+    assertFalse(recoveredMapping.activeProducers.contains(producerId))
+    append(recoveredMapping, producerId, epoch, sequence, 2L, 70001, isFromClient = false)
+    assertTrue(recoveredMapping.activeProducers.contains(producerId))
+    val producerIdEntry = recoveredMapping.activeProducers.get(producerId).head
+    assertEquals(epoch, producerIdEntry.producerEpoch)
+    assertEquals(sequence, producerIdEntry.firstSeq)
+    assertEquals(sequence, producerIdEntry.lastSeq)
+  }
+
+  @Test
+  def testAcceptAppendWithSequenceGapsOnReplica(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 0, 0L, 0)
+    val outOfOrderSequence = 3
+
+    // First we ensure that we raise an OutOfOrderSequenceException is raised when the append comes from a client.
+    try {
+      append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = true)
+      fail("Expected an OutOfOrderSequenceException to be raised.")
+    } catch {
+      case _ : OutOfOrderSequenceException =>
+      // Good!
+      case _ : Exception =>
+        fail("Expected an OutOfOrderSequenceException to be raised.")
+    }
+
+    assertEquals(0L, stateManager.activeProducers(producerId).lastSeq)
+    append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = false)
+    assertEquals(outOfOrderSequence, stateManager.activeProducers(producerId).lastSeq)
+  }
+
+  @Test
   def testDeleteSnapshotsBefore(): Unit = {
     val epoch = 0.toShort
     append(stateManager, producerId, epoch, 0, 0L)
@@ -675,7 +717,7 @@ class ProducerStateManagerTest extends JUnitSuite {
                                  offset: Long,
                                  coordinatorEpoch: Int = 0,
                                  timestamp: Long = time.milliseconds()): (CompletedTxn, Long) = {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false)
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
     val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
     val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp)
     mapping.update(producerAppendInfo)
@@ -691,8 +733,8 @@ class ProducerStateManagerTest extends JUnitSuite {
                      offset: Long,
                      timestamp: Long = time.milliseconds(),
                      isTransactional: Boolean = false,
-                     isLoadingFromLog: Boolean = false): Unit = {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, isLoadingFromLog)
+                     isFromClient : Boolean = true): Unit = {
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
     producerAppendInfo.append(producerEpoch, seq, seq, timestamp, offset, isTransactional)
     stateManager.update(producerAppendInfo)
     stateManager.updateMapEndOffset(offset + 1)