You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/08/22 08:56:33 UTC

[kafka] branch trunk updated: KAFKA-14097: Separate configuration for producer ID expiry (KIP-854) (#12501)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b392cf212f KAFKA-14097: Separate configuration for producer ID expiry (KIP-854) (#12501)
b392cf212f is described below

commit b392cf212f7ed4a82b79c3690b488619c027dba9
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Mon Aug 22 01:56:05 2022 -0700

    KAFKA-14097: Separate configuration for producer ID expiry (KIP-854) (#12501)
    
    This patch implements "KIP-854: Separate configuration for producer ID expiry" as described here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../kafka/server/builders/LogManagerBuilder.java   |   2 +
 core/src/main/scala/kafka/log/LogManager.scala     |   9 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  20 +++-
 ...onTest.scala => ProducerIdExpirationTest.scala} | 124 ++++++++++++++++-----
 .../kafka/api/TransactionsExpirationTest.scala     | 112 +++++++++++++++++--
 .../src/test/scala/other/kafka/StressTestLog.scala |   2 +-
 .../scala/other/kafka/TestLinearWriteSpeed.scala   |   2 +-
 .../log/AbstractLogCleanerIntegrationTest.scala    |   2 +-
 .../unit/kafka/log/BrokerCompressionTest.scala     |   2 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     |   7 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |   5 +-
 .../scala/unit/kafka/log/LogConcurrencyTest.scala  |   2 +-
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  |  10 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   4 +-
 .../test/scala/unit/kafka/log/LogTestUtils.scala   |   2 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala |   2 +-
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     |   4 +-
 .../scala/unit/kafka/utils/SchedulerTest.scala     |   5 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   1 +
 19 files changed, 248 insertions(+), 69 deletions(-)

diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
index 6b6bd919fe..0a7d692a58 100644
--- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
@@ -46,6 +46,7 @@ public class LogManagerBuilder {
     private long retentionCheckMs = 1000L;
     private int maxTransactionTimeoutMs = 15 * 60 * 1000;
     private int maxPidExpirationMs = 60000;
+    private int producerIdExpirationCheckIntervalMs = 600000;
     private MetadataVersion interBrokerProtocolVersion = MetadataVersion.latest();
     private Scheduler scheduler = null;
     private BrokerTopicStats brokerTopicStats = null;
@@ -164,6 +165,7 @@ public class LogManagerBuilder {
                               retentionCheckMs,
                               maxTransactionTimeoutMs,
                               maxPidExpirationMs,
+                              producerIdExpirationCheckIntervalMs,
                               interBrokerProtocolVersion,
                               scheduler,
                               brokerTopicStats,
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 886f56c63c..9d779323e5 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -66,6 +66,7 @@ class LogManager(logDirs: Seq[File],
                  val retentionCheckMs: Long,
                  val maxTransactionTimeoutMs: Int,
                  val maxPidExpirationMs: Int,
+                 val producerIdExpirationCheckIntervalMs: Int,
                  interBrokerProtocolVersion: MetadataVersion,
                  scheduler: Scheduler,
                  brokerTopicStats: BrokerTopicStats,
@@ -276,7 +277,7 @@ class LogManager(logDirs: Seq[File],
       recoveryPoint = logRecoveryPoint,
       maxTransactionTimeoutMs = maxTransactionTimeoutMs,
       maxProducerIdExpirationMs = maxPidExpirationMs,
-      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs,
       scheduler = scheduler,
       time = time,
       brokerTopicStats = brokerTopicStats,
@@ -950,7 +951,7 @@ class LogManager(logDirs: Seq[File],
           recoveryPoint = 0L,
           maxTransactionTimeoutMs = maxTransactionTimeoutMs,
           maxProducerIdExpirationMs = maxPidExpirationMs,
-          producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+          producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs,
           scheduler = scheduler,
           time = time,
           brokerTopicStats = brokerTopicStats,
@@ -1347,7 +1348,6 @@ object LogManager {
 
   val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
   val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
-  val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000
 
   def apply(config: KafkaConfig,
             initialOfflineDirs: Seq[String],
@@ -1375,7 +1375,8 @@ object LogManager {
       flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
       retentionCheckMs = config.logCleanupIntervalMs,
       maxTransactionTimeoutMs = config.transactionMaxTimeoutMs,
-      maxPidExpirationMs = config.transactionalIdExpirationMs,
+      maxPidExpirationMs = config.producerIdExpirationMs,
+      producerIdExpirationCheckIntervalMs = config.producerIdExpirationCheckIntervalMs,
       scheduler = kafkaScheduler,
       brokerTopicStats = brokerTopicStats,
       logDirFailureChannel = logDirFailureChannel,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 860056f9a3..0c222f92e9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -211,6 +211,9 @@ object Defaults {
   val TransactionsAbortTimedOutTransactionsCleanupIntervalMS = TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs
   val TransactionsRemoveExpiredTransactionsCleanupIntervalMS = TransactionStateManager.DefaultRemoveExpiredTransactionalIdsIntervalMs
 
+  val ProducerIdExpirationMs = 86400000
+  val ProducerIdExpirationCheckIntervalMs = 600000
+
   /** ********* Fetch Configuration **************/
   val MaxIncrementalFetchSessionCacheSlots = 1000
   val FetchMaxBytes = 55 * 1024 * 1024
@@ -534,6 +537,9 @@ object KafkaConfig {
   val TransactionsAbortTimedOutTransactionCleanupIntervalMsProp = "transaction.abort.timed.out.transaction.cleanup.interval.ms"
   val TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp = "transaction.remove.expired.transaction.cleanup.interval.ms"
 
+  val ProducerIdExpirationMsProp = "producer.id.expiration.ms"
+  val ProducerIdExpirationCheckIntervalMsProp = "producer.id.expiration.check.ms"
+
   /** ********* Fetch Configuration **************/
   val MaxIncrementalFetchSessionCacheSlots = "max.incremental.fetch.session.cache.slots"
   val FetchMaxBytes = "fetch.max.bytes"
@@ -952,8 +958,7 @@ object KafkaConfig {
   val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden"
   /** ********* Transaction management configuration ***********/
   val TransactionalIdExpirationMsDoc = "The time in ms that the transaction coordinator will wait without receiving any transaction status updates " +
-    "for the current transaction before expiring its transactional id. This setting also influences producer id expiration - producer ids are expired " +
-    "once this time has elapsed after the last write with the given producer id. Note that producer ids may expire sooner if the last write from the producer id is deleted due to the topic's retention settings."
+    "for the current transaction before expiring its transactional id. Transactional IDs will not expire while a the transaction is still ongoing."
   val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for transactions. " +
     "If a client’s requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction."
   val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + " config for the transaction topic."
@@ -965,6 +970,11 @@ object KafkaConfig {
   val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at which to rollback transactions that have timed out"
   val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at which to remove transactions that have expired due to <code>transactional.id.expiration.ms</code> passing"
 
+  val ProducerIdExpirationMsDoc = "The time in ms that a topic partition leader will wait before expiring producer IDs. Producer IDs will not expire while a transaction associated to them is still ongoing. " +
+    "Note that producer IDs may expire sooner if the last write from the producer ID is deleted due to the topic's retention settings. Setting this value the same or higher than " +
+    "<code>delivery.timeout.ms</code> can help prevent expiration during retries and protect against message duplication, but the default should be reasonable for most use cases."
+  val ProducerIdExpirationCheckIntervalMsDoc = "The interval at which to remove producer IDs that have expired due to <code>producer.id.expiration.ms</code> passing"
+
   /** ********* Fetch Configuration **************/
   val MaxIncrementalFetchSessionCacheSlotsDoc = "The maximum number of incremental fetch sessions that we will maintain."
   val FetchMaxBytesDoc = "The maximum number of bytes we will return for a fetch request. Must be at least 1024."
@@ -1288,6 +1298,10 @@ object KafkaConfig {
       .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TransactionsAbortTimedOutTransactionsCleanupIntervalMS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc)
       .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TransactionsRemoveExpiredTransactionsCleanupIntervalMS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc)
 
+      .define(ProducerIdExpirationMsProp, INT, Defaults.ProducerIdExpirationMs, atLeast(1), LOW, ProducerIdExpirationMsDoc)
+      // Configuration for testing only as default value should be sufficient for typical usage
+      .defineInternal(ProducerIdExpirationCheckIntervalMsProp, INT, Defaults.ProducerIdExpirationCheckIntervalMs, atLeast(1), LOW, ProducerIdExpirationMsDoc)
+
       /** ********* Fetch Configuration **************/
       .define(MaxIncrementalFetchSessionCacheSlots, INT, Defaults.MaxIncrementalFetchSessionCacheSlots, atLeast(0), MEDIUM, MaxIncrementalFetchSessionCacheSlotsDoc)
       .define(FetchMaxBytes, INT, Defaults.FetchMaxBytes, atLeast(1024), MEDIUM, FetchMaxBytesDoc)
@@ -1849,6 +1863,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
   val transactionAbortTimedOutTransactionCleanupIntervalMs = getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp)
   val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp)
 
+  val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)
+  val producerIdExpirationCheckIntervalMs = getInt(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp)
 
   /** ********* Metric Configuration **************/
   val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
similarity index 51%
copy from core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
copy to core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
index ddf3a97460..376bf1560a 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
@@ -17,15 +17,19 @@
 
 package kafka.api
 
-import java.util.Properties
+import java.util.{Collections, Properties}
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.utils.TestUtils.consumeRecords
+import kafka.utils.TestUtils.{consumeRecords, createAdminClient}
+import org.apache.kafka.clients.admin.{Admin, ProducerState}
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.common.errors.InvalidPidMappingException
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{InvalidPidMappingException, TransactionalIdNotFoundException}
+import org.apache.kafka.test.{TestUtils => JTestUtils}
+import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -33,15 +37,15 @@ import org.junit.jupiter.params.provider.ValueSource
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 
-// Test class that uses a very small transaction timeout to trigger InvalidPidMapping errors
-class TransactionsExpirationTest extends KafkaServerTestHarness {
+class ProducerIdExpirationTest extends KafkaServerTestHarness {
   val topic1 = "topic1"
-  val topic2 = "topic2"
-  val numPartitions = 4
+  val numPartitions = 1
   val replicationFactor = 3
+  val tp0 = new TopicPartition(topic1, 0)
 
   var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
   var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
+  var admin: Admin = _
 
   override def generateConfigs: Seq[KafkaConfig] = {
     TestUtils.createBrokerConfigs(3, zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
@@ -50,66 +54,124 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-
-    producer = TestUtils.createTransactionalProducer("transactionalProducer", brokers)
     consumer = TestUtils.createConsumer(bootstrapServers(),
       enableAutoCommit = false,
       readCommitted = true)
+    admin = createAdminClient(brokers, listenerName)
 
     createTopic(topic1, numPartitions, 3)
-    createTopic(topic2, numPartitions, 3)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
-    producer.close()
-    consumer.close()
+    if (producer != null)
+      producer.close()
+    if (consumer != null)
+      consumer.close()
+    if (admin != null)
+      admin.close()
 
     super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testBumpTransactionalEpochAfterInvalidProducerIdMapping(quorum: String): Unit = {
+  def testProducerIdExpirationWithNoTransactions(quorum: String): Unit = {
+    producer = TestUtils.createProducer(bootstrapServers(), enableIdempotence = true)
+
+    // Send records to populate producer state cache.
+    producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes, "value".getBytes))
+    producer.flush()
+
+    // Ensure producer IDs are added.
+    ensureConsistentKRaftMetadata()
+    assertEquals(1, producerState.size)
+
+    // Wait for the producer ID to expire.
+    TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer ID did not expire.")
+
+    // Send more records to send producer ID back to brokers.
+    producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes, "value".getBytes))
+    producer.flush()
+
+    // Producer IDs should repopulate.
+    assertEquals(1, producerState.size)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionAfterTransactionIdExpiresButProducerIdRemains(quorum: String): Unit = {
+    producer = TestUtils.createTransactionalProducer("transactionalProducer", brokers)
     producer.initTransactions()
 
-    // Start and then abort a transaction to allow the transactional ID to expire
+    // Start and then abort a transaction to allow the producer ID to expire.
     producer.beginTransaction()
     producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "2", "2", willBeCommitted = false))
-    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 0, "4", "4", willBeCommitted = false))
+    producer.flush()
+
+    // Ensure producer IDs are added.
+    assertEquals(1, producerState.size)
+
     producer.abortTransaction()
 
-    // Wait for the transactional ID to expire
-    Thread.sleep(3000)
+    // Wait for the transactional ID to expire.
+    waitUntilTransactionalStateExpires()
 
-    // Start a new transaction and attempt to send, which will trigger an AddPartitionsToTxnRequest, which will fail due to the expired producer ID
+    // Producer IDs should be retained.
+    assertEquals(1, producerState.size)
+
+    // Start a new transaction and attempt to send, which will trigger an AddPartitionsToTxnRequest, which will fail due to the expired transactional ID.
     producer.beginTransaction()
-    val failedFuture = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, "1", "1", willBeCommitted = false))
-    Thread.sleep(500)
+    val failedFuture = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "1", "1", willBeCommitted = false))
+    TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never completed.")
 
-    org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, classOf[InvalidPidMappingException])
+    JTestUtils.assertFutureThrows(failedFuture, classOf[InvalidPidMappingException])
     producer.abortTransaction()
 
     producer.beginTransaction()
-    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "2", "2", willBeCommitted = true))
-    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 2, "4", "4", willBeCommitted = true))
-    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "1", "1", willBeCommitted = true))
-    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, "3", "3", willBeCommitted = true))
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", willBeCommitted = true))
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "3", "3", willBeCommitted = true))
+
+    // Producer IDs should be retained.
+    assertEquals(1, producerState.size)
+
     producer.commitTransaction()
 
