You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/20 23:48:05 UTC

[GitHub] [kafka] hachikuji opened a new pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

hachikuji opened a new pull request #11098:
URL: https://github.com/apache/kafka/pull/11098


   When expiring transactionalIds, we group the tombstones together into batches. Currently there is no limit on the size of this batch, which can lead to `MESSAGE_TOO_LARGE` errors when a bunch of transactionalIds need to be expired at the same time. This patch fixes the problem by ensuring that the batch size respects the configured limit.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r674988520



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+            TransactionLog.EnforcedCompressionType,
+            TimestampType.CREATE_TIME,
+            0L,
+            maxBatchSize
+          )
+
+          partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) =>
+            txnMetadata.inLock {
+              if (!shouldExpire(txnMetadata, currentTimeMs)) {
+                true
+              } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, currentTimeMs, maxBatchSize)) {
+                val transitMetadata = txnMetadata.prepareDead()
+                expired += TransactionalIdCoordinatorEpochAndMetadata(
+                  transactionalId,
+                  partitionCacheEntry.coordinatorEpoch,
+                  transitMetadata
+                )
+                true
+              } else {
+                // If the batch is full, return false to end the search. Any remaining
+                // transactionalIds eligible for expiration can be picked next time.
+                false
               }
-              TransactionalIdCoordinatorEpochAndMetadata(transactionalId, entry.coordinatorEpoch, txnMetadataTransition)
             }
-          }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
-            partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
           }
 
-        val recordsPerPartition = transactionalIdByPartition
-          .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) =>
-            val deletes: Array[SimpleRecord] = transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
-              new SimpleRecord(now, TransactionLog.keyToBytes(entry.transactionalId), null)
-            }.toArray
-            val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
-            val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
-            (topicPartition, records)
+          if (expired.isEmpty) {
+            (Seq.empty, MemoryRecords.EMPTY)
+          } else {
+            (expired, recordsBuilder.build())
           }
 
-        def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = {
-          responses.forKeyValue { (topicPartition, response) =>
-            inReadLock(stateLock) {
-              val toRemove = transactionalIdByPartition(topicPartition.partition)
-              transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry =>
-                toRemove.foreach { idCoordinatorEpochAndMetadata =>
-                  val transactionalId = idCoordinatorEpochAndMetadata.transactionalId
-                  val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
-                  txnMetadata.inLock {
-                    if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch
-                      && txnMetadata.pendingState.contains(Dead)
-                      && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-                      && response.error == Errors.NONE) {
-                      txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
-                    } else {
-                      warn(s"Failed to remove expired transactionalId: $transactionalId" +
-                        s" from cache. Tombstone append error code: ${response.error}," +
-                        s" pendingState: ${txnMetadata.pendingState}, producerEpoch: ${txnMetadata.producerEpoch}," +
-                        s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
-                        s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
-                        s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
-                      txnMetadata.pendingState = None
-                    }
+        case None =>
+          (Seq.empty, MemoryRecords.EMPTY)
+      }
+    }
+  }
+
+  private def shouldExpire(
+    txnMetadata: TransactionMetadata,
+    currentTimeMs: Long
+  ): Boolean = {
+    val isExpirableState = txnMetadata.state match {
+      case Empty | CompleteCommit | CompleteAbort => true
+      case _ => false
+    }
+
+    isExpirableState && txnMetadata.txnLastUpdateTimestamp <= currentTimeMs - config.transactionalIdExpirationMs
+  }
+
+  private def maybeAppendExpiration(
+    txnMetadata: TransactionMetadata,
+    recordsBuilder: MemoryRecordsBuilder,
+    currentTimeMs: Long,
+    maxBatchSize: Int
+  ): Boolean = {
+    val keyBytes = TransactionLog.keyToBytes(txnMetadata.transactionalId)
+    if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, null, Record.EMPTY_HEADERS)) {
+      recordsBuilder.append(currentTimeMs, keyBytes, null, Record.EMPTY_HEADERS)
+      true
+    } else {
+      if (recordsBuilder.numRecords == 0) {

Review comment:
       I decided to get rid of this check instead. It's pretty unlikely that a user would tune max.message.bytes to a small enough value that not even a single record can fit, but if they do so, we can let the write fail with the MESSAGE_TOO_LARGE error that happens today. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#issuecomment-885065413


   @dajac Yeah, I was thinking about that too. I'll open a jira and we can address that separately.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r676047197



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),

Review comment:
       16k is also the default batch size for the producer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r677680568



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -161,22 +162,26 @@ class TransactionStateManager(brokerId: Int,
             maxBatchSize
           )
 
-          partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) =>
+          var breakIteration = false

Review comment:
       In that case, I feel it may actually get cleaner to inline `collectExpiredTransactionalIds` into the caller, and hence to get just one while loop / flag, we can still distinguish the case where the log is offline and hence we should not proceed v.s. the batch is full, we should write once and proceed. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r677847584



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -161,22 +162,26 @@ class TransactionStateManager(brokerId: Int,
             maxBatchSize
           )
 
-          partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) =>
+          var breakIteration = false

