You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2023/10/20 21:26:26 UTC

[kafka] branch trunk updated: KAFKA-15626: Replace verification guard object with an specific type (#14568)

This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e8c89693300 KAFKA-15626: Replace verification guard object with an specific type (#14568)
e8c89693300 is described below

commit e8c89693300660240cd85fb79344d01316fe1267
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Fri Oct 20 14:26:20 2023 -0700

    KAFKA-15626: Replace verification guard object with an specific type (#14568)
    
    I've added a new class with an incrementing atomic long to represent the verification guard. Upon creation of verification guard, we will increment this value and assign it to the guard.
    
    The expected behavior is the same as the object guard, but with better debuggability with the string value and type safety (I found a type safety issue in the current code when implementing this)
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Artem Livshits <al...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  9 ++--
 core/src/main/scala/kafka/log/UnifiedLog.scala     | 45 ++++++++--------
 .../main/scala/kafka/server/ReplicaManager.scala   | 15 +++---
 .../unit/kafka/cluster/PartitionLockTest.scala     |  4 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 18 +++----
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 37 +++++++------
 .../unit/kafka/log/VerificationGuardTest.scala     | 53 ++++++++++++++++++
 .../unit/kafka/server/ReplicaManagerTest.scala     | 18 +++----
 .../storage/internals/log/VerificationGuard.java   | 62 ++++++++++++++++++++++
 .../internals/log/VerificationStateEntry.java      | 10 ++--
 10 files changed, 196 insertions(+), 75 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 2fa3e6a4b8f..104fca8ea90 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, VerificationGuard}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
 import scala.collection.{Map, Seq}
@@ -581,8 +581,9 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  // Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
-  def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): Object = {
+  // Returns a VerificationGuard if we need to verify. This starts or continues the verification process. Otherwise return the
+  // sentinel VerificationGuard.
+  def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = {
     leaderLogIfLocal match {
       case Some(log) => log.maybeStartTransactionVerification(producerId, sequence, epoch)
       case None => throw new NotLeaderOrFollowerException();
@@ -1301,7 +1302,7 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
-                            requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {
+                            requestLocal: RequestLocal, verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = {
     val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
       leaderLogIfLocal match {
         case Some(leaderLog) =>
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 9246143520f..34468012306 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.util.Scheduler
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard}
 
 import java.io.{File, IOException}
 import java.nio.file.Files
@@ -599,31 +599,32 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   /**
-   * Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing.
-   * Creation starts the verification process. Otherwise return null.
+   * Maybe create and return the VerificationGuard for the given producer ID if the transaction is not yet ongoing.
+   * Creation starts the verification process. Otherwise return the sentinel VerificationGuard.
    */
-  def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): Object = lock synchronized {
+  def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = lock synchronized {
     if (hasOngoingTransaction(producerId))
-      null
+      VerificationGuard.SENTINEL
     else
       maybeCreateVerificationGuard(producerId, sequence, epoch)
   }
 
   /**
-   * Maybe create the VerificationStateEntry for the given producer ID -- always return the verification guard
+   * Maybe create the VerificationStateEntry for the given producer ID -- always return the VerificationGuard
    */
   def maybeCreateVerificationGuard(producerId: Long,
                                    sequence: Int,
-                                   epoch: Short): Object = lock synchronized {
+                                   epoch: Short): VerificationGuard = lock synchronized {
     producerStateManager.maybeCreateVerificationStateEntry(producerId, sequence, epoch).verificationGuard
   }
 
   /**
-   * If an VerificationStateEntry is present for the given producer ID, return its verification guard, otherwise, return null.
+   * If an VerificationStateEntry is present for the given producer ID, return its VerificationGuard, otherwise, return the
+   * sentinel VerificationGuard.
    */
-  def verificationGuard(producerId: Long): Object = lock synchronized {
+  def verificationGuard(producerId: Long): VerificationGuard = lock synchronized {
     val entry = producerStateManager.verificationStateEntry(producerId)
-    if (entry != null) entry.verificationGuard else null
+    if (entry != null) entry.verificationGuard else VerificationGuard.SENTINEL
   }
 
   /**
@@ -715,7 +716,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
                      origin: AppendOrigin = AppendOrigin.CLIENT,
                      interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest,
                      requestLocal: RequestLocal = RequestLocal.NoCaching,
-                     verificationGuard: Object = null): LogAppendInfo = {
+                     verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = {
     val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER
     append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false)
   }
@@ -734,7 +735,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       validateAndAssignOffsets = false,
       leaderEpoch = -1,
       requestLocal = None,
-      verificationGuard = null,
+      verificationGuard = VerificationGuard.SENTINEL,
       // disable to check the validation of record size since the record is already accepted by leader.
       ignoreRecordSize = true)
   }
@@ -763,7 +764,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
                      validateAndAssignOffsets: Boolean,
                      leaderEpoch: Int,
                      requestLocal: Option[RequestLocal],
-                     verificationGuard: Object,
+                     verificationGuard: VerificationGuard,
                      ignoreRecordSize: Boolean): LogAppendInfo = {
     // We want to ensure the partition metadata file is written to the log dir before any log data is written to disk.
     // This will ensure that any log data can be recovered with the correct topic ID in the case of failure.
@@ -1024,7 +1025,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def analyzeAndValidateProducerState(appendOffsetMetadata: LogOffsetMetadata,
                                               records: MemoryRecords,
                                               origin: AppendOrigin,
-                                              requestVerificationGuard: Object):
+                                              requestVerificationGuard: VerificationGuard):
   (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
     val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
     val completedTxns = ListBuffer.empty[CompletedTxn]
@@ -1049,17 +1050,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append.
           // There are two phases -- the first append to the log and subsequent appends.
           //
-          // 1. First append: Verification starts with creating a verification guard object, sending a verification request to the transaction coordinator, and
-          // given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction
+          // 1. First append: Verification starts with creating a VerificationGuard, sending a verification request to the transaction coordinator, and
+          // given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique VerificationGuard for the transaction
           // to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could
           // have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker,
-          // start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verification guard object, this sequence would not
+          // start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique VerificationGuard, this sequence would not
           // result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2.
           //
           // 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the
           // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
-          // ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and
-          // requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
+          // ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
+          // requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
           if (batch.isTransactional && !hasOngoingTransaction(batch.producerId) && batchMissingRequiredVerification(batch, requestVerificationGuard))
             throw new InvalidTxnStateException("Record was not part of an ongoing transaction")
         }
@@ -1080,9 +1081,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     (updatedProducers, completedTxns.toList, None)
   }
 
-  private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: Object): Boolean = {
+  private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: VerificationGuard): Boolean = {
     producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() && !batch.isControlBatch &&
-      (requestVerificationGuard != verificationGuard(batch.producerId) || requestVerificationGuard == null)
+      !verificationGuard(batch.producerId).verify(requestVerificationGuard)
   }
 
   /**
@@ -1991,7 +1992,7 @@ object UnifiedLog extends Logging {
     val producerId = batch.producerId
     val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin))
     val completedTxn = appendInfo.append(batch, firstOffsetMetadata.asJava).asScala
-    // Whether we wrote a control marker or a data batch, we can remove verification guard since either the transaction is complete or we have a first offset.
+    // Whether we wrote a control marker or a data batch, we can remove VerificationGuard since either the transaction is complete or we have a first offset.
     if (batch.isTransactional)
       producerStateManager.clearVerificationStateEntry(producerId)
     completedTxn
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index eb140eb31fd..4eb9a6f9c75 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -58,7 +58,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.server.common.MetadataVersion._
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard}
 
 import java.io.File
 import java.nio.file.{Files, Paths}
@@ -735,7 +735,7 @@ class ReplicaManager(val config: KafkaConfig,
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
 
-      val verificationGuards: mutable.Map[TopicPartition, Object] = mutable.Map[TopicPartition, Object]()
+      val verificationGuards: mutable.Map[TopicPartition, VerificationGuard] = mutable.Map[TopicPartition, VerificationGuard]()
       val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition, errorsPerPartition) =
         if (transactionalId == null || !config.transactionPartitionVerificationEnable)
           (entriesPerPartition, Map.empty[TopicPartition, MemoryRecords], Map.empty[TopicPartition, Errors])
@@ -864,7 +864,7 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private def partitionEntriesForVerification(verificationGuards: mutable.Map[TopicPartition, Object],
+  private def partitionEntriesForVerification(verificationGuards: mutable.Map[TopicPartition, VerificationGuard],
                                               entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                               verifiedEntries: mutable.Map[TopicPartition, MemoryRecords],
                                               unverifiedEntries: mutable.Map[TopicPartition, MemoryRecords],
@@ -877,10 +877,10 @@ class ReplicaManager(val config: KafkaConfig,
         transactionalBatches.foreach(batch => transactionalProducerIds.add(batch.producerId))
 
         if (transactionalBatches.nonEmpty) {
-          // We return verification guard if the partition needs to be verified. If no state is present, no need to verify.
+          // We return VerificationGuard if the partition needs to be verified. If no state is present, no need to verify.
           val firstBatch = records.firstBatch
           val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId, firstBatch.baseSequence, firstBatch.producerEpoch)
-          if (verificationGuard != null) {
+          if (verificationGuard != VerificationGuard.SENTINEL) {
             verificationGuards.put(topicPartition, verificationGuard)
             unverifiedEntries.put(topicPartition, records)
           } else
@@ -1156,7 +1156,7 @@ class ReplicaManager(val config: KafkaConfig,
                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                requiredAcks: Short,
                                requestLocal: RequestLocal,
-                               verificationGuards: Map[TopicPartition, Object]): Map[TopicPartition, LogAppendResult] = {
+                               verificationGuards: Map[TopicPartition, VerificationGuard]): Map[TopicPartition, LogAppendResult] = {
     val traceEnabled = isTraceEnabled
     def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
       val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L)
@@ -1183,7 +1183,8 @@ class ReplicaManager(val config: KafkaConfig,
       } else {
         try {
           val partition = getPartitionOrException(topicPartition)
-          val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal, verificationGuards.getOrElse(topicPartition, null))
+          val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal,
+            verificationGuards.getOrElse(topicPartition, VerificationGuard.SENTINEL))
           val numAppendedMessages = info.numMessages
 
           // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 113c72abc50..445371c108b 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogAppendInfo, LogConfig, LogDirFailureChannel, LogSegments, ProducerStateManager, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogAppendInfo, LogConfig, LogDirFailureChannel, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.ArgumentMatchers
@@ -450,7 +450,7 @@ class PartitionLockTest extends Logging {
     keepPartitionMetadataFile = true) {
 
     override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, origin: AppendOrigin,
-                                interBrokerProtocolVersion: MetadataVersion, requestLocal: RequestLocal, verificationGuard: Object): LogAppendInfo = {
+                                interBrokerProtocolVersion: MetadataVersion, requestLocal: RequestLocal, verificationGuard: VerificationGuard): LogAppendInfo = {
       val appendInfo = super.appendAsLeader(records, leaderEpoch, origin, interBrokerProtocolVersion, requestLocal, verificationGuard)
       appendSemaphore.acquire()
       appendInfo
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index d211194777c..a4aba277e3a 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -56,7 +56,7 @@ import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
@@ -3546,25 +3546,25 @@ class PartitionTest extends AbstractPartitionTest {
       baseSequence = 3,
       producerId = producerId)
 
-    // When verification guard is not there, we should not be able to append.
+    // When VerificationGuard is not there, we should not be able to append.
     assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))
 
-    // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-null verification object.
+    // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-sentinel VerificationGuard.
     val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0)
-    assertNotNull(verificationGuard)
+    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
 
-    // With the wrong verification guard, append should fail.
+    // With the wrong VerificationGuard, append should fail.
     assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(),
-      origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, Optional.of(new Object)))
+      origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, new VerificationGuard()))
 
-    // We should return the same verification object when we still need to verify. Append should proceed.
+    // We should return the same VerificationGuard when we still need to verify. Append should proceed.
     val verificationGuard2 = partition.maybeStartTransactionVerification(producerId, 3, 0)
     assertEquals(verificationGuard, verificationGuard2)
     partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, verificationGuard)
 
-    // We should no longer need a verification object. Future appends without verification guard will also succeed.
+    // We should no longer need a VerificationGuard. Future appends without VerificationGuard will also succeed.
     val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0)
-    assertNull(verificationGuard3)
+    assertEquals(VerificationGuard.SENTINEL, verificationGuard3)
     partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 8c96312f6ca..492ba87bc87 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
@@ -3738,7 +3738,8 @@ class UnifiedLogTest {
     val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
     assertFalse(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
+    assertFalse(log.verificationGuard(producerId).verify(VerificationGuard.SENTINEL))
 
     val idempotentRecords = MemoryRecords.withIdempotentRecords(
       CompressionType.NONE,
@@ -3763,22 +3764,23 @@ class UnifiedLogTest {
     )
 
     val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
-    assertNotNull(verificationGuard)
+    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
 
     log.appendAsLeader(idempotentRecords, origin = appendOrigin, leaderEpoch = 0)
     assertFalse(log.hasOngoingTransaction(producerId))
 
-    // Since we wrote idempotent records, we keep verification guard.
+    // Since we wrote idempotent records, we keep VerificationGuard.
     assertEquals(verificationGuard, log.verificationGuard(producerId))
 
     // Now write the transactional records
+    assertTrue(log.verificationGuard(producerId).verify(verificationGuard))
     log.appendAsLeader(transactionalRecords, origin = appendOrigin, leaderEpoch = 0, verificationGuard = verificationGuard)
     assertTrue(log.hasOngoingTransaction(producerId))
-    // Verification guard should be cleared now.
-    assertNull(log.verificationGuard(producerId))
+    // VerificationGuard should be cleared now.
+    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
 
     // A subsequent maybeStartTransactionVerification will be empty since we are already verified.
-    assertNull(log.maybeStartTransactionVerification(producerId, sequence, producerEpoch))
+    assertEquals(VerificationGuard.SENTINEL, log.maybeStartTransactionVerification(producerId, sequence, producerEpoch))
 
     val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
       producerId,
@@ -3788,15 +3790,16 @@ class UnifiedLogTest {
 
     log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
     assertFalse(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
 
     if (appendOrigin == AppendOrigin.CLIENT)
       sequence = sequence + 1
 
     // A new maybeStartTransactionVerification will not be empty, as we need to verify the next transaction.
     val newVerificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
-    assertNotNull(newVerificationGuard)
+    assertNotEquals(VerificationGuard.SENTINEL, newVerificationGuard)
     assertNotEquals(verificationGuard, newVerificationGuard)
+    assertFalse(verificationGuard.verify(newVerificationGuard))
   }
 
   @Test
@@ -3809,7 +3812,7 @@ class UnifiedLogTest {
     val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
 
     val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch)
-    assertNotNull(verificationGuard)
+    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
 
     val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
       producerId,
@@ -3819,7 +3822,7 @@ class UnifiedLogTest {
 
     log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
     assertFalse(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
   }
 
   @Test
@@ -3832,7 +3835,7 @@ class UnifiedLogTest {
     val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
 
     val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch)
-    assertNotNull(verificationGuard)
+    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
 
     producerStateManagerConfig.setTransactionVerificationEnabled(false)
 
@@ -3847,7 +3850,7 @@ class UnifiedLogTest {
     log.appendAsLeader(transactionalRecords, leaderEpoch = 0)
 
     assertTrue(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
   }
 
   @Test
@@ -3872,14 +3875,14 @@ class UnifiedLogTest {
     )
     assertThrows(classOf[InvalidTxnStateException], () => log.appendAsLeader(transactionalRecords, leaderEpoch = 0))
     assertFalse(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
 
     val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
-    assertNotNull(verificationGuard)
+    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
 
     log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
     assertTrue(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
   }
 
   @Test
@@ -3892,7 +3895,7 @@ class UnifiedLogTest {
     val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
     assertFalse(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
 
     val transactionalRecords = MemoryRecords.withTransactionalRecords(
       CompressionType.NONE,
diff --git a/core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala b/core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala
new file mode 100644
index 00000000000..b18b7430b9a
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala
@@ -0,0 +1,53 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package unit.kafka.log
+
+import org.apache.kafka.storage.internals.log.VerificationGuard
+import org.apache.kafka.storage.internals.log.VerificationGuard.SENTINEL
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertTrue}
+import org.junit.jupiter.api.Test
+
+class VerificationGuardTest {
+
+  @Test
+  def testEqualsAndHashCode(): Unit = {
+    val verificationGuard1 = new VerificationGuard
+    val verificationGuard2 = new VerificationGuard
+
+    assertNotEquals(verificationGuard1, verificationGuard2)
+    assertNotEquals(SENTINEL, verificationGuard1)
+    assertEquals(SENTINEL, SENTINEL)
+
+    assertNotEquals(verificationGuard1.hashCode, verificationGuard2.hashCode)
+    assertNotEquals(SENTINEL.hashCode, verificationGuard1.hashCode)
+    assertEquals(SENTINEL.hashCode, SENTINEL.hashCode)
+  }
+
+  @Test
+  def testVerify(): Unit = {
+    val verificationGuard1 = new VerificationGuard
+    val verificationGuard2 = new VerificationGuard
+
+    assertFalse(verificationGuard1.verify(verificationGuard2))
+    assertFalse(verificationGuard1.verify(SENTINEL))
+    assertFalse(SENTINEL.verify(verificationGuard1))
+    assertFalse(SENTINEL.verify(SENTINEL))
+    assertTrue(verificationGuard1.verify(verificationGuard1))
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 3054cc4c364..095c14260b8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -60,7 +60,7 @@ import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.util.{MockScheduler, MockTime}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
@@ -2163,7 +2163,7 @@ class ReplicaManagerTest {
         new SimpleRecord("message".getBytes))
       appendRecords(replicaManager, tp0, idempotentRecords)
       verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any[AddPartitionsToTxnManager.AppendCallback]())
-      assertNull(getVerificationGuard(replicaManager, tp0, producerId))
+      assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId))
 
       // If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records.
       val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1,
@@ -2179,8 +2179,8 @@ class ReplicaManagerTest {
         ArgumentMatchers.eq(Seq(tp0)),
         any[AddPartitionsToTxnManager.AppendCallback]()
       )
-      assertNotNull(getVerificationGuard(replicaManager, tp0, producerId))
-      assertNull(getVerificationGuard(replicaManager, tp1, producerId))
+      assertNotEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId))
+      assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp1, producerId))
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
@@ -2238,7 +2238,7 @@ class ReplicaManagerTest {
 
       val callback2: AddPartitionsToTxnManager.AppendCallback = appendCallback2.getValue()
       callback2(Map.empty[TopicPartition, Errors].toMap)
-      assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+      assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId))
       assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId))
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -2297,7 +2297,7 @@ class ReplicaManagerTest {
       )
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
 
-      // Verification should succeed, but we expect to fail with OutOfOrderSequence and for the verification guard to remain.
+      // Verification should succeed, but we expect to fail with OutOfOrderSequence and for the VerificationGuard to remain.
       val callback2: AddPartitionsToTxnManager.AppendCallback = appendCallback2.getValue()
       callback2(Map.empty[TopicPartition, Errors].toMap)
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
@@ -2424,7 +2424,7 @@ class ReplicaManagerTest {
       val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
         new SimpleRecord(s"message $sequence".getBytes))
       appendRecords(replicaManager, tp, transactionalRecords, transactionalId = transactionalId)
-      assertNull(getVerificationGuard(replicaManager, tp, producerId))
+      assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp, producerId))
 
       // We should not add these partitions to the manager to verify.
       verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any())
@@ -2442,7 +2442,7 @@ class ReplicaManagerTest {
 
       appendRecords(replicaManager, tp, moreTransactionalRecords, transactionalId = transactionalId)
       verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any())
-      assertEquals(null, getVerificationGuard(replicaManager, tp, producerId))
+      assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp, producerId))
       assertTrue(replicaManager.localLog(tp).get.hasOngoingTransaction(producerId))
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -2496,7 +2496,7 @@ class ReplicaManagerTest {
       // This time we do not verify
       appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId)
       verify(addPartitionsToTxnManager, times(1)).verifyTransaction(any(), any(), any(), any(), any())
-      assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+      assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId))
       assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId))
     } finally {
       replicaManager.shutdown(checkpointHW = false)
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java
new file mode 100644
index 00000000000..165faa52973
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public final class VerificationGuard {
+
+    // The sentinel VerificationGuard will be used as a default when no verification guard is provided.
+    // It can not be used to verify a transaction is ongoing and its value is always 0.
+    public static final VerificationGuard SENTINEL = new VerificationGuard(0);
+    private static final AtomicLong INCREMENTING_ID = new AtomicLong(0L);
+    private final long value;
+
+    public VerificationGuard() {
+        value = INCREMENTING_ID.incrementAndGet();
+    }
+
+    private VerificationGuard(long value) {
+        this.value = value;
+    }
+
+    @Override
+    public String toString() {
+        return "VerificationGuard(value=" + value + ")";
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if ((null == obj) || (obj.getClass() != this.getClass()))
+            return false;
+        VerificationGuard guard = (VerificationGuard) obj;
+        return value == guard.value();
+    }
+
+    @Override
+    public int hashCode() {
+        return Long.hashCode(value);
+    }
+
+    private long value() {
+        return value;
+    }
+
+    public boolean verify(VerificationGuard verifyingGuard) {
+        return verifyingGuard != SENTINEL && verifyingGuard.equals(this);
+    }
+}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java
index 0a52095889a..7fa0a658f14 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java
@@ -18,10 +18,10 @@ package org.apache.kafka.storage.internals.log;
 
 /**
  * This class represents the verification state of a specific producer id.
- * It contains a verification guard object that is used to uniquely identify the transaction we want to verify.
+ * It contains a VerificationGuard that is used to uniquely identify the transaction we want to verify.
  * After verifying, we retain this object until we append to the log. This prevents any race conditions where the transaction
  * may end via a control marker before we write to the log. This mechanism is used to prevent hanging transactions.
- * We remove the verification guard object whenever we write data to the transaction or write an end marker for the transaction.
+ * We remove the VerificationGuard whenever we write data to the transaction or write an end marker for the transaction.
  *
  * We also store the lowest seen sequence to block a higher sequence from being written in the case of the lower sequence needing retries.
  *
@@ -30,13 +30,13 @@ package org.apache.kafka.storage.internals.log;
 public class VerificationStateEntry {
 
     final private long timestamp;
-    final private Object verificationGuard;
+    final private VerificationGuard verificationGuard;
     private int lowestSequence;
     private short epoch;
 
     public VerificationStateEntry(long timestamp, int sequence, short epoch) {
         this.timestamp = timestamp;
-        this.verificationGuard = new Object();
+        this.verificationGuard = new VerificationGuard();
         this.lowestSequence = sequence;
         this.epoch = epoch;
     }
@@ -45,7 +45,7 @@ public class VerificationStateEntry {
         return timestamp;
     }
 
-    public Object verificationGuard() {
+    public VerificationGuard verificationGuard() {
         return verificationGuard;
     }