You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2023/10/18 17:28:19 UTC

[kafka] 01/01: Add sentinel and usage

This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch KAFKA-15626
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7bbf82b17e30f6b824a6fdd14bb19c9c5fb30a1a
Author: Justine <jo...@confluent.io>
AuthorDate: Wed Oct 18 10:27:54 2023 -0700

    Add sentinel and usage
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  5 +--
 core/src/main/scala/kafka/log/UnifiedLog.scala     | 17 +++++-----
 .../main/scala/kafka/server/ReplicaManager.scala   |  5 +--
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  6 ++--
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 33 +++++++++++---------
 .../unit/kafka/log/VerificationGuardTest.scala     | 36 ++++++++++++++++++++++
 .../unit/kafka/server/ReplicaManagerTest.scala     | 16 +++++-----
 .../storage/internals/log/VerificationGuard.java   | 14 ++++++++-
 8 files changed, 93 insertions(+), 39 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index d89dddd5c0e..fb66c4649aa 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -581,7 +581,8 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  // Returns a VerificationGuard if we need to verify. This starts or continues the verification process. Otherwise return null.
+  // Returns a VerificationGuard if we need to verify. This starts or continues the verification process. Otherwise return the
+  // sentinel verification guard.
   def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = {
     leaderLogIfLocal match {
       case Some(log) => log.maybeStartTransactionVerification(producerId, sequence, epoch)
@@ -1301,7 +1302,7 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
-                            requestLocal: RequestLocal, verificationGuard: VerificationGuard = null): LogAppendInfo = {
+                            requestLocal: RequestLocal, verificationGuard: VerificationGuard = VerificationGuard.SENTINEL_VERIFICATION_GUARD): LogAppendInfo = {
     val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
       leaderLogIfLocal match {
         case Some(leaderLog) =>
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 2895ae71fac..683260d74a9 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -600,11 +600,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
    * Maybe create and return the VerificationGuard for the given producer ID if the transaction is not yet ongoing.
-   * Creation starts the verification process. Otherwise return null.
+   * Creation starts the verification process. Otherwise return the Sentinel VerificationGuard.
    */
   def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = lock synchronized {
     if (hasOngoingTransaction(producerId))
-      null
+      VerificationGuard.SENTINEL_VERIFICATION_GUARD
     else
       maybeCreateVerificationGuard(producerId, sequence, epoch)
   }
@@ -619,11 +619,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   /**
-   * If an VerificationStateEntry is present for the given producer ID, return its VerificationGuard, otherwise, return null.
+   * If an VerificationStateEntry is present for the given producer ID, return its VerificationGuard, otherwise, return the
+   * sentinel VerificationGuard.
    */
   def verificationGuard(producerId: Long): VerificationGuard = lock synchronized {
     val entry = producerStateManager.verificationStateEntry(producerId)
-    if (entry != null) entry.verificationGuard else null
+    if (entry != null) entry.verificationGuard else VerificationGuard.SENTINEL_VERIFICATION_GUARD
   }
 
   /**
@@ -715,7 +716,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
                      origin: AppendOrigin = AppendOrigin.CLIENT,
                      interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest,
                      requestLocal: RequestLocal = RequestLocal.NoCaching,
-                     verificationGuard: VerificationGuard = null): LogAppendInfo = {
+                     verificationGuard: VerificationGuard = VerificationGuard.SENTINEL_VERIFICATION_GUARD): LogAppendInfo = {
     val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER
     append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false)
   }
@@ -734,7 +735,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       validateAndAssignOffsets = false,
       leaderEpoch = -1,
       requestLocal = None,
-      verificationGuard = null,
+      verificationGuard = VerificationGuard.SENTINEL_VERIFICATION_GUARD,
       // disable to check the validation of record size since the record is already accepted by leader.
       ignoreRecordSize = true)
   }
@@ -1059,7 +1060,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           // 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the
           // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
           // ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
-          // requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
+          // requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
           if (batch.isTransactional && !hasOngoingTransaction(batch.producerId) && batchMissingRequiredVerification(batch, requestVerificationGuard))
             throw new InvalidTxnStateException("Record was not part of an ongoing transaction")
         }
@@ -1082,7 +1083,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: VerificationGuard): Boolean = {
     producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() && !batch.isControlBatch &&
-      (requestVerificationGuard == null || !requestVerificationGuard.equals(verificationGuard(batch.producerId)))
+      !verificationGuard(batch.producerId).verifiedBy(requestVerificationGuard)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f02745c2ecc..73064220f9e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -880,7 +880,7 @@ class ReplicaManager(val config: KafkaConfig,
           // We return VerificationGuard if the partition needs to be verified. If no state is present, no need to verify.
           val firstBatch = records.firstBatch
           val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId, firstBatch.baseSequence, firstBatch.producerEpoch)
-          if (verificationGuard != null) {
+          if (verificationGuard != VerificationGuard.SENTINEL_VERIFICATION_GUARD) {
             verificationGuards.put(topicPartition, verificationGuard)
             unverifiedEntries.put(topicPartition, records)
           } else
@@ -1183,7 +1183,8 @@ class ReplicaManager(val config: KafkaConfig,
       } else {
         try {
           val partition = getPartitionOrException(topicPartition)
-          val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal, verificationGuards.getOrElse(topicPartition, null))
+          val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal,
+            verificationGuards.getOrElse(topicPartition, VerificationGuard.SENTINEL_VERIFICATION_GUARD))
           val numAppendedMessages = info.numMessages
 
           // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 17b40f30b81..b571298222c 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -3549,9 +3549,9 @@ class PartitionTest extends AbstractPartitionTest {
     // When VerificationGuard is not there, we should not be able to append.
     assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))
 
-    // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-null VerificationGuard.
+    // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-sentinel VerificationGuard.
     val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0)
-    assertNotNull(verificationGuard)
+    assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard)
 
     // With the wrong VerificationGuard, append should fail.
     assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(),
@@ -3564,7 +3564,7 @@ class PartitionTest extends AbstractPartitionTest {
 
     // We should no longer need a VerificationGuard. Future appends without VerificationGuard will also succeed.
     val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0)
-    assertNull(verificationGuard3)
+    assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard3)
     partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index b01c5af0217..f6640f5e0cc 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
@@ -3738,7 +3738,8 @@ class UnifiedLogTest {
     val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
     assertFalse(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
+    assertFalse(log.verificationGuard(producerId).verifiedBy(VerificationGuard.SENTINEL_VERIFICATION_GUARD))
 
     val idempotentRecords = MemoryRecords.withIdempotentRecords(
       CompressionType.NONE,
@@ -3763,7 +3764,7 @@ class UnifiedLogTest {
     )
 
     val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
-    assertNotNull(verificationGuard)
+    assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard)
 
     log.appendAsLeader(idempotentRecords, origin = appendOrigin, leaderEpoch = 0)
     assertFalse(log.hasOngoingTransaction(producerId))
@@ -3772,13 +3773,14 @@ class UnifiedLogTest {
     assertEquals(verificationGuard, log.verificationGuard(producerId))
 
     // Now write the transactional records
+    assertTrue(log.verificationGuard(producerId).verifiedBy(verificationGuard))
     log.appendAsLeader(transactionalRecords, origin = appendOrigin, leaderEpoch = 0, verificationGuard = verificationGuard)
     assertTrue(log.hasOngoingTransaction(producerId))
     // VerificationGuard should be cleared now.
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
 
     // A subsequent maybeStartTransactionVerification will be empty since we are already verified.
-    assertNull(log.maybeStartTransactionVerification(producerId, sequence, producerEpoch))
+    assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.maybeStartTransactionVerification(producerId, sequence, producerEpoch))
 
     val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
       producerId,
@@ -3788,15 +3790,16 @@ class UnifiedLogTest {
 
     log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
     assertFalse(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
 
     if (appendOrigin == AppendOrigin.CLIENT)
       sequence = sequence + 1
 
     // A new maybeStartTransactionVerification will not be empty, as we need to verify the next transaction.
     val newVerificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
-    assertNotNull(newVerificationGuard)
+    assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, newVerificationGuard)
     assertNotEquals(verificationGuard, newVerificationGuard)
+    assertFalse(verificationGuard.verifiedBy(newVerificationGuard))
   }
 
   @Test
@@ -3809,7 +3812,7 @@ class UnifiedLogTest {
     val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
 
     val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch)
-    assertNotNull(verificationGuard)
+    assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard)
 
     val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
       producerId,
@@ -3819,7 +3822,7 @@ class UnifiedLogTest {
 
     log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
     assertFalse(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
   }
 
   @Test
@@ -3832,7 +3835,7 @@ class UnifiedLogTest {
     val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
 
     val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch)
-    assertNotNull(verificationGuard)
+    assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard)
 
     producerStateManagerConfig.setTransactionVerificationEnabled(false)
 
@@ -3847,7 +3850,7 @@ class UnifiedLogTest {
     log.appendAsLeader(transactionalRecords, leaderEpoch = 0)
 
     assertTrue(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
   }
 
   @Test
@@ -3872,14 +3875,14 @@ class UnifiedLogTest {
     )
     assertThrows(classOf[InvalidTxnStateException], () => log.appendAsLeader(transactionalRecords, leaderEpoch = 0))
     assertFalse(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
 
     val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
-    assertNotNull(verificationGuard)
+    assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard)
 
     log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
     assertTrue(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
   }
 
   @Test
@@ -3892,7 +3895,7 @@ class UnifiedLogTest {
     val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
     assertFalse(log.hasOngoingTransaction(producerId))
-    assertNull(log.verificationGuard(producerId))
+    assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
 
     val transactionalRecords = MemoryRecords.withTransactionalRecords(
       CompressionType.NONE,
diff --git a/core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala b/core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala
new file mode 100644
index 00000000000..07623780c93
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala
@@ -0,0 +1,36 @@
+package unit.kafka.log
+
+import org.apache.kafka.storage.internals.log.VerificationGuard
+import org.apache.kafka.storage.internals.log.VerificationGuard.SENTINEL_VERIFICATION_GUARD
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertTrue}
+import org.junit.jupiter.api.Test
+
+class VerificationGuardTest {
+
+  @Test
+  def testEqualsAndHashCode(): Unit = {
+    val verificationGuard1 = new VerificationGuard
+    val verificationGuard2 = new VerificationGuard
+
+    assertNotEquals(verificationGuard1, verificationGuard2)
+    assertNotEquals(SENTINEL_VERIFICATION_GUARD, verificationGuard1)
+    assertEquals(SENTINEL_VERIFICATION_GUARD, SENTINEL_VERIFICATION_GUARD)
+
+    assertNotEquals(verificationGuard1.hashCode, verificationGuard2.hashCode)
+    assertNotEquals(SENTINEL_VERIFICATION_GUARD.hashCode, verificationGuard1.hashCode)
+    assertEquals(SENTINEL_VERIFICATION_GUARD.hashCode, SENTINEL_VERIFICATION_GUARD.hashCode)
+  }
+
+  @Test
+  def testVerifiedBy(): Unit = {
+    val verificationGuard1 = new VerificationGuard
+    val verificationGuard2 = new VerificationGuard
+
+    assertFalse(verificationGuard1.verifiedBy(verificationGuard2))
+    assertFalse(verificationGuard1.verifiedBy(SENTINEL_VERIFICATION_GUARD))
+    assertFalse(SENTINEL_VERIFICATION_GUARD.verifiedBy(verificationGuard1))
+    assertFalse(SENTINEL_VERIFICATION_GUARD.verifiedBy(SENTINEL_VERIFICATION_GUARD))
+    assertTrue(verificationGuard1.verifiedBy(verificationGuard1))
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 50e43d799bd..91874d8ab2c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -60,7 +60,7 @@ import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.util.{MockScheduler, MockTime}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
@@ -2163,7 +2163,7 @@ class ReplicaManagerTest {
         new SimpleRecord("message".getBytes))
       appendRecords(replicaManager, tp0, idempotentRecords)
       verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any[AddPartitionsToTxnManager.AppendCallback]())
-      assertNull(getVerificationGuard(replicaManager, tp0, producerId))
+      assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp0, producerId))
 
       // If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records.
       val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1,
@@ -2179,8 +2179,8 @@ class ReplicaManagerTest {
         ArgumentMatchers.eq(Seq(tp0)),
         any[AddPartitionsToTxnManager.AppendCallback]()
       )
-      assertNotNull(getVerificationGuard(replicaManager, tp0, producerId))
-      assertNull(getVerificationGuard(replicaManager, tp1, producerId))
+      assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp0, producerId))
+      assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp1, producerId))
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
@@ -2238,7 +2238,7 @@ class ReplicaManagerTest {
 
       val callback2: AddPartitionsToTxnManager.AppendCallback = appendCallback2.getValue()
       callback2(Map.empty[TopicPartition, Errors].toMap)
-      assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+      assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp0, producerId))
       assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId))
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -2424,7 +2424,7 @@ class ReplicaManagerTest {
       val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
         new SimpleRecord(s"message $sequence".getBytes))
       appendRecords(replicaManager, tp, transactionalRecords, transactionalId = transactionalId)
-      assertNull(getVerificationGuard(replicaManager, tp, producerId))
+      assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp, producerId))
 
       // We should not add these partitions to the manager to verify.
       verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any())
@@ -2442,7 +2442,7 @@ class ReplicaManagerTest {
 
       appendRecords(replicaManager, tp, moreTransactionalRecords, transactionalId = transactionalId)
       verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any())
-      assertEquals(null, getVerificationGuard(replicaManager, tp, producerId))
+      assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp, producerId))
       assertTrue(replicaManager.localLog(tp).get.hasOngoingTransaction(producerId))
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -2496,7 +2496,7 @@ class ReplicaManagerTest {
       // This time we do not verify
       appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId)
       verify(addPartitionsToTxnManager, times(1)).verifyTransaction(any(), any(), any(), any(), any())
-      assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+      assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp0, producerId))
       assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId))
     } finally {
       replicaManager.shutdown(checkpointHW = false)
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java
index e079707c7ab..0235769ee1a 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java
@@ -19,6 +19,10 @@ package org.apache.kafka.storage.internals.log;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class VerificationGuard {
+
+    // The sentinel verification guard will be used as a default when no verification guard is provided.
+    // It can not be used to verify a transaction is ongoing and its verificationGuardValue is always 0.
+    public static final VerificationGuard SENTINEL_VERIFICATION_GUARD = new VerificationGuard(0);
     private static final AtomicLong INCREMENTING_ID = new AtomicLong(0L);
     private final long verificationGuardValue;
 
@@ -26,6 +30,10 @@ public class VerificationGuard {
         verificationGuardValue = INCREMENTING_ID.incrementAndGet();
     }
 
+    private VerificationGuard(long value) {
+        verificationGuardValue = value;
+    }
+
     @Override
     public String toString() {
         return "VerificationGuard: " + verificationGuardValue;
@@ -45,7 +53,11 @@ public class VerificationGuard {
         return (int) (value ^ (value >>> 32));
     }
 
-    public long verificationGuardValue() {
+    private long verificationGuardValue() {
         return verificationGuardValue;
     }
+
+    public boolean verifiedBy(VerificationGuard verifyingGuard) {
+        return verifyingGuard != SENTINEL_VERIFICATION_GUARD && verifyingGuard.equals(this);
+    }
 }