Review comment:
       I tried to consolidate this logic into a single loop, which I think is what you are suggesting. Let me know if the latest commit is what you had in mind.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#issuecomment-884809208


   btw, I was looking at the code which expires groups and it seems that it does not consider the max batch size neither, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r674140433



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+            TransactionLog.EnforcedCompressionType,
+            TimestampType.CREATE_TIME,
+            0L,
+            maxBatchSize
+          )
+
+          partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) =>
+            txnMetadata.inLock {
+              if (!shouldExpire(txnMetadata, currentTimeMs)) {
+                true
+              } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, currentTimeMs, maxBatchSize)) {
+                val transitMetadata = txnMetadata.prepareDead()
+                expired += TransactionalIdCoordinatorEpochAndMetadata(
+                  transactionalId,
+                  partitionCacheEntry.coordinatorEpoch,
+                  transitMetadata
+                )
+                true
+              } else {
+                // If the batch is full, return false to end the search. Any remaining
+                // transactionalIds eligible for expiration can be picked next time.
+                false
               }
-              TransactionalIdCoordinatorEpochAndMetadata(transactionalId, entry.coordinatorEpoch, txnMetadataTransition)
             }
-          }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
-            partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
           }
 
-        val recordsPerPartition = transactionalIdByPartition
-          .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) =>
-            val deletes: Array[SimpleRecord] = transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
-              new SimpleRecord(now, TransactionLog.keyToBytes(entry.transactionalId), null)
-            }.toArray
-            val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
-            val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
-            (topicPartition, records)
+          if (expired.isEmpty) {
+            (Seq.empty, MemoryRecords.EMPTY)
+          } else {
+            (expired, recordsBuilder.build())
           }
 