-    consumer.subscribe(List(topic1, topic2).asJava)
+    // Check we can still consume the transaction.
+    consumer.subscribe(List(topic1).asJava)
 
-    val records = consumeRecords(consumer, 4)
+    val records = consumeRecords(consumer, 2)
     records.foreach { record =>
       TestUtils.assertCommittedAndGetValue(record)
     }
   }
 
+  private def producerState: List[ProducerState] = {
+    val describeResult = admin.describeProducers(Collections.singletonList(tp0))
+    val activeProducers = describeResult.partitionResult(tp0).get().activeProducers
+    activeProducers.asScala.toList
+  }
+
+  private def waitUntilTransactionalStateExpires(): Unit = {
+    TestUtils.waitUntilTrue(() =>  {
+      var removedTransactionState = false
+      val txnDescribeResult = admin.describeTransactions(Collections.singletonList("transactionalProducer")).description("transactionalProducer")
+      try {
+        txnDescribeResult.get()
+      } catch {
+        case e: Exception => {
+          removedTransactionState = e.getCause.isInstanceOf[TransactionalIdNotFoundException]
+        }
+      }
+      removedTransactionState
+    }, "Transaction state never expired.")
+  }
+
   private def serverProps(): Properties = {
     val serverProps = new Properties()
     serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
     // Set a smaller value for the number of partitions for the __consumer_offsets topic
-    // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
+    // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long.
     serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
     serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
     serverProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString)
