You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/05/30 19:19:02 UTC

[kafka] branch trunk updated: MINOR: Reduce allocations in requests via buffer caching (#9229)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6b005b2  MINOR: Reduce allocations in requests via buffer caching (#9229)
6b005b2 is described below

commit 6b005b2b4eece81a5500fb0080ef5354b4240681
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sun May 30 12:16:36 2021 -0700

    MINOR: Reduce allocations in requests via buffer caching (#9229)
    
    Use a caching `BufferSupplier` per request handler thread so that
    decompression buffers are cached if supported by the underlying
    `CompressionType`. This achieves a similar outcome as #9220, but
    with less contention.
    
    We introduce a `RequestLocal` class to make it easier to introduce
    new request scoped stateful instances (one example we discussed
    previously was an `ActionQueue` that could be used to avoid
    some of the complex group coordinator locking).
    
    This is a small win for zstd (no synchronization or soft references) and
    a more significant win for lz4. In particular, it reduces allocations
    significantly when the number of partitions is high. The decompression
    buffer size is typically 64 KB, so a produce request with 1000 partitions
    results in 64 MB of allocations even if each produce batch is small (likely,
    when there are so many partitions).
    
    I did a quick producer perf local test with 5000 partitions, 1 KB record
    size,
    1 broker, lz4 and ~0.5 for the producer compression rate metric:
    
    Before this change:
    > 20000000 records sent, 346314.349535 records/sec (330.27 MB/sec),
    148.33 ms avg latency, 2267.00 ms max latency, 115 ms 50th, 383 ms 95th, 777 ms 99th, 1514 ms 99.9th.
    
    After this change:
    > 20000000 records sent, 431956.113259 records/sec (411.95 MB/sec),
    117.79 ms avg latency, 1219.00 ms max latency, 99 ms 50th, 295 ms 95th, 440 ms 99th, 662 ms 99.9th.
    
    That's a 25% throughput improvement and p999 latency was reduced to
    under half (in this test).
    
    Default arguments will be removed in a subsequent PR.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>
---
 .../apache/kafka/common/compress/ZstdFactory.java  |  17 ++-
 core/src/main/scala/kafka/cluster/Partition.scala  |   6 +-
 .../kafka/coordinator/group/GroupCoordinator.scala |  80 ++++++-----
 .../coordinator/group/GroupMetadataManager.scala   |  28 ++--
 .../transaction/TransactionCoordinator.scala       |  33 +++--
 .../TransactionMarkerChannelManager.scala          |   7 +-
 .../transaction/TransactionStateManager.scala      |  12 +-
 core/src/main/scala/kafka/log/Log.scala            |  16 ++-
 core/src/main/scala/kafka/log/LogValidator.scala   |  27 ++--
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   |   6 +-
 .../main/scala/kafka/server/ControllerApis.scala   |   8 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  94 +++++++------
 .../scala/kafka/server/KafkaRequestHandler.scala   |  18 ++-
 .../main/scala/kafka/server/ReplicaManager.scala   |  11 +-
 .../src/main/scala/kafka/server/RequestLocal.scala |  37 +++++
 .../server/metadata/BrokerMetadataListener.scala   |   4 +-
 .../scala/kafka/tools/TestRaftRequestHandler.scala |   4 +-
 .../unit/kafka/cluster/PartitionLockTest.scala     |   9 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  15 ++-
 .../AbstractCoordinatorConcurrencyTest.scala       |  12 +-
 .../coordinator/group/GroupCoordinatorTest.scala   |  39 ++++--
 .../group/GroupMetadataManagerTest.scala           |  44 +++---
 .../TransactionCoordinatorConcurrencyTest.scala    |  12 +-
 .../transaction/TransactionCoordinatorTest.scala   |  16 +++
 .../TransactionMarkerChannelManagerTest.scala      |   4 +
 .../transaction/TransactionStateManagerTest.scala  |  37 ++---
 .../scala/unit/kafka/log/LogValidatorTest.scala    | 124 +++++++++++------
 .../unit/kafka/server/ControllerApisTest.scala     |   3 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 149 +++++++++++++--------
 .../unit/kafka/server/ReplicaManagerTest.scala     |   2 +-
 .../metadata/BrokerMetadataListenerTest.scala      |   2 +-
 .../kafka/jmh/record/BaseRecordBatchBenchmark.java |   8 +-
 .../CompressedRecordBatchValidationBenchmark.java  |   3 +-
 .../jmh/record/RecordBatchIterationBenchmark.java  |  14 +-
 34 files changed, 568 insertions(+), 333 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java b/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java
index 8f4735e..4664f4e 100644
--- a/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.common.compress;
 
+import com.github.luben.zstd.BufferPool;
 import com.github.luben.zstd.RecyclingBufferPool;
 import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
 import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
@@ -47,10 +48,24 @@ public class ZstdFactory {
 
     public static InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
         try {
+            // We use our own BufferSupplier instead of com.github.luben.zstd.RecyclingBufferPool since our
+            // implementation doesn't require locking or soft references.
+            BufferPool bufferPool = new BufferPool() {
+                @Override
+                public ByteBuffer get(int capacity) {
+                    return decompressionBufferSupplier.get(capacity);
+                }
+
+                @Override
+                public void release(ByteBuffer buffer) {
+                    decompressionBufferSupplier.release(buffer);
+                }
+            };
+
             // Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
             // in cases where the caller reads a small number of bytes (potentially a single byte).
             return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer),
-                RecyclingBufferPool.INSTANCE), 16 * 1024);
+                bufferPool), 16 * 1024);
         } catch (Throwable e) {
             throw new KafkaException(e);
         }
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 7eea0e2..89cadf4 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -18,7 +18,6 @@ package kafka.cluster
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import java.util.Optional
-
 import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.{KafkaController, StateChangeLogger}
@@ -1019,7 +1018,8 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = {
+  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
+                            requestLocal: RequestLocal): LogAppendInfo = {
     val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
       leaderLogIfLocal match {
         case Some(leaderLog) =>
@@ -1033,7 +1033,7 @@ class Partition(val topicPartition: TopicPartition,
           }
 
           val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
-            interBrokerProtocolVersion)
+            interBrokerProtocolVersion, requestLocal)
 
           // we may need to increment high watermark since ISR could be down to 1
           (info, maybeIncrementLeaderHW(leaderLog))
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index bb06ca5..f3e170b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -18,7 +18,6 @@ package kafka.coordinator.group
 
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
-
 import kafka.common.OffsetAndMetadata
 import kafka.log.LogConfig
 import kafka.message.ProducerCompressionCodec
@@ -92,6 +91,7 @@ class GroupCoordinator(val brokerId: Int,
     props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
     props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
     props.put(LogConfig.CompressionTypeProp, ProducerCompressionCodec.name)
+
     props
   }
 
@@ -162,7 +162,8 @@ class GroupCoordinator(val brokerId: Int,
                       sessionTimeoutMs: Int,
                       protocolType: String,
                       protocols: List[(String, Array[Byte])],
-                      responseCallback: JoinCallback): Unit = {
+                      responseCallback: JoinCallback,
+                      requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
     validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error =>
       responseCallback(JoinGroupResult(memberId, error))
       return
@@ -194,7 +195,8 @@ class GroupCoordinator(val brokerId: Int,
                 sessionTimeoutMs,
                 protocolType,
                 protocols,
-                responseCallback
+                responseCallback,
+                requestLocal
               )
             } else {
               doCurrentMemberJoinGroup(
@@ -230,7 +232,8 @@ class GroupCoordinator(val brokerId: Int,
     sessionTimeoutMs: Int,
     protocolType: String,
     protocols: List[(String, Array[Byte])],
-    responseCallback: JoinCallback
+    responseCallback: JoinCallback,
+    requestLocal: RequestLocal
   ): Unit = {
     group.inLock {
       if (group.is(Dead)) {
@@ -255,9 +258,9 @@ class GroupCoordinator(val brokerId: Int,
               sessionTimeoutMs,
               protocolType,
               protocols,
-              responseCallback
+              responseCallback,
+              requestLocal
             )
-
           case None =>
             doDynamicNewMemberJoinGroup(
               group,
@@ -286,14 +289,15 @@ class GroupCoordinator(val brokerId: Int,
     sessionTimeoutMs: Int,
     protocolType: String,
     protocols: List[(String, Array[Byte])],
-    responseCallback: JoinCallback
+    responseCallback: JoinCallback,
+    requestLocal: RequestLocal
   ): Unit = {
     group.currentStaticMemberId(groupInstanceId) match {
       case Some(oldMemberId) =>
         info(s"Static member with groupInstanceId=$groupInstanceId and unknown member id joins " +
           s"group ${group.groupId} in ${group.currentState} state. Replacing previously mapped " +
           s"member $oldMemberId with this groupInstanceId.")
-        updateStaticMemberAndRebalance(group, oldMemberId, newMemberId, groupInstanceId, protocols, responseCallback)
+        updateStaticMemberAndRebalance(group, oldMemberId, newMemberId, groupInstanceId, protocols, responseCallback, requestLocal)
 
       case None =>
         info(s"Static member with groupInstanceId=$groupInstanceId and unknown member id joins " +
@@ -474,7 +478,8 @@ class GroupCoordinator(val brokerId: Int,
                       protocolName: Option[String],
                       groupInstanceId: Option[String],
                       groupAssignment: Map[String, Array[Byte]],
-                      responseCallback: SyncCallback): Unit = {
+                      responseCallback: SyncCallback,
+                      requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
     validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match {
       case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS =>
         // The coordinator is loading, which means we've lost the state of the active rebalance and the
@@ -489,7 +494,7 @@ class GroupCoordinator(val brokerId: Int,
         groupManager.getGroup(groupId) match {
           case None => responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
           case Some(group) => doSyncGroup(group, generation, memberId, protocolType, protocolName,
-            groupInstanceId, groupAssignment, responseCallback)
+            groupInstanceId, groupAssignment, requestLocal, responseCallback)
         }
     }
   }
@@ -535,6 +540,7 @@ class GroupCoordinator(val brokerId: Int,
                           protocolName: Option[String],
                           groupInstanceId: Option[String],
                           groupAssignment: Map[String, Array[Byte]],
+                          requestLocal: RequestLocal,
                           responseCallback: SyncCallback): Unit = {
     group.inLock {
       val validationErrorOpt = validateSyncGroup(
@@ -587,7 +593,7 @@ class GroupCoordinator(val brokerId: Int,
                     }
                   }
                 }
-              })
+              }, requestLocal)
               groupCompletedRebalanceSensor.record()
             }
 
@@ -669,7 +675,8 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def handleDeleteGroups(groupIds: Set[String]): Map[String, Errors] = {
+  def handleDeleteGroups(groupIds: Set[String],
+                         requestLocal: RequestLocal = RequestLocal.NoCaching): Map[String, Errors] = {
     val groupErrors = mutable.Map.empty[String, Errors]
     val groupsEligibleForDeletion = mutable.ArrayBuffer[GroupMetadata]()
 
@@ -701,7 +708,8 @@ class GroupCoordinator(val brokerId: Int,
     }
 
     if (groupsEligibleForDeletion.nonEmpty) {
-      val offsetsRemoved = groupManager.cleanupGroupMetadata(groupsEligibleForDeletion, _.removeAllOffsets())
+      val offsetsRemoved = groupManager.cleanupGroupMetadata(groupsEligibleForDeletion, requestLocal,
+        _.removeAllOffsets())
       groupErrors ++= groupsEligibleForDeletion.map(_.groupId -> Errors.NONE).toMap
       info(s"The following groups were deleted: ${groupsEligibleForDeletion.map(_.groupId).mkString(", ")}. " +
         s"A total of $offsetsRemoved offsets were removed.")
@@ -710,7 +718,8 @@ class GroupCoordinator(val brokerId: Int,
     groupErrors
   }
 
-  def handleDeleteOffsets(groupId: String, partitions: Seq[TopicPartition]): (Errors, Map[TopicPartition, Errors]) = {
+  def handleDeleteOffsets(groupId: String, partitions: Seq[TopicPartition],
+                          requestLocal: RequestLocal): (Errors, Map[TopicPartition, Errors]) = {
     var groupError: Errors = Errors.NONE
     var partitionErrors: Map[TopicPartition, Errors] = Map()
     var partitionsEligibleForDeletion: Seq[TopicPartition] = Seq()
@@ -748,9 +757,8 @@ class GroupCoordinator(val brokerId: Int,
             }
 
             if (partitionsEligibleForDeletion.nonEmpty) {
-              val offsetsRemoved = groupManager.cleanupGroupMetadata(Seq(group), group => {
-                group.removeOffsets(partitionsEligibleForDeletion)
-              })
+              val offsetsRemoved = groupManager.cleanupGroupMetadata(Seq(group), requestLocal,
+                _.removeOffsets(partitionsEligibleForDeletion))
 
               partitionErrors ++= partitionsEligibleForDeletion.map(_ -> Errors.NONE).toMap
 
@@ -855,14 +863,16 @@ class GroupCoordinator(val brokerId: Int,
                              groupInstanceId: Option[String],
                              generationId: Int,
                              offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
-                             responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
+                             responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
+                             requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
     validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match {
       case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })
       case None =>
         val group = groupManager.getGroup(groupId).getOrElse {
           groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
         }
-        doTxnCommitOffsets(group, memberId, groupInstanceId, generationId, producerId, producerEpoch, offsetMetadata, responseCallback)
+        doTxnCommitOffsets(group, memberId, groupInstanceId, generationId, producerId, producerEpoch,
+          offsetMetadata, requestLocal, responseCallback)
     }
   }
 
@@ -871,7 +881,8 @@ class GroupCoordinator(val brokerId: Int,
                           groupInstanceId: Option[String],
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
-                          responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
+                          responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
+                          requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
     validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
       case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })
       case None =>
@@ -880,14 +891,16 @@ class GroupCoordinator(val brokerId: Int,
             if (generationId < 0) {
               // the group is not relying on Kafka for group management, so allow the commit
               val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
-              doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata, responseCallback)
+              doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata,
+                responseCallback, requestLocal)
             } else {
               // or this is a request coming from an older generation. either way, reject the commit
               responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
             }
 
           case Some(group) =>
-            doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata, responseCallback)
+            doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata,
+              responseCallback, requestLocal)
         }
     }
   }
@@ -907,6 +920,7 @@ class GroupCoordinator(val brokerId: Int,
                                  producerId: Long,
                                  producerEpoch: Short,
                                  offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+                                 requestLocal: RequestLocal,
                                  responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
     group.inLock {
       val validationErrorOpt = validateOffsetCommit(
@@ -920,7 +934,8 @@ class GroupCoordinator(val brokerId: Int,
       if (validationErrorOpt.isDefined) {
         responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get })
       } else {
-        groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
+        groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId,
+          producerEpoch, requestLocal)
       }
     }
   }
@@ -963,7 +978,8 @@ class GroupCoordinator(val brokerId: Int,
                               groupInstanceId: Option[String],
                               generationId: Int,
                               offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
-                              responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
+                              responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
+                              requestLocal: RequestLocal): Unit = {
     group.inLock {
       val validationErrorOpt = validateOffsetCommit(
         group,
@@ -985,7 +1001,7 @@ class GroupCoordinator(val brokerId: Int,
             // on heartbeat response to eventually notify the rebalance in progress signal to the consumer
             val member = group.get(memberId)
             completeAndScheduleNextHeartbeatExpiration(group, member)
-            groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
+            groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, requestLocal = requestLocal)
 
           case CompletingRebalance =>
             // We should not receive a commit request if the group has not completed rebalance;
@@ -1041,10 +1057,9 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def handleDeletedPartitions(topicPartitions: Seq[TopicPartition]): Unit = {
-    val offsetsRemoved = groupManager.cleanupGroupMetadata(groupManager.currentGroups, group => {
-      group.removeOffsets(topicPartitions)
-    })
+  def handleDeletedPartitions(topicPartitions: Seq[TopicPartition], requestLocal: RequestLocal): Unit = {
+    val offsetsRemoved = groupManager.cleanupGroupMetadata(groupManager.currentGroups, requestLocal,
+      _.removeOffsets(topicPartitions))
     info(s"Removed $offsetsRemoved offsets associated with deleted partitions: ${topicPartitions.mkString(", ")}.")
   }
 
@@ -1235,7 +1250,8 @@ class GroupCoordinator(val brokerId: Int,
                                              newMemberId: String,
                                              groupInstanceId: String,
                                              protocols: List[(String, Array[Byte])],
-                                             responseCallback: JoinCallback): Unit = {
+                                             responseCallback: JoinCallback,
+                                             requestLocal: RequestLocal): Unit = {
     val currentLeader = group.leaderOrNull
     val member = group.replaceStaticMember(groupInstanceId, oldMemberId, newMemberId)
     // Heartbeat of old member id will expire without effect since the group no longer contains that member id.
@@ -1287,7 +1303,7 @@ class GroupCoordinator(val brokerId: Int,
                 leaderId = currentLeader,
                 error = Errors.NONE))
             }
-          })
+          }, requestLocal)
         } else {
           maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol")
         }