-        def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = {
-          responses.forKeyValue { (topicPartition, response) =>
-            inReadLock(stateLock) {
-              val toRemove = transactionalIdByPartition(topicPartition.partition)
-              transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry =>
-                toRemove.foreach { idCoordinatorEpochAndMetadata =>
-                  val transactionalId = idCoordinatorEpochAndMetadata.transactionalId
-                  val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
-                  txnMetadata.inLock {
-                    if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch
-                      && txnMetadata.pendingState.contains(Dead)
-                      && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-                      && response.error == Errors.NONE) {
-                      txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
-                    } else {
-                      warn(s"Failed to remove expired transactionalId: $transactionalId" +
-                        s" from cache. Tombstone append error code: ${response.error}," +
-                        s" pendingState: ${txnMetadata.pendingState}, producerEpoch: ${txnMetadata.producerEpoch}," +
-                        s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
-                        s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
-                        s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
-                      txnMetadata.pendingState = None
-                    }
+        case None =>
+          (Seq.empty, MemoryRecords.EMPTY)
+      }
+    }
+  }
+
+  private def shouldExpire(
+    txnMetadata: TransactionMetadata,
+    currentTimeMs: Long
+  ): Boolean = {
+    val isExpirableState = txnMetadata.state match {
+      case Empty | CompleteCommit | CompleteAbort => true
+      case _ => false
+    }
+
+    isExpirableState && txnMetadata.txnLastUpdateTimestamp <= currentTimeMs - config.transactionalIdExpirationMs
+  }
+
+  private def maybeAppendExpiration(
+    txnMetadata: TransactionMetadata,
+    recordsBuilder: MemoryRecordsBuilder,
+    currentTimeMs: Long,
+    maxBatchSize: Int
+  ): Boolean = {
+    val keyBytes = TransactionLog.keyToBytes(txnMetadata.transactionalId)
+    if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, null, Record.EMPTY_HEADERS)) {
+      recordsBuilder.append(currentTimeMs, keyBytes, null, Record.EMPTY_HEADERS)
+      true
+    } else {
+      if (recordsBuilder.numRecords == 0) {

Review comment:
       Yeah, I guess this is the downside of using something like `MemoryRecordsBuilder` which is so tailored to the producer. Would it make sense to add a stricter `hasRoomFor`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r677652114



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -161,22 +162,26 @@ class TransactionStateManager(brokerId: Int,
             maxBatchSize
           )
 
-          partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) =>
+          var breakIteration = false

Review comment:
       We've changed the logic to add check the log configuration to get to the batch size. If the partition is offline, then `ReplicaManager.getLogConfig` will return `None`. I have a test case which shows this path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #11098:
URL: https://github.com/apache/kafka/pull/11098


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r673818846



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),

Review comment:
       For my own education, why do we use `16384` as a minimum here?

##########
File path: core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
##########
@@ -629,6 +631,62 @@ class TransactionStateManagerTest {
     verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
+  @Test
+  def testTransactionExpirationShouldRespectBatchSize(): Unit = {
+    val partitionIds = 0 until numPartitions
+    val maxBatchSize = 512
+
+    loadTransactionsForPartitions(partitionIds)
+
+    val allTransactionalIds = mutable.Set.empty[String]
+    for (i <- 0 to 1000) {
+      val txnlId = s"id_$i"
+      val producerId = i
+      val txnMetadata = transactionMetadata(txnlId, producerId)
+      txnMetadata.txnLastUpdateTimestamp = time.milliseconds() - txnConfig.transactionalIdExpirationMs
+      transactionManager.putTransactionStateIfNotExists(txnMetadata)
+      allTransactionalIds += txnlId
+    }
+
+    def removeExpiredTransactionalIds(): Map[TopicPartition, MemoryRecords] = {
+      EasyMock.reset(replicaManager)
+      expectLogConfig(partitionIds, maxBatchSize)
+
+      val appendedRecordsCapture = expectTransactionalIdExpiration(Errors.NONE)
+      EasyMock.replay(replicaManager)
+
+      transactionManager.removeExpiredTransactionalIds()
+      EasyMock.verify(replicaManager)
+
+      assertTrue(appendedRecordsCapture.hasCaptured)
+      appendedRecordsCapture.getValue
+    }
+
+    def hasUnexpiredTransactionalIds: Boolean = {
+      val unexpiredTransactions = transactionManager.listTransactionStates(Set.empty, Set.empty)
+        .transactionStates.asScala
+      assertTrue(unexpiredTransactions.forall(txn => txn.transactionState == Empty.name))
+      unexpiredTransactions.nonEmpty
+    }
+
+    var iterations = 0
+    val expiredTransactionalIds = mutable.Set.empty[String]
+    while (hasUnexpiredTransactionalIds) {
+      removeExpiredTransactionalIds().forKeyValue { (_, records) =>
+        assertTrue(records.sizeInBytes() < maxBatchSize)

Review comment:
       nit: We might be able to drop the `()` of `sizeInBytes` and `records` below.

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+            TransactionLog.EnforcedCompressionType,
+            TimestampType.CREATE_TIME,
+            0L,
+            maxBatchSize
+          )
+
+          partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) =>
+            txnMetadata.inLock {
+              if (!shouldExpire(txnMetadata, currentTimeMs)) {
+                true
+              } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, currentTimeMs, maxBatchSize)) {
+                val transitMetadata = txnMetadata.prepareDead()
+                expired += TransactionalIdCoordinatorEpochAndMetadata(
+                  transactionalId,
+                  partitionCacheEntry.coordinatorEpoch,
+                  transitMetadata
+                )
+                true
+              } else {
+                // If the batch is full, return false to end the search. Any remaining
+                // transactionalIds eligible for expiration can be picked next time.
+                false
               }
-              TransactionalIdCoordinatorEpochAndMetadata(transactionalId, entry.coordinatorEpoch, txnMetadataTransition)
             }
-          }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
-            partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
           }
 
