You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2023/10/18 17:28:19 UTC
[kafka] 01/01: Add sentinel and usage
This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch KAFKA-15626
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 7bbf82b17e30f6b824a6fdd14bb19c9c5fb30a1a
Author: Justine <jo...@confluent.io>
AuthorDate: Wed Oct 18 10:27:54 2023 -0700
Add sentinel and usage
---
core/src/main/scala/kafka/cluster/Partition.scala | 5 +--
core/src/main/scala/kafka/log/UnifiedLog.scala | 17 +++++-----
.../main/scala/kafka/server/ReplicaManager.scala | 5 +--
.../scala/unit/kafka/cluster/PartitionTest.scala | 6 ++--
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 33 +++++++++++---------
.../unit/kafka/log/VerificationGuardTest.scala | 36 ++++++++++++++++++++++
.../unit/kafka/server/ReplicaManagerTest.scala | 16 +++++-----
.../storage/internals/log/VerificationGuard.java | 14 ++++++++-
8 files changed, 93 insertions(+), 39 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index d89dddd5c0e..fb66c4649aa 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -581,7 +581,8 @@ class Partition(val topicPartition: TopicPartition,
}
}
- // Returns a VerificationGuard if we need to verify. This starts or continues the verification process. Otherwise return null.
+ // Returns a VerificationGuard if we need to verify. This starts or continues the verification process. Otherwise return the
+ // sentinel verification guard.
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = {
leaderLogIfLocal match {
case Some(log) => log.maybeStartTransactionVerification(producerId, sequence, epoch)
@@ -1301,7 +1302,7 @@ class Partition(val topicPartition: TopicPartition,
}
def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
- requestLocal: RequestLocal, verificationGuard: VerificationGuard = null): LogAppendInfo = {
+ requestLocal: RequestLocal, verificationGuard: VerificationGuard = VerificationGuard.SENTINEL_VERIFICATION_GUARD): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 2895ae71fac..683260d74a9 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -600,11 +600,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Maybe create and return the VerificationGuard for the given producer ID if the transaction is not yet ongoing.
- * Creation starts the verification process. Otherwise return null.
+ * Creation starts the verification process. Otherwise return the Sentinel VerificationGuard.
*/
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = lock synchronized {
if (hasOngoingTransaction(producerId))
- null
+ VerificationGuard.SENTINEL_VERIFICATION_GUARD
else
maybeCreateVerificationGuard(producerId, sequence, epoch)
}
@@ -619,11 +619,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
/**
- * If an VerificationStateEntry is present for the given producer ID, return its VerificationGuard, otherwise, return null.
+ * If an VerificationStateEntry is present for the given producer ID, return its VerificationGuard, otherwise, return the
+ * sentinel VerificationGuard.
*/
def verificationGuard(producerId: Long): VerificationGuard = lock synchronized {
val entry = producerStateManager.verificationStateEntry(producerId)
- if (entry != null) entry.verificationGuard else null
+ if (entry != null) entry.verificationGuard else VerificationGuard.SENTINEL_VERIFICATION_GUARD
}
/**
@@ -715,7 +716,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
origin: AppendOrigin = AppendOrigin.CLIENT,
interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest,
requestLocal: RequestLocal = RequestLocal.NoCaching,
- verificationGuard: VerificationGuard = null): LogAppendInfo = {
+ verificationGuard: VerificationGuard = VerificationGuard.SENTINEL_VERIFICATION_GUARD): LogAppendInfo = {
val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER
append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false)
}
@@ -734,7 +735,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
validateAndAssignOffsets = false,
leaderEpoch = -1,
requestLocal = None,
- verificationGuard = null,
+ verificationGuard = VerificationGuard.SENTINEL_VERIFICATION_GUARD,
// disable to check the validation of record size since the record is already accepted by leader.
ignoreRecordSize = true)
}
@@ -1059,7 +1060,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the
// transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
// ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
- // requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
+ // requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId) && batchMissingRequiredVerification(batch, requestVerificationGuard))
throw new InvalidTxnStateException("Record was not part of an ongoing transaction")
}
@@ -1082,7 +1083,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: VerificationGuard): Boolean = {
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() && !batch.isControlBatch &&
- (requestVerificationGuard == null || !requestVerificationGuard.equals(verificationGuard(batch.producerId)))
+ !verificationGuard(batch.producerId).verifiedBy(requestVerificationGuard)
}
/**
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f02745c2ecc..73064220f9e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -880,7 +880,7 @@ class ReplicaManager(val config: KafkaConfig,
// We return VerificationGuard if the partition needs to be verified. If no state is present, no need to verify.
val firstBatch = records.firstBatch
val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId, firstBatch.baseSequence, firstBatch.producerEpoch)
- if (verificationGuard != null) {
+ if (verificationGuard != VerificationGuard.SENTINEL_VERIFICATION_GUARD) {
verificationGuards.put(topicPartition, verificationGuard)
unverifiedEntries.put(topicPartition, records)
} else
@@ -1183,7 +1183,8 @@ class ReplicaManager(val config: KafkaConfig,
} else {
try {
val partition = getPartitionOrException(topicPartition)
- val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal, verificationGuards.getOrElse(topicPartition, null))
+ val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal,
+ verificationGuards.getOrElse(topicPartition, VerificationGuard.SENTINEL_VERIFICATION_GUARD))
val numAppendedMessages = info.numMessages
// update stats for successfully appended bytes and messages as bytesInRate and messageInRate
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 17b40f30b81..b571298222c 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -3549,9 +3549,9 @@ class PartitionTest extends AbstractPartitionTest {
// When VerificationGuard is not there, we should not be able to append.
assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))
- // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-null VerificationGuard.
+ // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-sentinel VerificationGuard.
val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0)
- assertNotNull(verificationGuard)
+ assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard)
// With the wrong VerificationGuard, append should fail.
assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(),
@@ -3564,7 +3564,7 @@ class PartitionTest extends AbstractPartitionTest {
// We should no longer need a VerificationGuard. Future appends without VerificationGuard will also succeed.
val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0)
- assertNull(verificationGuard3)
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard3)
partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
}
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index b01c5af0217..f6640f5e0cc 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@@ -3738,7 +3738,8 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
assertFalse(log.hasOngoingTransaction(producerId))
- assertNull(log.verificationGuard(producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
+ assertFalse(log.verificationGuard(producerId).verifiedBy(VerificationGuard.SENTINEL_VERIFICATION_GUARD))
val idempotentRecords = MemoryRecords.withIdempotentRecords(
CompressionType.NONE,
@@ -3763,7 +3764,7 @@ class UnifiedLogTest {
)
val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
- assertNotNull(verificationGuard)
+ assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard)
log.appendAsLeader(idempotentRecords, origin = appendOrigin, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))
@@ -3772,13 +3773,14 @@ class UnifiedLogTest {
assertEquals(verificationGuard, log.verificationGuard(producerId))
// Now write the transactional records
+ assertTrue(log.verificationGuard(producerId).verifiedBy(verificationGuard))
log.appendAsLeader(transactionalRecords, origin = appendOrigin, leaderEpoch = 0, verificationGuard = verificationGuard)
assertTrue(log.hasOngoingTransaction(producerId))
// VerificationGuard should be cleared now.
- assertNull(log.verificationGuard(producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
// A subsequent maybeStartTransactionVerification will be empty since we are already verified.
- assertNull(log.maybeStartTransactionVerification(producerId, sequence, producerEpoch))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.maybeStartTransactionVerification(producerId, sequence, producerEpoch))
val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
producerId,
@@ -3788,15 +3790,16 @@ class UnifiedLogTest {
log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))
- assertNull(log.verificationGuard(producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
if (appendOrigin == AppendOrigin.CLIENT)
sequence = sequence + 1
// A new maybeStartTransactionVerification will not be empty, as we need to verify the next transaction.
val newVerificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
- assertNotNull(newVerificationGuard)
+ assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, newVerificationGuard)
assertNotEquals(verificationGuard, newVerificationGuard)
+ assertFalse(verificationGuard.verifiedBy(newVerificationGuard))
}
@Test
@@ -3809,7 +3812,7 @@ class UnifiedLogTest {
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch)
- assertNotNull(verificationGuard)
+ assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard)
val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
producerId,
@@ -3819,7 +3822,7 @@ class UnifiedLogTest {
log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))
- assertNull(log.verificationGuard(producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
}
@Test
@@ -3832,7 +3835,7 @@ class UnifiedLogTest {
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch)
- assertNotNull(verificationGuard)
+ assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard)
producerStateManagerConfig.setTransactionVerificationEnabled(false)
@@ -3847,7 +3850,7 @@ class UnifiedLogTest {
log.appendAsLeader(transactionalRecords, leaderEpoch = 0)
assertTrue(log.hasOngoingTransaction(producerId))
- assertNull(log.verificationGuard(producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
}
@Test
@@ -3872,14 +3875,14 @@ class UnifiedLogTest {
)
assertThrows(classOf[InvalidTxnStateException], () => log.appendAsLeader(transactionalRecords, leaderEpoch = 0))
assertFalse(log.hasOngoingTransaction(producerId))
- assertNull(log.verificationGuard(producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
- assertNotNull(verificationGuard)
+ assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard)
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
assertTrue(log.hasOngoingTransaction(producerId))
- assertNull(log.verificationGuard(producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
}
@Test
@@ -3892,7 +3895,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
assertFalse(log.hasOngoingTransaction(producerId))
- assertNull(log.verificationGuard(producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId))
val transactionalRecords = MemoryRecords.withTransactionalRecords(
CompressionType.NONE,
diff --git a/core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala b/core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala
new file mode 100644
index 00000000000..07623780c93
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala
@@ -0,0 +1,36 @@
+package unit.kafka.log
+
+import org.apache.kafka.storage.internals.log.VerificationGuard
+import org.apache.kafka.storage.internals.log.VerificationGuard.SENTINEL_VERIFICATION_GUARD
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertTrue}
+import org.junit.jupiter.api.Test
+
+class VerificationGuardTest {
+
+ @Test
+ def testEqualsAndHashCode(): Unit = {
+ val verificationGuard1 = new VerificationGuard
+ val verificationGuard2 = new VerificationGuard
+
+ assertNotEquals(verificationGuard1, verificationGuard2)
+ assertNotEquals(SENTINEL_VERIFICATION_GUARD, verificationGuard1)
+ assertEquals(SENTINEL_VERIFICATION_GUARD, SENTINEL_VERIFICATION_GUARD)
+
+ assertNotEquals(verificationGuard1.hashCode, verificationGuard2.hashCode)
+ assertNotEquals(SENTINEL_VERIFICATION_GUARD.hashCode, verificationGuard1.hashCode)
+ assertEquals(SENTINEL_VERIFICATION_GUARD.hashCode, SENTINEL_VERIFICATION_GUARD.hashCode)
+ }
+
+ @Test
+ def testVerifiedBy(): Unit = {
+ val verificationGuard1 = new VerificationGuard
+ val verificationGuard2 = new VerificationGuard
+
+ assertFalse(verificationGuard1.verifiedBy(verificationGuard2))
+ assertFalse(verificationGuard1.verifiedBy(SENTINEL_VERIFICATION_GUARD))
+ assertFalse(SENTINEL_VERIFICATION_GUARD.verifiedBy(verificationGuard1))
+ assertFalse(SENTINEL_VERIFICATION_GUARD.verifiedBy(SENTINEL_VERIFICATION_GUARD))
+ assertTrue(verificationGuard1.verifiedBy(verificationGuard1))
+ }
+
+}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 50e43d799bd..91874d8ab2c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -60,7 +60,7 @@ import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.{MockScheduler, MockTime}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@@ -2163,7 +2163,7 @@ class ReplicaManagerTest {
new SimpleRecord("message".getBytes))
appendRecords(replicaManager, tp0, idempotentRecords)
verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any[AddPartitionsToTxnManager.AppendCallback]())
- assertNull(getVerificationGuard(replicaManager, tp0, producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp0, producerId))
// If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records.
val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1,
@@ -2179,8 +2179,8 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(Seq(tp0)),
any[AddPartitionsToTxnManager.AppendCallback]()
)
- assertNotNull(getVerificationGuard(replicaManager, tp0, producerId))
- assertNull(getVerificationGuard(replicaManager, tp1, producerId))
+ assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp0, producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp1, producerId))
} finally {
replicaManager.shutdown(checkpointHW = false)
}
@@ -2238,7 +2238,7 @@ class ReplicaManagerTest {
val callback2: AddPartitionsToTxnManager.AppendCallback = appendCallback2.getValue()
callback2(Map.empty[TopicPartition, Errors].toMap)
- assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp0, producerId))
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId))
} finally {
replicaManager.shutdown(checkpointHW = false)
@@ -2424,7 +2424,7 @@ class ReplicaManagerTest {
val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
new SimpleRecord(s"message $sequence".getBytes))
appendRecords(replicaManager, tp, transactionalRecords, transactionalId = transactionalId)
- assertNull(getVerificationGuard(replicaManager, tp, producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp, producerId))
// We should not add these partitions to the manager to verify.
verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any())
@@ -2442,7 +2442,7 @@ class ReplicaManagerTest {
appendRecords(replicaManager, tp, moreTransactionalRecords, transactionalId = transactionalId)
verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any())
- assertEquals(null, getVerificationGuard(replicaManager, tp, producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp, producerId))
assertTrue(replicaManager.localLog(tp).get.hasOngoingTransaction(producerId))
} finally {
replicaManager.shutdown(checkpointHW = false)
@@ -2496,7 +2496,7 @@ class ReplicaManagerTest {
// This time we do not verify
appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId)
verify(addPartitionsToTxnManager, times(1)).verifyTransaction(any(), any(), any(), any(), any())
- assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+ assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp0, producerId))
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId))
} finally {
replicaManager.shutdown(checkpointHW = false)
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java
index e079707c7ab..0235769ee1a 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java
@@ -19,6 +19,10 @@ package org.apache.kafka.storage.internals.log;
import java.util.concurrent.atomic.AtomicLong;
public class VerificationGuard {
+
+ // The sentinel verification guard will be used as a default when no verification guard is provided.
+ // It can not be used to verify a transaction is ongoing and its verificationGuardValue is always 0.
+ public static final VerificationGuard SENTINEL_VERIFICATION_GUARD = new VerificationGuard(0);
private static final AtomicLong INCREMENTING_ID = new AtomicLong(0L);
private final long verificationGuardValue;
@@ -26,6 +30,10 @@ public class VerificationGuard {
verificationGuardValue = INCREMENTING_ID.incrementAndGet();
}
+ private VerificationGuard(long value) {
+ verificationGuardValue = value;
+ }
+
@Override
public String toString() {
return "VerificationGuard: " + verificationGuardValue;
@@ -45,7 +53,11 @@ public class VerificationGuard {
return (int) (value ^ (value >>> 32));
}
- public long verificationGuardValue() {
+ private long verificationGuardValue() {
return verificationGuardValue;
}
+
+ public boolean verifiedBy(VerificationGuard verifyingGuard) {
+ return verifyingGuard != SENTINEL_VERIFICATION_GUARD && verifyingGuard.equals(this);
+ }
}