You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2023/05/30 20:46:29 UTC

[kafka] branch trunk updated: MINOR: Add transaction verification config to producerStateManager config (#13770)

This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9edf2ec5cce MINOR: Add transaction verification config to producerStateManager config (#13770)
9edf2ec5cce is described below

commit 9edf2ec5cce7bd92f3a7fb0617774374b681cd2c
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Tue May 30 13:46:17 2023 -0700

    MINOR: Add transaction verification config to producerStateManager config (#13770)
    
    I have moved this config into producer state manager so it can be checked easily under the log lock when we are about to append.
    
    Only a few test files currently use the validation and those have been verified to work via running the tests.
    
    Reviews:  David Jacot <dj...@confluent.io>
---
 .../main/java/kafka/server/builders/LogManagerBuilder.java   |  6 +++---
 core/src/main/scala/kafka/log/LogManager.scala               |  2 +-
 core/src/main/scala/kafka/raft/KafkaMetadataLog.scala        |  2 +-
 core/src/test/scala/other/kafka/StressTestLog.scala          |  2 +-
 core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala   |  2 +-
 .../test/scala/unit/kafka/cluster/PartitionLockTest.scala    |  2 +-
 core/src/test/scala/unit/kafka/cluster/PartitionTest.scala   |  2 +-
 .../unit/kafka/log/AbstractLogCleanerIntegrationTest.scala   |  2 +-
 .../test/scala/unit/kafka/log/BrokerCompressionTest.scala    |  2 +-
 .../test/scala/unit/kafka/log/LogCleanerManagerTest.scala    |  2 +-
 core/src/test/scala/unit/kafka/log/LogCleanerTest.scala      |  2 +-
 core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala  |  2 +-
 core/src/test/scala/unit/kafka/log/LogLoaderTest.scala       | 12 ++++++------
 core/src/test/scala/unit/kafka/log/LogManagerTest.scala      |  4 ++--
 core/src/test/scala/unit/kafka/log/LogSegmentTest.scala      |  2 +-
 core/src/test/scala/unit/kafka/log/LogTestUtils.scala        |  2 +-
 .../test/scala/unit/kafka/log/ProducerStateManagerTest.scala |  2 +-
 core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala      |  6 +++---
 .../test/scala/unit/kafka/server/ReplicaManagerTest.scala    |  2 +-
 .../test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala    |  2 +-
 core/src/test/scala/unit/kafka/utils/SchedulerTest.scala     |  2 +-
 core/src/test/scala/unit/kafka/utils/TestUtils.scala         |  2 +-
 .../kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java     |  2 +-
 .../kafka/jmh/partition/PartitionMakeFollowerBenchmark.java  |  2 +-
 .../jmh/partition/UpdateFollowerFetchStateBenchmark.java     |  2 +-
 .../org/apache/kafka/jmh/server/PartitionCreationBench.java  |  2 +-
 .../storage/internals/log/ProducerStateManagerConfig.java    |  9 ++++++++-
 27 files changed, 44 insertions(+), 37 deletions(-)

diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
index 2bbd0e9bb44..43a6c73b91e 100644
--- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
@@ -46,7 +46,7 @@ public class LogManagerBuilder {
     private long flushStartOffsetCheckpointMs = 10000L;
     private long retentionCheckMs = 1000L;
     private int maxTransactionTimeoutMs = 15 * 60 * 1000;
-    private ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(60000);
+    private ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(60000, false);
     private int producerIdExpirationCheckIntervalMs = 600000;
     private MetadataVersion interBrokerProtocolVersion = MetadataVersion.latest();
     private Scheduler scheduler = null;
@@ -110,8 +110,8 @@ public class LogManagerBuilder {
         return this;
     }
 
-    public LogManagerBuilder setMaxProducerIdExpirationMs(int maxProducerIdExpirationMs) {
-        this.producerStateManagerConfig = new ProducerStateManagerConfig(maxProducerIdExpirationMs);
+    public LogManagerBuilder setProducerStateManagerConfig(int maxProducerIdExpirationMs, boolean transactionVerificationEnabled) {
+        this.producerStateManagerConfig = new ProducerStateManagerConfig(maxProducerIdExpirationMs, transactionVerificationEnabled);
         return this;
     }
 
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 381872acd2f..2d1ba35a6d2 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1391,7 +1391,7 @@ object LogManager {
       flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
       retentionCheckMs = config.logCleanupIntervalMs,
       maxTransactionTimeoutMs = config.transactionMaxTimeoutMs,
-      producerStateManagerConfig = new ProducerStateManagerConfig(config.producerIdExpirationMs),
+      producerStateManagerConfig = new ProducerStateManagerConfig(config.producerIdExpirationMs, config.transactionPartitionVerificationEnable),
       producerIdExpirationCheckIntervalMs = config.producerIdExpirationCheckIntervalMs,
       scheduler = kafkaScheduler,
       brokerTopicStats = brokerTopicStats,
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index ed97f07073b..bb7ed1f37ad 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -603,7 +603,7 @@ object KafkaMetadataLog extends Logging {
       brokerTopicStats = new BrokerTopicStats,
       time = time,
       maxTransactionTimeoutMs = Int.MaxValue,
-      producerStateManagerConfig = new ProducerStateManagerConfig(Int.MaxValue),
+      producerStateManagerConfig = new ProducerStateManagerConfig(Int.MaxValue, false),
       producerIdExpirationCheckIntervalMs = Int.MaxValue,
       logDirFailureChannel = new LogDirFailureChannel(5),
       lastShutdownClean = false,
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 2176e9f3b21..631a3afcebf 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -50,7 +50,7 @@ object StressTestLog {
       scheduler = time.scheduler,
       time = time,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
-      producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+      producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
       producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
       brokerTopicStats = new BrokerTopicStats,
       logDirFailureChannel = new LogDirFailureChannel(10),
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 315dc75809c..085dbbcf790 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -220,7 +220,7 @@ object TestLinearWriteSpeed {
       brokerTopicStats = new BrokerTopicStats,
       time = Time.SYSTEM,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
-      producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+      producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
       producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 283d98385c4..11d2fcb0d92 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -297,7 +297,7 @@ class PartitionLockTest extends Logging {
         val segments = new LogSegments(log.topicPartition)
         val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
         val maxTransactionTimeout = 5 * 60 * 1000
-        val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+        val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
         val producerStateManager = new ProducerStateManager(
           log.topicPartition,
           log.dir,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index b7ff476988f..f450c271fa9 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -431,7 +431,7 @@ class PartitionTest extends AbstractPartitionTest {
         val segments = new LogSegments(log.topicPartition)
         val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
         val maxTransactionTimeoutMs = 5 * 60 * 1000
-        val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+        val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, true)
         val producerStateManager = new ProducerStateManager(
           log.topicPartition,
           log.dir,
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index c411f6d37b6..86f62bc9746 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -112,7 +112,7 @@ abstract class AbstractLogCleanerIntegrationTest {
         time = time,
         brokerTopicStats = new BrokerTopicStats,
         maxTransactionTimeoutMs = 5 * 60 * 1000,
-        producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+        producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
         producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
         logDirFailureChannel = new LogDirFailureChannel(10),
         topicId = None,
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 15d24224c71..e8962b9a893 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -62,7 +62,7 @@ class BrokerCompressionTest {
       time = time,
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
-      producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+      producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
       producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 0c9a13597c9..cc52b32bcd3 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -50,7 +50,7 @@ class LogCleanerManagerTest extends Logging {
   val logConfig: LogConfig = new LogConfig(logProps)
   val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
   val offset = 999
-  val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+  val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
 
   val cleanerCheckpoints: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()
 
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 6a1f0f1b911..79054b6d926 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -58,7 +58,7 @@ class LogCleanerTest {
   val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
   val tombstoneRetentionMs = 86400000
   val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1
-  val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+  val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
 
   @AfterEach
   def teardown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
index 487b31d438a..16700bcc7aa 100644
--- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
@@ -151,7 +151,7 @@ class LogConcurrencyTest {
       brokerTopicStats = brokerTopicStats,
       time = Time.SYSTEM,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
-      producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+      producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
       producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 52dfa760cd8..78be88cd2d2 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -52,7 +52,7 @@ class LogLoaderTest {
   var config: KafkaConfig = _
   val brokerTopicStats = new BrokerTopicStats
   val maxTransactionTimeoutMs: Int = 5 * 60 * 1000
-  val producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+  val producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
   val producerIdExpirationCheckIntervalMs: Int = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
@@ -255,7 +255,7 @@ class LogLoaderTest {
                         producerIdExpirationCheckIntervalMs: Int = producerIdExpirationCheckIntervalMs,
                         lastShutdownClean: Boolean = true): UnifiedLog = {
     LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, logStartOffset, recoveryPoint,
-      maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs), producerIdExpirationCheckIntervalMs, lastShutdownClean)
+      maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, false), producerIdExpirationCheckIntervalMs, lastShutdownClean)
   }
 
   private def createLogWithOffsetOverflow(logConfig: LogConfig): (UnifiedLog, LogSegment) = {
@@ -408,7 +408,7 @@ class LogLoaderTest {
   @Test
   def testSkipLoadingIfEmptyProducerStateBeforeTruncation(): Unit = {
     val maxTransactionTimeoutMs = 60000
-    val producerStateManagerConfig = new ProducerStateManagerConfig(300000)
+    val producerStateManagerConfig = new ProducerStateManagerConfig(300000, false)
 
     val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
     when(stateManager.producerStateManagerConfig).thenReturn(producerStateManagerConfig)
@@ -517,7 +517,7 @@ class LogLoaderTest {
   @Test
   def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = {
     val maxTransactionTimeoutMs = 60000
-    val producerStateManagerConfig = new ProducerStateManagerConfig(300000)
+    val producerStateManagerConfig = new ProducerStateManagerConfig(300000, false)
 
     val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
     when(stateManager.isEmpty).thenReturn(true)
@@ -571,7 +571,7 @@ class LogLoaderTest {
   @Test
   def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = {
     val maxTransactionTimeoutMs = 60000
-    val producerStateManagerConfig = new ProducerStateManagerConfig(300000)
+    val producerStateManagerConfig = new ProducerStateManagerConfig(300000, false)
 
     val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
     when(stateManager.isEmpty).thenReturn(true)
@@ -623,7 +623,7 @@ class LogLoaderTest {
   @Test
   def testSkipTruncateAndReloadIfNewMessageFormatAndCleanShutdown(): Unit = {
     val maxTransactionTimeoutMs = 60000
-    val producerStateManagerConfig = new ProducerStateManagerConfig(300000)
+    val producerStateManagerConfig = new ProducerStateManagerConfig(300000, false)
 
     val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
     when(stateManager.latestSnapshotOffset).thenReturn(OptionalLong.empty())
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 465bca9af1b..68f6cbb8c5a 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -662,7 +662,7 @@ class LogManagerTest {
     val segmentBytes = 1024
 
     val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0,
-      5 * 60 * 1000, new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs), kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs)
+      5 * 60 * 1000, new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false), kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs)
 
     assertTrue(expectedSegmentsPerLog > 0)
     // calculate numMessages to append to logs. It'll create "expectedSegmentsPerLog" log segments with segment.bytes=1024
@@ -797,7 +797,7 @@ class LogManagerTest {
         logStartOffset = 0,
         recoveryPoint = 0,
         maxTransactionTimeoutMs = 5 * 60 * 1000,
-        producerStateManagerConfig = new ProducerStateManagerConfig(5 * 60 * 1000),
+        producerStateManagerConfig = new ProducerStateManagerConfig(5 * 60 * 1000, false),
         producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
         scheduler = mockTime.scheduler,
         time = mockTime,
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 37ce4c01bf2..c2b108016c8 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -588,7 +588,7 @@ class LogSegmentTest {
       topicPartition,
       logDir,
       5 * 60 * 1000,
-      new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+      new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
       new MockTime()
     )
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 1c7cce3ac99..77ce05b9648 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -87,7 +87,7 @@ object LogTestUtils {
                 logStartOffset: Long = 0L,
                 recoveryPoint: Long = 0L,
                 maxTransactionTimeoutMs: Int = 5 * 60 * 1000,
-                producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+                producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
                 producerIdExpirationCheckIntervalMs: Int = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
                 lastShutdownClean: Boolean = true,
                 topicId: Option[Uuid] = None,
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 0585866ded4..b98d0b3acfc 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -44,7 +44,7 @@ class ProducerStateManagerTest {
   private val partition = new TopicPartition("test", 0)
   private val producerId = 1L
   private val maxTransactionTimeoutMs = 5 * 60 * 1000
-  private val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+  private val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, true)
   private val lateTransactionTimeoutMs = maxTransactionTimeoutMs + ProducerStateManager.LATE_TRANSACTION_BUFFER_MS
   private val time = new MockTime
 
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 7de7288a1c1..9b48944a4f7 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -59,7 +59,7 @@ class UnifiedLogTest {
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
   val mockTime = new MockTime()
-  val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+  val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
   def metricsKeySet = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
 
   @BeforeEach
@@ -484,7 +484,7 @@ class UnifiedLogTest {
     val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L)
 
     // create a log
-    val log = createLog(logDir, logConfig, producerStateManagerConfig = new ProducerStateManagerConfig(24 * 60))
+    val log = createLog(logDir, logConfig, producerStateManagerConfig = new ProducerStateManagerConfig(24 * 60, false))
     assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.")
     // Test the segment rolling behavior when messages do not have a timestamp.
     mockTime.sleep(log.config.segmentMs + 1)
@@ -1174,7 +1174,7 @@ class UnifiedLogTest {
 
   @Test
   def testPeriodicProducerIdExpiration(): Unit = {
-    val producerStateManagerConfig = new ProducerStateManagerConfig(200)
+    val producerStateManagerConfig = new ProducerStateManagerConfig(200, false)
     val producerIdExpirationCheckIntervalMs = 100
 
     val pid = 23L
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 36e6abfe588..fe65b754572 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2394,7 +2394,7 @@ class ReplicaManagerTest {
     val segments = new LogSegments(tp)
     val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.recordVersion, "")
     val producerStateManager = new ProducerStateManager(tp, logDir,
-      maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs), time)
+      maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, true), time)
     val offsets = new LogLoader(
       logDir,
       tp,
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 4114330b058..cafad1658ad 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -75,7 +75,7 @@ class DumpLogSegmentsTest {
       time = time,
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
-      producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+      producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
       producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index 73bd36b4ecf..a8cd7411ea5 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -138,7 +138,7 @@ class SchedulerTest {
     val segments = new LogSegments(topicPartition)
     val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "")
     val producerStateManager = new ProducerStateManager(topicPartition, logDir,
-      maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs), mockTime)
+      maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, false), mockTime)
     val offsets = new LogLoader(
       logDir,
       topicPartition,
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index fcbd4907216..a3e13f49c56 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1373,7 +1373,7 @@ object TestUtils extends Logging {
                    flushStartOffsetCheckpointMs = 10000L,
                    retentionCheckMs = 1000L,
                    maxTransactionTimeoutMs = 5 * 60 * 1000,
-                   producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+                   producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
                    producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
                    scheduler = time.scheduler,
                    time = time,
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index de0d73beb6e..59afc76bff4 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -147,7 +147,7 @@ public class ReplicaFetcherThreadBenchmark {
             setFlushRecoveryOffsetCheckpointMs(10000L).
             setFlushStartOffsetCheckpointMs(10000L).
             setRetentionCheckMs(1000L).
-            setMaxProducerIdExpirationMs(60000).
+            setProducerStateManagerConfig(60000, false).
             setInterBrokerProtocolVersion(MetadataVersion.latest()).
             setScheduler(scheduler).
             setBrokerTopicStats(brokerTopicStats).
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index f92ee1cb1ab..0e051ac5d89 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -107,7 +107,7 @@ public class PartitionMakeFollowerBenchmark {
             setFlushRecoveryOffsetCheckpointMs(10000L).
             setFlushStartOffsetCheckpointMs(10000L).
             setRetentionCheckMs(1000L).
-            setMaxProducerIdExpirationMs(60000).
+            setProducerStateManagerConfig(60000, false).
             setInterBrokerProtocolVersion(MetadataVersion.latest()).
             setScheduler(scheduler).
             setBrokerTopicStats(brokerTopicStats).
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 68f82a22c0a..a8fec969c87 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -97,7 +97,7 @@ public class UpdateFollowerFetchStateBenchmark {
             setFlushRecoveryOffsetCheckpointMs(10000L).
             setFlushStartOffsetCheckpointMs(10000L).
             setRetentionCheckMs(1000L).
-            setMaxProducerIdExpirationMs(60000).
+            setProducerStateManagerConfig(60000, false).
             setInterBrokerProtocolVersion(MetadataVersion.latest()).
             setScheduler(scheduler).
             setBrokerTopicStats(brokerTopicStats).
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index 8c0500ba6e2..befa467e224 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -133,7 +133,7 @@ public class PartitionCreationBench {
             setFlushRecoveryOffsetCheckpointMs(10000L).
             setFlushStartOffsetCheckpointMs(10000L).
             setRetentionCheckMs(1000L).
-            setMaxProducerIdExpirationMs(60000).
+            setProducerStateManagerConfig(60000, false).
             setInterBrokerProtocolVersion(MetadataVersion.latest()).
             setScheduler(scheduler).
             setBrokerTopicStats(brokerTopicStats).
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
index 2f7946f95f5..53e4c010d9d 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
@@ -22,10 +22,13 @@ import java.util.Set;
 public class ProducerStateManagerConfig {
     public static final String PRODUCER_ID_EXPIRATION_MS = "producer.id.expiration.ms";
     public static final Set<String> RECONFIGURABLE_CONFIGS = Collections.singleton(PRODUCER_ID_EXPIRATION_MS);
+
     private volatile int producerIdExpirationMs;
+    private boolean transactionVerificationEnabled;
 
-    public ProducerStateManagerConfig(int producerIdExpirationMs) {
+    public ProducerStateManagerConfig(int producerIdExpirationMs, boolean transactionVerificationEnabled) {
         this.producerIdExpirationMs = producerIdExpirationMs;
+        this.transactionVerificationEnabled = transactionVerificationEnabled;
     }
 
     public void setProducerIdExpirationMs(int producerIdExpirationMs) {
@@ -35,4 +38,8 @@ public class ProducerStateManagerConfig {
     public int producerIdExpirationMs() {
         return producerIdExpirationMs;
     }
+
+    public boolean transactionVerificationEnabled() {
+        return transactionVerificationEnabled;
+    }
 }