-        val recordsPerPartition = transactionalIdByPartition
-          .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) =>
-            val deletes: Array[SimpleRecord] = transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
-              new SimpleRecord(now, TransactionLog.keyToBytes(entry.transactionalId), null)
-            }.toArray
-            val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
-            val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
-            (topicPartition, records)
+          if (expired.isEmpty) {
+            (Seq.empty, MemoryRecords.EMPTY)
+          } else {
+            (expired, recordsBuilder.build())
           }
 
-        def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = {
-          responses.forKeyValue { (topicPartition, response) =>
-            inReadLock(stateLock) {
-              val toRemove = transactionalIdByPartition(topicPartition.partition)
-              transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry =>
-                toRemove.foreach { idCoordinatorEpochAndMetadata =>
-                  val transactionalId = idCoordinatorEpochAndMetadata.transactionalId
-                  val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
-                  txnMetadata.inLock {
-                    if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch
-                      && txnMetadata.pendingState.contains(Dead)
-                      && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-                      && response.error == Errors.NONE) {
-                      txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
-                    } else {
-                      warn(s"Failed to remove expired transactionalId: $transactionalId" +
-                        s" from cache. Tombstone append error code: ${response.error}," +
-                        s" pendingState: ${txnMetadata.pendingState}, producerEpoch: ${txnMetadata.producerEpoch}," +
-                        s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
-                        s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
-                        s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
-                      txnMetadata.pendingState = None
-                    }
+        case None =>
+          (Seq.empty, MemoryRecords.EMPTY)
+      }
+    }
+  }
+
+  private def shouldExpire(
+    txnMetadata: TransactionMetadata,
+    currentTimeMs: Long
+  ): Boolean = {
+    val isExpirableState = txnMetadata.state match {
+      case Empty | CompleteCommit | CompleteAbort => true
+      case _ => false
+    }
+
+    isExpirableState && txnMetadata.txnLastUpdateTimestamp <= currentTimeMs - config.transactionalIdExpirationMs
+  }
+
+  private def maybeAppendExpiration(
+    txnMetadata: TransactionMetadata,
+    recordsBuilder: MemoryRecordsBuilder,
+    currentTimeMs: Long,
+    maxBatchSize: Int
+  ): Boolean = {
+    val keyBytes = TransactionLog.keyToBytes(txnMetadata.transactionalId)
+    if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, null, Record.EMPTY_HEADERS)) {
+      recordsBuilder.append(currentTimeMs, keyBytes, null, Record.EMPTY_HEADERS)
+      true
+    } else {
+      if (recordsBuilder.numRecords == 0) {

Review comment:
       It seems to me that this condition will never be true because `hasRoomFor` returns `true` if `numRecords == 0` so the first record will always be appended. Were you trying to warn if we can't even append the first record to the batch?

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+            TransactionLog.EnforcedCompressionType,
+            TimestampType.CREATE_TIME,
+            0L,
+            maxBatchSize
+          )
+
+          partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) =>
+            txnMetadata.inLock {
+              if (!shouldExpire(txnMetadata, currentTimeMs)) {
+                true
+              } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, currentTimeMs, maxBatchSize)) {
+                val transitMetadata = txnMetadata.prepareDead()
+                expired += TransactionalIdCoordinatorEpochAndMetadata(
+                  transactionalId,
+                  partitionCacheEntry.coordinatorEpoch,
+                  transitMetadata
+                )
+                true
+              } else {
+                // If the batch is full, return false to end the search. Any remaining
+                // transactionalIds eligible for expiration can be picked next time.
+                false
               }