@@ -1411,7 +1427,7 @@ class GroupCoordinator(val brokerId: Int,
               // This should be safe since there are no active members in an empty generation, so we just warn.
               warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
             }
-          })
+          }, RequestLocal.NoCaching)
         } else {
           info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
             s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) with ${group.size} members")
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index c054234..d3d911d 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -24,14 +24,13 @@ import java.util.Optional
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
-
 import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, KAFKA_2_3_IV0}
 import kafka.common.OffsetAndMetadata
 import kafka.internals.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
 import kafka.log.AppendOrigin
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{FetchLogEnd, ReplicaManager}
+import kafka.server.{FetchLogEnd, ReplicaManager, RequestLocal}
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.Implicits._
 import kafka.utils._
@@ -240,7 +239,8 @@ class GroupMetadataManager(brokerId: Int,
 
   def storeGroup(group: GroupMetadata,
                  groupAssignment: Map[String, Array[Byte]],
-                 responseCallback: Errors => Unit): Unit = {
+                 responseCallback: Errors => Unit,
+                 requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
     getMagic(partitionFor(group.groupId)) match {
       case Some(magicValue) =>
         // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
@@ -310,7 +310,7 @@ class GroupMetadataManager(brokerId: Int,
 
           responseCallback(responseError)
         }
-        appendForGroup(group, groupMetadataRecords, putCacheCallback)
+        appendForGroup(group, groupMetadataRecords, requestLocal, putCacheCallback)
 
       case None =>
         responseCallback(Errors.NOT_COORDINATOR)
@@ -320,6 +320,7 @@ class GroupMetadataManager(brokerId: Int,
 
   private def appendForGroup(group: GroupMetadata,
                              records: Map[TopicPartition, MemoryRecords],
+                             requestLocal: RequestLocal,
                              callback: Map[TopicPartition, PartitionResponse] => Unit): Unit = {
     // call replica manager to append the group message
     replicaManager.appendRecords(
@@ -329,7 +330,8 @@ class GroupMetadataManager(brokerId: Int,
       origin = AppendOrigin.Coordinator,
       entriesPerPartition = records,
       delayedProduceLock = Some(group.lock),
-      responseCallback = callback)
+      responseCallback = callback,
+      requestLocal = requestLocal)
   }
 
   /**
@@ -340,7 +342,8 @@ class GroupMetadataManager(brokerId: Int,
                    offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                    responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
                    producerId: Long = RecordBatch.NO_PRODUCER_ID,
-                   producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH): Unit = {
+                   producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
+                   requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
     // first filter out partitions with offset metadata size exceeding limit
     val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
       validateOffsetMetadataLength(offsetAndMetadata.metadata)
@@ -467,7 +470,7 @@ class GroupMetadataManager(brokerId: Int,
             }
           }
 
-          appendForGroup(group, entries, putCacheCallback)
+          appendForGroup(group, entries, requestLocal, putCacheCallback)
 
         case None =>
           val commitStatus = offsetMetadata.map { case (topicPartition, _) =>
@@ -781,9 +784,8 @@ class GroupMetadataManager(brokerId: Int,
   // visible for testing
   private[group] def cleanupGroupMetadata(): Unit = {
     val currentTimestamp = time.milliseconds()
-    val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => {
-      group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
-    })
+    val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, RequestLocal.NoCaching,
+      _.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs))
     offsetExpiredSensor.record(numOffsetsRemoved)
     if (numOffsetsRemoved > 0)
       info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.")
@@ -796,7 +798,8 @@ class GroupMetadataManager(brokerId: Int,
     *                 a group lock is held, therefore there is no need for the caller to also obtain a group lock.
     * @return The cumulative number of offsets removed
     */
-  def cleanupGroupMetadata(groups: Iterable[GroupMetadata], selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = {
+  def cleanupGroupMetadata(groups: Iterable[GroupMetadata], requestLocal: RequestLocal,
+                           selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = {
     var offsetsRemoved = 0
 
     groups.foreach { group =>
@@ -843,7 +846,8 @@ class GroupMetadataManager(brokerId: Int,
                 // do not need to require acks since even if the tombstone is lost,
                 // it will be appended again in the next purge cycle
                 val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones.toArray: _*)
-                partition.appendRecordsToLeader(records, origin = AppendOrigin.Coordinator, requiredAcks = 0)
+                partition.appendRecordsToLeader(records, origin = AppendOrigin.Coordinator, requiredAcks = 0,
+                  requestLocal = requestLocal)
 
                 offsetsRemoved += removedOffsets.size
                 trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 543e9c8..78983c1 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -18,8 +18,7 @@ package kafka.coordinator.transaction
 
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
-
-import kafka.server.{KafkaConfig, MetadataCache, ReplicaManager}
+import kafka.server.{KafkaConfig, MetadataCache, ReplicaManager, RequestLocal}
 import kafka.utils.{Logging, Scheduler}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
@@ -104,7 +103,8 @@ class TransactionCoordinator(brokerId: Int,
   def handleInitProducerId(transactionalId: String,
                            transactionTimeoutMs: Int,
                            expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch],
-                           responseCallback: InitProducerIdCallback): Unit = {
+                           responseCallback: InitProducerIdCallback,
+                           requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
 
     if (transactionalId == null) {
       // if the transactional id is null, then always blindly accept the request
@@ -167,7 +167,8 @@ class TransactionCoordinator(brokerId: Int,
               newMetadata.producerEpoch,
               TransactionResult.ABORT,
               isFromClient = false,
-              sendRetriableErrorCallback)
+              sendRetriableErrorCallback,
+              requestLocal)
           } else {
             def sendPidResponseCallback(error: Errors): Unit = {
               if (error == Errors.NONE) {
@@ -181,7 +182,8 @@ class TransactionCoordinator(brokerId: Int,
               }
             }
 
-            txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendPidResponseCallback)
+            txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata,
+              sendPidResponseCallback, requestLocal = requestLocal)
           }
       }
     }
@@ -320,7 +322,8 @@ class TransactionCoordinator(brokerId: Int,
                                        producerId: Long,
                                        producerEpoch: Short,
                                        partitions: collection.Set[TopicPartition],
-                                       responseCallback: AddPartitionsCallback): Unit = {
+                                       responseCallback: AddPartitionsCallback,
+                                       requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
     if (transactionalId == null || transactionalId.isEmpty) {
       debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request")
       responseCallback(Errors.INVALID_REQUEST)
@@ -360,7 +363,8 @@ class TransactionCoordinator(brokerId: Int,
           responseCallback(err)
 
         case Right((coordinatorEpoch, newMetadata)) =>
-          txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, responseCallback)
+          txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata,
+            responseCallback, requestLocal = requestLocal)
       }
     }
   }
@@ -413,13 +417,15 @@ class TransactionCoordinator(brokerId: Int,
                            producerId: Long,
                            producerEpoch: Short,
                            txnMarkerResult: TransactionResult,
-                           responseCallback: EndTxnCallback): Unit = {
+                           responseCallback: EndTxnCallback,
+                           requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
     endTransaction(transactionalId,
       producerId,
       producerEpoch,
       txnMarkerResult,
       isFromClient = true,
-      responseCallback)
+      responseCallback,
+      requestLocal)
   }
 
   private def endTransaction(transactionalId: String,
@@ -427,7 +433,8 @@ class TransactionCoordinator(brokerId: Int,
                              producerEpoch: Short,
                              txnMarkerResult: TransactionResult,
                              isFromClient: Boolean,
-                             responseCallback: EndTxnCallback): Unit = {
+                             responseCallback: EndTxnCallback,
+                             requestLocal: RequestLocal): Unit = {
     var isEpochFence = false
     if (transactionalId == null || transactionalId.isEmpty)
       responseCallback(Errors.INVALID_REQUEST)
@@ -586,7 +593,8 @@ class TransactionCoordinator(brokerId: Int,
             }
           }
 
-          txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendTxnMarkersCallback)
+          txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata,
+            sendTxnMarkersCallback, requestLocal = requestLocal)
       }
     }
   }
@@ -643,7 +651,8 @@ class TransactionCoordinator(brokerId: Int,
               txnTransitMetadata.producerEpoch,
               TransactionResult.ABORT,
               isFromClient = false,
-              onComplete(txnIdAndPidEpoch))
+              onComplete(txnIdAndPidEpoch),
+              RequestLocal.NoCaching)
           }
       }
     }
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 5e22fb7..62c70d9 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -19,11 +19,10 @@ package kafka.coordinator.transaction
 
 import java.util
 import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue}
-
 import kafka.api.KAFKA_2_8_IV0
 import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{KafkaConfig, MetadataCache}
+import kafka.server.{KafkaConfig, MetadataCache, RequestLocal}
 import kafka.utils.Implicits._
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.clients._
@@ -330,8 +329,8 @@ class TransactionMarkerChannelManager(
           throw new IllegalStateException(errorMsg)
       }
 
-    txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch, txnLogAppend.newMetadata, appendCallback,
-      _ == Errors.COORDINATOR_NOT_AVAILABLE)
+    txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch,
+      txnLogAppend.newMetadata, appendCallback, _ == Errors.COORDINATOR_NOT_AVAILABLE, RequestLocal.NoCaching)
   }
 
   def addTxnMarkersToBrokerQueue(transactionalId: String,
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 61fad95..25580f2 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -21,10 +21,9 @@ import java.util.Properties
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantReadWriteLock
-
 import kafka.log.{AppendOrigin, LogConfig}
 import kafka.message.UncompressedCodec
-import kafka.server.{Defaults, FetchLogEnd, ReplicaManager}
+import kafka.server.{Defaults, FetchLogEnd, ReplicaManager, RequestLocal}
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils.{Logging, Pool, Scheduler}
 import kafka.utils.Implicits._
@@ -209,7 +208,8 @@ class TransactionStateManager(brokerId: Int,
           internalTopicsAllowed = true,
           origin = AppendOrigin.Coordinator,
           recordsPerPartition,
-          removeFromCacheCallback)
+          removeFromCacheCallback,
+          requestLocal = RequestLocal.NoCaching)
       }
 
     }, delay = config.removeExpiredTransactionalIdsIntervalMs, period = config.removeExpiredTransactionalIdsIntervalMs)