@@ -119,8 +181,10 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
     serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
     serverProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
     serverProps.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, "200")
-    serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "2000")
+    serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "500")
     serverProps.put(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, "500")
+    serverProps.put(KafkaConfig.ProducerIdExpirationMsProp, "2000")
+    serverProps.put(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp, "500")
     serverProps
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
index ddf3a97460..79fc67e4c5 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
@@ -17,15 +17,18 @@
 
 package kafka.api
 
-import java.util.Properties
+import java.util.{Collections, Properties}
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.utils.TestUtils.consumeRecords
+import kafka.utils.TestUtils.{consumeRecords, createAdminClient}
+import org.apache.kafka.clients.admin.{Admin, ProducerState}
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.common.errors.InvalidPidMappingException
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{InvalidPidMappingException, TransactionalIdNotFoundException}
+import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -39,9 +42,11 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
   val topic2 = "topic2"
   val numPartitions = 4
   val replicationFactor = 3
+  val tp0 = new TopicPartition(topic1, 0)
 
   var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
   var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
+  var admin: Admin = _
 
   override def generateConfigs: Seq[KafkaConfig] = {
     TestUtils.createBrokerConfigs(3, zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
@@ -55,6 +60,7 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
     consumer = TestUtils.createConsumer(bootstrapServers(),
       enableAutoCommit = false,
       readCommitted = true)
+    admin = createAdminClient(brokers, listenerName)
 
     createTopic(topic1, numPartitions, 3)
     createTopic(topic2, numPartitions, 3)
@@ -62,8 +68,12 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
 
   @AfterEach
   override def tearDown(): Unit = {
-    producer.close()
-    consumer.close()
+    if (producer != null)
+      producer.close()
+    if (consumer != null)
+      consumer.close()
+    if (admin != null)
+      admin.close()
 
     super.tearDown()
   }
@@ -73,19 +83,20 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
   def testBumpTransactionalEpochAfterInvalidProducerIdMapping(quorum: String): Unit = {
     producer.initTransactions()
 
-    // Start and then abort a transaction to allow the transactional ID to expire
+    // Start and then abort a transaction to allow the transactional ID to expire.
     producer.beginTransaction()
     producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "2", "2", willBeCommitted = false))
     producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 0, "4", "4", willBeCommitted = false))
     producer.abortTransaction()
 