-              TransactionalIdCoordinatorEpochAndMetadata(transactionalId, entry.coordinatorEpoch, txnMetadataTransition)
             }
-          }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
-            partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
           }
 
-        val recordsPerPartition = transactionalIdByPartition
-          .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) =>
-            val deletes: Array[SimpleRecord] = transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
-              new SimpleRecord(now, TransactionLog.keyToBytes(entry.transactionalId), null)
-            }.toArray
-            val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
-            val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
-            (topicPartition, records)
+          if (expired.isEmpty) {
+            (Seq.empty, MemoryRecords.EMPTY)
+          } else {
+            (expired, recordsBuilder.build())
           }
 
-        def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = {
-          responses.forKeyValue { (topicPartition, response) =>
-            inReadLock(stateLock) {
-              val toRemove = transactionalIdByPartition(topicPartition.partition)
-              transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry =>
-                toRemove.foreach { idCoordinatorEpochAndMetadata =>
-                  val transactionalId = idCoordinatorEpochAndMetadata.transactionalId
-                  val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
-                  txnMetadata.inLock {
-                    if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch
-                      && txnMetadata.pendingState.contains(Dead)
-                      && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-                      && response.error == Errors.NONE) {
-                      txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
-                    } else {
-                      warn(s"Failed to remove expired transactionalId: $transactionalId" +
-                        s" from cache. Tombstone append error code: ${response.error}," +
-                        s" pendingState: ${txnMetadata.pendingState}, producerEpoch: ${txnMetadata.producerEpoch}," +
-                        s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
-                        s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
-                        s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
-                      txnMetadata.pendingState = None
-                    }
+        case None =>
+          (Seq.empty, MemoryRecords.EMPTY)
+      }
+    }
+  }
+
+  private def shouldExpire(
+    txnMetadata: TransactionMetadata,
+    currentTimeMs: Long
+  ): Boolean = {
+    val isExpirableState = txnMetadata.state match {
+      case Empty | CompleteCommit | CompleteAbort => true
+      case _ => false
+    }
+
+    isExpirableState && txnMetadata.txnLastUpdateTimestamp <= currentTimeMs - config.transactionalIdExpirationMs
+  }
+
+  private def maybeAppendExpiration(
+    txnMetadata: TransactionMetadata,
+    recordsBuilder: MemoryRecordsBuilder,
+    currentTimeMs: Long,
+    maxBatchSize: Int
+  ): Boolean = {
+    val keyBytes = TransactionLog.keyToBytes(txnMetadata.transactionalId)
+    if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, null, Record.EMPTY_HEADERS)) {
+      recordsBuilder.append(currentTimeMs, keyBytes, null, Record.EMPTY_HEADERS)
+      true
+    } else {
+      if (recordsBuilder.numRecords == 0) {
+        warn(s"Failed to write expiration record for transactionalId ${txnMetadata.transactionalId} " +
+          s"because the tombstone record exceeds the max allowed batch size of $maxBatchSize")
+      }
+      false
+    }
+  }
+
+  private[transaction] def removeExpiredTransactionalIds(): Unit = {
+    inReadLock(stateLock) {
+      val expirationRecords = mutable.Map.empty[TopicPartition, MemoryRecords]
+      val expiredTransactionalIds = mutable.Map.empty[TopicPartition, Iterable[TransactionalIdCoordinatorEpochAndMetadata]]
+
+      transactionMetadataCache.forKeyValue { (partitionId, partitionCacheEntry) =>
+        val (expiredForPartition, partitionRecords) = collectExpiredTransactionalIds(partitionId, partitionCacheEntry)
+        if (expiredForPartition.nonEmpty) {
+          val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)

Review comment:
       nit: It might be worth instantiating this one earlier and passing it to `collectExpiredTransactionalIds` instead of passing the `partitionId`. This would avoid having to instantiate it twice.

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+            TransactionLog.EnforcedCompressionType,
+            TimestampType.CREATE_TIME,
+            0L,
+            maxBatchSize
+          )
+
+          partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) =>
+            txnMetadata.inLock {
+              if (!shouldExpire(txnMetadata, currentTimeMs)) {
+                true
+              } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, currentTimeMs, maxBatchSize)) {
+                val transitMetadata = txnMetadata.prepareDead()
+                expired += TransactionalIdCoordinatorEpochAndMetadata(
+                  transactionalId,
+                  partitionCacheEntry.coordinatorEpoch,
+                  transitMetadata
+                )
+                true
+              } else {
+                // If the batch is full, return false to end the search. Any remaining
+                // transactionalIds eligible for expiration can be picked next time.
+                false
               }