@@ -526,7 +526,8 @@ class TransactionStateManager(brokerId: Int,
                              coordinatorEpoch: Int,
                              newMetadata: TxnTransitMetadata,
                              responseCallback: Errors => Unit,
-                             retryOnError: Errors => Boolean = _ => false): Unit = {
+                             retryOnError: Errors => Boolean = _ => false,
+                             requestLocal: RequestLocal): Unit = {
 
     // generate the message for this transaction metadata
     val keyBytes = TransactionLog.keyToBytes(transactionalId)
@@ -679,7 +680,8 @@ class TransactionStateManager(brokerId: Int,
                 internalTopicsAllowed = true,
                 origin = AppendOrigin.Coordinator,
                 recordsPerPartition,
-                updateCacheCallback)
+                updateCacheCallback,
+                requestLocal = requestLocal)
 
               trace(s"Appending new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log")
           }
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0e74c28..b49bfb4 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -24,7 +24,6 @@ import java.util.Optional
 import java.util.concurrent.atomic._
 import java.util.concurrent.TimeUnit
 import java.util.regex.Pattern
-
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0}
 import kafka.common.{LongRef, OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
 import kafka.log.AppendOrigin.RaftLeader
@@ -32,7 +31,7 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCod
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.LeaderEpochFileCache
-import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
 import kafka.utils._
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.message.{DescribeProducersResponseData, FetchResponseData}
@@ -693,15 +692,17 @@ class Log(@volatile private var _dir: File,
    * @param records The records to append
    * @param origin Declares the origin of the append which affects required validations
    * @param interBrokerProtocolVersion Inter-broker message protocol version
+   * @param requestLocal request local instance
    * @throws KafkaStorageException If the append fails due to an I/O error.
    * @return Information about the appended messages including the first and last offset.
    */
   def appendAsLeader(records: MemoryRecords,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
-                     interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
+                     interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion,
+                     requestLocal: RequestLocal = RequestLocal.NoCaching): LogAppendInfo = {
     val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader
-    append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, ignoreRecordSize = false)
+    append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), ignoreRecordSize = false)
   }
 
   /**
@@ -717,6 +718,7 @@ class Log(@volatile private var _dir: File,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       validateAndAssignOffsets = false,
       leaderEpoch = -1,
+      None,
       // disable to check the validation of record size since the record is already accepted by leader.
       ignoreRecordSize = true)
   }
@@ -732,6 +734,7 @@ class Log(@volatile private var _dir: File,
    * @param interBrokerProtocolVersion Inter-broker message protocol version
    * @param validateAndAssignOffsets Should the log assign offsets to this message set or blindly apply what it is given
    * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
+   * @param requestLocal The request local instance if assignOffsets is true
    * @param ignoreRecordSize true to skip validation of record size.
    * @throws KafkaStorageException If the append fails due to an I/O error.
    * @throws OffsetsOutOfOrderException If out of order offsets found in 'records'
@@ -743,6 +746,7 @@ class Log(@volatile private var _dir: File,
                      interBrokerProtocolVersion: ApiVersion,
                      validateAndAssignOffsets: Boolean,
                      leaderEpoch: Int,
+                     requestLocal: Option[RequestLocal],
                      ignoreRecordSize: Boolean): LogAppendInfo = {
 
     val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)
@@ -778,7 +782,9 @@ class Log(@volatile private var _dir: File,
                 leaderEpoch,
                 origin,
                 interBrokerProtocolVersion,
-                brokerTopicStats)
+                brokerTopicStats,
+                requestLocal.getOrElse(throw new IllegalArgumentException(
+                  "requestLocal should be defined if assignOffsets is true")))
             } catch {
               case e: IOException =>
                 throw new KafkaException(s"Error validating messages while appending to log $name", e)
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 056be10..925c602 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer
 import kafka.api.{ApiVersion, KAFKA_2_1_IV0}
 import kafka.common.{LongRef, RecordValidationException}
 import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec}
-import kafka.server.BrokerTopicStats
+import kafka.server.{BrokerTopicStats, RequestLocal}
 import kafka.utils.Logging
 import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record.{AbstractRecords, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
@@ -28,7 +28,7 @@ import org.apache.kafka.common.InvalidRecordException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.ProduceResponse.RecordError
-import org.apache.kafka.common.utils.{BufferSupplier, Time}
+import org.apache.kafka.common.utils.Time
 
 import scala.collection.{Seq, mutable}
 import scala.jdk.CollectionConverters._
@@ -95,7 +95,8 @@ private[log] object LogValidator extends Logging {
                                                     partitionLeaderEpoch: Int,
                                                     origin: AppendOrigin,
                                                     interBrokerProtocolVersion: ApiVersion,
-                                                    brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
+                                                    brokerTopicStats: BrokerTopicStats,
+                                                    requestLocal: RequestLocal): ValidationAndOffsetAssignResult = {
     if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // check the magic value
       if (!records.hasMatchingMagic(magic))
@@ -106,8 +107,9 @@ private[log] object LogValidator extends Logging {
         assignOffsetsNonCompressed(records, topicPartition, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
           partitionLeaderEpoch, origin, magic, brokerTopicStats)
     } else {
-      validateMessagesAndAssignOffsetsCompressed(records, topicPartition, offsetCounter, time, now, sourceCodec, targetCodec, compactedTopic,
-        magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, origin, interBrokerProtocolVersion, brokerTopicStats)
+      validateMessagesAndAssignOffsetsCompressed(records, topicPartition, offsetCounter, time, now, sourceCodec,
+        targetCodec, compactedTopic, magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, origin,
+        interBrokerProtocolVersion, brokerTopicStats, requestLocal)
     }
   }
 
@@ -232,6 +234,8 @@ private[log] object LogValidator extends Logging {
       (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
     }
 
+    // The current implementation of BufferSupplier is naive and works best when the buffer size
+    // cardinality is low, so don't use it here
     val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
     val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType,
       offsetCounter.value, now, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
@@ -290,7 +294,9 @@ private[log] object LogValidator extends Logging {
       var offsetOfMaxBatchTimestamp = -1L
 
       val recordErrors = new ArrayBuffer[ApiRecordError](0)
-      // this is a hot path and we want to avoid any unnecessary allocations.
+      // This is a hot path and we want to avoid any unnecessary allocations.
+      // That said, there is no benefit in using `skipKeyValueIterator` for the uncompressed
+      // case since we don't do key/value copies in this path (we just slice the ByteBuffer)
       var batchIndex = 0
       batch.forEach { record =>
         validateRecord(batch, topicPartition, record, batchIndex, now, timestampType,
@@ -360,7 +366,8 @@ private[log] object LogValidator extends Logging {
                                                  partitionLeaderEpoch: Int,
                                                  origin: AppendOrigin,
                                                  interBrokerProtocolVersion: ApiVersion,
-                                                 brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
+                                                 brokerTopicStats: BrokerTopicStats,
+                                                 requestLocal: RequestLocal): ValidationAndOffsetAssignResult = {
 
     if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
       throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " +
@@ -404,9 +411,9 @@ private[log] object LogValidator extends Logging {
       // if we are on version 2 and beyond, and we know we are going for in place assignment,
       // then we can optimize the iterator to skip key / value / headers since they would not be used at all
       val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= RecordBatch.MAGIC_VALUE_V2)
-        batch.skipKeyValueIterator(BufferSupplier.NO_CACHING)
+        batch.skipKeyValueIterator(requestLocal.bufferSupplier)
       else
-        batch.streamingIterator(BufferSupplier.NO_CACHING)
+        batch.streamingIterator(requestLocal.bufferSupplier)
 
       try {
         val recordErrors = new ArrayBuffer[ApiRecordError](0)
@@ -499,6 +506,8 @@ private[log] object LogValidator extends Logging {
     val startNanos = time.nanoseconds
     val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType,
       validatedRecords.asJava)
+    // The current implementation of BufferSupplier is naive and works best when the buffer size
+    // cardinality is low, so don't use it here
     val buffer = ByteBuffer.allocate(estimatedSize)
     val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value,
       logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch)
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 6ac39b7..7b5f83d 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -19,10 +19,9 @@ package kafka.raft
 import java.io.File
 import java.nio.file.{Files, NoSuchFileException, Path}
 import java.util.{Optional, Properties}
-
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.utils.Time
@@ -76,7 +75,8 @@ final class KafkaMetadataLog private (
     handleAndConvertLogAppendInfo(
       log.appendAsLeader(records.asInstanceOf[MemoryRecords],
         leaderEpoch = epoch,
-        origin = AppendOrigin.RaftLeader
+        origin = AppendOrigin.RaftLeader,
+        requestLocal = RequestLocal.NoCaching
       )
     )
   }
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 47bc19d..fa0dc79 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -76,7 +76,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
   private val aclApis = new AclApis(authHelper, authorizer, requestHelper, "controller", config)
 
-  override def handle(request: RequestChannel.Request): Unit = {
+  override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     try {
       request.header.apiKey match {
         case ApiKeys.FETCH => handleFetch(request)
@@ -97,7 +97,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
         case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignments(request)
         case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignments(request)
-        case ApiKeys.ENVELOPE => handleEnvelopeRequest(request)
+        case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal)
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
         case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
@@ -113,12 +113,12 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleEnvelopeRequest(request: RequestChannel.Request): Unit = {
+  def handleEnvelopeRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
       requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException(
         s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope"))
     } else {
-      EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
+      EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle(_, requestLocal))
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d75e4ae..c5e1404 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -144,7 +144,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Top-level method that handles all requests and multiplexes to the right api
    */
-  override def handle(request: RequestChannel.Request): Unit = {
+  override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     try {
       trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
         s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
@@ -156,21 +156,21 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       request.header.apiKey match {
-        case ApiKeys.PRODUCE => handleProduceRequest(request)
+        case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
         case ApiKeys.FETCH => handleFetchRequest(request)
         case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
         case ApiKeys.METADATA => handleTopicMetadataRequest(request)
         case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
         case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
-        case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
+        case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal)
         case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
-        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
+        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal)
         case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
         case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
-        case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
+        case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal)
         case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
         case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
-        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
+        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal)
         case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
         case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
@@ -178,13 +178,13 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
         case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
         case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
-        case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
+        case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal)
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
-        case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
-        case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
-        case ApiKeys.END_TXN => handleEndTxnRequest(request)
-        case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
-        case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
+        case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request, requestLocal)
+        case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request, requestLocal)
+        case ApiKeys.END_TXN => handleEndTxnRequest(request, requestLocal)
+        case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal)
+        case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal)
         case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
         case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
         case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
@@ -198,19 +198,19 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.RENEW_DELEGATION_TOKEN => maybeForwardToController(request, handleRenewTokenRequest)
         case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest)
         case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
-        case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
+        case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal)
         case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
         case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
         case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
         case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
-        case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request)
+        case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal)
         case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
         case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)
         case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
         case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
         case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
         case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures)
-        case ApiKeys.ENVELOPE => handleEnvelope(request)
+        case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
         case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
         case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
         case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest)
@@ -314,7 +314,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads(), this)
   }
 
-  def handleUpdateMetadataRequest(request: RequestChannel.Request): Unit = {
+  def handleUpdateMetadataRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
     val correlationId = request.header.correlationId
     val updateMetadataRequest = request.body[UpdateMetadataRequest]
@@ -330,13 +330,14 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else {
       val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
       if (deletedPartitions.nonEmpty)
-        groupCoordinator.handleDeletedPartitions(deletedPartitions)
+        groupCoordinator.handleDeletedPartitions(deletedPartitions, requestLocal)
 
       if (zkSupport.adminManager.hasDelayedTopicOperations) {
         updateMetadataRequest.partitionStates.forEach { partitionState =>
           zkSupport.adminManager.tryCompleteDelayedTopicOperations(partitionState.topicName)
         }
       }
+
       quotas.clientQuotaCallback.foreach { callback =>
         if (callback.updateClusterMetadata(metadataCache.getClusterMetadata(clusterId, request.context.listenerName))) {
           quotas.fetch.updateQuotaMetricConfigs()
@@ -380,7 +381,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset commit request
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request): Unit = {
+  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val header = request.header
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
@@ -509,7 +510,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           Option(offsetCommitRequest.data.groupInstanceId),
           offsetCommitRequest.data.generationId,
           partitionData,
-          sendResponseCallback)
+          sendResponseCallback,
+          requestLocal)
       }
     }
   }
@@ -517,7 +519,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle a produce request
    */
-  def handleProduceRequest(request: RequestChannel.Request): Unit = {
+  def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val produceRequest = request.body[ProduceRequest]
     val requestSize = request.sizeInBytes
 
@@ -639,6 +641,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         internalTopicsAllowed = internalTopicsAllowed,
         origin = AppendOrigin.Client,
         entriesPerPartition = authorizedRequestInfo,
+        requestLocal = requestLocal,
         responseCallback = sendResponseCallback,
         recordConversionStatsCallback = processingStatsCallback)
 
@@ -1448,7 +1451,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val joinGroupRequest = request.body[JoinGroupRequest]
 
     // the callback for sending a join-group response
@@ -1507,11 +1510,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         joinGroupRequest.data.sessionTimeoutMs,
         joinGroupRequest.data.protocolType,
         protocols,
-        sendResponseCallback)
+        sendResponseCallback,
+        requestLocal)
     }
   }
 
-  def handleSyncGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleSyncGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val syncGroupRequest = request.body[SyncGroupRequest]
 
     def sendResponseCallback(syncGroupResult: SyncGroupResult): Unit = {
@@ -1550,19 +1554,20 @@ class KafkaApis(val requestChannel: RequestChannel,
         Option(syncGroupRequest.data.protocolName),
         Option(syncGroupRequest.data.groupInstanceId),
         assignmentMap.result(),
-        sendResponseCallback
+        sendResponseCallback,
+        requestLocal
       )
     }
   }
 