-    // Wait for the transactional ID to expire
-    Thread.sleep(3000)
+    // Check the transactional state exists and then wait for it to expire.
+    waitUntilTransactionalStateExists()
+    waitUntilTransactionalStateExpires()
 
-    // Start a new transaction and attempt to send, which will trigger an AddPartitionsToTxnRequest, which will fail due to the expired producer ID
+    // Start a new transaction and attempt to send, which will trigger an AddPartitionsToTxnRequest, which will fail due to the expired transactional ID.
     producer.beginTransaction()
     val failedFuture = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, "1", "1", willBeCommitted = false))
-    Thread.sleep(500)
+    TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never completed.")
 
     org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, classOf[InvalidPidMappingException])
     producer.abortTransaction()
@@ -97,6 +108,8 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
     producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, "3", "3", willBeCommitted = true))
     producer.commitTransaction()
 
+    waitUntilTransactionalStateExists()
+
     consumer.subscribe(List(topic1, topic2).asJava)
 
     val records = consumeRecords(consumer, 4)
@@ -105,11 +118,84 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
     }
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionAfterProducerIdExpires(quorum: String): Unit = {
+    producer.initTransactions()
+
+    // Start and then abort a transaction to allow the producer ID to expire.
+    producer.beginTransaction()
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "2", "2", willBeCommitted = false))
+    producer.flush()
+
+    // Ensure producer IDs are added.
+    val pState = producerState
+    assertEquals(1, pState.size)
+    val oldProducerId = pState(0).producerId
+
+    producer.abortTransaction()
+
+    // Wait for the producer ID to expire.
+    TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer IDs for topic1 did not expire.")
+
+    // Create a new producer to check that we retain the producer ID in transactional state.
+    producer.close()
+    producer = TestUtils.createTransactionalProducer("transactionalProducer", brokers)
+    producer.initTransactions()
+
+    // Start a new transaction and attempt to send. This should work since only the producer ID was removed from its mapping in ProducerStateManager.
+    producer.beginTransaction()
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", willBeCommitted = true))
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, "3", "3", willBeCommitted = true))
+    producer.commitTransaction()
+
+    // Producer IDs should repopulate.
+    val pState2 = producerState
+    assertEquals(1, pState2.size)
+    val newProducerId = pState2(0).producerId
+
+    // Producer IDs should be the same.
+    assertEquals(oldProducerId, newProducerId)
+
+    consumer.subscribe(List(topic1).asJava)
+
+    val records = consumeRecords(consumer, 2)
+    records.foreach { record =>
+      TestUtils.assertCommittedAndGetValue(record)
+    }
+  }
+
+  private def producerState: List[ProducerState] = {
+    val describeResult = admin.describeProducers(Collections.singletonList(tp0))
+    val activeProducers = describeResult.partitionResult(tp0).get().activeProducers
+    activeProducers.asScala.toList
+  }
+
+  private def waitUntilTransactionalStateExpires(): Unit = {
+    TestUtils.waitUntilTrue(() =>  {
+      var removedTransactionState = false
+      val txnDescribeResult = admin.describeTransactions(Collections.singletonList("transactionalProducer")).description("transactionalProducer")
+      try {
+        txnDescribeResult.get()
+      } catch {
+        case e: Exception => {
+          removedTransactionState = e.getCause.isInstanceOf[TransactionalIdNotFoundException]
+        }
+      }
+      removedTransactionState
+    }, "Transaction state never expired.")
+  }
+
+  private def waitUntilTransactionalStateExists(): Unit = {
+    val describeState = admin.describeTransactions(Collections.singletonList("transactionalProducer")).description("transactionalProducer")
+    TestUtils.waitUntilTrue(() => describeState.isDone, "Transactional state was never added.")
+  }
+
   private def serverProps(): Properties = {
     val serverProps = new Properties()
     serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
     // Set a smaller value for the number of partitions for the __consumer_offsets topic
-    // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
+    // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long.
     serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
     serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
     serverProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString)