-              TransactionalIdCoordinatorEpochAndMetadata(transactionalId, entry.coordinatorEpoch, txnMetadataTransition)
             }
-          }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
-            partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
           }
 
-        val recordsPerPartition = transactionalIdByPartition
-          .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) =>
-            val deletes: Array[SimpleRecord] = transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
-              new SimpleRecord(now, TransactionLog.keyToBytes(entry.transactionalId), null)
-            }.toArray
-            val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
-            val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
-            (topicPartition, records)
+          if (expired.isEmpty) {
+            (Seq.empty, MemoryRecords.EMPTY)
+          } else {
+            (expired, recordsBuilder.build())
           }
 
-        def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = {
-          responses.forKeyValue { (topicPartition, response) =>
-            inReadLock(stateLock) {
-              val toRemove = transactionalIdByPartition(topicPartition.partition)
-              transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry =>
-                toRemove.foreach { idCoordinatorEpochAndMetadata =>
-                  val transactionalId = idCoordinatorEpochAndMetadata.transactionalId
-                  val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
-                  txnMetadata.inLock {
-                    if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch
-                      && txnMetadata.pendingState.contains(Dead)
-                      && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-                      && response.error == Errors.NONE) {
-                      txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
-                    } else {
-                      warn(s"Failed to remove expired transactionalId: $transactionalId" +
-                        s" from cache. Tombstone append error code: ${response.error}," +
-                        s" pendingState: ${txnMetadata.pendingState}, producerEpoch: ${txnMetadata.producerEpoch}," +
-                        s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
-                        s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
-                        s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
-                      txnMetadata.pendingState = None
-                    }
+        case None =>
+          (Seq.empty, MemoryRecords.EMPTY)

Review comment:
       Have you considered logging a warning here? Not being able to look up the config means that the partitions is not online. I suppose that previously, we would have tried to write to the log and the write would have failed and thus warn us at L257.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r677649333



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -161,22 +162,26 @@ class TransactionStateManager(brokerId: Int,
             maxBatchSize
           )
 
-          partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) =>
+          var breakIteration = false

Review comment:
       I think I may miss something here, could you explain why we'd need both the caller/callee doing this while loop with the break iteration flag? It seems to me that we have multiple reasons to break early: 1) we've reached the record limit, 2) the partition is already offline, and we want to treat them differently and hence the both while loops, right?
   
   But I cannot see exactly where we may have case 2)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r675005086



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),

Review comment:
       Note that the buffer will still grow to reach the limit of max.message.bytes. I agree, however, that one hour is a long time to wait. Let me look into triggering the next run right away.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r674676502



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),