-  def handleDeleteGroupsRequest(request: RequestChannel.Request): Unit = {
+  def handleDeleteGroupsRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val deleteGroupsRequest = request.body[DeleteGroupsRequest]
     val groups = deleteGroupsRequest.data.groupsNames.asScala.distinct
 
     val (authorizedGroups, unauthorizedGroups) = authHelper.partitionSeqByAuthorized(request.context, DELETE, GROUP,
       groups)(identity)
 
-    val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups.toSet) ++
+    val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups.toSet, requestLocal) ++
       unauthorizedGroups.map(_ -> Errors.GROUP_AUTHORIZATION_FAILED)
 
     requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
@@ -1990,7 +1995,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = {
+  def handleInitProducerIdRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val initProducerIdRequest = request.body[InitProducerIdRequest]
     val transactionalId = initProducerIdRequest.data.transactionalId
 
@@ -2035,12 +2040,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     producerIdAndEpoch match {
       case Right(producerIdAndEpoch) => txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.data.transactionTimeoutMs,
-        producerIdAndEpoch, sendResponseCallback)
+        producerIdAndEpoch, sendResponseCallback, requestLocal)
       case Left(error) => requestHelper.sendErrorResponseMaybeThrottle(request, error.exception)
     }
   }
 
-  def handleEndTxnRequest(request: RequestChannel.Request): Unit = {
+  def handleEndTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
     val endTxnRequest = request.body[EndTxnRequest]
     val transactionalId = endTxnRequest.data.transactionalId
@@ -2071,7 +2076,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         endTxnRequest.data.producerId,
         endTxnRequest.data.producerEpoch,
         endTxnRequest.result(),
-        sendResponseCallback)
+        sendResponseCallback,
+        requestLocal)
     } else
       requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
         new EndTxnResponse(new EndTxnResponseData()
@@ -2080,7 +2086,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       )
   }
 
-  def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = {
+  def handleWriteTxnMarkersRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest]
@@ -2175,6 +2181,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           internalTopicsAllowed = true,
           origin = AppendOrigin.Coordinator,
           entriesPerPartition = controlRecords,
+          requestLocal = requestLocal,
           responseCallback = maybeSendResponseCallback(producerId, marker.transactionResult))
       }
     }
@@ -2190,7 +2197,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}")
   }
 
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request): Unit = {
+  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
     val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
     val transactionalId = addPartitionsToTxnRequest.data.transactionalId
@@ -2240,7 +2247,6 @@ class KafkaApis(val requestChannel: RequestChannel,
             responseBody
           }
 
-
           requestHelper.sendResponseMaybeThrottle(request, createResponse)
         }
 
@@ -2248,12 +2254,13 @@ class KafkaApis(val requestChannel: RequestChannel,
           addPartitionsToTxnRequest.data.producerId,
           addPartitionsToTxnRequest.data.producerEpoch,
           authorizedPartitions,
-          sendResponseCallback)
+          sendResponseCallback,
+          requestLocal)
       }
     }
   }
 
-  def handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit = {
+  def handleAddOffsetsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
     val addOffsetsToTxnRequest = request.body[AddOffsetsToTxnRequest]
     val transactionalId = addOffsetsToTxnRequest.data.transactionalId
@@ -2298,11 +2305,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         addOffsetsToTxnRequest.data.producerId,
         addOffsetsToTxnRequest.data.producerEpoch,
         Set(offsetTopicPartition),
-        sendResponseCallback)
+        sendResponseCallback,
+        requestLocal)
     }
   }
 
-  def handleTxnOffsetCommitRequest(request: RequestChannel.Request): Unit = {
+  def handleTxnOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
     val header = request.header
     val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest]
@@ -2367,7 +2375,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           Option(txnOffsetCommitRequest.data.groupInstanceId),
           txnOffsetCommitRequest.data.generationId,
           offsetMetadata,
-          sendResponseCallback)
+          sendResponseCallback,
+          requestLocal)
       }
     }
   }
@@ -2877,7 +2886,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleOffsetDeleteRequest(request: RequestChannel.Request): Unit = {
+  def handleOffsetDeleteRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val offsetDeleteRequest = request.body[OffsetDeleteRequest]
     val groupId = offsetDeleteRequest.data.groupId
 
@@ -2901,7 +2910,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       val (groupError, authorizedTopicPartitionsErrors) = groupCoordinator.handleDeleteOffsets(
-        groupId, topicPartitions)
+        groupId, topicPartitions, requestLocal)
 
       topicPartitionErrors ++= authorizedTopicPartitionsErrors
 
@@ -3129,7 +3138,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     })
   }
 
-  def handleEnvelope(request: RequestChannel.Request): Unit = {
+  def handleEnvelope(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
 
     // If forwarding is not yet enabled or this request has been received on an invalid endpoint,
@@ -3154,7 +3163,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         s"Broker $brokerId is not the active controller"))
       return
     }
-    EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
+
+    EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle(_, requestLocal))
   }
 
   def handleDescribeProducersRequest(request: RequestChannel.Request): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index f6d868a..4d38c6e 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -20,9 +20,9 @@ package kafka.server
 import kafka.network._
 import kafka.utils._
 import kafka.metrics.KafkaMetricsGroup
+
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
-
 import com.yammer.metrics.core.Meter
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.utils.{KafkaThread, Time}
@@ -31,7 +31,7 @@ import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
 trait ApiRequestHandler {
-  def handle(request: RequestChannel.Request): Unit
+  def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit
 }
 
 /**
@@ -44,8 +44,9 @@ class KafkaRequestHandler(id: Int,
                           val requestChannel: RequestChannel,
                           apis: ApiRequestHandler,
                           time: Time) extends Runnable with Logging {
-  this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
+  this.logIdent = s"[Kafka Request Handler $id on Broker $brokerId], "
   private val shutdownComplete = new CountDownLatch(1)
+  private val requestLocal = RequestLocal.withThreadConfinedCaching
   @volatile private var stopped = false
 
   def run(): Unit = {
@@ -64,17 +65,17 @@ class KafkaRequestHandler(id: Int,
       req match {
         case RequestChannel.ShutdownRequest =>
           debug(s"Kafka request handler $id on broker $brokerId received shut down command")
-          shutdownComplete.countDown()
+          completeShutdown()
           return
 
         case request: RequestChannel.Request =>
           try {
             request.requestDequeueTimeNanos = endTime
             trace(s"Kafka request handler $id on broker $brokerId handling request $request")
-            apis.handle(request)
+            apis.handle(request, requestLocal)
           } catch {
             case e: FatalExitError =>
-              shutdownComplete.countDown()
+              completeShutdown()
               Exit.exit(e.statusCode)
             case e: Throwable => error("Exception when handling request", e)
           } finally {
@@ -84,6 +85,11 @@ class KafkaRequestHandler(id: Int,
         case null => // continue
       }
     }
+    completeShutdown()
+  }
+
+  private def completeShutdown(): Unit = {
+    requestLocal.close()
     shutdownComplete.countDown()
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d813241..6ca8169 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -21,7 +21,6 @@ import java.util.Optional
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.Lock
-
 import com.yammer.metrics.core.Meter
 import kafka.api._
 import kafka.cluster.{BrokerEndPoint, Partition}
@@ -606,11 +605,12 @@ class ReplicaManager(val config: KafkaConfig,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                     delayedProduceLock: Option[Lock] = None,
-                    recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = {
+                    recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (),
+                    requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
       val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
-        origin, entriesPerPartition, requiredAcks)
+        origin, entriesPerPartition, requiredAcks, requestLocal)
       debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
 
       val produceStatus = localProduceResults.map { case (topicPartition, result) =>
@@ -926,7 +926,8 @@ class ReplicaManager(val config: KafkaConfig,
   private def appendToLocalLog(internalTopicsAllowed: Boolean,
                                origin: AppendOrigin,
                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
-                               requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
+                               requiredAcks: Short,
+                               requestLocal: RequestLocal): Map[TopicPartition, LogAppendResult] = {
     val traceEnabled = isTraceEnabled
     def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
       val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L)
@@ -952,7 +953,7 @@ class ReplicaManager(val config: KafkaConfig,
       } else {
         try {
           val partition = getPartitionOrException(topicPartition)
-          val info = partition.appendRecordsToLeader(records, origin, requiredAcks)
+          val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal)
           val numAppendedMessages = info.numMessages
 
           // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
diff --git a/core/src/main/scala/kafka/server/RequestLocal.scala b/core/src/main/scala/kafka/server/RequestLocal.scala
new file mode 100644
index 0000000..5af495f
--- /dev/null
+++ b/core/src/main/scala/kafka/server/RequestLocal.scala
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.common.utils.BufferSupplier
+
+object RequestLocal {
+  val NoCaching: RequestLocal = RequestLocal(BufferSupplier.NO_CACHING)
+
+  /** The returned instance should be confined to a single thread. */
+  def withThreadConfinedCaching: RequestLocal = RequestLocal(BufferSupplier.create())
+}
+
+/**
+ * Container for stateful instances where the lifecycle is scoped to one request.
+ *
+ * When each request is handled by one thread, efficient data structures with no locking or atomic operations
+ * can be used (see RequestLocal.withThreadConfinedCaching).
+ */
+case class RequestLocal(bufferSupplier: BufferSupplier) {
+  def close(): Unit = bufferSupplier.close()
+}
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 6dfaa18..70e44c8 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{RaftReplicaManager, RequestHandlerHelper}
+import kafka.server.{RaftReplicaManager, RequestHandlerHelper, RequestLocal}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.metadata.MetadataRecordType._
 import org.apache.kafka.common.metadata._
@@ -248,7 +248,7 @@ class BrokerMetadataListener(
       case Some(topicName) =>
         info(s"Processing deletion of topic $topicName with id ${record.topicId}")
         val removedPartitions = imageBuilder.partitionsBuilder().removeTopicById(record.topicId())
-        groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
+        groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq, RequestLocal.NoCaching)
         configRepository.remove(new ConfigResource(ConfigResource.Type.TOPIC, topicName))
     }
   }
diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
index 91d5ecd3..a9b471b 100644
--- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
@@ -19,7 +19,7 @@ package kafka.tools
 
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
-import kafka.server.{ApiRequestHandler, ApiVersionManager}
+import kafka.server.{ApiRequestHandler, ApiVersionManager, RequestLocal}
 import kafka.utils.Logging
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, EndQuorumEpochResponseData, FetchResponseData, FetchSnapshotResponseData, VoteResponseData}
@@ -37,7 +37,7 @@ class TestRaftRequestHandler(
   apiVersionManager: ApiVersionManager
 ) extends ApiRequestHandler with Logging {
 
-  override def handle(request: RequestChannel.Request): Unit = {
+  override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     try {
       trace(s"Handling request:${request.requestDesc(true)} with context ${request.context}")
       request.header.apiKey match {
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 8d567ec..8dc37d4 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -20,7 +20,6 @@ package kafka.cluster
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent._
-
 import kafka.api.ApiVersion
 import kafka.log._
 import kafka.server._
@@ -336,10 +335,11 @@ class PartitionLockTest extends Logging {
   }
 
   private def append(partition: Partition, numRecords: Int, followerQueues: Seq[ArrayBlockingQueue[MemoryRecords]]): Unit = {
+    val requestLocal = RequestLocal.withThreadConfinedCaching
     (0 until numRecords).foreach { _ =>
       val batch = TestUtils.records(records = List(new SimpleRecord("k1".getBytes, "v1".getBytes),
         new SimpleRecord("k2".getBytes, "v2".getBytes)))
-      partition.appendRecordsToLeader(batch, origin = AppendOrigin.Client, requiredAcks = 0)
+      partition.appendRecordsToLeader(batch, origin = AppendOrigin.Client, requiredAcks = 0, requestLocal)
       followerQueues.foreach(_.put(batch))
     }
   }
@@ -388,8 +388,9 @@ class PartitionLockTest extends Logging {
     _topicId = None,
     keepPartitionMetadataFile = true) {
 
-    override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, origin: AppendOrigin, interBrokerProtocolVersion: ApiVersion): LogAppendInfo = {
-      val appendInfo = super.appendAsLeader(records, leaderEpoch, origin, interBrokerProtocolVersion)
+    override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, origin: AppendOrigin,
+                                interBrokerProtocolVersion: ApiVersion, requestLocal: RequestLocal): LogAppendInfo = {
+      val appendInfo = super.appendAsLeader(records, leaderEpoch, origin, interBrokerProtocolVersion, requestLocal)
       appendSemaphore.acquire()
       appendInfo
     }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index b614e13..2b6d7ff 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -592,10 +592,11 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch")
     assertEquals(Set[Integer](leader, follower2), partition.isrState.isr, "ISR")
 
+    val requestLocal = RequestLocal.withThreadConfinedCaching
     // after makeLeader(() call, partition should know about all the replicas
     // append records with initial leader epoch
-    partition.appendRecordsToLeader(batch1, origin = AppendOrigin.Client, requiredAcks = 0)
-    partition.appendRecordsToLeader(batch2, origin = AppendOrigin.Client, requiredAcks = 0)
+    partition.appendRecordsToLeader(batch1, origin = AppendOrigin.Client, requiredAcks = 0, requestLocal)
+    partition.appendRecordsToLeader(batch2, origin = AppendOrigin.Client, requiredAcks = 0, requestLocal)
     assertEquals(partition.localLogOrException.logStartOffset, partition.localLogOrException.highWatermark,
       "Expected leader's HW not move")
 
@@ -839,7 +840,7 @@ class PartitionTest extends AbstractPartitionTest {
       new SimpleRecord("k2".getBytes, "v2".getBytes),
       new SimpleRecord("k3".getBytes, "v3".getBytes)),
       baseOffset = 0L)
-    partition.appendRecordsToLeader(records, origin = AppendOrigin.Client, requiredAcks = 0)
+    partition.appendRecordsToLeader(records, origin = AppendOrigin.Client, requiredAcks = 0, RequestLocal.withThreadConfinedCaching)
 
     def fetchLatestOffset(isolationLevel: Option[IsolationLevel]): TimestampAndOffset = {
       val res = partition.fetchOffsetForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
@@ -954,11 +955,13 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch")
     assertEquals(Set[Integer](leader, follower2), partition.isrState.isr, "ISR")
 
+    val requestLocal = RequestLocal.withThreadConfinedCaching
+
     // after makeLeader(() call, partition should know about all the replicas
     // append records with initial leader epoch
     val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1, origin = AppendOrigin.Client,
-      requiredAcks = 0).lastOffset
-    partition.appendRecordsToLeader(batch2, origin = AppendOrigin.Client, requiredAcks = 0)
+      requiredAcks = 0, requestLocal).lastOffset
+    partition.appendRecordsToLeader(batch2, origin = AppendOrigin.Client, requiredAcks = 0, requestLocal)
     assertEquals(partition.localLogOrException.logStartOffset, partition.log.get.highWatermark, "Expected leader's HW not move")
 
     // let the follower in ISR move leader's HW to move further but below LEO
@@ -999,7 +1002,7 @@ class PartitionTest extends AbstractPartitionTest {
     val currentLeaderEpochStartOffset = partition.localLogOrException.logEndOffset
 
     // append records with the latest leader epoch
-    partition.appendRecordsToLeader(batch3, origin = AppendOrigin.Client, requiredAcks = 0)
+    partition.appendRecordsToLeader(batch3, origin = AppendOrigin.Client, requiredAcks = 0, requestLocal)
 
     // fetch from follower not in ISR from log start offset should not add this follower to ISR
     updateFollowerFetchState(follower1, LogOffsetMetadata(0))
diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 14e0f3f..6dfb396 100644
--- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{ConcurrentHashMap, Executors}
 import java.util.{Collections, Random}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.Lock
-
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
 import kafka.log.{AppendOrigin, Log}
 import kafka.server._
@@ -160,8 +159,10 @@ object AbstractCoordinatorConcurrencyTest {
   class TestReplicaManager extends ReplicaManager(
     null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, None, null, null) {
 
+    @volatile var logs: mutable.Map[TopicPartition, (Log, Long)] = _
     var producePurgatory: DelayedOperationPurgatory[DelayedProduce] = _
     var watchKeys: mutable.Set[TopicPartitionOperationKey] = _
+
     def createDelayedProducePurgatory(timer: MockTimer): Unit = {
       producePurgatory = new DelayedOperationPurgatory[DelayedProduce]("Produce", timer, 1, reaperEnabled = false)
       watchKeys = Collections.newSetFromMap(new ConcurrentHashMap[TopicPartitionOperationKey, java.lang.Boolean]()).asScala
@@ -176,7 +177,8 @@ object AbstractCoordinatorConcurrencyTest {
                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                                delayedProduceLock: Option[Lock] = None,
-                               processingStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = {
+                               processingStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (),
+                               requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
 
       if (entriesPerPartition.isEmpty)
         return
@@ -204,20 +206,24 @@ object AbstractCoordinatorConcurrencyTest {
       watchKeys ++= producerRequestKeys
       producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
     }
+
     override def getMagic(topicPartition: TopicPartition): Option[Byte] = {
       Some(RecordBatch.MAGIC_VALUE_V2)
     }
-    @volatile var logs: mutable.Map[TopicPartition, (Log, Long)] = _
+
     def getOrCreateLogs(): mutable.Map[TopicPartition, (Log, Long)] = {
       if (logs == null)
         logs = mutable.Map[TopicPartition, (Log, Long)]()
       logs
     }
+
     def updateLog(topicPartition: TopicPartition, log: Log, endOffset: Long): Unit = {
       getOrCreateLogs().put(topicPartition, (log, endOffset))
     }
+
     override def getLog(topicPartition: TopicPartition): Option[Log] =
       getOrCreateLogs().get(topicPartition).map(l => l._1)
+
     override def getLogEndOffset(topicPartition: TopicPartition): Option[Long] =
       getOrCreateLogs().get(topicPartition).map(l => l._2)
   }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 3982689..23ebbeb 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -18,9 +18,8 @@
 package kafka.coordinator.group
 
 import java.util.Optional
-
 import kafka.common.OffsetAndMetadata
-import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager}
+import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal}
 import kafka.utils._
 import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.TopicPartition
@@ -29,9 +28,9 @@ import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse, TransactionResult}
 import org.easymock.{Capture, EasyMock, IAnswer}
+
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
-
 import kafka.cluster.Partition
 import kafka.log.AppendOrigin
 import kafka.zk.KafkaZkClient
@@ -3421,7 +3420,8 @@ class GroupCoordinatorTest {
   @Test
   def testDeleteOffsetOfNonExistingGroup(): Unit = {
     val tp = new TopicPartition("foo", 0)
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp))
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp),
+      RequestLocal.NoCaching)
 
     assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError)
     assertTrue(topics.isEmpty)
@@ -3432,7 +3432,8 @@ class GroupCoordinatorTest {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     dynamicJoinGroup(groupId, memberId, "My Protocol", protocols)
     val tp = new TopicPartition("foo", 0)
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp))
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp),
+      RequestLocal.NoCaching)
 
     assertEquals(Errors.NON_EMPTY_GROUP, groupError)
     assertTrue(topics.isEmpty)
@@ -3476,7 +3477,8 @@ class GroupCoordinatorTest {
     EasyMock.expect(replicaManager.onlinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0))
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0),
+      RequestLocal.NoCaching)
 
     assertEquals(Errors.NONE, groupError)
     assertEquals(1, topics.size)
@@ -3505,7 +3507,8 @@ class GroupCoordinatorTest {
       Map(tp -> offset))
     assertEquals(Errors.NONE, validOffsetCommitResult(tp))
 
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp))
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp),
+      RequestLocal.NoCaching)
 
     assertEquals(Errors.NONE, groupError)
     assertEquals(1, topics.size)
@@ -3519,7 +3522,8 @@ class GroupCoordinatorTest {
     groupCoordinator.groupManager.addGroup(group)
 
     val tp = new TopicPartition("foo", 0)
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp))
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp),
+      RequestLocal.NoCaching)
 
     assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError)
     assertTrue(topics.isEmpty)
@@ -3562,7 +3566,8 @@ class GroupCoordinatorTest {
     EasyMock.expect(replicaManager.onlinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0))
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0),
+      RequestLocal.NoCaching)
 
     assertEquals(Errors.NONE, groupError)
     assertEquals(1, topics.size)
@@ -3609,7 +3614,8 @@ class GroupCoordinatorTest {
     EasyMock.expect(replicaManager.onlinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0, t2p0))
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0, t2p0),
+      RequestLocal.NoCaching)
 
     assertEquals(Errors.NONE, groupError)
     assertEquals(2, topics.size)
@@ -3789,12 +3795,14 @@ class GroupCoordinatorTest {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
-      EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
-      override def answer = capturedArgument.getValue.apply(
+      EasyMock.anyObject(),
+      EasyMock.anyObject()
+    )).andAnswer(new IAnswer[Unit] {
+      override def answer: Unit = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
           new PartitionResponse(appendRecordError, 0L, RecordBatch.NO_TIMESTAMP, 0L)
-        )
-      )})
+      ))
+    })
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
@@ -3821,6 +3829,7 @@ class GroupCoordinatorTest {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+      EasyMock.anyObject(),
       EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
@@ -3963,6 +3972,7 @@ class GroupCoordinatorTest {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
@@ -3996,6 +4006,7 @@ class GroupCoordinatorTest {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 231382e..5b2152b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -21,15 +21,15 @@ import java.lang.management.ManagementFactory
 import java.nio.ByteBuffer
 import java.util.concurrent.locks.ReentrantLock
 import java.util.{Collections, Optional}
-
 import com.yammer.metrics.core.Gauge
+
 import javax.management.ObjectName
 import kafka.api._
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
 import kafka.log.{AppendOrigin, Log, LogAppendInfo}
 import kafka.metrics.KafkaYammerMetrics
-import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata, ReplicaManager}
+import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata, ReplicaManager, RequestLocal}
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
@@ -105,7 +105,7 @@ class GroupMetadataManagerTest {
     var expiredOffsets: Int = 0
     var infoCount = 0
     val gmm = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, time, metrics) {
-      override def cleanupGroupMetadata(groups: Iterable[GroupMetadata],
+      override def cleanupGroupMetadata(groups: Iterable[GroupMetadata], requestLocal: RequestLocal,
                                         selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = expiredOffsets
 
       override def info(msg: => String): Unit = infoCount += 1
@@ -1418,8 +1418,8 @@ class GroupMetadataManagerTest {
 
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+      EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
     groupMetadataManager.cleanupGroupMetadata()
@@ -1453,8 +1453,8 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     mockGetPartition()
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
-      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+      EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(replicaManager, partition)
 
     groupMetadataManager.cleanupGroupMetadata()
@@ -1501,8 +1501,8 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     mockGetPartition()
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
-      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+      EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(replicaManager, partition)
 
     groupMetadataManager.cleanupGroupMetadata()
@@ -1576,8 +1576,8 @@ class GroupMetadataManagerTest {
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
-      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+      EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
     groupMetadataManager.cleanupGroupMetadata()
@@ -1677,8 +1677,8 @@ class GroupMetadataManagerTest {
     // expect the offset tombstone
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+      EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
     groupMetadataManager.cleanupGroupMetadata()
@@ -1701,8 +1701,8 @@ class GroupMetadataManagerTest {
     // expect the offset tombstone
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+      EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
     groupMetadataManager.cleanupGroupMetadata()
@@ -1744,8 +1744,8 @@ class GroupMetadataManagerTest {
     // expect the offset tombstone
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+      EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
     groupMetadataManager.cleanupGroupMetadata()
@@ -1822,8 +1822,8 @@ class GroupMetadataManagerTest {
     // expect the offset tombstone
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+      EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
     groupMetadataManager.cleanupGroupMetadata()
@@ -1948,8 +1948,8 @@ class GroupMetadataManagerTest {
 
     // expect the offset tombstone
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+      EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.expectLastCall().times(1)
 
     EasyMock.replay(partition)
@@ -2270,6 +2270,7 @@ class GroupMetadataManagerTest {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     )
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
@@ -2286,6 +2287,7 @@ class GroupMetadataManagerTest {
       EasyMock.capture(capturedRecords),
       EasyMock.capture(capturedCallback),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedCallback.getValue.apply(
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index e1786d0..e02c2fe3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -18,12 +18,11 @@ package kafka.coordinator.transaction
 
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicBoolean
-
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
 import kafka.coordinator.transaction.TransactionCoordinatorConcurrencyTest._
 import kafka.log.Log
-import kafka.server.{FetchDataInfo, FetchLogEnd, KafkaConfig, LogOffsetMetadata, MetadataCache}
+import kafka.server.{FetchDataInfo, FetchLogEnd, KafkaConfig, LogOffsetMetadata, MetadataCache, RequestLocal}
 import kafka.utils.{Pool, TestUtils}
 import org.apache.kafka.clients.{ClientResponse, NetworkClient}
 import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
@@ -509,7 +508,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
 
   class InitProducerIdOperation(val producerIdAndEpoch: Option[ProducerIdAndEpoch] = None) extends TxnOperation[InitProducerIdResult] {
     override def run(txn: Transaction): Unit = {
-      transactionCoordinator.handleInitProducerId(txn.transactionalId, 60000, producerIdAndEpoch, resultCallback)
+      transactionCoordinator.handleInitProducerId(txn.transactionalId, 60000, producerIdAndEpoch, resultCallback,
+        RequestLocal.withThreadConfinedCaching)
       replicaManager.tryCompleteActions()
     }
     override def awaitAndVerify(txn: Transaction): Unit = {
@@ -526,7 +526,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
             txnMetadata.producerId,
             txnMetadata.producerEpoch,
             partitions,
-            resultCallback)
+            resultCallback,
+            RequestLocal.withThreadConfinedCaching)
         replicaManager.tryCompleteActions()
       }
     }
@@ -544,7 +545,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
           txnMetadata.producerId,
           txnMetadata.producerEpoch,
           transactionResult(txn),
-          resultCallback)
+          resultCallback,
+          RequestLocal.withThreadConfinedCaching)
       }
     }
     override def awaitAndVerify(txn: Transaction): Unit = {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index f6b5e54..38e8e71 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -118,6 +118,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.anyObject().asInstanceOf[TxnTransitMetadata],
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.NONE)).anyTimes()
     EasyMock.replay(pidGenerator, transactionManager)
@@ -145,6 +146,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.anyObject().asInstanceOf[TxnTransitMetadata],
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.NONE)).anyTimes()
     EasyMock.replay(pidGenerator, transactionManager)
@@ -169,6 +171,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.anyObject().asInstanceOf[TxnTransitMetadata],
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject()
     )).andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.NONE))
 
@@ -314,6 +317,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.anyObject().asInstanceOf[TxnTransitMetadata],
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject()
     ))
 
@@ -572,6 +576,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.eq(originalMetadata.prepareAbortOrCommit(PrepareAbort, time.milliseconds())),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.NONE))
 