@@ -119,8 +205,10 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
     serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
     serverProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
     serverProps.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, "200")
-    serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "2000")
+    serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "1000")
     serverProps.put(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, "500")
+    serverProps.put(KafkaConfig.ProducerIdExpirationMsProp, "500")
+    serverProps.put(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp, "500")
     serverProps
   }
 }
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index a90690ad2a..95ba597b08 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -50,7 +50,7 @@ object StressTestLog {
       time = time,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       brokerTopicStats = new BrokerTopicStats,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index c342e71361..94609a6bb9 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -220,7 +220,7 @@ object TestLinearWriteSpeed {
       time = Time.SYSTEM,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index 381ec93a0a..ac73566875 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -111,7 +111,7 @@ abstract class AbstractLogCleanerIntegrationTest {
         brokerTopicStats = new BrokerTopicStats,
         maxTransactionTimeoutMs = 5 * 60 * 1000,
         maxProducerIdExpirationMs = 60 * 60 * 1000,
-        producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+        producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
         logDirFailureChannel = new LogDirFailureChannel(10),
         topicId = None,
         keepPartitionMetadataFile = true)
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 85745bfe67..ede47de47c 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -62,7 +62,7 @@ class BrokerCompressionTest {
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index fdc05c74f8..2c05286d04 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -100,6 +100,7 @@ class LogCleanerManagerTest extends Logging {
     val config = createLowRetentionLogConfig(logSegmentSize, LogConfig.Compact)
     val maxTransactionTimeoutMs = 5 * 60 * 1000
     val maxProducerIdExpirationMs = 60 * 60 * 1000
+    val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
     val segments = new LogSegments(tp)
     val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
     val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxTransactionTimeoutMs, maxProducerIdExpirationMs, time)
@@ -121,7 +122,7 @@ class LogCleanerManagerTest extends Logging {
       offsets.nextOffsetMetadata, time.scheduler, time, tp, logDirFailureChannel)
     // the exception should be caught and the partition that caused it marked as uncleanable
     class LogMock extends UnifiedLog(offsets.logStartOffset, localLog, new BrokerTopicStats,
-        LogManager.ProducerIdExpirationCheckIntervalMs, leaderEpochCache,
+        producerIdExpirationCheckIntervalMs, leaderEpochCache,
         producerStateManager, _topicId = None, keepPartitionMetadataFile = true) {
       // Throw an error in getFirstBatchTimestampForSegments since it is called in grabFilthiestLog()
       override def getFirstBatchTimestampForSegments(segments: Iterable[LogSegment]): Iterable[Long] =
@@ -808,7 +809,7 @@ class LogCleanerManagerTest extends Logging {
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true)
@@ -862,7 +863,7 @@ class LogCleanerManagerTest extends Logging {
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 949e0c59df..19a52268f9 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -106,6 +106,7 @@ class LogCleanerTest {
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val maxTransactionTimeoutMs = 5 * 60 * 1000
     val maxProducerIdExpirationMs = 60 * 60 * 1000
+    val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
     val logSegments = new LogSegments(topicPartition)
     val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.recordVersion, "")
     val producerStateManager = new ProducerStateManager(topicPartition, dir,
@@ -129,7 +130,7 @@ class LogCleanerTest {
     val log = new UnifiedLog(offsets.logStartOffset,
                       localLog,
                       brokerTopicStats = new BrokerTopicStats,
-                      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+                      producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs,
                       leaderEpochCache = leaderEpochCache,
                       producerStateManager = producerStateManager,
                       _topicId = None,
@@ -1973,7 +1974,7 @@ class LogCleanerTest {
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
index db3222bb7f..c60e661a4b 100644
--- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
@@ -150,7 +150,7 @@ class LogConcurrencyTest {
       time = Time.SYSTEM,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index c6379ff3f3..168d6e0b05 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -49,6 +49,7 @@ class LogLoaderTest {
   val brokerTopicStats = new BrokerTopicStats
   val maxTransactionTimeoutMs: Int = 5 * 60 * 1000
   val maxProducerIdExpirationMs: Int = 60 * 60 * 1000
+  val producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
   val mockTime = new MockTime()
@@ -89,6 +90,7 @@ class LogLoaderTest {
 
     val maxTransactionTimeoutMs = 5 * 60 * 1000
     val maxProducerIdExpirationMs = 60 * 60 * 1000
+    val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
 
     // Create a LogManager with some overridden methods to facilitate interception of clean shutdown
     // flag and to inject an error
@@ -109,6 +111,7 @@ class LogLoaderTest {
         retentionCheckMs = 1000L,
         maxTransactionTimeoutMs = maxTransactionTimeoutMs,
         maxPidExpirationMs = maxProducerIdExpirationMs,
+        producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs,
         interBrokerProtocolVersion = config.interBrokerProtocolVersion,
         scheduler = time.scheduler,
         brokerTopicStats = new BrokerTopicStats(),
@@ -149,7 +152,7 @@ class LogLoaderTest {
             offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
             logDirFailureChannel)
           new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats,
-            LogManager.ProducerIdExpirationCheckIntervalMs, leaderEpochCache,
+            producerIdExpirationCheckIntervalMs, leaderEpochCache,
             producerStateManager, None, true)
         }
       }
@@ -246,7 +249,7 @@ class LogLoaderTest {
                         time: Time = mockTime,
                         maxTransactionTimeoutMs: Int = maxTransactionTimeoutMs,
                         maxProducerIdExpirationMs: Int = maxProducerIdExpirationMs,
-                        producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs,
+                        producerIdExpirationCheckIntervalMs: Int = producerIdExpirationCheckIntervalMs,
                         lastShutdownClean: Boolean = true): UnifiedLog = {
     LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, logStartOffset, recoveryPoint,
       maxTransactionTimeoutMs, maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs, lastShutdownClean)
@@ -330,6 +333,7 @@ class LogLoaderTest {
     def createLogWithInterceptedReads(recoveryPoint: Long): UnifiedLog = {
       val maxTransactionTimeoutMs = 5 * 60 * 1000
       val maxProducerIdExpirationMs = 60 * 60 * 1000
+      val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
       val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
       val logDirFailureChannel = new LogDirFailureChannel(10)
       // Intercept all segment read calls
@@ -373,7 +377,7 @@ class LogLoaderTest {
         offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
         logDirFailureChannel)
       new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats,
-        LogManager.ProducerIdExpirationCheckIntervalMs, leaderEpochCache, producerStateManager,
+        producerIdExpirationCheckIntervalMs, leaderEpochCache, producerStateManager,
         None, keepPartitionMetadataFile = true)
     }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 1b2dd7809f..001c62aa2d 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -647,7 +647,7 @@ class LogManagerTest {
     val segmentBytes = 1024
 
     val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0,
-      5 * 60 * 1000, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs)
+      5 * 60 * 1000, 60 * 60 * 1000, 10 * 60 * 1000)
 
     assertTrue(expectedSegmentsPerLog > 0)
     // calculate numMessages to append to logs. It'll create "expectedSegmentsPerLog" log segments with segment.bytes=1024
@@ -783,7 +783,7 @@ class LogManagerTest {
         recoveryPoint = 0,
         maxTransactionTimeoutMs = 5 * 60 * 1000,
         maxProducerIdExpirationMs = 5 * 60 * 1000,
-        producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+        producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
         scheduler = mockTime.scheduler,
         time = mockTime,
         brokerTopicStats = mockBrokerTopicStats,
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 50af76f556..4ff4c25601 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -81,7 +81,7 @@ object LogTestUtils {
                 recoveryPoint: Long = 0L,
                 maxTransactionTimeoutMs: Int = 5 * 60 * 1000,
                 maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
-                producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs,
+                producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000,
                 lastShutdownClean: Boolean = true,
                 topicId: Option[Uuid] = None,
                 keepPartitionMetadataFile: Boolean = true,
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 57409a1f03..9db288b529 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -3505,7 +3505,7 @@ class UnifiedLogTest {
                         time: Time = mockTime,
                         maxTransactionTimeoutMs: Int = 60 * 60 * 1000,
                         maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
-                        producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs,
+                        producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000,
                         lastShutdownClean: Boolean = true,
                         topicId: Option[Uuid] = None,
                         keepPartitionMetadataFile: Boolean = true): UnifiedLog = {
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 5d5e462b5a..4e76dcd00b 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
 import java.util
 import java.util.Properties
 
-import kafka.log.{AppendOrigin, Defaults, LogConfig, LogManager, LogTestUtils, UnifiedLog}
+import kafka.log.{AppendOrigin, Defaults, LogConfig, LogTestUtils, UnifiedLog}
 import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
 import kafka.server.{BrokerTopicStats, FetchLogEnd, KafkaRaftServer, LogDirFailureChannel}
 import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
@@ -75,7 +75,7 @@ class DumpLogSegmentsTest {
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index 91c8f270f3..c39c73f728 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -19,7 +19,7 @@ package kafka.utils
 import java.util.Properties
 import java.util.concurrent.atomic._
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
-import kafka.log.{LocalLog, UnifiedLog, LogConfig, LogLoader, LogManager, LogSegments, ProducerStateManager}
+import kafka.log.{LocalLog, UnifiedLog, LogConfig, LogLoader, LogSegments, ProducerStateManager}
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils.TestUtils.retry
 import org.junit.jupiter.api.Assertions._
@@ -120,6 +120,7 @@ class SchedulerTest {
     val brokerTopicStats = new BrokerTopicStats
     val maxTransactionTimeoutMs = 5 * 60 * 1000
     val maxProducerIdExpirationMs = 60 * 60 * 1000
+    val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
     val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val segments = new LogSegments(topicPartition)
@@ -144,7 +145,7 @@ class SchedulerTest {
       offsets.nextOffsetMetadata, scheduler, mockTime, topicPartition, logDirFailureChannel)
     val log = new UnifiedLog(logStartOffset = offsets.logStartOffset,
       localLog = localLog,
-      brokerTopicStats, LogManager.ProducerIdExpirationCheckIntervalMs,
+      brokerTopicStats, producerIdExpirationCheckIntervalMs,
       leaderEpochCache, producerStateManager,
       _topicId = None, keepPartitionMetadataFile = true)
     assertTrue(scheduler.taskRunning(log.producerExpireCheck))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1e0d5981da..592d02dc4b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1346,6 +1346,7 @@ object TestUtils extends Logging {
                    retentionCheckMs = 1000L,
                    maxTransactionTimeoutMs = 5 * 60 * 1000,
                    maxPidExpirationMs = 60 * 60 * 1000,
+                   producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
                    scheduler = time.scheduler,
                    time = time,
                    brokerTopicStats = new BrokerTopicStats,