Review comment:
       I do agree that 16k seems quite reasonable for the common case. The downside is that we have to wait another hour to clean the remaining ones if they are many transactions to be expired.

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+            TransactionLog.EnforcedCompressionType,
+            TimestampType.CREATE_TIME,
+            0L,
+            maxBatchSize
+          )
+
+          partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) =>
+            txnMetadata.inLock {
+              if (!shouldExpire(txnMetadata, currentTimeMs)) {
+                true
+              } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, currentTimeMs, maxBatchSize)) {
+                val transitMetadata = txnMetadata.prepareDead()
+                expired += TransactionalIdCoordinatorEpochAndMetadata(
+                  transactionalId,
+                  partitionCacheEntry.coordinatorEpoch,
+                  transitMetadata
+                )
+                true
+              } else {
+                // If the batch is full, return false to end the search. Any remaining
+                // transactionalIds eligible for expiration can be picked next time.
+                false
               }
-              TransactionalIdCoordinatorEpochAndMetadata(transactionalId, entry.coordinatorEpoch, txnMetadataTransition)
             }
-          }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
-            partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
           }
 
-        val recordsPerPartition = transactionalIdByPartition
-          .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) =>
-            val deletes: Array[SimpleRecord] = transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
-              new SimpleRecord(now, TransactionLog.keyToBytes(entry.transactionalId), null)
-            }.toArray
-            val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
-            val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
-            (topicPartition, records)
+          if (expired.isEmpty) {
+            (Seq.empty, MemoryRecords.EMPTY)
+          } else {
+            (expired, recordsBuilder.build())
           }
 
-        def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = {
-          responses.forKeyValue { (topicPartition, response) =>
-            inReadLock(stateLock) {
-              val toRemove = transactionalIdByPartition(topicPartition.partition)
-              transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry =>
-                toRemove.foreach { idCoordinatorEpochAndMetadata =>
-                  val transactionalId = idCoordinatorEpochAndMetadata.transactionalId
-                  val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
-                  txnMetadata.inLock {
-                    if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch
-                      && txnMetadata.pendingState.contains(Dead)
-                      && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-                      && response.error == Errors.NONE) {
-                      txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
-                    } else {
-                      warn(s"Failed to remove expired transactionalId: $transactionalId" +
-                        s" from cache. Tombstone append error code: ${response.error}," +
-                        s" pendingState: ${txnMetadata.pendingState}, producerEpoch: ${txnMetadata.producerEpoch}," +
-                        s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
-                        s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
-                        s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
-                      txnMetadata.pendingState = None
-                    }
+        case None =>
+          (Seq.empty, MemoryRecords.EMPTY)
+      }
+    }
+  }
+
+  private def shouldExpire(
+    txnMetadata: TransactionMetadata,
+    currentTimeMs: Long
+  ): Boolean = {
+    val isExpirableState = txnMetadata.state match {
+      case Empty | CompleteCommit | CompleteAbort => true
+      case _ => false
+    }
+
+    isExpirableState && txnMetadata.txnLastUpdateTimestamp <= currentTimeMs - config.transactionalIdExpirationMs
+  }
+
+  private def maybeAppendExpiration(
+    txnMetadata: TransactionMetadata,
+    recordsBuilder: MemoryRecordsBuilder,
+    currentTimeMs: Long,
+    maxBatchSize: Int
+  ): Boolean = {
+    val keyBytes = TransactionLog.keyToBytes(txnMetadata.transactionalId)
+    if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, null, Record.EMPTY_HEADERS)) {
+      recordsBuilder.append(currentTimeMs, keyBytes, null, Record.EMPTY_HEADERS)
+      true
+    } else {
+      if (recordsBuilder.numRecords == 0) {

Review comment:
       Yeah, why not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r674117013



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),

Review comment:
       Filling a batch is an uncommon scenario, so I thought that allocating a full 1MB (default max.message.bytes) buffer each time the task ran seemed excessive. 16K seemed more reasonable for the common case. Another thought that I considered was a statically allocated  buffer, but that seemed like overkill.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org