@@ -641,6 +646,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.eq(txnTransitMetadata),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => {
       capturedErrorsCallback.getValue.apply(Errors.NOT_ENOUGH_REPLICAS)
@@ -652,6 +658,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.eq(txnTransitMetadata),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => {
       capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -724,6 +731,7 @@ class TransactionCoordinatorTest {
         txnStartTimestamp = time.milliseconds(),
         txnLastUpdateTimestamp = time.milliseconds())),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.NONE))
 
@@ -790,6 +798,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.anyObject().asInstanceOf[TxnTransitMetadata],
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => {
       capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -827,6 +836,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.capture(capturedTxnTransitMetadata),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => {
       capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -867,6 +877,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.capture(capturedTxnTransitMetadata),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => {
       capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -910,6 +921,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.capture(capturedTxnTransitMetadata),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => {
       capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -963,6 +975,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.eq(expectedTransition),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => {}).once()
 
@@ -1046,6 +1059,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.eq(expectedTransition),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.NOT_ENOUGH_REPLICAS)).once()
 
@@ -1179,6 +1193,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.capture(capturedNewMetadata),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject()
     )).andAnswer(() => {
       metadata.completeTransitionTo(capturedNewMetadata.getValue)
@@ -1213,6 +1228,7 @@ class TransactionCoordinatorTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.eq(transition),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => {
       if (runCallback)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 8aa07c6..0a0ec51 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -107,6 +107,7 @@ class TransactionMarkerChannelManagerTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.eq(expectedTransition),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject()))
       .andAnswer(() => {
         txnMetadata2.completeTransitionTo(expectedTransition)
@@ -345,6 +346,7 @@ class TransactionMarkerChannelManagerTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.eq(txnTransitionMetadata2),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject()))
       .andAnswer(() => {
         txnMetadata2.completeTransitionTo(txnTransitionMetadata2)
@@ -392,6 +394,7 @@ class TransactionMarkerChannelManagerTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.eq(txnTransitionMetadata2),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject()))
       .andAnswer(() => {
         txnMetadata2.pendingState = None
@@ -439,6 +442,7 @@ class TransactionMarkerChannelManagerTest {
       EasyMock.eq(coordinatorEpoch),
       EasyMock.eq(txnTransitionMetadata2),
       EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject()))
       .andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.COORDINATOR_NOT_AVAILABLE))
       .andAnswer(() => {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index df57693..410d6e2 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
 import java.util.concurrent.locks.ReentrantLock
 import javax.management.ObjectName
 import kafka.log.{AppendOrigin, Log}
-import kafka.server.{FetchDataInfo, FetchLogEnd, LogOffsetMetadata, ReplicaManager}
+import kafka.server.{FetchDataInfo, FetchLogEnd, LogOffsetMetadata, ReplicaManager, RequestLocal}
 import kafka.utils.{MockScheduler, Pool, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
@@ -319,7 +319,7 @@ class TransactionStateManagerTest {
       new TopicPartition("topic1", 1)), time.milliseconds())
 
     // append the new metadata into log
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch, newMetadata, assertCallback)
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch, newMetadata, assertCallback, requestLocal = RequestLocal.withThreadConfinedCaching)
 
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
@@ -334,25 +334,26 @@ class TransactionStateManagerTest {
     var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
 
     prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+    val requestLocal = RequestLocal.withThreadConfinedCaching
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
     failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
     prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS)
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
     failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
     prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
     failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
     prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT)
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
   }
@@ -366,25 +367,26 @@ class TransactionStateManagerTest {
     var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
 
     prepareForTxnMessageAppend(Errors.NOT_LEADER_OR_FOLLOWER)
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+    val requestLocal = RequestLocal.withThreadConfinedCaching
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
     failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
     prepareForTxnMessageAppend(Errors.NONE)
     transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
 
     prepareForTxnMessageAppend(Errors.NONE)
     transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
     transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch + 1, new Pool[String, TransactionMetadata]())
     transactionManager.putTransactionStateIfNotExists(txnMetadata1)
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
 
     prepareForTxnMessageAppend(Errors.NONE)
     transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
     transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]())
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
   }
 
   @Test
@@ -398,7 +400,7 @@ class TransactionStateManagerTest {
     prepareForTxnMessageAppend(Errors.NONE)
     transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
     transactionManager.addLoadingPartition(partitionId, coordinatorEpoch + 1)
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback,  requestLocal = RequestLocal.withThreadConfinedCaching)
   }
 
   @Test
@@ -410,13 +412,14 @@ class TransactionStateManagerTest {
     var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
 
     prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE)
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+    val requestLocal = RequestLocal.withThreadConfinedCaching
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
     failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
     prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE)
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
   }
@@ -430,7 +433,7 @@ class TransactionStateManagerTest {
     val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
 
     prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, _ => true)
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, _ => true, RequestLocal.withThreadConfinedCaching)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertEquals(Some(Ongoing), txnMetadata1.pendingState)
   }
@@ -452,7 +455,7 @@ class TransactionStateManagerTest {
     txnMetadata1.producerEpoch = (txnMetadata1.producerEpoch + 1).toShort
 
     // append the new metadata into log
-    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, newMetadata, assertCallback)
+    transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, newMetadata, assertCallback,  requestLocal = RequestLocal.withThreadConfinedCaching)
   }
 
   @Test
@@ -472,7 +475,7 @@ class TransactionStateManagerTest {
 
     // append the new metadata into log
     assertThrows(classOf[IllegalStateException], () => transactionManager.appendTransactionToLog(transactionalId1,
-      coordinatorEpoch = 10, newMetadata, assertCallback))
+      coordinatorEpoch = 10, newMetadata, assertCallback,  requestLocal = RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
@@ -719,6 +722,7 @@ class TransactionStateManagerTest {
           EasyMock.eq(recordsByPartition),
           EasyMock.capture(capturedArgument),
           EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+          EasyMock.anyObject(),
           EasyMock.anyObject()
         )).andAnswer(() => capturedArgument.getValue.apply(
           Map(partition -> new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
@@ -824,6 +828,7 @@ class TransactionStateManagerTest {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(() => capturedArgument.getValue.apply(
       Map(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) ->
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index b0d4e3d..af585bf 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -18,13 +18,12 @@ package kafka.log
 
 import java.nio.ByteBuffer
 import java.util.concurrent.TimeUnit
-
 import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1}
 import kafka.common.{LongRef, RecordValidationException}
 import kafka.log.LogValidator.ValidationAndOffsetAssignResult
 import kafka.message._
 import kafka.metrics.KafkaYammerMetrics
-import kafka.server.BrokerTopicStats
+import kafka.server.{BrokerTopicStats, RequestLocal}
 import kafka.utils.TestUtils.meterCount
 import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
@@ -128,8 +127,8 @@ class LogValidatorTest {
       RecordBatch.NO_PRODUCER_EPOCH,
       origin = AppendOrigin.Client,
       KAFKA_2_3_IV1,
-      brokerTopicStats
-    )
+      brokerTopicStats,
+      RequestLocal.withThreadConfinedCaching)
   }
 
   @Test
@@ -160,7 +159,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
     val validatedRecords = validatedResults.validatedRecords
     assertEquals(records.records.asScala.size, validatedRecords.records.asScala.size, "message set size should not change")
     validatedRecords.batches.forEach(batch => validateLogAppendTime(now, 1234L, batch))
@@ -199,7 +199,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
     val validatedRecords = validatedResults.validatedRecords
 
     assertEquals(records.records.asScala.size, validatedRecords.records.asScala.size,
@@ -247,7 +248,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
     val validatedRecords = validatedResults.validatedRecords
 
     assertEquals(records.records.asScala.size, validatedRecords.records.asScala.size,
@@ -309,7 +311,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
   }
 
   @Test
@@ -353,7 +356,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = partitionLeaderEpoch,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
     val validatedRecords = validatingResults.validatedRecords
 
     var i = 0
@@ -425,7 +429,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = partitionLeaderEpoch,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
     val validatedRecords = validatingResults.validatedRecords
 
     var i = 0
@@ -481,7 +486,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
     val validatedRecords = validatedResults.validatedRecords
 
     for (batch <- validatedRecords.batches.asScala) {
@@ -526,7 +532,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
     val validatedRecords = validatedResults.validatedRecords
 
     for (batch <- validatedRecords.batches.asScala) {
@@ -583,7 +590,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = partitionLeaderEpoch,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
     val validatedRecords = validatedResults.validatedRecords
 
     var i = 0
@@ -636,7 +644,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats))
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
@@ -659,7 +668,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats))
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
@@ -682,7 +692,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats))
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
@@ -705,7 +716,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats))
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
@@ -727,7 +739,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
   }
 
   @Test
@@ -749,7 +762,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
   }
 
   @Test
@@ -772,7 +786,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords
     checkOffsets(messageWithOffset, offset)
   }
 
@@ -796,7 +811,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords
     checkOffsets(messageWithOffset, offset)
   }
 
@@ -821,7 +837,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords
     checkOffsets(compressedMessagesWithOffset, offset)
   }
 
@@ -846,7 +863,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords
     checkOffsets(compressedMessagesWithOffset, offset)
   }
 
@@ -869,7 +887,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
     checkOffsets(validatedResults.validatedRecords, offset)
     verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
       compressed = false)
@@ -894,7 +913,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
     checkOffsets(validatedResults.validatedRecords, offset)
     verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
       compressed = false)
@@ -919,7 +939,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
     checkOffsets(validatedResults.validatedRecords, offset)
     verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
       compressed = true)
@@ -944,7 +965,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
     checkOffsets(validatedResults.validatedRecords, offset)
     verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
       compressed = true)
@@ -969,7 +991,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats))
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
@@ -991,7 +1014,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Coordinator,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching)
     val batches = TestUtils.toList(result.validatedRecords.batches)
     assertEquals(1, batches.size)
     val batch = batches.get(0)
@@ -1018,7 +1042,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
   }
 
   @Test
@@ -1041,7 +1066,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
   }
 
   @Test
@@ -1063,7 +1089,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
   }
 
   @Test
@@ -1085,7 +1112,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
   }
 
   @Test
@@ -1108,7 +1136,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
   }
 
   @Test
@@ -1131,7 +1160,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
   }
 
   @Test
@@ -1156,7 +1186,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats))
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
@@ -1181,7 +1212,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats))
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
@@ -1204,7 +1236,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
   }
 
   @Test
@@ -1227,7 +1260,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
   }
 
   @Test
@@ -1248,7 +1282,8 @@ class LogValidatorTest {
         partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
         origin = AppendOrigin.Client,
         interBrokerProtocolVersion = ApiVersion.latestVersion,
-        brokerTopicStats = brokerTopicStats)
+        brokerTopicStats = brokerTopicStats,
+        requestLocal = RequestLocal.withThreadConfinedCaching)
     )
     assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec}")), 1)
     assertTrue(meterCount(s"${BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec}") > 0)
@@ -1278,7 +1313,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = KAFKA_2_0_IV1,
-      brokerTopicStats = brokerTopicStats))
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
@@ -1312,7 +1348,8 @@ class LogValidatorTest {
         partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
         origin = AppendOrigin.Client,
         interBrokerProtocolVersion = ApiVersion.latestVersion,
-        brokerTopicStats = brokerTopicStats)
+        brokerTopicStats = brokerTopicStats,
+        requestLocal = RequestLocal.withThreadConfinedCaching)
     )
 
     assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
@@ -1390,7 +1427,8 @@ class LogValidatorTest {
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
-      brokerTopicStats = brokerTopicStats))
+      brokerTopicStats = brokerTopicStats,
+      requestLocal = RequestLocal.withThreadConfinedCaching))
   }
 
   private def createRecords(magicValue: Byte,
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index b0fa2b3..418ee31 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -274,7 +274,8 @@ class ControllerApisTest {
     val request = buildRequest(brokerRegistrationRequest)
     val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse])
 
-    createControllerApis(Some(createDenyAllAuthorizer()), mock(classOf[Controller])).handle(request)
+    createControllerApis(Some(createDenyAllAuthorizer()), mock(classOf[Controller])).handle(request,
+      RequestLocal.withThreadConfinedCaching)
     verify(requestChannel).sendResponse(
       ArgumentMatchers.eq(request),
       capturedResponse.capture(),
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index bd5de08..46f4c1b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -313,7 +313,7 @@ class KafkaApisTest {
 
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
       adminManager, controller)
-    createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request)
+    createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
 
     assertEquals(Some(request), capturedRequest.getValue.envelope)
     val innerResponse = capturedResponse.getValue.asInstanceOf[AlterConfigsResponse]
