You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2023/05/30 20:46:29 UTC
[kafka] branch trunk updated: MINOR: Add transaction verification config to producerStateManager config (#13770)
This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9edf2ec5cce MINOR: Add transaction verification config to producerStateManager config (#13770)
9edf2ec5cce is described below
commit 9edf2ec5cce7bd92f3a7fb0617774374b681cd2c
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Tue May 30 13:46:17 2023 -0700
MINOR: Add transaction verification config to producerStateManager config (#13770)
I have moved this config into producer state manager so it can be checked easily under the log lock when we are about to append.
Only a few test files currently use the validation and those have been verified to work via running the tests.
Reviews: David Jacot <dj...@confluent.io>
---
.../main/java/kafka/server/builders/LogManagerBuilder.java | 6 +++---
core/src/main/scala/kafka/log/LogManager.scala | 2 +-
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala | 2 +-
core/src/test/scala/other/kafka/StressTestLog.scala | 2 +-
core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala | 2 +-
.../test/scala/unit/kafka/cluster/PartitionLockTest.scala | 2 +-
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala | 2 +-
.../unit/kafka/log/AbstractLogCleanerIntegrationTest.scala | 2 +-
.../test/scala/unit/kafka/log/BrokerCompressionTest.scala | 2 +-
.../test/scala/unit/kafka/log/LogCleanerManagerTest.scala | 2 +-
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala | 2 +-
core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala | 2 +-
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala | 12 ++++++------
core/src/test/scala/unit/kafka/log/LogManagerTest.scala | 4 ++--
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala | 2 +-
core/src/test/scala/unit/kafka/log/LogTestUtils.scala | 2 +-
.../test/scala/unit/kafka/log/ProducerStateManagerTest.scala | 2 +-
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala | 6 +++---
.../test/scala/unit/kafka/server/ReplicaManagerTest.scala | 2 +-
.../test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala | 2 +-
core/src/test/scala/unit/kafka/utils/SchedulerTest.scala | 2 +-
core/src/test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
.../kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 2 +-
.../kafka/jmh/partition/PartitionMakeFollowerBenchmark.java | 2 +-
.../jmh/partition/UpdateFollowerFetchStateBenchmark.java | 2 +-
.../org/apache/kafka/jmh/server/PartitionCreationBench.java | 2 +-
.../storage/internals/log/ProducerStateManagerConfig.java | 9 ++++++++-
27 files changed, 44 insertions(+), 37 deletions(-)
diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
index 2bbd0e9bb44..43a6c73b91e 100644
--- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
@@ -46,7 +46,7 @@ public class LogManagerBuilder {
private long flushStartOffsetCheckpointMs = 10000L;
private long retentionCheckMs = 1000L;
private int maxTransactionTimeoutMs = 15 * 60 * 1000;
- private ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(60000);
+ private ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(60000, false);
private int producerIdExpirationCheckIntervalMs = 600000;
private MetadataVersion interBrokerProtocolVersion = MetadataVersion.latest();
private Scheduler scheduler = null;
@@ -110,8 +110,8 @@ public class LogManagerBuilder {
return this;
}
- public LogManagerBuilder setMaxProducerIdExpirationMs(int maxProducerIdExpirationMs) {
- this.producerStateManagerConfig = new ProducerStateManagerConfig(maxProducerIdExpirationMs);
+ public LogManagerBuilder setProducerStateManagerConfig(int maxProducerIdExpirationMs, boolean transactionVerificationEnabled) {
+ this.producerStateManagerConfig = new ProducerStateManagerConfig(maxProducerIdExpirationMs, transactionVerificationEnabled);
return this;
}
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 381872acd2f..2d1ba35a6d2 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1391,7 +1391,7 @@ object LogManager {
flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
maxTransactionTimeoutMs = config.transactionMaxTimeoutMs,
- producerStateManagerConfig = new ProducerStateManagerConfig(config.producerIdExpirationMs),
+ producerStateManagerConfig = new ProducerStateManagerConfig(config.producerIdExpirationMs, config.transactionPartitionVerificationEnable),
producerIdExpirationCheckIntervalMs = config.producerIdExpirationCheckIntervalMs,
scheduler = kafkaScheduler,
brokerTopicStats = brokerTopicStats,
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index ed97f07073b..bb7ed1f37ad 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -603,7 +603,7 @@ object KafkaMetadataLog extends Logging {
brokerTopicStats = new BrokerTopicStats,
time = time,
maxTransactionTimeoutMs = Int.MaxValue,
- producerStateManagerConfig = new ProducerStateManagerConfig(Int.MaxValue),
+ producerStateManagerConfig = new ProducerStateManagerConfig(Int.MaxValue, false),
producerIdExpirationCheckIntervalMs = Int.MaxValue,
logDirFailureChannel = new LogDirFailureChannel(5),
lastShutdownClean = false,
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 2176e9f3b21..631a3afcebf 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -50,7 +50,7 @@ object StressTestLog {
scheduler = time.scheduler,
time = time,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+ producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
brokerTopicStats = new BrokerTopicStats,
logDirFailureChannel = new LogDirFailureChannel(10),
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 315dc75809c..085dbbcf790 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -220,7 +220,7 @@ object TestLinearWriteSpeed {
brokerTopicStats = new BrokerTopicStats,
time = Time.SYSTEM,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+ producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 283d98385c4..11d2fcb0d92 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -297,7 +297,7 @@ class PartitionLockTest extends Logging {
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
val maxTransactionTimeout = 5 * 60 * 1000
- val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+ val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
val producerStateManager = new ProducerStateManager(
log.topicPartition,
log.dir,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index b7ff476988f..f450c271fa9 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -431,7 +431,7 @@ class PartitionTest extends AbstractPartitionTest {
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
val maxTransactionTimeoutMs = 5 * 60 * 1000
- val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+ val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, true)
val producerStateManager = new ProducerStateManager(
log.topicPartition,
log.dir,
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index c411f6d37b6..86f62bc9746 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -112,7 +112,7 @@ abstract class AbstractLogCleanerIntegrationTest {
time = time,
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+ producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 15d24224c71..e8962b9a893 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -62,7 +62,7 @@ class BrokerCompressionTest {
time = time,
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+ producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 0c9a13597c9..cc52b32bcd3 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -50,7 +50,7 @@ class LogCleanerManagerTest extends Logging {
val logConfig: LogConfig = new LogConfig(logProps)
val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
val offset = 999
- val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+ val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
val cleanerCheckpoints: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 6a1f0f1b911..79054b6d926 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -58,7 +58,7 @@ class LogCleanerTest {
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
val tombstoneRetentionMs = 86400000
val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1
- val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+ val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
@AfterEach
def teardown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
index 487b31d438a..16700bcc7aa 100644
--- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
@@ -151,7 +151,7 @@ class LogConcurrencyTest {
brokerTopicStats = brokerTopicStats,
time = Time.SYSTEM,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+ producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 52dfa760cd8..78be88cd2d2 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -52,7 +52,7 @@ class LogLoaderTest {
var config: KafkaConfig = _
val brokerTopicStats = new BrokerTopicStats
val maxTransactionTimeoutMs: Int = 5 * 60 * 1000
- val producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+ val producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
val producerIdExpirationCheckIntervalMs: Int = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs
val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
@@ -255,7 +255,7 @@ class LogLoaderTest {
producerIdExpirationCheckIntervalMs: Int = producerIdExpirationCheckIntervalMs,
lastShutdownClean: Boolean = true): UnifiedLog = {
LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, logStartOffset, recoveryPoint,
- maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs), producerIdExpirationCheckIntervalMs, lastShutdownClean)
+ maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, false), producerIdExpirationCheckIntervalMs, lastShutdownClean)
}
private def createLogWithOffsetOverflow(logConfig: LogConfig): (UnifiedLog, LogSegment) = {
@@ -408,7 +408,7 @@ class LogLoaderTest {
@Test
def testSkipLoadingIfEmptyProducerStateBeforeTruncation(): Unit = {
val maxTransactionTimeoutMs = 60000
- val producerStateManagerConfig = new ProducerStateManagerConfig(300000)
+ val producerStateManagerConfig = new ProducerStateManagerConfig(300000, false)
val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
when(stateManager.producerStateManagerConfig).thenReturn(producerStateManagerConfig)
@@ -517,7 +517,7 @@ class LogLoaderTest {
@Test
def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = {
val maxTransactionTimeoutMs = 60000
- val producerStateManagerConfig = new ProducerStateManagerConfig(300000)
+ val producerStateManagerConfig = new ProducerStateManagerConfig(300000, false)
val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
when(stateManager.isEmpty).thenReturn(true)
@@ -571,7 +571,7 @@ class LogLoaderTest {
@Test
def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = {
val maxTransactionTimeoutMs = 60000
- val producerStateManagerConfig = new ProducerStateManagerConfig(300000)
+ val producerStateManagerConfig = new ProducerStateManagerConfig(300000, false)
val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
when(stateManager.isEmpty).thenReturn(true)
@@ -623,7 +623,7 @@ class LogLoaderTest {
@Test
def testSkipTruncateAndReloadIfNewMessageFormatAndCleanShutdown(): Unit = {
val maxTransactionTimeoutMs = 60000
- val producerStateManagerConfig = new ProducerStateManagerConfig(300000)
+ val producerStateManagerConfig = new ProducerStateManagerConfig(300000, false)
val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
when(stateManager.latestSnapshotOffset).thenReturn(OptionalLong.empty())
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 465bca9af1b..68f6cbb8c5a 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -662,7 +662,7 @@ class LogManagerTest {
val segmentBytes = 1024
val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0,
- 5 * 60 * 1000, new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs), kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs)
+ 5 * 60 * 1000, new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false), kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs)
assertTrue(expectedSegmentsPerLog > 0)
// calculate numMessages to append to logs. It'll create "expectedSegmentsPerLog" log segments with segment.bytes=1024
@@ -797,7 +797,7 @@ class LogManagerTest {
logStartOffset = 0,
recoveryPoint = 0,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new ProducerStateManagerConfig(5 * 60 * 1000),
+ producerStateManagerConfig = new ProducerStateManagerConfig(5 * 60 * 1000, false),
producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
scheduler = mockTime.scheduler,
time = mockTime,
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 37ce4c01bf2..c2b108016c8 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -588,7 +588,7 @@ class LogSegmentTest {
topicPartition,
logDir,
5 * 60 * 1000,
- new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+ new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
new MockTime()
)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 1c7cce3ac99..77ce05b9648 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -87,7 +87,7 @@ object LogTestUtils {
logStartOffset: Long = 0L,
recoveryPoint: Long = 0L,
maxTransactionTimeoutMs: Int = 5 * 60 * 1000,
- producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+ producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
producerIdExpirationCheckIntervalMs: Int = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
lastShutdownClean: Boolean = true,
topicId: Option[Uuid] = None,
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 0585866ded4..b98d0b3acfc 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -44,7 +44,7 @@ class ProducerStateManagerTest {
private val partition = new TopicPartition("test", 0)
private val producerId = 1L
private val maxTransactionTimeoutMs = 5 * 60 * 1000
- private val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+ private val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, true)
private val lateTransactionTimeoutMs = maxTransactionTimeoutMs + ProducerStateManager.LATE_TRANSACTION_BUFFER_MS
private val time = new MockTime
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 7de7288a1c1..9b48944a4f7 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -59,7 +59,7 @@ class UnifiedLogTest {
val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val mockTime = new MockTime()
- val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs)
+ val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
def metricsKeySet = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
@BeforeEach
@@ -484,7 +484,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L)
// create a log
- val log = createLog(logDir, logConfig, producerStateManagerConfig = new ProducerStateManagerConfig(24 * 60))
+ val log = createLog(logDir, logConfig, producerStateManagerConfig = new ProducerStateManagerConfig(24 * 60, false))
assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.")
// Test the segment rolling behavior when messages do not have a timestamp.
mockTime.sleep(log.config.segmentMs + 1)
@@ -1174,7 +1174,7 @@ class UnifiedLogTest {
@Test
def testPeriodicProducerIdExpiration(): Unit = {
- val producerStateManagerConfig = new ProducerStateManagerConfig(200)
+ val producerStateManagerConfig = new ProducerStateManagerConfig(200, false)
val producerIdExpirationCheckIntervalMs = 100
val pid = 23L
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 36e6abfe588..fe65b754572 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2394,7 +2394,7 @@ class ReplicaManagerTest {
val segments = new LogSegments(tp)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.recordVersion, "")
val producerStateManager = new ProducerStateManager(tp, logDir,
- maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs), time)
+ maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, true), time)
val offsets = new LogLoader(
logDir,
tp,
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 4114330b058..cafad1658ad 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -75,7 +75,7 @@ class DumpLogSegmentsTest {
time = time,
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+ producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index 73bd36b4ecf..a8cd7411ea5 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -138,7 +138,7 @@ class SchedulerTest {
val segments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, logDir,
- maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs), mockTime)
+ maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, false), mockTime)
val offsets = new LogLoader(
logDir,
topicPartition,
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index fcbd4907216..a3e13f49c56 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1373,7 +1373,7 @@ object TestUtils extends Logging {
flushStartOffsetCheckpointMs = 10000L,
retentionCheckMs = 1000L,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs),
+ producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
scheduler = time.scheduler,
time = time,
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index de0d73beb6e..59afc76bff4 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -147,7 +147,7 @@ public class ReplicaFetcherThreadBenchmark {
setFlushRecoveryOffsetCheckpointMs(10000L).
setFlushStartOffsetCheckpointMs(10000L).
setRetentionCheckMs(1000L).
- setMaxProducerIdExpirationMs(60000).
+ setProducerStateManagerConfig(60000, false).
setInterBrokerProtocolVersion(MetadataVersion.latest()).
setScheduler(scheduler).
setBrokerTopicStats(brokerTopicStats).
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index f92ee1cb1ab..0e051ac5d89 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -107,7 +107,7 @@ public class PartitionMakeFollowerBenchmark {
setFlushRecoveryOffsetCheckpointMs(10000L).
setFlushStartOffsetCheckpointMs(10000L).
setRetentionCheckMs(1000L).
- setMaxProducerIdExpirationMs(60000).
+ setProducerStateManagerConfig(60000, false).
setInterBrokerProtocolVersion(MetadataVersion.latest()).
setScheduler(scheduler).
setBrokerTopicStats(brokerTopicStats).
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 68f82a22c0a..a8fec969c87 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -97,7 +97,7 @@ public class UpdateFollowerFetchStateBenchmark {
setFlushRecoveryOffsetCheckpointMs(10000L).
setFlushStartOffsetCheckpointMs(10000L).
setRetentionCheckMs(1000L).
- setMaxProducerIdExpirationMs(60000).
+ setProducerStateManagerConfig(60000, false).
setInterBrokerProtocolVersion(MetadataVersion.latest()).
setScheduler(scheduler).
setBrokerTopicStats(brokerTopicStats).
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index 8c0500ba6e2..befa467e224 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -133,7 +133,7 @@ public class PartitionCreationBench {
setFlushRecoveryOffsetCheckpointMs(10000L).
setFlushStartOffsetCheckpointMs(10000L).
setRetentionCheckMs(1000L).
- setMaxProducerIdExpirationMs(60000).
+ setProducerStateManagerConfig(60000, false).
setInterBrokerProtocolVersion(MetadataVersion.latest()).
setScheduler(scheduler).
setBrokerTopicStats(brokerTopicStats).
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
index 2f7946f95f5..53e4c010d9d 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
@@ -22,10 +22,13 @@ import java.util.Set;
public class ProducerStateManagerConfig {
public static final String PRODUCER_ID_EXPIRATION_MS = "producer.id.expiration.ms";
public static final Set<String> RECONFIGURABLE_CONFIGS = Collections.singleton(PRODUCER_ID_EXPIRATION_MS);
+
private volatile int producerIdExpirationMs;
+ private boolean transactionVerificationEnabled;
- public ProducerStateManagerConfig(int producerIdExpirationMs) {
+ public ProducerStateManagerConfig(int producerIdExpirationMs, boolean transactionVerificationEnabled) {
this.producerIdExpirationMs = producerIdExpirationMs;
+ this.transactionVerificationEnabled = transactionVerificationEnabled;
}
public void setProducerIdExpirationMs(int producerIdExpirationMs) {
@@ -35,4 +38,8 @@ public class ProducerStateManagerConfig {
public int producerIdExpirationMs() {
return producerIdExpirationMs;
}
+
+ public boolean transactionVerificationEnabled() {
+ return transactionVerificationEnabled;
+ }
}