@@ -339,7 +339,7 @@ class KafkaApisTest {
     val capturedResponse = expectNoThrottling(request)
 
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller)
-    createKafkaApis(enableForwarding = true).handle(request)
+    createKafkaApis(enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
 
     val response = capturedResponse.getValue.asInstanceOf[EnvelopeResponse]
     assertEquals(Errors.INVALID_REQUEST, response.error())
@@ -407,7 +407,7 @@ class KafkaApisTest {
 
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
       adminManager, controller)
-    createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request)
+    createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
 
     if (!shouldCloseConnection) {
       val response = capturedResponse.getValue.asInstanceOf[EnvelopeResponse]
@@ -482,7 +482,7 @@ class KafkaApisTest {
 
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller, forwardingManager)
 
-    createKafkaApis(enableForwarding = true).handle(request)
+    createKafkaApis(enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
 
     EasyMock.verify(controller, forwardingManager)
   }
@@ -1078,7 +1078,7 @@ class KafkaApisTest {
       val request = buildRequest(offsetCommitRequest)
       val capturedResponse = expectNoThrottling(request)
       EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
-      createKafkaApis().handleOffsetCommitRequest(request)
+      createKafkaApis().handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
 
       val response = capturedResponse.getValue.asInstanceOf[OffsetCommitResponse]
       assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
@@ -1110,7 +1110,7 @@ class KafkaApisTest {
 
       val capturedResponse = expectNoThrottling(request)
       EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
-      createKafkaApis().handleTxnOffsetCommitRequest(request)
+      createKafkaApis().handleTxnOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
 
       val response = capturedResponse.getValue.asInstanceOf[TxnOffsetCommitResponse]
       assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
@@ -1147,6 +1147,7 @@ class KafkaApisTest {
       ).build(version.toShort)
       val request = buildRequest(offsetCommitRequest)
 
+      val requestLocal = RequestLocal.withThreadConfinedCaching
       EasyMock.expect(groupCoordinator.handleTxnCommitOffsets(
         EasyMock.eq(groupId),
         EasyMock.eq(producerId),
@@ -1155,7 +1156,8 @@ class KafkaApisTest {
         EasyMock.eq(Option.empty),
         EasyMock.anyInt(),
         EasyMock.anyObject(),
-        EasyMock.capture(responseCallback)
+        EasyMock.capture(responseCallback),
+        EasyMock.eq(requestLocal)
       )).andAnswer(
         () => responseCallback.getValue.apply(Map(topicPartition -> Errors.COORDINATOR_LOAD_IN_PROGRESS)))
 
@@ -1167,7 +1169,7 @@ class KafkaApisTest {
 
       EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator)
 
-      createKafkaApis().handleTxnOffsetCommitRequest(request)
+      createKafkaApis().handleTxnOffsetCommitRequest(request, requestLocal)
 
       val response = capturedResponse.getValue.asInstanceOf[TxnOffsetCommitResponse]
 
@@ -1219,11 +1221,13 @@ class KafkaApisTest {
       else
         Option(new ProducerIdAndEpoch(producerId, epoch))
 
+      val requestLocal = RequestLocal.withThreadConfinedCaching
       EasyMock.expect(txnCoordinator.handleInitProducerId(
         EasyMock.eq(transactionalId),
         EasyMock.eq(txnTimeoutMs),
         EasyMock.eq(expectedProducerIdAndEpoch),
-        EasyMock.capture(responseCallback)
+        EasyMock.capture(responseCallback),
+        EasyMock.eq(requestLocal)
       )).andAnswer(
         () => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED)))
 
@@ -1235,7 +1239,7 @@ class KafkaApisTest {
 
       EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
 
-      createKafkaApis().handleInitProducerIdRequest(request)
+      createKafkaApis().handleInitProducerIdRequest(request, requestLocal)
 
       val response = capturedResponse.getValue.asInstanceOf[InitProducerIdResponse]
 
@@ -1278,12 +1282,14 @@ class KafkaApisTest {
         EasyMock.eq(groupId)
       )).andReturn(partition)
 
+      val requestLocal = RequestLocal.withThreadConfinedCaching
       EasyMock.expect(txnCoordinator.handleAddPartitionsToTransaction(
         EasyMock.eq(transactionalId),
         EasyMock.eq(producerId),
         EasyMock.eq(epoch),
         EasyMock.eq(Set(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))),
-        EasyMock.capture(responseCallback)
+        EasyMock.capture(responseCallback),
+        EasyMock.eq(requestLocal)
       )).andAnswer(
         () => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
 
@@ -1295,7 +1301,7 @@ class KafkaApisTest {
 
       EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, groupCoordinator)
 
-      createKafkaApis().handleAddOffsetsToTxnRequest(request)
+      createKafkaApis().handleAddOffsetsToTxnRequest(request, requestLocal)
 
       val response = capturedResponse.getValue.asInstanceOf[AddOffsetsToTxnResponse]
 
@@ -1334,13 +1340,14 @@ class KafkaApisTest {
       ).build(version.toShort)
       val request = buildRequest(addPartitionsToTxnRequest)
 
+      val requestLocal = RequestLocal.withThreadConfinedCaching
       EasyMock.expect(txnCoordinator.handleAddPartitionsToTransaction(
         EasyMock.eq(transactionalId),
         EasyMock.eq(producerId),
         EasyMock.eq(epoch),
         EasyMock.eq(Set(topicPartition)),
-
-        EasyMock.capture(responseCallback)
+        EasyMock.capture(responseCallback),
+        EasyMock.eq(requestLocal)
       )).andAnswer(
         () => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
 
@@ -1352,7 +1359,7 @@ class KafkaApisTest {
 
       EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
 
-      createKafkaApis().handleAddPartitionToTxnRequest(request)
+      createKafkaApis().handleAddPartitionToTxnRequest(request, requestLocal)
 
       val response = capturedResponse.getValue.asInstanceOf[AddPartitionsToTxnResponse]
 
@@ -1388,12 +1395,14 @@ class KafkaApisTest {
       ).build(version.toShort)
       val request = buildRequest(endTxnRequest)
 
+      val requestLocal = RequestLocal.withThreadConfinedCaching
       EasyMock.expect(txnCoordinator.handleEndTransaction(
         EasyMock.eq(transactionalId),
         EasyMock.eq(producerId),
         EasyMock.eq(epoch),
         EasyMock.eq(TransactionResult.COMMIT),
-        EasyMock.capture(responseCallback)
+        EasyMock.capture(responseCallback),
+        EasyMock.eq(requestLocal)
       )).andAnswer(
         () => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
 
@@ -1404,7 +1413,7 @@ class KafkaApisTest {
       ))
 
       EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
-      createKafkaApis().handleEndTxnRequest(request)
+      createKafkaApis().handleEndTxnRequest(request, requestLocal)
 
       val response = capturedResponse.getValue.asInstanceOf[EndTxnResponse]
 
@@ -1449,6 +1458,7 @@ class KafkaApisTest {
         EasyMock.anyObject(),
         EasyMock.capture(responseCallback),
         EasyMock.anyObject(),
+        EasyMock.anyObject(),
         EasyMock.anyObject())
       ).andAnswer(() => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))))
 
@@ -1458,7 +1468,7 @@ class KafkaApisTest {
 
       EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
 
-      createKafkaApis().handleProduceRequest(request)
+      createKafkaApis().handleProduceRequest(request, RequestLocal.withThreadConfinedCaching)
 
       val response = capturedResponse.getValue.asInstanceOf[ProduceResponse]
 
@@ -1486,7 +1496,7 @@ class KafkaApisTest {
 
       val capturedResponse = expectNoThrottling(request)
       EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
-      createKafkaApis().handleAddPartitionToTxnRequest(request)
+      createKafkaApis().handleAddPartitionToTxnRequest(request, RequestLocal.withThreadConfinedCaching)
 
       val response = capturedResponse.getValue.asInstanceOf[AddPartitionsToTxnResponse]
       assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
@@ -1498,27 +1508,32 @@ class KafkaApisTest {
 
   @Test
   def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
-    assertThrows(classOf[UnsupportedVersionException], () => createKafkaApis(KAFKA_0_10_2_IV0).handleAddOffsetsToTxnRequest(null))
+    assertThrows(classOf[UnsupportedVersionException],
+      () => createKafkaApis(KAFKA_0_10_2_IV0).handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
   def shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
-    assertThrows(classOf[UnsupportedVersionException], () => createKafkaApis(KAFKA_0_10_2_IV0).handleAddPartitionToTxnRequest(null))
+    assertThrows(classOf[UnsupportedVersionException],
+      () => createKafkaApis(KAFKA_0_10_2_IV0).handleAddPartitionToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
   def shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported(): Unit = {
-    assertThrows(classOf[UnsupportedVersionException], () => createKafkaApis(KAFKA_0_10_2_IV0).handleAddPartitionToTxnRequest(null))
+    assertThrows(classOf[UnsupportedVersionException],
+      () => createKafkaApis(KAFKA_0_10_2_IV0).handleAddPartitionToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
   def shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
-    assertThrows(classOf[UnsupportedVersionException], () => createKafkaApis(KAFKA_0_10_2_IV0).handleEndTxnRequest(null))
+    assertThrows(classOf[UnsupportedVersionException],
+      () => createKafkaApis(KAFKA_0_10_2_IV0).handleEndTxnRequest(null, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
   def shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported(): Unit = {
-    assertThrows(classOf[UnsupportedVersionException], () => createKafkaApis(KAFKA_0_10_2_IV0).handleWriteTxnMarkersRequest(null))
+    assertThrows(classOf[UnsupportedVersionException],
+      () => createKafkaApis(KAFKA_0_10_2_IV0).handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
@@ -1537,7 +1552,7 @@ class KafkaApisTest {
     ))
     EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
 
-    createKafkaApis().handleWriteTxnMarkersRequest(request)
+    createKafkaApis().handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching)
 
     val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
     assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
@@ -1559,7 +1574,7 @@ class KafkaApisTest {
     ))
     EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
 
-    createKafkaApis().handleWriteTxnMarkersRequest(request)
+    createKafkaApis().handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching)
 
     val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
     assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
@@ -1580,6 +1595,7 @@ class KafkaApisTest {
     EasyMock.expect(replicaManager.getMagic(tp2))
       .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
 
+    val requestLocal = RequestLocal.withThreadConfinedCaching
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.eq(true),
@@ -1587,7 +1603,8 @@ class KafkaApisTest {
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
       EasyMock.anyObject(),
-      EasyMock.anyObject())
+      EasyMock.anyObject(),
+      EasyMock.eq(requestLocal))
     ).andAnswer(() => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
 
     EasyMock.expect(requestChannel.sendResponse(
@@ -1597,7 +1614,7 @@ class KafkaApisTest {
     ))
     EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
 
-    createKafkaApis().handleWriteTxnMarkersRequest(request)
+    createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal)
 
     val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
     assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
@@ -1719,6 +1736,7 @@ class KafkaApisTest {
     EasyMock.expect(replicaManager.getMagic(tp2))
       .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
 
+    val requestLocal = RequestLocal.withThreadConfinedCaching
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.eq(true),
@@ -1726,7 +1744,8 @@ class KafkaApisTest {
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
       EasyMock.anyObject(),
-      EasyMock.anyObject())
+      EasyMock.anyObject(),
+      EasyMock.eq(requestLocal))
     ).andAnswer(() => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
 
     EasyMock.expect(requestChannel.sendResponse(
@@ -1736,7 +1755,7 @@ class KafkaApisTest {
     ))
     EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
 
-    createKafkaApis().handleWriteTxnMarkersRequest(request)
+    createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal)
 
     val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
     assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
@@ -1750,6 +1769,7 @@ class KafkaApisTest {
     EasyMock.expect(replicaManager.getMagic(topicPartition))
       .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
 
+    val requestLocal = RequestLocal.withThreadConfinedCaching
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.eq(true),
@@ -1757,11 +1777,12 @@ class KafkaApisTest {
       EasyMock.anyObject(),
       EasyMock.anyObject(),
       EasyMock.anyObject(),
-      EasyMock.anyObject()))
+      EasyMock.anyObject(),
+      EasyMock.eq(requestLocal)))
 
     EasyMock.replay(replicaManager)
 
-    createKafkaApis().handleWriteTxnMarkersRequest(request)
+    createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal)
     EasyMock.verify(replicaManager)
   }
 
@@ -1857,6 +1878,7 @@ class KafkaApisTest {
     ).build()
     val request = buildRequest(offsetDeleteRequest)
 
+    val requestLocal = RequestLocal.withThreadConfinedCaching
     val capturedResponse = expectNoThrottling(request)
     EasyMock.expect(groupCoordinator.handleDeleteOffsets(
       EasyMock.eq(group),
@@ -1865,7 +1887,8 @@ class KafkaApisTest {
         new TopicPartition("topic-1", 1),
         new TopicPartition("topic-2", 0),
         new TopicPartition("topic-2", 1)
-      ))
+      )),
+      EasyMock.eq(requestLocal)
     )).andReturn((Errors.NONE, Map(
       new TopicPartition("topic-1", 0) -> Errors.NONE,
       new TopicPartition("topic-1", 1) -> Errors.NONE,
@@ -1875,7 +1898,7 @@ class KafkaApisTest {
 
     EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
 
-    createKafkaApis().handleOffsetDeleteRequest(request)
+    createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
 
     val response = capturedResponse.getValue.asInstanceOf[OffsetDeleteResponse]
 
@@ -1912,11 +1935,12 @@ class KafkaApisTest {
       val request = buildRequest(offsetDeleteRequest)
       val capturedResponse = expectNoThrottling(request)
 
-      EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty)))
-        .andReturn((Errors.NONE, Map.empty))
+      val requestLocal = RequestLocal.withThreadConfinedCaching
+      EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty),
+        EasyMock.eq(requestLocal))).andReturn((Errors.NONE, Map.empty))
       EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
 
-      createKafkaApis().handleOffsetDeleteRequest(request)
+      createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
 
       val response = capturedResponse.getValue.asInstanceOf[OffsetDeleteResponse]
 
@@ -1941,11 +1965,12 @@ class KafkaApisTest {
     val request = buildRequest(offsetDeleteRequest)
 
     val capturedResponse = expectNoThrottling(request)
-    EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty)))
-      .andReturn((Errors.GROUP_ID_NOT_FOUND, Map.empty))
+    val requestLocal = RequestLocal.withThreadConfinedCaching
+    EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty),
+      EasyMock.eq(requestLocal))).andReturn((Errors.GROUP_ID_NOT_FOUND, Map.empty))
     EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
 
-    createKafkaApis().handleOffsetDeleteRequest(request)
+    createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
 
     val response = capturedResponse.getValue.asInstanceOf[OffsetDeleteResponse]
 
@@ -2174,6 +2199,7 @@ class KafkaApisTest {
       EasyMock.eq(sessionTimeoutMs),
       EasyMock.eq(protocolType),
       EasyMock.capture(capturedProtocols),
+      anyObject(),
       anyObject()
     ))
 
@@ -2193,7 +2219,8 @@ class KafkaApisTest {
                 .setName(name).setMetadata(protocol)
               }.iterator.asJava))
         ).build()
-      ))
+      ),
+      RequestLocal.withThreadConfinedCaching)
 
     EasyMock.verify(groupCoordinator)
 
@@ -2234,7 +2261,8 @@ class KafkaApisTest {
       EasyMock.eq(sessionTimeoutMs),
       EasyMock.eq(protocolType),
       EasyMock.eq(List.empty),
-      EasyMock.capture(capturedCallback)
+      EasyMock.capture(capturedCallback),
+      EasyMock.anyObject()
     ))
 
     val joinGroupRequest = new JoinGroupRequest.Builder(
@@ -2250,7 +2278,7 @@ class KafkaApisTest {
     val capturedResponse = expectNoThrottling(requestChannelRequest)
 
     EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest)
+    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
     EasyMock.verify(groupCoordinator)
 
@@ -2304,7 +2332,8 @@ class KafkaApisTest {
       EasyMock.eq(sessionTimeoutMs),
       EasyMock.eq(protocolType),
       EasyMock.eq(List.empty),
-      EasyMock.capture(capturedCallback)
+      EasyMock.capture(capturedCallback),
+      EasyMock.anyObject()
     ))
 
     val joinGroupRequest = new JoinGroupRequest.Builder(
@@ -2320,7 +2349,7 @@ class KafkaApisTest {
     val capturedResponse = expectNoThrottling(requestChannelRequest)
 
     EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest)
+    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
     EasyMock.verify(groupCoordinator)
 
@@ -2364,6 +2393,7 @@ class KafkaApisTest {
 
     val capturedCallback = EasyMock.newCapture[SyncGroupCallback]()
 
+    val requestLocal = RequestLocal.withThreadConfinedCaching
     EasyMock.expect(groupCoordinator.handleSyncGroup(
       EasyMock.eq(groupId),
       EasyMock.eq(0),
@@ -2372,7 +2402,8 @@ class KafkaApisTest {
       EasyMock.eq(if (version >= 5) Some(protocolName) else None),
       EasyMock.eq(None),
       EasyMock.eq(Map.empty),
-      EasyMock.capture(capturedCallback)
+      EasyMock.capture(capturedCallback),
+      EasyMock.eq(requestLocal)
     ))
 
     val syncGroupRequest = new SyncGroupRequest.Builder(
@@ -2388,7 +2419,7 @@ class KafkaApisTest {
     val capturedResponse = expectNoThrottling(requestChannelRequest)
 
     EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
-    createKafkaApis().handleSyncGroupRequest(requestChannelRequest)
+    createKafkaApis().handleSyncGroupRequest(requestChannelRequest, requestLocal)
 
     EasyMock.verify(groupCoordinator)
 
@@ -2425,6 +2456,7 @@ class KafkaApisTest {
 
     val capturedCallback = EasyMock.newCapture[SyncGroupCallback]()
 
+    val requestLocal = RequestLocal.withThreadConfinedCaching
     if (version < 5) {
       EasyMock.expect(groupCoordinator.handleSyncGroup(
         EasyMock.eq(groupId),
@@ -2434,7 +2466,8 @@ class KafkaApisTest {
         EasyMock.eq(None),
         EasyMock.eq(None),
         EasyMock.eq(Map.empty),
-        EasyMock.capture(capturedCallback)
+        EasyMock.capture(capturedCallback),
+        EasyMock.eq(requestLocal)
       ))
     }
 
@@ -2449,7 +2482,7 @@ class KafkaApisTest {
     val capturedResponse = expectNoThrottling(requestChannelRequest)
 
     EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
-    createKafkaApis().handleSyncGroupRequest(requestChannelRequest)
+    createKafkaApis().handleSyncGroupRequest(requestChannelRequest, requestLocal)
 
     EasyMock.verify(groupCoordinator)
 
@@ -2488,7 +2521,7 @@ class KafkaApisTest {
     val capturedResponse = expectNoThrottling(requestChannelRequest)
 
     EasyMock.replay(clientRequestQuotaManager, requestChannel)
-    createKafkaApis(KAFKA_2_2_IV1).handleJoinGroupRequest(requestChannelRequest)
+    createKafkaApis(KAFKA_2_2_IV1).handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
     assertEquals(Errors.UNSUPPORTED_VERSION, response.error())
@@ -2509,7 +2542,7 @@ class KafkaApisTest {
     val capturedResponse = expectNoThrottling(requestChannelRequest)
 
     EasyMock.replay(clientRequestQuotaManager, requestChannel)
-    createKafkaApis(KAFKA_2_2_IV1).handleSyncGroupRequest(requestChannelRequest)
+    createKafkaApis(KAFKA_2_2_IV1).handleSyncGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
     val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
     assertEquals(Errors.UNSUPPORTED_VERSION, response.error)
@@ -2561,7 +2594,7 @@ class KafkaApisTest {
     val capturedResponse = expectNoThrottling(requestChannelRequest)
 
     EasyMock.replay(clientRequestQuotaManager, requestChannel)
-    createKafkaApis(KAFKA_2_2_IV1).handleOffsetCommitRequest(requestChannelRequest)
+    createKafkaApis(KAFKA_2_2_IV1).handleOffsetCommitRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
     val expectedTopicErrors = Collections.singletonList(
       new OffsetCommitResponseData.OffsetCommitResponseTopic()
@@ -2688,7 +2721,7 @@ class KafkaApisTest {
 
     replay(replicaManager, fetchManager, clientQuotaManager, requestChannel, replicaQuotaManager, partition)
 
-    createKafkaApis().handle(fetchFromFollower)
+    createKafkaApis().handle(fetchFromFollower, RequestLocal.withThreadConfinedCaching)
 
     if (isReassigning)
       assertEquals(records.sizeInBytes(), brokerTopicStats.allTopicsStats.reassignmentBytesOutPerSec.get.count())
@@ -2712,7 +2745,7 @@ class KafkaApisTest {
     val capturedResponse = expectNoThrottling(requestChannelRequest)
 
     EasyMock.replay(clientRequestQuotaManager, requestChannel)
-    createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest)
+    createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
     val response = capturedResponse.getValue.asInstanceOf[InitProducerIdResponse]
     assertEquals(Errors.INVALID_REQUEST, response.error)
@@ -2731,7 +2764,7 @@ class KafkaApisTest {
     val capturedResponse = expectNoThrottling(requestChannelRequest)
 
     EasyMock.replay(clientRequestQuotaManager, requestChannel)
-    createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest)
+    createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
     val response = capturedResponse.getValue.asInstanceOf[InitProducerIdResponse]
     assertEquals(Errors.INVALID_REQUEST, response.error)
@@ -2776,7 +2809,7 @@ class KafkaApisTest {
     ))
     EasyMock.replay(replicaManager, controller, requestChannel)
 
-    createKafkaApis().handleUpdateMetadataRequest(request)
+    createKafkaApis().handleUpdateMetadataRequest(request, RequestLocal.withThreadConfinedCaching)
     val updateMetadataResponse = capturedResponse.getValue.asInstanceOf[UpdateMetadataResponse]
     assertEquals(expectedError, updateMetadataResponse.error())
     EasyMock.verify(replicaManager)
@@ -3754,7 +3787,7 @@ class KafkaApisTest {
   @Test
   def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
     metadataCache = MetadataCache.raftMetadataCache(brokerId)
-    verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest)
+    verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
@@ -3772,7 +3805,7 @@ class KafkaApisTest {
   @Test
   def testRaftShouldNeverHandleEnvelope(): Unit = {
     metadataCache = MetadataCache.raftMetadataCache(brokerId)
-    verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleEnvelope)
+    verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 99928f0..65ab81f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2194,7 +2194,7 @@ class ReplicaManagerTest {
     val batch = TestUtils.records(records = List(
       new SimpleRecord(10, "k1".getBytes, "v1".getBytes),
       new SimpleRecord(11, "k2".getBytes, "v2".getBytes)))
-    partition.appendRecordsToLeader(batch, AppendOrigin.Client, requiredAcks = 0)
+    partition.appendRecordsToLeader(batch, AppendOrigin.Client, requiredAcks = 0, RequestLocal.withThreadConfinedCaching)
     partition.log.get.updateHighWatermark(2L)
     partition.log.get.maybeIncrementLogStartOffset(1L, LeaderOffsetIncremented)
     replicaManager.logManager.checkpointLogRecoveryOffsets()
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 89ba5f1..ccccb8b 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -94,7 +94,7 @@ class BrokerMetadataListenerTest {
 
     verify(groupCoordinator).handleDeletedPartitions(ArgumentMatchers.argThat[Seq[TopicPartition]] { partitions =>
       partitions.toSet == partitionSet(topic, numPartitions)
-    })
+    }, any)
 
     val deleteImageCapture: ArgumentCaptor[MetadataImageBuilder] =
       ArgumentCaptor.forClass(classOf[MetadataImageBuilder])
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
index 30f908e..e9910da 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
@@ -17,9 +17,9 @@
 package org.apache.kafka.jmh.record;
 
 import kafka.server.BrokerTopicStats;
+import kafka.server.RequestLocal;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.record.AbstractRecords;
-import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
@@ -74,7 +74,7 @@ public abstract class BaseRecordBatchBenchmark {
 
     // Used by measureVariableBatchSize
     ByteBuffer[] batchBuffers;
-    BufferSupplier bufferSupplier;
+    RequestLocal requestLocal;
     final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
 
     @Setup
@@ -85,9 +85,9 @@ public abstract class BaseRecordBatchBenchmark {
         startingOffset = messageVersion == 2 ? 0 : 42;
 
         if (bufferSupplierStr.equals("NO_CACHING")) {
-            bufferSupplier = BufferSupplier.NO_CACHING;
+            requestLocal = RequestLocal.NoCaching();
         } else if (bufferSupplierStr.equals("CREATE")) {
-            bufferSupplier = BufferSupplier.create();
+            requestLocal = RequestLocal.withThreadConfinedCaching();
         } else {
             throw new IllegalArgumentException("Unsupported buffer supplier " + bufferSupplierStr);
         }
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java
index f176e06..24ac53e 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java
@@ -59,6 +59,7 @@ public class CompressedRecordBatchValidationBenchmark extends BaseRecordBatchBen
                 false,  messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, 0,
                 new AppendOrigin.Client$(),
                 ApiVersion.latestVersion(),
-                brokerTopicStats);
+                brokerTopicStats,
+                requestLocal);
     }
 }
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
index 8aaa2d5..c331cd5 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
@@ -32,8 +32,6 @@ import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.infra.Blackhole;
 
-import java.io.IOException;
-
 @State(Scope.Benchmark)
 @Fork(value = 1)
 @Warmup(iterations = 5)
@@ -49,9 +47,9 @@ public class RecordBatchIterationBenchmark extends BaseRecordBatchBenchmark {
     }
 
     @Benchmark
-    public void measureIteratorForBatchWithSingleMessage(Blackhole bh) throws IOException {
+    public void measureIteratorForBatchWithSingleMessage(Blackhole bh) {
         for (RecordBatch batch : MemoryRecords.readableRecords(singleBatchBuffer.duplicate()).batches()) {
-            try (CloseableIterator<Record> iterator = batch.streamingIterator(bufferSupplier)) {
+            try (CloseableIterator<Record> iterator = batch.streamingIterator(requestLocal.bufferSupplier())) {
                 while (iterator.hasNext())
                     bh.consume(iterator.next());
             }
@@ -61,10 +59,10 @@ public class RecordBatchIterationBenchmark extends BaseRecordBatchBenchmark {
     @OperationsPerInvocation(value = batchCount)
     @Fork(jvmArgsAppend = "-Xmx8g")
     @Benchmark
-    public void measureStreamingIteratorForVariableBatchSize(Blackhole bh) throws IOException {
+    public void measureStreamingIteratorForVariableBatchSize(Blackhole bh) {
         for (int i = 0; i < batchCount; ++i) {
             for (RecordBatch batch : MemoryRecords.readableRecords(batchBuffers[i].duplicate()).batches()) {
-                try (CloseableIterator<Record> iterator = batch.streamingIterator(bufferSupplier)) {
+                try (CloseableIterator<Record> iterator = batch.streamingIterator(requestLocal.bufferSupplier())) {
                     while (iterator.hasNext())
                         bh.consume(iterator.next());
                 }
@@ -75,10 +73,10 @@ public class RecordBatchIterationBenchmark extends BaseRecordBatchBenchmark {
     @OperationsPerInvocation(value = batchCount)
     @Fork(jvmArgsAppend = "-Xmx8g")
     @Benchmark
-    public void measureSkipIteratorForVariableBatchSize(Blackhole bh) throws IOException {
+    public void measureSkipIteratorForVariableBatchSize(Blackhole bh) {
         for (int i = 0; i < batchCount; ++i) {
             for (MutableRecordBatch batch : MemoryRecords.readableRecords(batchBuffers[i].duplicate()).batches()) {
-                try (CloseableIterator<Record> iterator = batch.skipKeyValueIterator(bufferSupplier)) {
+                try (CloseableIterator<Record> iterator = batch.skipKeyValueIterator(requestLocal.bufferSupplier())) {
                     while (iterator.hasNext())
                         bh.consume(iterator.next());
                 }