You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/05/30 19:19:02 UTC
[kafka] branch trunk updated: MINOR: Reduce allocations in requests
via buffer caching (#9229)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6b005b2 MINOR: Reduce allocations in requests via buffer caching (#9229)
6b005b2 is described below
commit 6b005b2b4eece81a5500fb0080ef5354b4240681
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sun May 30 12:16:36 2021 -0700
MINOR: Reduce allocations in requests via buffer caching (#9229)
Use a caching `BufferSupplier` per request handler thread so that
decompression buffers are cached if supported by the underlying
`CompressionType`. This achieves a similar outcome as #9220, but
with less contention.
We introduce a `RequestLocal` class to make it easier to introduce
new request scoped stateful instances (one example we discussed
previously was an `ActionQueue` that could be used to avoid
some of the complex group coordinator locking).
This is a small win for zstd (no synchronization or soft references) and
a more significant win for lz4. In particular, it reduces allocations
significantly when the number of partitions is high. The decompression
buffer size is typically 64 KB, so a produce request with 1000 partitions
results in 64 MB of allocations even if each produce batch is small (likely,
when there are so many partitions).
I did a quick producer perf local test with 5000 partitions, 1 KB record
size,
1 broker, lz4 and ~0.5 for the producer compression rate metric:
Before this change:
> 20000000 records sent, 346314.349535 records/sec (330.27 MB/sec),
148.33 ms avg latency, 2267.00 ms max latency, 115 ms 50th, 383 ms 95th, 777 ms 99th, 1514 ms 99.9th.
After this change:
> 20000000 records sent, 431956.113259 records/sec (411.95 MB/sec),
117.79 ms avg latency, 1219.00 ms max latency, 99 ms 50th, 295 ms 95th, 440 ms 99th, 662 ms 99.9th.
That's a 25% throughput improvement and p999 latency was reduced to
under half (in this test).
Default arguments will be removed in a subsequent PR.
Reviewers: Chia-Ping Tsai <ch...@gmail.com>
---
.../apache/kafka/common/compress/ZstdFactory.java | 17 ++-
core/src/main/scala/kafka/cluster/Partition.scala | 6 +-
.../kafka/coordinator/group/GroupCoordinator.scala | 80 ++++++-----
.../coordinator/group/GroupMetadataManager.scala | 28 ++--
.../transaction/TransactionCoordinator.scala | 33 +++--
.../TransactionMarkerChannelManager.scala | 7 +-
.../transaction/TransactionStateManager.scala | 12 +-
core/src/main/scala/kafka/log/Log.scala | 16 ++-
core/src/main/scala/kafka/log/LogValidator.scala | 27 ++--
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 6 +-
.../main/scala/kafka/server/ControllerApis.scala | 8 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 94 +++++++------
.../scala/kafka/server/KafkaRequestHandler.scala | 18 ++-
.../main/scala/kafka/server/ReplicaManager.scala | 11 +-
.../src/main/scala/kafka/server/RequestLocal.scala | 37 +++++
.../server/metadata/BrokerMetadataListener.scala | 4 +-
.../scala/kafka/tools/TestRaftRequestHandler.scala | 4 +-
.../unit/kafka/cluster/PartitionLockTest.scala | 9 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 15 ++-
.../AbstractCoordinatorConcurrencyTest.scala | 12 +-
.../coordinator/group/GroupCoordinatorTest.scala | 39 ++++--
.../group/GroupMetadataManagerTest.scala | 44 +++---
.../TransactionCoordinatorConcurrencyTest.scala | 12 +-
.../transaction/TransactionCoordinatorTest.scala | 16 +++
.../TransactionMarkerChannelManagerTest.scala | 4 +
.../transaction/TransactionStateManagerTest.scala | 37 ++---
.../scala/unit/kafka/log/LogValidatorTest.scala | 124 +++++++++++------
.../unit/kafka/server/ControllerApisTest.scala | 3 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 149 +++++++++++++--------
.../unit/kafka/server/ReplicaManagerTest.scala | 2 +-
.../metadata/BrokerMetadataListenerTest.scala | 2 +-
.../kafka/jmh/record/BaseRecordBatchBenchmark.java | 8 +-
.../CompressedRecordBatchValidationBenchmark.java | 3 +-
.../jmh/record/RecordBatchIterationBenchmark.java | 14 +-
34 files changed, 568 insertions(+), 333 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java b/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java
index 8f4735e..4664f4e 100644
--- a/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.compress;
+import com.github.luben.zstd.BufferPool;
import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
@@ -47,10 +48,24 @@ public class ZstdFactory {
public static InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
+ // We use our own BufferSupplier instead of com.github.luben.zstd.RecyclingBufferPool since our
+ // implementation doesn't require locking or soft references.
+ BufferPool bufferPool = new BufferPool() {
+ @Override
+ public ByteBuffer get(int capacity) {
+ return decompressionBufferSupplier.get(capacity);
+ }
+
+ @Override
+ public void release(ByteBuffer buffer) {
+ decompressionBufferSupplier.release(buffer);
+ }
+ };
+
// Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
// in cases where the caller reads a small number of bytes (potentially a single byte).
return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer),
- RecyclingBufferPool.INSTANCE), 16 * 1024);
+ bufferPool), 16 * 1024);
} catch (Throwable e) {
throw new KafkaException(e);
}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 7eea0e2..89cadf4 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -18,7 +18,6 @@ package kafka.cluster
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.Optional
-
import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.common.UnexpectedAppendOffsetException
import kafka.controller.{KafkaController, StateChangeLogger}
@@ -1019,7 +1018,8 @@ class Partition(val topicPartition: TopicPartition,
}
}
- def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = {
+ def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
+ requestLocal: RequestLocal): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>
@@ -1033,7 +1033,7 @@ class Partition(val topicPartition: TopicPartition,
}
val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
- interBrokerProtocolVersion)
+ interBrokerProtocolVersion, requestLocal)
// we may need to increment high watermark since ISR could be down to 1
(info, maybeIncrementLeaderHW(leaderLog))
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index bb06ca5..f3e170b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -18,7 +18,6 @@ package kafka.coordinator.group
import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
-
import kafka.common.OffsetAndMetadata
import kafka.log.LogConfig
import kafka.message.ProducerCompressionCodec
@@ -92,6 +91,7 @@ class GroupCoordinator(val brokerId: Int,
props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
props.put(LogConfig.CompressionTypeProp, ProducerCompressionCodec.name)
+
props
}
@@ -162,7 +162,8 @@ class GroupCoordinator(val brokerId: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
- responseCallback: JoinCallback): Unit = {
+ responseCallback: JoinCallback,
+ requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error =>
responseCallback(JoinGroupResult(memberId, error))
return
@@ -194,7 +195,8 @@ class GroupCoordinator(val brokerId: Int,
sessionTimeoutMs,
protocolType,
protocols,
- responseCallback
+ responseCallback,
+ requestLocal
)
} else {
doCurrentMemberJoinGroup(
@@ -230,7 +232,8 @@ class GroupCoordinator(val brokerId: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
- responseCallback: JoinCallback
+ responseCallback: JoinCallback,
+ requestLocal: RequestLocal
): Unit = {
group.inLock {
if (group.is(Dead)) {
@@ -255,9 +258,9 @@ class GroupCoordinator(val brokerId: Int,
sessionTimeoutMs,
protocolType,
protocols,
- responseCallback
+ responseCallback,
+ requestLocal
)
-
case None =>
doDynamicNewMemberJoinGroup(
group,
@@ -286,14 +289,15 @@ class GroupCoordinator(val brokerId: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
- responseCallback: JoinCallback
+ responseCallback: JoinCallback,
+ requestLocal: RequestLocal
): Unit = {
group.currentStaticMemberId(groupInstanceId) match {
case Some(oldMemberId) =>
info(s"Static member with groupInstanceId=$groupInstanceId and unknown member id joins " +
s"group ${group.groupId} in ${group.currentState} state. Replacing previously mapped " +
s"member $oldMemberId with this groupInstanceId.")
- updateStaticMemberAndRebalance(group, oldMemberId, newMemberId, groupInstanceId, protocols, responseCallback)
+ updateStaticMemberAndRebalance(group, oldMemberId, newMemberId, groupInstanceId, protocols, responseCallback, requestLocal)
case None =>
info(s"Static member with groupInstanceId=$groupInstanceId and unknown member id joins " +
@@ -474,7 +478,8 @@ class GroupCoordinator(val brokerId: Int,
protocolName: Option[String],
groupInstanceId: Option[String],
groupAssignment: Map[String, Array[Byte]],
- responseCallback: SyncCallback): Unit = {
+ responseCallback: SyncCallback,
+ requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match {
case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS =>
// The coordinator is loading, which means we've lost the state of the active rebalance and the
@@ -489,7 +494,7 @@ class GroupCoordinator(val brokerId: Int,
groupManager.getGroup(groupId) match {
case None => responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
case Some(group) => doSyncGroup(group, generation, memberId, protocolType, protocolName,
- groupInstanceId, groupAssignment, responseCallback)
+ groupInstanceId, groupAssignment, requestLocal, responseCallback)
}
}
}
@@ -535,6 +540,7 @@ class GroupCoordinator(val brokerId: Int,
protocolName: Option[String],
groupInstanceId: Option[String],
groupAssignment: Map[String, Array[Byte]],
+ requestLocal: RequestLocal,
responseCallback: SyncCallback): Unit = {
group.inLock {
val validationErrorOpt = validateSyncGroup(
@@ -587,7 +593,7 @@ class GroupCoordinator(val brokerId: Int,
}
}
}
- })
+ }, requestLocal)
groupCompletedRebalanceSensor.record()
}
@@ -669,7 +675,8 @@ class GroupCoordinator(val brokerId: Int,
}
}
- def handleDeleteGroups(groupIds: Set[String]): Map[String, Errors] = {
+ def handleDeleteGroups(groupIds: Set[String],
+ requestLocal: RequestLocal = RequestLocal.NoCaching): Map[String, Errors] = {
val groupErrors = mutable.Map.empty[String, Errors]
val groupsEligibleForDeletion = mutable.ArrayBuffer[GroupMetadata]()
@@ -701,7 +708,8 @@ class GroupCoordinator(val brokerId: Int,
}
if (groupsEligibleForDeletion.nonEmpty) {
- val offsetsRemoved = groupManager.cleanupGroupMetadata(groupsEligibleForDeletion, _.removeAllOffsets())
+ val offsetsRemoved = groupManager.cleanupGroupMetadata(groupsEligibleForDeletion, requestLocal,
+ _.removeAllOffsets())
groupErrors ++= groupsEligibleForDeletion.map(_.groupId -> Errors.NONE).toMap
info(s"The following groups were deleted: ${groupsEligibleForDeletion.map(_.groupId).mkString(", ")}. " +
s"A total of $offsetsRemoved offsets were removed.")
@@ -710,7 +718,8 @@ class GroupCoordinator(val brokerId: Int,
groupErrors
}
- def handleDeleteOffsets(groupId: String, partitions: Seq[TopicPartition]): (Errors, Map[TopicPartition, Errors]) = {
+ def handleDeleteOffsets(groupId: String, partitions: Seq[TopicPartition],
+ requestLocal: RequestLocal): (Errors, Map[TopicPartition, Errors]) = {
var groupError: Errors = Errors.NONE
var partitionErrors: Map[TopicPartition, Errors] = Map()
var partitionsEligibleForDeletion: Seq[TopicPartition] = Seq()
@@ -748,9 +757,8 @@ class GroupCoordinator(val brokerId: Int,
}
if (partitionsEligibleForDeletion.nonEmpty) {
- val offsetsRemoved = groupManager.cleanupGroupMetadata(Seq(group), group => {
- group.removeOffsets(partitionsEligibleForDeletion)
- })
+ val offsetsRemoved = groupManager.cleanupGroupMetadata(Seq(group), requestLocal,
+ _.removeOffsets(partitionsEligibleForDeletion))
partitionErrors ++= partitionsEligibleForDeletion.map(_ -> Errors.NONE).toMap
@@ -855,14 +863,16 @@ class GroupCoordinator(val brokerId: Int,
groupInstanceId: Option[String],
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
- responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
+ responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
+ requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match {
case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })
case None =>
val group = groupManager.getGroup(groupId).getOrElse {
groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
}
- doTxnCommitOffsets(group, memberId, groupInstanceId, generationId, producerId, producerEpoch, offsetMetadata, responseCallback)
+ doTxnCommitOffsets(group, memberId, groupInstanceId, generationId, producerId, producerEpoch,
+ offsetMetadata, requestLocal, responseCallback)
}
}
@@ -871,7 +881,8 @@ class GroupCoordinator(val brokerId: Int,
groupInstanceId: Option[String],
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
- responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
+ responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
+ requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })
case None =>
@@ -880,14 +891,16 @@ class GroupCoordinator(val brokerId: Int,
if (generationId < 0) {
// the group is not relying on Kafka for group management, so allow the commit
val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
- doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata, responseCallback)
+ doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata,
+ responseCallback, requestLocal)
} else {
// or this is a request coming from an older generation. either way, reject the commit
responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
}
case Some(group) =>
- doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata, responseCallback)
+ doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata,
+ responseCallback, requestLocal)
}
}
}
@@ -907,6 +920,7 @@ class GroupCoordinator(val brokerId: Int,
producerId: Long,
producerEpoch: Short,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+ requestLocal: RequestLocal,
responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
group.inLock {
val validationErrorOpt = validateOffsetCommit(
@@ -920,7 +934,8 @@ class GroupCoordinator(val brokerId: Int,
if (validationErrorOpt.isDefined) {
responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get })
} else {
- groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
+ groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId,
+ producerEpoch, requestLocal)
}
}
}
@@ -963,7 +978,8 @@ class GroupCoordinator(val brokerId: Int,
groupInstanceId: Option[String],
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
- responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
+ responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
+ requestLocal: RequestLocal): Unit = {
group.inLock {
val validationErrorOpt = validateOffsetCommit(
group,
@@ -985,7 +1001,7 @@ class GroupCoordinator(val brokerId: Int,
// on heartbeat response to eventually notify the rebalance in progress signal to the consumer
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
- groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
+ groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, requestLocal = requestLocal)
case CompletingRebalance =>
// We should not receive a commit request if the group has not completed rebalance;
@@ -1041,10 +1057,9 @@ class GroupCoordinator(val brokerId: Int,
}
}
- def handleDeletedPartitions(topicPartitions: Seq[TopicPartition]): Unit = {
- val offsetsRemoved = groupManager.cleanupGroupMetadata(groupManager.currentGroups, group => {
- group.removeOffsets(topicPartitions)
- })
+ def handleDeletedPartitions(topicPartitions: Seq[TopicPartition], requestLocal: RequestLocal): Unit = {
+ val offsetsRemoved = groupManager.cleanupGroupMetadata(groupManager.currentGroups, requestLocal,
+ _.removeOffsets(topicPartitions))
info(s"Removed $offsetsRemoved offsets associated with deleted partitions: ${topicPartitions.mkString(", ")}.")
}
@@ -1235,7 +1250,8 @@ class GroupCoordinator(val brokerId: Int,
newMemberId: String,
groupInstanceId: String,
protocols: List[(String, Array[Byte])],
- responseCallback: JoinCallback): Unit = {
+ responseCallback: JoinCallback,
+ requestLocal: RequestLocal): Unit = {
val currentLeader = group.leaderOrNull
val member = group.replaceStaticMember(groupInstanceId, oldMemberId, newMemberId)
// Heartbeat of old member id will expire without effect since the group no longer contains that member id.
@@ -1287,7 +1303,7 @@ class GroupCoordinator(val brokerId: Int,
leaderId = currentLeader,
error = Errors.NONE))
}
- })
+ }, requestLocal)
} else {
maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol")
}
@@ -1411,7 +1427,7 @@ class GroupCoordinator(val brokerId: Int,
// This should be safe since there are no active members in an empty generation, so we just warn.
warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
}
- })
+ }, RequestLocal.NoCaching)
} else {
info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) with ${group.size} members")
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index c054234..d3d911d 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -24,14 +24,13 @@ import java.util.Optional
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
-
import com.yammer.metrics.core.Gauge
import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, KAFKA_2_3_IV0}
import kafka.common.OffsetAndMetadata
import kafka.internals.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
import kafka.log.AppendOrigin
import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{FetchLogEnd, ReplicaManager}
+import kafka.server.{FetchLogEnd, ReplicaManager, RequestLocal}
import kafka.utils.CoreUtils.inLock
import kafka.utils.Implicits._
import kafka.utils._
@@ -240,7 +239,8 @@ class GroupMetadataManager(brokerId: Int,
def storeGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
- responseCallback: Errors => Unit): Unit = {
+ responseCallback: Errors => Unit,
+ requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
getMagic(partitionFor(group.groupId)) match {
case Some(magicValue) =>
// We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
@@ -310,7 +310,7 @@ class GroupMetadataManager(brokerId: Int,
responseCallback(responseError)
}
- appendForGroup(group, groupMetadataRecords, putCacheCallback)
+ appendForGroup(group, groupMetadataRecords, requestLocal, putCacheCallback)
case None =>
responseCallback(Errors.NOT_COORDINATOR)
@@ -320,6 +320,7 @@ class GroupMetadataManager(brokerId: Int,
private def appendForGroup(group: GroupMetadata,
records: Map[TopicPartition, MemoryRecords],
+ requestLocal: RequestLocal,
callback: Map[TopicPartition, PartitionResponse] => Unit): Unit = {
// call replica manager to append the group message
replicaManager.appendRecords(
@@ -329,7 +330,8 @@ class GroupMetadataManager(brokerId: Int,
origin = AppendOrigin.Coordinator,
entriesPerPartition = records,
delayedProduceLock = Some(group.lock),
- responseCallback = callback)
+ responseCallback = callback,
+ requestLocal = requestLocal)
}
/**
@@ -340,7 +342,8 @@ class GroupMetadataManager(brokerId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
producerId: Long = RecordBatch.NO_PRODUCER_ID,
- producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH): Unit = {
+ producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
+ requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
// first filter out partitions with offset metadata size exceeding limit
val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
validateOffsetMetadataLength(offsetAndMetadata.metadata)
@@ -467,7 +470,7 @@ class GroupMetadataManager(brokerId: Int,
}
}
- appendForGroup(group, entries, putCacheCallback)
+ appendForGroup(group, entries, requestLocal, putCacheCallback)
case None =>
val commitStatus = offsetMetadata.map { case (topicPartition, _) =>
@@ -781,9 +784,8 @@ class GroupMetadataManager(brokerId: Int,
// visible for testing
private[group] def cleanupGroupMetadata(): Unit = {
val currentTimestamp = time.milliseconds()
- val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => {
- group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
- })
+ val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, RequestLocal.NoCaching,
+ _.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs))
offsetExpiredSensor.record(numOffsetsRemoved)
if (numOffsetsRemoved > 0)
info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.")
@@ -796,7 +798,8 @@ class GroupMetadataManager(brokerId: Int,
* a group lock is held, therefore there is no need for the caller to also obtain a group lock.
* @return The cumulative number of offsets removed
*/
- def cleanupGroupMetadata(groups: Iterable[GroupMetadata], selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = {
+ def cleanupGroupMetadata(groups: Iterable[GroupMetadata], requestLocal: RequestLocal,
+ selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = {
var offsetsRemoved = 0
groups.foreach { group =>
@@ -843,7 +846,8 @@ class GroupMetadataManager(brokerId: Int,
// do not need to require acks since even if the tombstone is lost,
// it will be appended again in the next purge cycle
val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones.toArray: _*)
- partition.appendRecordsToLeader(records, origin = AppendOrigin.Coordinator, requiredAcks = 0)
+ partition.appendRecordsToLeader(records, origin = AppendOrigin.Coordinator, requiredAcks = 0,
+ requestLocal = requestLocal)
offsetsRemoved += removedOffsets.size
trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 543e9c8..78983c1 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -18,8 +18,7 @@ package kafka.coordinator.transaction
import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
-
-import kafka.server.{KafkaConfig, MetadataCache, ReplicaManager}
+import kafka.server.{KafkaConfig, MetadataCache, ReplicaManager, RequestLocal}
import kafka.utils.{Logging, Scheduler}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
@@ -104,7 +103,8 @@ class TransactionCoordinator(brokerId: Int,
def handleInitProducerId(transactionalId: String,
transactionTimeoutMs: Int,
expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch],
- responseCallback: InitProducerIdCallback): Unit = {
+ responseCallback: InitProducerIdCallback,
+ requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
if (transactionalId == null) {
// if the transactional id is null, then always blindly accept the request
@@ -167,7 +167,8 @@ class TransactionCoordinator(brokerId: Int,
newMetadata.producerEpoch,
TransactionResult.ABORT,
isFromClient = false,
- sendRetriableErrorCallback)
+ sendRetriableErrorCallback,
+ requestLocal)
} else {
def sendPidResponseCallback(error: Errors): Unit = {
if (error == Errors.NONE) {
@@ -181,7 +182,8 @@ class TransactionCoordinator(brokerId: Int,
}
}
- txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendPidResponseCallback)
+ txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata,
+ sendPidResponseCallback, requestLocal = requestLocal)
}
}
}
@@ -320,7 +322,8 @@ class TransactionCoordinator(brokerId: Int,
producerId: Long,
producerEpoch: Short,
partitions: collection.Set[TopicPartition],
- responseCallback: AddPartitionsCallback): Unit = {
+ responseCallback: AddPartitionsCallback,
+ requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
if (transactionalId == null || transactionalId.isEmpty) {
debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request")
responseCallback(Errors.INVALID_REQUEST)
@@ -360,7 +363,8 @@ class TransactionCoordinator(brokerId: Int,
responseCallback(err)
case Right((coordinatorEpoch, newMetadata)) =>
- txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, responseCallback)
+ txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata,
+ responseCallback, requestLocal = requestLocal)
}
}
}
@@ -413,13 +417,15 @@ class TransactionCoordinator(brokerId: Int,
producerId: Long,
producerEpoch: Short,
txnMarkerResult: TransactionResult,
- responseCallback: EndTxnCallback): Unit = {
+ responseCallback: EndTxnCallback,
+ requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
endTransaction(transactionalId,
producerId,
producerEpoch,
txnMarkerResult,
isFromClient = true,
- responseCallback)
+ responseCallback,
+ requestLocal)
}
private def endTransaction(transactionalId: String,
@@ -427,7 +433,8 @@ class TransactionCoordinator(brokerId: Int,
producerEpoch: Short,
txnMarkerResult: TransactionResult,
isFromClient: Boolean,
- responseCallback: EndTxnCallback): Unit = {
+ responseCallback: EndTxnCallback,
+ requestLocal: RequestLocal): Unit = {
var isEpochFence = false
if (transactionalId == null || transactionalId.isEmpty)
responseCallback(Errors.INVALID_REQUEST)
@@ -586,7 +593,8 @@ class TransactionCoordinator(brokerId: Int,
}
}
- txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendTxnMarkersCallback)
+ txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata,
+ sendTxnMarkersCallback, requestLocal = requestLocal)
}
}
}
@@ -643,7 +651,8 @@ class TransactionCoordinator(brokerId: Int,
txnTransitMetadata.producerEpoch,
TransactionResult.ABORT,
isFromClient = false,
- onComplete(txnIdAndPidEpoch))
+ onComplete(txnIdAndPidEpoch),
+ RequestLocal.NoCaching)
}
}
}
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 5e22fb7..62c70d9 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -19,11 +19,10 @@ package kafka.coordinator.transaction
import java.util
import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue}
-
import kafka.api.KAFKA_2_8_IV0
import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{KafkaConfig, MetadataCache}
+import kafka.server.{KafkaConfig, MetadataCache, RequestLocal}
import kafka.utils.Implicits._
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.clients._
@@ -330,8 +329,8 @@ class TransactionMarkerChannelManager(
throw new IllegalStateException(errorMsg)
}
- txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch, txnLogAppend.newMetadata, appendCallback,
- _ == Errors.COORDINATOR_NOT_AVAILABLE)
+ txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch,
+ txnLogAppend.newMetadata, appendCallback, _ == Errors.COORDINATOR_NOT_AVAILABLE, RequestLocal.NoCaching)
}
def addTxnMarkersToBrokerQueue(transactionalId: String,
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 61fad95..25580f2 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -21,10 +21,9 @@ import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantReadWriteLock
-
import kafka.log.{AppendOrigin, LogConfig}
import kafka.message.UncompressedCodec
-import kafka.server.{Defaults, FetchLogEnd, ReplicaManager}
+import kafka.server.{Defaults, FetchLogEnd, ReplicaManager, RequestLocal}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.{Logging, Pool, Scheduler}
import kafka.utils.Implicits._
@@ -209,7 +208,8 @@ class TransactionStateManager(brokerId: Int,
internalTopicsAllowed = true,
origin = AppendOrigin.Coordinator,
recordsPerPartition,
- removeFromCacheCallback)
+ removeFromCacheCallback,
+ requestLocal = RequestLocal.NoCaching)
}
}, delay = config.removeExpiredTransactionalIdsIntervalMs, period = config.removeExpiredTransactionalIdsIntervalMs)
@@ -526,7 +526,8 @@ class TransactionStateManager(brokerId: Int,
coordinatorEpoch: Int,
newMetadata: TxnTransitMetadata,
responseCallback: Errors => Unit,
- retryOnError: Errors => Boolean = _ => false): Unit = {
+ retryOnError: Errors => Boolean = _ => false,
+ requestLocal: RequestLocal): Unit = {
// generate the message for this transaction metadata
val keyBytes = TransactionLog.keyToBytes(transactionalId)
@@ -679,7 +680,8 @@ class TransactionStateManager(brokerId: Int,
internalTopicsAllowed = true,
origin = AppendOrigin.Coordinator,
recordsPerPartition,
- updateCacheCallback)
+ updateCacheCallback,
+ requestLocal = requestLocal)
trace(s"Appending new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log")
}
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0e74c28..b49bfb4 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -24,7 +24,6 @@ import java.util.Optional
import java.util.concurrent.atomic._
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
-
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0}
import kafka.common.{LongRef, OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.AppendOrigin.RaftLeader
@@ -32,7 +31,7 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCod
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.LeaderEpochFileCache
-import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.{DescribeProducersResponseData, FetchResponseData}
@@ -693,15 +692,17 @@ class Log(@volatile private var _dir: File,
* @param records The records to append
* @param origin Declares the origin of the append which affects required validations
* @param interBrokerProtocolVersion Inter-broker message protocol version
+ * @param requestLocal request local instance
* @throws KafkaStorageException If the append fails due to an I/O error.
* @return Information about the appended messages including the first and last offset.
*/
def appendAsLeader(records: MemoryRecords,
leaderEpoch: Int,
origin: AppendOrigin = AppendOrigin.Client,
- interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
+ interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion,
+ requestLocal: RequestLocal = RequestLocal.NoCaching): LogAppendInfo = {
val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader
- append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, ignoreRecordSize = false)
+ append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), ignoreRecordSize = false)
}
/**
@@ -717,6 +718,7 @@ class Log(@volatile private var _dir: File,
interBrokerProtocolVersion = ApiVersion.latestVersion,
validateAndAssignOffsets = false,
leaderEpoch = -1,
+ None,
// disable to check the validation of record size since the record is already accepted by leader.
ignoreRecordSize = true)
}
@@ -732,6 +734,7 @@ class Log(@volatile private var _dir: File,
* @param interBrokerProtocolVersion Inter-broker message protocol version
* @param validateAndAssignOffsets Should the log assign offsets to this message set or blindly apply what it is given
* @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
+ * @param requestLocal The request local instance if assignOffsets is true
* @param ignoreRecordSize true to skip validation of record size.
* @throws KafkaStorageException If the append fails due to an I/O error.
* @throws OffsetsOutOfOrderException If out of order offsets found in 'records'
@@ -743,6 +746,7 @@ class Log(@volatile private var _dir: File,
interBrokerProtocolVersion: ApiVersion,
validateAndAssignOffsets: Boolean,
leaderEpoch: Int,
+ requestLocal: Option[RequestLocal],
ignoreRecordSize: Boolean): LogAppendInfo = {
val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)
@@ -778,7 +782,9 @@ class Log(@volatile private var _dir: File,
leaderEpoch,
origin,
interBrokerProtocolVersion,
- brokerTopicStats)
+ brokerTopicStats,
+ requestLocal.getOrElse(throw new IllegalArgumentException(
+ "requestLocal should be defined if assignOffsets is true")))
} catch {
case e: IOException =>
throw new KafkaException(s"Error validating messages while appending to log $name", e)
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 056be10..925c602 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer
import kafka.api.{ApiVersion, KAFKA_2_1_IV0}
import kafka.common.{LongRef, RecordValidationException}
import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec}
-import kafka.server.BrokerTopicStats
+import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record.{AbstractRecords, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
@@ -28,7 +28,7 @@ import org.apache.kafka.common.InvalidRecordException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ProduceResponse.RecordError
-import org.apache.kafka.common.utils.{BufferSupplier, Time}
+import org.apache.kafka.common.utils.Time
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
@@ -95,7 +95,8 @@ private[log] object LogValidator extends Logging {
partitionLeaderEpoch: Int,
origin: AppendOrigin,
interBrokerProtocolVersion: ApiVersion,
- brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
+ brokerTopicStats: BrokerTopicStats,
+ requestLocal: RequestLocal): ValidationAndOffsetAssignResult = {
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
// check the magic value
if (!records.hasMatchingMagic(magic))
@@ -106,8 +107,9 @@ private[log] object LogValidator extends Logging {
assignOffsetsNonCompressed(records, topicPartition, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
partitionLeaderEpoch, origin, magic, brokerTopicStats)
} else {
- validateMessagesAndAssignOffsetsCompressed(records, topicPartition, offsetCounter, time, now, sourceCodec, targetCodec, compactedTopic,
- magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, origin, interBrokerProtocolVersion, brokerTopicStats)
+ validateMessagesAndAssignOffsetsCompressed(records, topicPartition, offsetCounter, time, now, sourceCodec,
+ targetCodec, compactedTopic, magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, origin,
+ interBrokerProtocolVersion, brokerTopicStats, requestLocal)
}
}
@@ -232,6 +234,8 @@ private[log] object LogValidator extends Logging {
(first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
}
+ // The current implementation of BufferSupplier is naive and works best when the buffer size
+ // cardinality is low, so don't use it here
val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType,
offsetCounter.value, now, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
@@ -290,7 +294,9 @@ private[log] object LogValidator extends Logging {
var offsetOfMaxBatchTimestamp = -1L
val recordErrors = new ArrayBuffer[ApiRecordError](0)
- // this is a hot path and we want to avoid any unnecessary allocations.
+ // This is a hot path and we want to avoid any unnecessary allocations.
+ // That said, there is no benefit in using `skipKeyValueIterator` for the uncompressed
+ // case since we don't do key/value copies in this path (we just slice the ByteBuffer)
var batchIndex = 0
batch.forEach { record =>
validateRecord(batch, topicPartition, record, batchIndex, now, timestampType,
@@ -360,7 +366,8 @@ private[log] object LogValidator extends Logging {
partitionLeaderEpoch: Int,
origin: AppendOrigin,
interBrokerProtocolVersion: ApiVersion,
- brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
+ brokerTopicStats: BrokerTopicStats,
+ requestLocal: RequestLocal): ValidationAndOffsetAssignResult = {
if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " +
@@ -404,9 +411,9 @@ private[log] object LogValidator extends Logging {
// if we are on version 2 and beyond, and we know we are going for in place assignment,
// then we can optimize the iterator to skip key / value / headers since they would not be used at all
val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= RecordBatch.MAGIC_VALUE_V2)
- batch.skipKeyValueIterator(BufferSupplier.NO_CACHING)
+ batch.skipKeyValueIterator(requestLocal.bufferSupplier)
else
- batch.streamingIterator(BufferSupplier.NO_CACHING)
+ batch.streamingIterator(requestLocal.bufferSupplier)
try {
val recordErrors = new ArrayBuffer[ApiRecordError](0)
@@ -499,6 +506,8 @@ private[log] object LogValidator extends Logging {
val startNanos = time.nanoseconds
val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType,
validatedRecords.asJava)
+ // The current implementation of BufferSupplier is naive and works best when the buffer size
+ // cardinality is low, so don't use it here
val buffer = ByteBuffer.allocate(estimatedSize)
val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value,
logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch)
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 6ac39b7..7b5f83d 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -19,10 +19,9 @@ package kafka.raft
import java.io.File
import java.nio.file.{Files, NoSuchFileException, Path}
import java.util.{Optional, Properties}
-
import kafka.api.ApiVersion
import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel, RequestLocal}
import kafka.utils.{CoreUtils, Logging, Scheduler}
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.Time
@@ -76,7 +75,8 @@ final class KafkaMetadataLog private (
handleAndConvertLogAppendInfo(
log.appendAsLeader(records.asInstanceOf[MemoryRecords],
leaderEpoch = epoch,
- origin = AppendOrigin.RaftLeader
+ origin = AppendOrigin.RaftLeader,
+ requestLocal = RequestLocal.NoCaching
)
)
}
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 47bc19d..fa0dc79 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -76,7 +76,7 @@ class ControllerApis(val requestChannel: RequestChannel,
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
private val aclApis = new AclApis(authHelper, authorizer, requestHelper, "controller", config)
- override def handle(request: RequestChannel.Request): Unit = {
+ override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
try {
request.header.apiKey match {
case ApiKeys.FETCH => handleFetch(request)
@@ -97,7 +97,7 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignments(request)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignments(request)
- case ApiKeys.ENVELOPE => handleEnvelopeRequest(request)
+ case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
@@ -113,12 +113,12 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleEnvelopeRequest(request: RequestChannel.Request): Unit = {
+ def handleEnvelopeRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException(
s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope"))
} else {
- EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
+ EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle(_, requestLocal))
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d75e4ae..c5e1404 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -144,7 +144,7 @@ class KafkaApis(val requestChannel: RequestChannel,
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
- override def handle(request: RequestChannel.Request): Unit = {
+ override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
try {
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
@@ -156,21 +156,21 @@ class KafkaApis(val requestChannel: RequestChannel,
}
request.header.apiKey match {
- case ApiKeys.PRODUCE => handleProduceRequest(request)
+ case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
- case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
+ case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal)
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
- case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
+ case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
- case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
+ case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
- case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
+ case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
@@ -178,13 +178,13 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
- case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
+ case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
- case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
- case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
- case ApiKeys.END_TXN => handleEndTxnRequest(request)
- case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
- case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
+ case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request, requestLocal)
+ case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request, requestLocal)
+ case ApiKeys.END_TXN => handleEndTxnRequest(request, requestLocal)
+ case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal)
+ case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal)
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
@@ -198,19 +198,19 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.RENEW_DELEGATION_TOKEN => maybeForwardToController(request, handleRenewTokenRequest)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
- case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
+ case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal)
case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
- case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request)
+ case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal)
case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)
case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures)
- case ApiKeys.ENVELOPE => handleEnvelope(request)
+ case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest)
@@ -314,7 +314,7 @@ class KafkaApis(val requestChannel: RequestChannel,
CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads(), this)
}
- def handleUpdateMetadataRequest(request: RequestChannel.Request): Unit = {
+ def handleUpdateMetadataRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
val correlationId = request.header.correlationId
val updateMetadataRequest = request.body[UpdateMetadataRequest]
@@ -330,13 +330,14 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
if (deletedPartitions.nonEmpty)
- groupCoordinator.handleDeletedPartitions(deletedPartitions)
+ groupCoordinator.handleDeletedPartitions(deletedPartitions, requestLocal)
if (zkSupport.adminManager.hasDelayedTopicOperations) {
updateMetadataRequest.partitionStates.forEach { partitionState =>
zkSupport.adminManager.tryCompleteDelayedTopicOperations(partitionState.topicName)
}
}
+
quotas.clientQuotaCallback.foreach { callback =>
if (callback.updateClusterMetadata(metadataCache.getClusterMetadata(clusterId, request.context.listenerName))) {
quotas.fetch.updateQuotaMetricConfigs()
@@ -380,7 +381,7 @@ class KafkaApis(val requestChannel: RequestChannel,
/**
* Handle an offset commit request
*/
- def handleOffsetCommitRequest(request: RequestChannel.Request): Unit = {
+ def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val header = request.header
val offsetCommitRequest = request.body[OffsetCommitRequest]
@@ -509,7 +510,8 @@ class KafkaApis(val requestChannel: RequestChannel,
Option(offsetCommitRequest.data.groupInstanceId),
offsetCommitRequest.data.generationId,
partitionData,
- sendResponseCallback)
+ sendResponseCallback,
+ requestLocal)
}
}
}
@@ -517,7 +519,7 @@ class KafkaApis(val requestChannel: RequestChannel,
/**
* Handle a produce request
*/
- def handleProduceRequest(request: RequestChannel.Request): Unit = {
+ def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val produceRequest = request.body[ProduceRequest]
val requestSize = request.sizeInBytes
@@ -639,6 +641,7 @@ class KafkaApis(val requestChannel: RequestChannel,
internalTopicsAllowed = internalTopicsAllowed,
origin = AppendOrigin.Client,
entriesPerPartition = authorizedRequestInfo,
+ requestLocal = requestLocal,
responseCallback = sendResponseCallback,
recordConversionStatsCallback = processingStatsCallback)
@@ -1448,7 +1451,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def handleJoinGroupRequest(request: RequestChannel.Request): Unit = {
+ def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val joinGroupRequest = request.body[JoinGroupRequest]
// the callback for sending a join-group response
@@ -1507,11 +1510,12 @@ class KafkaApis(val requestChannel: RequestChannel,
joinGroupRequest.data.sessionTimeoutMs,
joinGroupRequest.data.protocolType,
protocols,
- sendResponseCallback)
+ sendResponseCallback,
+ requestLocal)
}
}
- def handleSyncGroupRequest(request: RequestChannel.Request): Unit = {
+ def handleSyncGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val syncGroupRequest = request.body[SyncGroupRequest]
def sendResponseCallback(syncGroupResult: SyncGroupResult): Unit = {
@@ -1550,19 +1554,20 @@ class KafkaApis(val requestChannel: RequestChannel,
Option(syncGroupRequest.data.protocolName),
Option(syncGroupRequest.data.groupInstanceId),
assignmentMap.result(),
- sendResponseCallback
+ sendResponseCallback,
+ requestLocal
)
}
}
- def handleDeleteGroupsRequest(request: RequestChannel.Request): Unit = {
+ def handleDeleteGroupsRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val deleteGroupsRequest = request.body[DeleteGroupsRequest]
val groups = deleteGroupsRequest.data.groupsNames.asScala.distinct
val (authorizedGroups, unauthorizedGroups) = authHelper.partitionSeqByAuthorized(request.context, DELETE, GROUP,
groups)(identity)
- val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups.toSet) ++
+ val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups.toSet, requestLocal) ++
unauthorizedGroups.map(_ -> Errors.GROUP_AUTHORIZATION_FAILED)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
@@ -1990,7 +1995,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = {
+ def handleInitProducerIdRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val initProducerIdRequest = request.body[InitProducerIdRequest]
val transactionalId = initProducerIdRequest.data.transactionalId
@@ -2035,12 +2040,12 @@ class KafkaApis(val requestChannel: RequestChannel,
producerIdAndEpoch match {
case Right(producerIdAndEpoch) => txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.data.transactionTimeoutMs,
- producerIdAndEpoch, sendResponseCallback)
+ producerIdAndEpoch, sendResponseCallback, requestLocal)
case Left(error) => requestHelper.sendErrorResponseMaybeThrottle(request, error.exception)
}
}
- def handleEndTxnRequest(request: RequestChannel.Request): Unit = {
+ def handleEndTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
val endTxnRequest = request.body[EndTxnRequest]
val transactionalId = endTxnRequest.data.transactionalId
@@ -2071,7 +2076,8 @@ class KafkaApis(val requestChannel: RequestChannel,
endTxnRequest.data.producerId,
endTxnRequest.data.producerEpoch,
endTxnRequest.result(),
- sendResponseCallback)
+ sendResponseCallback,
+ requestLocal)
} else
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new EndTxnResponse(new EndTxnResponseData()
@@ -2080,7 +2086,7 @@ class KafkaApis(val requestChannel: RequestChannel,
)
}
- def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = {
+ def handleWriteTxnMarkersRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest]
@@ -2175,6 +2181,7 @@ class KafkaApis(val requestChannel: RequestChannel,
internalTopicsAllowed = true,
origin = AppendOrigin.Coordinator,
entriesPerPartition = controlRecords,
+ requestLocal = requestLocal,
responseCallback = maybeSendResponseCallback(producerId, marker.transactionResult))
}
}
@@ -2190,7 +2197,7 @@ class KafkaApis(val requestChannel: RequestChannel,
throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}")
}
- def handleAddPartitionToTxnRequest(request: RequestChannel.Request): Unit = {
+ def handleAddPartitionToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
val transactionalId = addPartitionsToTxnRequest.data.transactionalId
@@ -2240,7 +2247,6 @@ class KafkaApis(val requestChannel: RequestChannel,
responseBody
}
-
requestHelper.sendResponseMaybeThrottle(request, createResponse)
}
@@ -2248,12 +2254,13 @@ class KafkaApis(val requestChannel: RequestChannel,
addPartitionsToTxnRequest.data.producerId,
addPartitionsToTxnRequest.data.producerEpoch,
authorizedPartitions,
- sendResponseCallback)
+ sendResponseCallback,
+ requestLocal)
}
}
}
- def handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit = {
+ def handleAddOffsetsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
val addOffsetsToTxnRequest = request.body[AddOffsetsToTxnRequest]
val transactionalId = addOffsetsToTxnRequest.data.transactionalId
@@ -2298,11 +2305,12 @@ class KafkaApis(val requestChannel: RequestChannel,
addOffsetsToTxnRequest.data.producerId,
addOffsetsToTxnRequest.data.producerEpoch,
Set(offsetTopicPartition),
- sendResponseCallback)
+ sendResponseCallback,
+ requestLocal)
}
}
- def handleTxnOffsetCommitRequest(request: RequestChannel.Request): Unit = {
+ def handleTxnOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
val header = request.header
val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest]
@@ -2367,7 +2375,8 @@ class KafkaApis(val requestChannel: RequestChannel,
Option(txnOffsetCommitRequest.data.groupInstanceId),
txnOffsetCommitRequest.data.generationId,
offsetMetadata,
- sendResponseCallback)
+ sendResponseCallback,
+ requestLocal)
}
}
}
@@ -2877,7 +2886,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def handleOffsetDeleteRequest(request: RequestChannel.Request): Unit = {
+ def handleOffsetDeleteRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val offsetDeleteRequest = request.body[OffsetDeleteRequest]
val groupId = offsetDeleteRequest.data.groupId
@@ -2901,7 +2910,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val (groupError, authorizedTopicPartitionsErrors) = groupCoordinator.handleDeleteOffsets(
- groupId, topicPartitions)
+ groupId, topicPartitions, requestLocal)
topicPartitionErrors ++= authorizedTopicPartitionsErrors
@@ -3129,7 +3138,7 @@ class KafkaApis(val requestChannel: RequestChannel,
})
}
- def handleEnvelope(request: RequestChannel.Request): Unit = {
+ def handleEnvelope(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
// If forwarding is not yet enabled or this request has been received on an invalid endpoint,
@@ -3154,7 +3163,8 @@ class KafkaApis(val requestChannel: RequestChannel,
s"Broker $brokerId is not the active controller"))
return
}
- EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
+
+ EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle(_, requestLocal))
}
def handleDescribeProducersRequest(request: RequestChannel.Request): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index f6d868a..4d38c6e 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -20,9 +20,9 @@ package kafka.server
import kafka.network._
import kafka.utils._
import kafka.metrics.KafkaMetricsGroup
+
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
-
import com.yammer.metrics.core.Meter
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{KafkaThread, Time}
@@ -31,7 +31,7 @@ import scala.collection.mutable
import scala.jdk.CollectionConverters._
trait ApiRequestHandler {
- def handle(request: RequestChannel.Request): Unit
+ def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit
}
/**
@@ -44,8 +44,9 @@ class KafkaRequestHandler(id: Int,
val requestChannel: RequestChannel,
apis: ApiRequestHandler,
time: Time) extends Runnable with Logging {
- this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
+ this.logIdent = s"[Kafka Request Handler $id on Broker $brokerId], "
private val shutdownComplete = new CountDownLatch(1)
+ private val requestLocal = RequestLocal.withThreadConfinedCaching
@volatile private var stopped = false
def run(): Unit = {
@@ -64,17 +65,17 @@ class KafkaRequestHandler(id: Int,
req match {
case RequestChannel.ShutdownRequest =>
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
- shutdownComplete.countDown()
+ completeShutdown()
return
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
- apis.handle(request)
+ apis.handle(request, requestLocal)
} catch {
case e: FatalExitError =>
- shutdownComplete.countDown()
+ completeShutdown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
} finally {
@@ -84,6 +85,11 @@ class KafkaRequestHandler(id: Int,
case null => // continue
}
}
+ completeShutdown()
+ }
+
+ private def completeShutdown(): Unit = {
+ requestLocal.close()
shutdownComplete.countDown()
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d813241..6ca8169 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -21,7 +21,6 @@ import java.util.Optional
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
-
import com.yammer.metrics.core.Meter
import kafka.api._
import kafka.cluster.{BrokerEndPoint, Partition}
@@ -606,11 +605,12 @@ class ReplicaManager(val config: KafkaConfig,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
- recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = {
+ recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (),
+ requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
if (isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
- origin, entriesPerPartition, requiredAcks)
+ origin, entriesPerPartition, requiredAcks, requestLocal)
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
val produceStatus = localProduceResults.map { case (topicPartition, result) =>
@@ -926,7 +926,8 @@ class ReplicaManager(val config: KafkaConfig,
private def appendToLocalLog(internalTopicsAllowed: Boolean,
origin: AppendOrigin,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
- requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
+ requiredAcks: Short,
+ requestLocal: RequestLocal): Map[TopicPartition, LogAppendResult] = {
val traceEnabled = isTraceEnabled
def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L)
@@ -952,7 +953,7 @@ class ReplicaManager(val config: KafkaConfig,
} else {
try {
val partition = getPartitionOrException(topicPartition)
- val info = partition.appendRecordsToLeader(records, origin, requiredAcks)
+ val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal)
val numAppendedMessages = info.numMessages
// update stats for successfully appended bytes and messages as bytesInRate and messageInRate
diff --git a/core/src/main/scala/kafka/server/RequestLocal.scala b/core/src/main/scala/kafka/server/RequestLocal.scala
new file mode 100644
index 0000000..5af495f
--- /dev/null
+++ b/core/src/main/scala/kafka/server/RequestLocal.scala
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.common.utils.BufferSupplier
+
+object RequestLocal {
+ val NoCaching: RequestLocal = RequestLocal(BufferSupplier.NO_CACHING)
+
+ /** The returned instance should be confined to a single thread. */
+ def withThreadConfinedCaching: RequestLocal = RequestLocal(BufferSupplier.create())
+}
+
+/**
+ * Container for stateful instances where the lifecycle is scoped to one request.
+ *
+ * When each request is handled by one thread, efficient data structures with no locking or atomic operations
+ * can be used (see RequestLocal.withThreadConfinedCaching).
+ */
+case class RequestLocal(bufferSupplier: BufferSupplier) {
+ def close(): Unit = bufferSupplier.close()
+}
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 6dfaa18..70e44c8 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{RaftReplicaManager, RequestHandlerHelper}
+import kafka.server.{RaftReplicaManager, RequestHandlerHelper, RequestLocal}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.metadata.MetadataRecordType._
import org.apache.kafka.common.metadata._
@@ -248,7 +248,7 @@ class BrokerMetadataListener(
case Some(topicName) =>
info(s"Processing deletion of topic $topicName with id ${record.topicId}")
val removedPartitions = imageBuilder.partitionsBuilder().removeTopicById(record.topicId())
- groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
+ groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq, RequestLocal.NoCaching)
configRepository.remove(new ConfigResource(ConfigResource.Type.TOPIC, topicName))
}
}
diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
index 91d5ecd3..a9b471b 100644
--- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
@@ -19,7 +19,7 @@ package kafka.tools
import kafka.network.RequestChannel
import kafka.raft.RaftManager
-import kafka.server.{ApiRequestHandler, ApiVersionManager}
+import kafka.server.{ApiRequestHandler, ApiVersionManager, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, EndQuorumEpochResponseData, FetchResponseData, FetchSnapshotResponseData, VoteResponseData}
@@ -37,7 +37,7 @@ class TestRaftRequestHandler(
apiVersionManager: ApiVersionManager
) extends ApiRequestHandler with Logging {
- override def handle(request: RequestChannel.Request): Unit = {
+ override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
try {
trace(s"Handling request:${request.requestDesc(true)} with context ${request.context}")
request.header.apiKey match {
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 8d567ec..8dc37d4 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -20,7 +20,6 @@ package kafka.cluster
import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent._
-
import kafka.api.ApiVersion
import kafka.log._
import kafka.server._
@@ -336,10 +335,11 @@ class PartitionLockTest extends Logging {
}
private def append(partition: Partition, numRecords: Int, followerQueues: Seq[ArrayBlockingQueue[MemoryRecords]]): Unit = {
+ val requestLocal = RequestLocal.withThreadConfinedCaching
(0 until numRecords).foreach { _ =>
val batch = TestUtils.records(records = List(new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)))
- partition.appendRecordsToLeader(batch, origin = AppendOrigin.Client, requiredAcks = 0)
+ partition.appendRecordsToLeader(batch, origin = AppendOrigin.Client, requiredAcks = 0, requestLocal)
followerQueues.foreach(_.put(batch))
}
}
@@ -388,8 +388,9 @@ class PartitionLockTest extends Logging {
_topicId = None,
keepPartitionMetadataFile = true) {
- override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, origin: AppendOrigin, interBrokerProtocolVersion: ApiVersion): LogAppendInfo = {
- val appendInfo = super.appendAsLeader(records, leaderEpoch, origin, interBrokerProtocolVersion)
+ override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, origin: AppendOrigin,
+ interBrokerProtocolVersion: ApiVersion, requestLocal: RequestLocal): LogAppendInfo = {
+ val appendInfo = super.appendAsLeader(records, leaderEpoch, origin, interBrokerProtocolVersion, requestLocal)
appendSemaphore.acquire()
appendInfo
}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index b614e13..2b6d7ff 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -592,10 +592,11 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch")
assertEquals(Set[Integer](leader, follower2), partition.isrState.isr, "ISR")
+ val requestLocal = RequestLocal.withThreadConfinedCaching
// after makeLeader(() call, partition should know about all the replicas
// append records with initial leader epoch
- partition.appendRecordsToLeader(batch1, origin = AppendOrigin.Client, requiredAcks = 0)
- partition.appendRecordsToLeader(batch2, origin = AppendOrigin.Client, requiredAcks = 0)
+ partition.appendRecordsToLeader(batch1, origin = AppendOrigin.Client, requiredAcks = 0, requestLocal)
+ partition.appendRecordsToLeader(batch2, origin = AppendOrigin.Client, requiredAcks = 0, requestLocal)
assertEquals(partition.localLogOrException.logStartOffset, partition.localLogOrException.highWatermark,
"Expected leader's HW not move")
@@ -839,7 +840,7 @@ class PartitionTest extends AbstractPartitionTest {
new SimpleRecord("k2".getBytes, "v2".getBytes),
new SimpleRecord("k3".getBytes, "v3".getBytes)),
baseOffset = 0L)
- partition.appendRecordsToLeader(records, origin = AppendOrigin.Client, requiredAcks = 0)
+ partition.appendRecordsToLeader(records, origin = AppendOrigin.Client, requiredAcks = 0, RequestLocal.withThreadConfinedCaching)
def fetchLatestOffset(isolationLevel: Option[IsolationLevel]): TimestampAndOffset = {
val res = partition.fetchOffsetForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
@@ -954,11 +955,13 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch")
assertEquals(Set[Integer](leader, follower2), partition.isrState.isr, "ISR")
+ val requestLocal = RequestLocal.withThreadConfinedCaching
+
// after makeLeader(() call, partition should know about all the replicas
// append records with initial leader epoch
val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1, origin = AppendOrigin.Client,
- requiredAcks = 0).lastOffset
- partition.appendRecordsToLeader(batch2, origin = AppendOrigin.Client, requiredAcks = 0)
+ requiredAcks = 0, requestLocal).lastOffset
+ partition.appendRecordsToLeader(batch2, origin = AppendOrigin.Client, requiredAcks = 0, requestLocal)
assertEquals(partition.localLogOrException.logStartOffset, partition.log.get.highWatermark, "Expected leader's HW not move")
// let the follower in ISR move leader's HW to move further but below LEO
@@ -999,7 +1002,7 @@ class PartitionTest extends AbstractPartitionTest {
val currentLeaderEpochStartOffset = partition.localLogOrException.logEndOffset
// append records with the latest leader epoch
- partition.appendRecordsToLeader(batch3, origin = AppendOrigin.Client, requiredAcks = 0)
+ partition.appendRecordsToLeader(batch3, origin = AppendOrigin.Client, requiredAcks = 0, requestLocal)
// fetch from follower not in ISR from log start offset should not add this follower to ISR
updateFollowerFetchState(follower1, LogOffsetMetadata(0))
diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 14e0f3f..6dfb396 100644
--- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.{Collections, Random}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.Lock
-
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.log.{AppendOrigin, Log}
import kafka.server._
@@ -160,8 +159,10 @@ object AbstractCoordinatorConcurrencyTest {
class TestReplicaManager extends ReplicaManager(
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, None, null, null) {
+ @volatile var logs: mutable.Map[TopicPartition, (Log, Long)] = _
var producePurgatory: DelayedOperationPurgatory[DelayedProduce] = _
var watchKeys: mutable.Set[TopicPartitionOperationKey] = _
+
def createDelayedProducePurgatory(timer: MockTimer): Unit = {
producePurgatory = new DelayedOperationPurgatory[DelayedProduce]("Produce", timer, 1, reaperEnabled = false)
watchKeys = Collections.newSetFromMap(new ConcurrentHashMap[TopicPartitionOperationKey, java.lang.Boolean]()).asScala
@@ -176,7 +177,8 @@ object AbstractCoordinatorConcurrencyTest {
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
- processingStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = {
+ processingStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (),
+ requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
if (entriesPerPartition.isEmpty)
return
@@ -204,20 +206,24 @@ object AbstractCoordinatorConcurrencyTest {
watchKeys ++= producerRequestKeys
producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
}
+
override def getMagic(topicPartition: TopicPartition): Option[Byte] = {
Some(RecordBatch.MAGIC_VALUE_V2)
}
- @volatile var logs: mutable.Map[TopicPartition, (Log, Long)] = _
+
def getOrCreateLogs(): mutable.Map[TopicPartition, (Log, Long)] = {
if (logs == null)
logs = mutable.Map[TopicPartition, (Log, Long)]()
logs
}
+
def updateLog(topicPartition: TopicPartition, log: Log, endOffset: Long): Unit = {
getOrCreateLogs().put(topicPartition, (log, endOffset))
}
+
override def getLog(topicPartition: TopicPartition): Option[Log] =
getOrCreateLogs().get(topicPartition).map(l => l._1)
+
override def getLogEndOffset(topicPartition: TopicPartition): Option[Long] =
getOrCreateLogs().get(topicPartition).map(l => l._2)
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 3982689..23ebbeb 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -18,9 +18,8 @@
package kafka.coordinator.group
import java.util.Optional
-
import kafka.common.OffsetAndMetadata
-import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager}
+import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils._
import kafka.utils.timer.MockTimer
import org.apache.kafka.common.TopicPartition
@@ -29,9 +28,9 @@ import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse, TransactionResult}
import org.easymock.{Capture, EasyMock, IAnswer}
+
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
-
import kafka.cluster.Partition
import kafka.log.AppendOrigin
import kafka.zk.KafkaZkClient
@@ -3421,7 +3420,8 @@ class GroupCoordinatorTest {
@Test
def testDeleteOffsetOfNonExistingGroup(): Unit = {
val tp = new TopicPartition("foo", 0)
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp))
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp),
+ RequestLocal.NoCaching)
assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError)
assertTrue(topics.isEmpty)
@@ -3432,7 +3432,8 @@ class GroupCoordinatorTest {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
dynamicJoinGroup(groupId, memberId, "My Protocol", protocols)
val tp = new TopicPartition("foo", 0)
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp))
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp),
+ RequestLocal.NoCaching)
assertEquals(Errors.NON_EMPTY_GROUP, groupError)
assertTrue(topics.isEmpty)
@@ -3476,7 +3477,8 @@ class GroupCoordinatorTest {
EasyMock.expect(replicaManager.onlinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0))
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0),
+ RequestLocal.NoCaching)
assertEquals(Errors.NONE, groupError)
assertEquals(1, topics.size)
@@ -3505,7 +3507,8 @@ class GroupCoordinatorTest {
Map(tp -> offset))
assertEquals(Errors.NONE, validOffsetCommitResult(tp))
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp))
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp),
+ RequestLocal.NoCaching)
assertEquals(Errors.NONE, groupError)
assertEquals(1, topics.size)
@@ -3519,7 +3522,8 @@ class GroupCoordinatorTest {
groupCoordinator.groupManager.addGroup(group)
val tp = new TopicPartition("foo", 0)
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp))
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp),
+ RequestLocal.NoCaching)
assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError)
assertTrue(topics.isEmpty)
@@ -3562,7 +3566,8 @@ class GroupCoordinatorTest {
EasyMock.expect(replicaManager.onlinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0))
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0),
+ RequestLocal.NoCaching)
assertEquals(Errors.NONE, groupError)
assertEquals(1, topics.size)
@@ -3609,7 +3614,8 @@ class GroupCoordinatorTest {
EasyMock.expect(replicaManager.onlinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0, t2p0))
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0, t2p0),
+ RequestLocal.NoCaching)
assertEquals(Errors.NONE, groupError)
assertEquals(2, topics.size)
@@ -3789,12 +3795,14 @@ class GroupCoordinatorTest {
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
- EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
- override def answer = capturedArgument.getValue.apply(
+ EasyMock.anyObject(),
+ EasyMock.anyObject()
+ )).andAnswer(new IAnswer[Unit] {
+ override def answer: Unit = capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
new PartitionResponse(appendRecordError, 0L, RecordBatch.NO_TIMESTAMP, 0L)
- )
- )})
+ ))
+ })
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
@@ -3821,6 +3829,7 @@ class GroupCoordinatorTest {
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+ EasyMock.anyObject(),
EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
@@ -3963,6 +3972,7 @@ class GroupCoordinatorTest {
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
@@ -3996,6 +4006,7 @@ class GroupCoordinatorTest {
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 231382e..5b2152b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -21,15 +21,15 @@ import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, Optional}
-
import com.yammer.metrics.core.Gauge
+
import javax.management.ObjectName
import kafka.api._
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
import kafka.log.{AppendOrigin, Log, LogAppendInfo}
import kafka.metrics.KafkaYammerMetrics
-import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata, ReplicaManager}
+import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata, ReplicaManager, RequestLocal}
import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
@@ -105,7 +105,7 @@ class GroupMetadataManagerTest {
var expiredOffsets: Int = 0
var infoCount = 0
val gmm = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, time, metrics) {
- override def cleanupGroupMetadata(groups: Iterable[GroupMetadata],
+ override def cleanupGroupMetadata(groups: Iterable[GroupMetadata], requestLocal: RequestLocal,
selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = expiredOffsets
override def info(msg: => String): Unit = infoCount += 1
@@ -1418,8 +1418,8 @@ class GroupMetadataManagerTest {
EasyMock.reset(partition)
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
- origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
- .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+ origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+ EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
EasyMock.replay(partition)
groupMetadataManager.cleanupGroupMetadata()
@@ -1453,8 +1453,8 @@ class GroupMetadataManagerTest {
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
mockGetPartition()
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
- origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
- .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+ origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+ EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
EasyMock.replay(replicaManager, partition)
groupMetadataManager.cleanupGroupMetadata()
@@ -1501,8 +1501,8 @@ class GroupMetadataManagerTest {
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
mockGetPartition()
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
- origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
- .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+ origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+ EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
EasyMock.replay(replicaManager, partition)
groupMetadataManager.cleanupGroupMetadata()
@@ -1576,8 +1576,8 @@ class GroupMetadataManagerTest {
val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
- origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
- .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+ origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+ EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
EasyMock.replay(partition)
groupMetadataManager.cleanupGroupMetadata()
@@ -1677,8 +1677,8 @@ class GroupMetadataManagerTest {
// expect the offset tombstone
EasyMock.reset(partition)
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
- origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
- .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+ origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+ EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
EasyMock.replay(partition)
groupMetadataManager.cleanupGroupMetadata()
@@ -1701,8 +1701,8 @@ class GroupMetadataManagerTest {
// expect the offset tombstone
EasyMock.reset(partition)
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
- origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
- .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+ origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+ EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
EasyMock.replay(partition)
groupMetadataManager.cleanupGroupMetadata()
@@ -1744,8 +1744,8 @@ class GroupMetadataManagerTest {
// expect the offset tombstone
EasyMock.reset(partition)
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
- origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
- .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+ origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+ EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
EasyMock.replay(partition)
groupMetadataManager.cleanupGroupMetadata()
@@ -1822,8 +1822,8 @@ class GroupMetadataManagerTest {
// expect the offset tombstone
EasyMock.reset(partition)
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
- origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
- .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+ origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+ EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
EasyMock.replay(partition)
groupMetadataManager.cleanupGroupMetadata()
@@ -1948,8 +1948,8 @@ class GroupMetadataManagerTest {
// expect the offset tombstone
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
- origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
- .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+ origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt(),
+ EasyMock.anyObject())).andReturn(LogAppendInfo.UnknownLogAppendInfo)
EasyMock.expectLastCall().times(1)
EasyMock.replay(partition)
@@ -2270,6 +2270,7 @@ class GroupMetadataManagerTest {
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+ EasyMock.anyObject(),
EasyMock.anyObject())
)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
@@ -2286,6 +2287,7 @@ class GroupMetadataManagerTest {
EasyMock.capture(capturedRecords),
EasyMock.capture(capturedCallback),
EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
override def answer = capturedCallback.getValue.apply(
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index e1786d0..e02c2fe3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -18,12 +18,11 @@ package kafka.coordinator.transaction
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
-
import kafka.coordinator.AbstractCoordinatorConcurrencyTest
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.coordinator.transaction.TransactionCoordinatorConcurrencyTest._
import kafka.log.Log
-import kafka.server.{FetchDataInfo, FetchLogEnd, KafkaConfig, LogOffsetMetadata, MetadataCache}
+import kafka.server.{FetchDataInfo, FetchLogEnd, KafkaConfig, LogOffsetMetadata, MetadataCache, RequestLocal}
import kafka.utils.{Pool, TestUtils}
import org.apache.kafka.clients.{ClientResponse, NetworkClient}
import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
@@ -509,7 +508,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
class InitProducerIdOperation(val producerIdAndEpoch: Option[ProducerIdAndEpoch] = None) extends TxnOperation[InitProducerIdResult] {
override def run(txn: Transaction): Unit = {
- transactionCoordinator.handleInitProducerId(txn.transactionalId, 60000, producerIdAndEpoch, resultCallback)
+ transactionCoordinator.handleInitProducerId(txn.transactionalId, 60000, producerIdAndEpoch, resultCallback,
+ RequestLocal.withThreadConfinedCaching)
replicaManager.tryCompleteActions()
}
override def awaitAndVerify(txn: Transaction): Unit = {
@@ -526,7 +526,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
txnMetadata.producerId,
txnMetadata.producerEpoch,
partitions,
- resultCallback)
+ resultCallback,
+ RequestLocal.withThreadConfinedCaching)
replicaManager.tryCompleteActions()
}
}
@@ -544,7 +545,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
txnMetadata.producerId,
txnMetadata.producerEpoch,
transactionResult(txn),
- resultCallback)
+ resultCallback,
+ RequestLocal.withThreadConfinedCaching)
}
}
override def awaitAndVerify(txn: Transaction): Unit = {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index f6b5e54..38e8e71 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -118,6 +118,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.anyObject().asInstanceOf[TxnTransitMetadata],
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.NONE)).anyTimes()
EasyMock.replay(pidGenerator, transactionManager)
@@ -145,6 +146,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.anyObject().asInstanceOf[TxnTransitMetadata],
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.NONE)).anyTimes()
EasyMock.replay(pidGenerator, transactionManager)
@@ -169,6 +171,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.anyObject().asInstanceOf[TxnTransitMetadata],
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject()
)).andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.NONE))
@@ -314,6 +317,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.anyObject().asInstanceOf[TxnTransitMetadata],
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject()
))
@@ -572,6 +576,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.eq(originalMetadata.prepareAbortOrCommit(PrepareAbort, time.milliseconds())),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.NONE))
@@ -641,6 +646,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.eq(txnTransitMetadata),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => {
capturedErrorsCallback.getValue.apply(Errors.NOT_ENOUGH_REPLICAS)
@@ -652,6 +658,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.eq(txnTransitMetadata),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => {
capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -724,6 +731,7 @@ class TransactionCoordinatorTest {
txnStartTimestamp = time.milliseconds(),
txnLastUpdateTimestamp = time.milliseconds())),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.NONE))
@@ -790,6 +798,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.anyObject().asInstanceOf[TxnTransitMetadata],
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => {
capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -827,6 +836,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.capture(capturedTxnTransitMetadata),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => {
capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -867,6 +877,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.capture(capturedTxnTransitMetadata),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => {
capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -910,6 +921,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.capture(capturedTxnTransitMetadata),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => {
capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -963,6 +975,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.eq(expectedTransition),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => {}).once()
@@ -1046,6 +1059,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.eq(expectedTransition),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.NOT_ENOUGH_REPLICAS)).once()
@@ -1179,6 +1193,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.capture(capturedNewMetadata),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject()
)).andAnswer(() => {
metadata.completeTransitionTo(capturedNewMetadata.getValue)
@@ -1213,6 +1228,7 @@ class TransactionCoordinatorTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.eq(transition),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => {
if (runCallback)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 8aa07c6..0a0ec51 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -107,6 +107,7 @@ class TransactionMarkerChannelManagerTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.eq(expectedTransition),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject()))
.andAnswer(() => {
txnMetadata2.completeTransitionTo(expectedTransition)
@@ -345,6 +346,7 @@ class TransactionMarkerChannelManagerTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.eq(txnTransitionMetadata2),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject()))
.andAnswer(() => {
txnMetadata2.completeTransitionTo(txnTransitionMetadata2)
@@ -392,6 +394,7 @@ class TransactionMarkerChannelManagerTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.eq(txnTransitionMetadata2),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject()))
.andAnswer(() => {
txnMetadata2.pendingState = None
@@ -439,6 +442,7 @@ class TransactionMarkerChannelManagerTest {
EasyMock.eq(coordinatorEpoch),
EasyMock.eq(txnTransitionMetadata2),
EasyMock.capture(capturedErrorsCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject()))
.andAnswer(() => capturedErrorsCallback.getValue.apply(Errors.COORDINATOR_NOT_AVAILABLE))
.andAnswer(() => {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index df57693..410d6e2 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.locks.ReentrantLock
import javax.management.ObjectName
import kafka.log.{AppendOrigin, Log}
-import kafka.server.{FetchDataInfo, FetchLogEnd, LogOffsetMetadata, ReplicaManager}
+import kafka.server.{FetchDataInfo, FetchLogEnd, LogOffsetMetadata, ReplicaManager, RequestLocal}
import kafka.utils.{MockScheduler, Pool, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
@@ -319,7 +319,7 @@ class TransactionStateManagerTest {
new TopicPartition("topic1", 1)), time.milliseconds())
// append the new metadata into log
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch, newMetadata, assertCallback)
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch, newMetadata, assertCallback, requestLocal = RequestLocal.withThreadConfinedCaching)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
@@ -334,25 +334,26 @@ class TransactionStateManagerTest {
var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+ val requestLocal = RequestLocal.withThreadConfinedCaching
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS)
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT)
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
}
@@ -366,25 +367,26 @@ class TransactionStateManagerTest {
var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
prepareForTxnMessageAppend(Errors.NOT_LEADER_OR_FOLLOWER)
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+ val requestLocal = RequestLocal.withThreadConfinedCaching
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
prepareForTxnMessageAppend(Errors.NONE)
transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
prepareForTxnMessageAppend(Errors.NONE)
transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch + 1, new Pool[String, TransactionMetadata]())
transactionManager.putTransactionStateIfNotExists(txnMetadata1)
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
prepareForTxnMessageAppend(Errors.NONE)
transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]())
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
}
@Test
@@ -398,7 +400,7 @@ class TransactionStateManagerTest {
prepareForTxnMessageAppend(Errors.NONE)
transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
transactionManager.addLoadingPartition(partitionId, coordinatorEpoch + 1)
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = RequestLocal.withThreadConfinedCaching)
}
@Test
@@ -410,13 +412,14 @@ class TransactionStateManagerTest {
var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE)
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+ val requestLocal = RequestLocal.withThreadConfinedCaching
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE)
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, requestLocal = requestLocal)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
}
@@ -430,7 +433,7 @@ class TransactionStateManagerTest {
val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, _ => true)
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, _ => true, RequestLocal.withThreadConfinedCaching)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertEquals(Some(Ongoing), txnMetadata1.pendingState)
}
@@ -452,7 +455,7 @@ class TransactionStateManagerTest {
txnMetadata1.producerEpoch = (txnMetadata1.producerEpoch + 1).toShort
// append the new metadata into log
- transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, newMetadata, assertCallback)
+ transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, newMetadata, assertCallback, requestLocal = RequestLocal.withThreadConfinedCaching)
}
@Test
@@ -472,7 +475,7 @@ class TransactionStateManagerTest {
// append the new metadata into log
assertThrows(classOf[IllegalStateException], () => transactionManager.appendTransactionToLog(transactionalId1,
- coordinatorEpoch = 10, newMetadata, assertCallback))
+ coordinatorEpoch = 10, newMetadata, assertCallback, requestLocal = RequestLocal.withThreadConfinedCaching))
}
@Test
@@ -719,6 +722,7 @@ class TransactionStateManagerTest {
EasyMock.eq(recordsByPartition),
EasyMock.capture(capturedArgument),
EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+ EasyMock.anyObject(),
EasyMock.anyObject()
)).andAnswer(() => capturedArgument.getValue.apply(
Map(partition -> new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
@@ -824,6 +828,7 @@ class TransactionStateManagerTest {
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => capturedArgument.getValue.apply(
Map(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) ->
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index b0d4e3d..af585bf 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -18,13 +18,12 @@ package kafka.log
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
-
import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1}
import kafka.common.{LongRef, RecordValidationException}
import kafka.log.LogValidator.ValidationAndOffsetAssignResult
import kafka.message._
import kafka.metrics.KafkaYammerMetrics
-import kafka.server.BrokerTopicStats
+import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils.TestUtils.meterCount
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
@@ -128,8 +127,8 @@ class LogValidatorTest {
RecordBatch.NO_PRODUCER_EPOCH,
origin = AppendOrigin.Client,
KAFKA_2_3_IV1,
- brokerTopicStats
- )
+ brokerTopicStats,
+ RequestLocal.withThreadConfinedCaching)
}
@Test
@@ -160,7 +159,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
val validatedRecords = validatedResults.validatedRecords
assertEquals(records.records.asScala.size, validatedRecords.records.asScala.size, "message set size should not change")
validatedRecords.batches.forEach(batch => validateLogAppendTime(now, 1234L, batch))
@@ -199,7 +199,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
val validatedRecords = validatedResults.validatedRecords
assertEquals(records.records.asScala.size, validatedRecords.records.asScala.size,
@@ -247,7 +248,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
val validatedRecords = validatedResults.validatedRecords
assertEquals(records.records.asScala.size, validatedRecords.records.asScala.size,
@@ -309,7 +311,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
}
@Test
@@ -353,7 +356,8 @@ class LogValidatorTest {
partitionLeaderEpoch = partitionLeaderEpoch,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
val validatedRecords = validatingResults.validatedRecords
var i = 0
@@ -425,7 +429,8 @@ class LogValidatorTest {
partitionLeaderEpoch = partitionLeaderEpoch,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
val validatedRecords = validatingResults.validatedRecords
var i = 0
@@ -481,7 +486,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
val validatedRecords = validatedResults.validatedRecords
for (batch <- validatedRecords.batches.asScala) {
@@ -526,7 +532,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
val validatedRecords = validatedResults.validatedRecords
for (batch <- validatedRecords.batches.asScala) {
@@ -583,7 +590,8 @@ class LogValidatorTest {
partitionLeaderEpoch = partitionLeaderEpoch,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
val validatedRecords = validatedResults.validatedRecords
var i = 0
@@ -636,7 +644,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats))
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching))
}
@Test
@@ -659,7 +668,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats))
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching))
}
@Test
@@ -682,7 +692,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats))
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching))
}
@Test
@@ -705,7 +716,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats))
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching))
}
@Test
@@ -727,7 +739,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
}
@Test
@@ -749,7 +762,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
}
@Test
@@ -772,7 +786,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords
checkOffsets(messageWithOffset, offset)
}
@@ -796,7 +811,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords
checkOffsets(messageWithOffset, offset)
}
@@ -821,7 +837,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords
checkOffsets(compressedMessagesWithOffset, offset)
}
@@ -846,7 +863,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords
checkOffsets(compressedMessagesWithOffset, offset)
}
@@ -869,7 +887,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
checkOffsets(validatedResults.validatedRecords, offset)
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
compressed = false)
@@ -894,7 +913,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
checkOffsets(validatedResults.validatedRecords, offset)
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
compressed = false)
@@ -919,7 +939,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
checkOffsets(validatedResults.validatedRecords, offset)
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
compressed = true)
@@ -944,7 +965,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
checkOffsets(validatedResults.validatedRecords, offset)
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
compressed = true)
@@ -969,7 +991,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats))
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching))
}
@Test
@@ -991,7 +1014,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Coordinator,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
val batches = TestUtils.toList(result.validatedRecords.batches)
assertEquals(1, batches.size)
val batch = batches.get(0)
@@ -1018,7 +1042,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
}
@Test
@@ -1041,7 +1066,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
}
@Test
@@ -1063,7 +1089,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
}
@Test
@@ -1085,7 +1112,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
}
@Test
@@ -1108,7 +1136,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
}
@Test
@@ -1131,7 +1160,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
}
@Test
@@ -1156,7 +1186,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats))
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching))
}
@Test
@@ -1181,7 +1212,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats))
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching))
}
@Test
@@ -1204,7 +1236,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
}
@Test
@@ -1227,7 +1260,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats).validatedRecords, offset)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching).validatedRecords, offset)
}
@Test
@@ -1248,7 +1282,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
)
assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec}")), 1)
assertTrue(meterCount(s"${BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec}") > 0)
@@ -1278,7 +1313,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = KAFKA_2_0_IV1,
- brokerTopicStats = brokerTopicStats))
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching))
}
@Test
@@ -1312,7 +1348,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats)
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching)
)
assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
@@ -1390,7 +1427,8 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
origin = AppendOrigin.Client,
interBrokerProtocolVersion = ApiVersion.latestVersion,
- brokerTopicStats = brokerTopicStats))
+ brokerTopicStats = brokerTopicStats,
+ requestLocal = RequestLocal.withThreadConfinedCaching))
}
private def createRecords(magicValue: Byte,
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index b0fa2b3..418ee31 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -274,7 +274,8 @@ class ControllerApisTest {
val request = buildRequest(brokerRegistrationRequest)
val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse])
- createControllerApis(Some(createDenyAllAuthorizer()), mock(classOf[Controller])).handle(request)
+ createControllerApis(Some(createDenyAllAuthorizer()), mock(classOf[Controller])).handle(request,
+ RequestLocal.withThreadConfinedCaching)
verify(requestChannel).sendResponse(
ArgumentMatchers.eq(request),
capturedResponse.capture(),
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index bd5de08..46f4c1b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -313,7 +313,7 @@ class KafkaApisTest {
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
adminManager, controller)
- createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request)
+ createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
assertEquals(Some(request), capturedRequest.getValue.envelope)
val innerResponse = capturedResponse.getValue.asInstanceOf[AlterConfigsResponse]
@@ -339,7 +339,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(request)
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller)
- createKafkaApis(enableForwarding = true).handle(request)
+ createKafkaApis(enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
val response = capturedResponse.getValue.asInstanceOf[EnvelopeResponse]
assertEquals(Errors.INVALID_REQUEST, response.error())
@@ -407,7 +407,7 @@ class KafkaApisTest {
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
adminManager, controller)
- createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request)
+ createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
if (!shouldCloseConnection) {
val response = capturedResponse.getValue.asInstanceOf[EnvelopeResponse]
@@ -482,7 +482,7 @@ class KafkaApisTest {
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller, forwardingManager)
- createKafkaApis(enableForwarding = true).handle(request)
+ createKafkaApis(enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
EasyMock.verify(controller, forwardingManager)
}
@@ -1078,7 +1078,7 @@ class KafkaApisTest {
val request = buildRequest(offsetCommitRequest)
val capturedResponse = expectNoThrottling(request)
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
- createKafkaApis().handleOffsetCommitRequest(request)
+ createKafkaApis().handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
val response = capturedResponse.getValue.asInstanceOf[OffsetCommitResponse]
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
@@ -1110,7 +1110,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(request)
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
- createKafkaApis().handleTxnOffsetCommitRequest(request)
+ createKafkaApis().handleTxnOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
val response = capturedResponse.getValue.asInstanceOf[TxnOffsetCommitResponse]
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
@@ -1147,6 +1147,7 @@ class KafkaApisTest {
).build(version.toShort)
val request = buildRequest(offsetCommitRequest)
+ val requestLocal = RequestLocal.withThreadConfinedCaching
EasyMock.expect(groupCoordinator.handleTxnCommitOffsets(
EasyMock.eq(groupId),
EasyMock.eq(producerId),
@@ -1155,7 +1156,8 @@ class KafkaApisTest {
EasyMock.eq(Option.empty),
EasyMock.anyInt(),
EasyMock.anyObject(),
- EasyMock.capture(responseCallback)
+ EasyMock.capture(responseCallback),
+ EasyMock.eq(requestLocal)
)).andAnswer(
() => responseCallback.getValue.apply(Map(topicPartition -> Errors.COORDINATOR_LOAD_IN_PROGRESS)))
@@ -1167,7 +1169,7 @@ class KafkaApisTest {
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator)
- createKafkaApis().handleTxnOffsetCommitRequest(request)
+ createKafkaApis().handleTxnOffsetCommitRequest(request, requestLocal)
val response = capturedResponse.getValue.asInstanceOf[TxnOffsetCommitResponse]
@@ -1219,11 +1221,13 @@ class KafkaApisTest {
else
Option(new ProducerIdAndEpoch(producerId, epoch))
+ val requestLocal = RequestLocal.withThreadConfinedCaching
EasyMock.expect(txnCoordinator.handleInitProducerId(
EasyMock.eq(transactionalId),
EasyMock.eq(txnTimeoutMs),
EasyMock.eq(expectedProducerIdAndEpoch),
- EasyMock.capture(responseCallback)
+ EasyMock.capture(responseCallback),
+ EasyMock.eq(requestLocal)
)).andAnswer(
() => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED)))
@@ -1235,7 +1239,7 @@ class KafkaApisTest {
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
- createKafkaApis().handleInitProducerIdRequest(request)
+ createKafkaApis().handleInitProducerIdRequest(request, requestLocal)
val response = capturedResponse.getValue.asInstanceOf[InitProducerIdResponse]
@@ -1278,12 +1282,14 @@ class KafkaApisTest {
EasyMock.eq(groupId)
)).andReturn(partition)
+ val requestLocal = RequestLocal.withThreadConfinedCaching
EasyMock.expect(txnCoordinator.handleAddPartitionsToTransaction(
EasyMock.eq(transactionalId),
EasyMock.eq(producerId),
EasyMock.eq(epoch),
EasyMock.eq(Set(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))),
- EasyMock.capture(responseCallback)
+ EasyMock.capture(responseCallback),
+ EasyMock.eq(requestLocal)
)).andAnswer(
() => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
@@ -1295,7 +1301,7 @@ class KafkaApisTest {
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, groupCoordinator)
- createKafkaApis().handleAddOffsetsToTxnRequest(request)
+ createKafkaApis().handleAddOffsetsToTxnRequest(request, requestLocal)
val response = capturedResponse.getValue.asInstanceOf[AddOffsetsToTxnResponse]
@@ -1334,13 +1340,14 @@ class KafkaApisTest {
).build(version.toShort)
val request = buildRequest(addPartitionsToTxnRequest)
+ val requestLocal = RequestLocal.withThreadConfinedCaching
EasyMock.expect(txnCoordinator.handleAddPartitionsToTransaction(
EasyMock.eq(transactionalId),
EasyMock.eq(producerId),
EasyMock.eq(epoch),
EasyMock.eq(Set(topicPartition)),
-
- EasyMock.capture(responseCallback)
+ EasyMock.capture(responseCallback),
+ EasyMock.eq(requestLocal)
)).andAnswer(
() => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
@@ -1352,7 +1359,7 @@ class KafkaApisTest {
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
- createKafkaApis().handleAddPartitionToTxnRequest(request)
+ createKafkaApis().handleAddPartitionToTxnRequest(request, requestLocal)
val response = capturedResponse.getValue.asInstanceOf[AddPartitionsToTxnResponse]
@@ -1388,12 +1395,14 @@ class KafkaApisTest {
).build(version.toShort)
val request = buildRequest(endTxnRequest)
+ val requestLocal = RequestLocal.withThreadConfinedCaching
EasyMock.expect(txnCoordinator.handleEndTransaction(
EasyMock.eq(transactionalId),
EasyMock.eq(producerId),
EasyMock.eq(epoch),
EasyMock.eq(TransactionResult.COMMIT),
- EasyMock.capture(responseCallback)
+ EasyMock.capture(responseCallback),
+ EasyMock.eq(requestLocal)
)).andAnswer(
() => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
@@ -1404,7 +1413,7 @@ class KafkaApisTest {
))
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
- createKafkaApis().handleEndTxnRequest(request)
+ createKafkaApis().handleEndTxnRequest(request, requestLocal)
val response = capturedResponse.getValue.asInstanceOf[EndTxnResponse]
@@ -1449,6 +1458,7 @@ class KafkaApisTest {
EasyMock.anyObject(),
EasyMock.capture(responseCallback),
EasyMock.anyObject(),
+ EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(() => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))))
@@ -1458,7 +1468,7 @@ class KafkaApisTest {
EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
- createKafkaApis().handleProduceRequest(request)
+ createKafkaApis().handleProduceRequest(request, RequestLocal.withThreadConfinedCaching)
val response = capturedResponse.getValue.asInstanceOf[ProduceResponse]
@@ -1486,7 +1496,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(request)
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
- createKafkaApis().handleAddPartitionToTxnRequest(request)
+ createKafkaApis().handleAddPartitionToTxnRequest(request, RequestLocal.withThreadConfinedCaching)
val response = capturedResponse.getValue.asInstanceOf[AddPartitionsToTxnResponse]
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
@@ -1498,27 +1508,32 @@ class KafkaApisTest {
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
- assertThrows(classOf[UnsupportedVersionException], () => createKafkaApis(KAFKA_0_10_2_IV0).handleAddOffsetsToTxnRequest(null))
+ assertThrows(classOf[UnsupportedVersionException],
+ () => createKafkaApis(KAFKA_0_10_2_IV0).handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
}
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
- assertThrows(classOf[UnsupportedVersionException], () => createKafkaApis(KAFKA_0_10_2_IV0).handleAddPartitionToTxnRequest(null))
+ assertThrows(classOf[UnsupportedVersionException],
+ () => createKafkaApis(KAFKA_0_10_2_IV0).handleAddPartitionToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
}
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported(): Unit = {
- assertThrows(classOf[UnsupportedVersionException], () => createKafkaApis(KAFKA_0_10_2_IV0).handleAddPartitionToTxnRequest(null))
+ assertThrows(classOf[UnsupportedVersionException],
+ () => createKafkaApis(KAFKA_0_10_2_IV0).handleAddPartitionToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
}
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
- assertThrows(classOf[UnsupportedVersionException], () => createKafkaApis(KAFKA_0_10_2_IV0).handleEndTxnRequest(null))
+ assertThrows(classOf[UnsupportedVersionException],
+ () => createKafkaApis(KAFKA_0_10_2_IV0).handleEndTxnRequest(null, RequestLocal.withThreadConfinedCaching))
}
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported(): Unit = {
- assertThrows(classOf[UnsupportedVersionException], () => createKafkaApis(KAFKA_0_10_2_IV0).handleWriteTxnMarkersRequest(null))
+ assertThrows(classOf[UnsupportedVersionException],
+ () => createKafkaApis(KAFKA_0_10_2_IV0).handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching))
}
@Test
@@ -1537,7 +1552,7 @@ class KafkaApisTest {
))
EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
- createKafkaApis().handleWriteTxnMarkersRequest(request)
+ createKafkaApis().handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching)
val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
@@ -1559,7 +1574,7 @@ class KafkaApisTest {
))
EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
- createKafkaApis().handleWriteTxnMarkersRequest(request)
+ createKafkaApis().handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching)
val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
@@ -1580,6 +1595,7 @@ class KafkaApisTest {
EasyMock.expect(replicaManager.getMagic(tp2))
.andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+ val requestLocal = RequestLocal.withThreadConfinedCaching
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
EasyMock.anyShort(),
EasyMock.eq(true),
@@ -1587,7 +1603,8 @@ class KafkaApisTest {
EasyMock.anyObject(),
EasyMock.capture(responseCallback),
EasyMock.anyObject(),
- EasyMock.anyObject())
+ EasyMock.anyObject(),
+ EasyMock.eq(requestLocal))
).andAnswer(() => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
EasyMock.expect(requestChannel.sendResponse(
@@ -1597,7 +1614,7 @@ class KafkaApisTest {
))
EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
- createKafkaApis().handleWriteTxnMarkersRequest(request)
+ createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal)
val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
@@ -1719,6 +1736,7 @@ class KafkaApisTest {
EasyMock.expect(replicaManager.getMagic(tp2))
.andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+ val requestLocal = RequestLocal.withThreadConfinedCaching
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
EasyMock.anyShort(),
EasyMock.eq(true),
@@ -1726,7 +1744,8 @@ class KafkaApisTest {
EasyMock.anyObject(),
EasyMock.capture(responseCallback),
EasyMock.anyObject(),
- EasyMock.anyObject())
+ EasyMock.anyObject(),
+ EasyMock.eq(requestLocal))
).andAnswer(() => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
EasyMock.expect(requestChannel.sendResponse(
@@ -1736,7 +1755,7 @@ class KafkaApisTest {
))
EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
- createKafkaApis().handleWriteTxnMarkersRequest(request)
+ createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal)
val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
@@ -1750,6 +1769,7 @@ class KafkaApisTest {
EasyMock.expect(replicaManager.getMagic(topicPartition))
.andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+ val requestLocal = RequestLocal.withThreadConfinedCaching
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
EasyMock.anyShort(),
EasyMock.eq(true),
@@ -1757,11 +1777,12 @@ class KafkaApisTest {
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.anyObject(),
- EasyMock.anyObject()))
+ EasyMock.anyObject(),
+ EasyMock.eq(requestLocal)))
EasyMock.replay(replicaManager)
- createKafkaApis().handleWriteTxnMarkersRequest(request)
+ createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal)
EasyMock.verify(replicaManager)
}
@@ -1857,6 +1878,7 @@ class KafkaApisTest {
).build()
val request = buildRequest(offsetDeleteRequest)
+ val requestLocal = RequestLocal.withThreadConfinedCaching
val capturedResponse = expectNoThrottling(request)
EasyMock.expect(groupCoordinator.handleDeleteOffsets(
EasyMock.eq(group),
@@ -1865,7 +1887,8 @@ class KafkaApisTest {
new TopicPartition("topic-1", 1),
new TopicPartition("topic-2", 0),
new TopicPartition("topic-2", 1)
- ))
+ )),
+ EasyMock.eq(requestLocal)
)).andReturn((Errors.NONE, Map(
new TopicPartition("topic-1", 0) -> Errors.NONE,
new TopicPartition("topic-1", 1) -> Errors.NONE,
@@ -1875,7 +1898,7 @@ class KafkaApisTest {
EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
- createKafkaApis().handleOffsetDeleteRequest(request)
+ createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
val response = capturedResponse.getValue.asInstanceOf[OffsetDeleteResponse]
@@ -1912,11 +1935,12 @@ class KafkaApisTest {
val request = buildRequest(offsetDeleteRequest)
val capturedResponse = expectNoThrottling(request)
- EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty)))
- .andReturn((Errors.NONE, Map.empty))
+ val requestLocal = RequestLocal.withThreadConfinedCaching
+ EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty),
+ EasyMock.eq(requestLocal))).andReturn((Errors.NONE, Map.empty))
EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
- createKafkaApis().handleOffsetDeleteRequest(request)
+ createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
val response = capturedResponse.getValue.asInstanceOf[OffsetDeleteResponse]
@@ -1941,11 +1965,12 @@ class KafkaApisTest {
val request = buildRequest(offsetDeleteRequest)
val capturedResponse = expectNoThrottling(request)
- EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty)))
- .andReturn((Errors.GROUP_ID_NOT_FOUND, Map.empty))
+ val requestLocal = RequestLocal.withThreadConfinedCaching
+ EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty),
+ EasyMock.eq(requestLocal))).andReturn((Errors.GROUP_ID_NOT_FOUND, Map.empty))
EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
- createKafkaApis().handleOffsetDeleteRequest(request)
+ createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
val response = capturedResponse.getValue.asInstanceOf[OffsetDeleteResponse]
@@ -2174,6 +2199,7 @@ class KafkaApisTest {
EasyMock.eq(sessionTimeoutMs),
EasyMock.eq(protocolType),
EasyMock.capture(capturedProtocols),
+ anyObject(),
anyObject()
))
@@ -2193,7 +2219,8 @@ class KafkaApisTest {
.setName(name).setMetadata(protocol)
}.iterator.asJava))
).build()
- ))
+ ),
+ RequestLocal.withThreadConfinedCaching)
EasyMock.verify(groupCoordinator)
@@ -2234,7 +2261,8 @@ class KafkaApisTest {
EasyMock.eq(sessionTimeoutMs),
EasyMock.eq(protocolType),
EasyMock.eq(List.empty),
- EasyMock.capture(capturedCallback)
+ EasyMock.capture(capturedCallback),
+ EasyMock.anyObject()
))
val joinGroupRequest = new JoinGroupRequest.Builder(
@@ -2250,7 +2278,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(requestChannelRequest)
EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
- createKafkaApis().handleJoinGroupRequest(requestChannelRequest)
+ createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
EasyMock.verify(groupCoordinator)
@@ -2304,7 +2332,8 @@ class KafkaApisTest {
EasyMock.eq(sessionTimeoutMs),
EasyMock.eq(protocolType),
EasyMock.eq(List.empty),
- EasyMock.capture(capturedCallback)
+ EasyMock.capture(capturedCallback),
+ EasyMock.anyObject()
))
val joinGroupRequest = new JoinGroupRequest.Builder(
@@ -2320,7 +2349,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(requestChannelRequest)
EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
- createKafkaApis().handleJoinGroupRequest(requestChannelRequest)
+ createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
EasyMock.verify(groupCoordinator)
@@ -2364,6 +2393,7 @@ class KafkaApisTest {
val capturedCallback = EasyMock.newCapture[SyncGroupCallback]()
+ val requestLocal = RequestLocal.withThreadConfinedCaching
EasyMock.expect(groupCoordinator.handleSyncGroup(
EasyMock.eq(groupId),
EasyMock.eq(0),
@@ -2372,7 +2402,8 @@ class KafkaApisTest {
EasyMock.eq(if (version >= 5) Some(protocolName) else None),
EasyMock.eq(None),
EasyMock.eq(Map.empty),
- EasyMock.capture(capturedCallback)
+ EasyMock.capture(capturedCallback),
+ EasyMock.eq(requestLocal)
))
val syncGroupRequest = new SyncGroupRequest.Builder(
@@ -2388,7 +2419,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(requestChannelRequest)
EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
- createKafkaApis().handleSyncGroupRequest(requestChannelRequest)
+ createKafkaApis().handleSyncGroupRequest(requestChannelRequest, requestLocal)
EasyMock.verify(groupCoordinator)
@@ -2425,6 +2456,7 @@ class KafkaApisTest {
val capturedCallback = EasyMock.newCapture[SyncGroupCallback]()
+ val requestLocal = RequestLocal.withThreadConfinedCaching
if (version < 5) {
EasyMock.expect(groupCoordinator.handleSyncGroup(
EasyMock.eq(groupId),
@@ -2434,7 +2466,8 @@ class KafkaApisTest {
EasyMock.eq(None),
EasyMock.eq(None),
EasyMock.eq(Map.empty),
- EasyMock.capture(capturedCallback)
+ EasyMock.capture(capturedCallback),
+ EasyMock.eq(requestLocal)
))
}
@@ -2449,7 +2482,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(requestChannelRequest)
EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
- createKafkaApis().handleSyncGroupRequest(requestChannelRequest)
+ createKafkaApis().handleSyncGroupRequest(requestChannelRequest, requestLocal)
EasyMock.verify(groupCoordinator)
@@ -2488,7 +2521,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(requestChannelRequest)
EasyMock.replay(clientRequestQuotaManager, requestChannel)
- createKafkaApis(KAFKA_2_2_IV1).handleJoinGroupRequest(requestChannelRequest)
+ createKafkaApis(KAFKA_2_2_IV1).handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
assertEquals(Errors.UNSUPPORTED_VERSION, response.error())
@@ -2509,7 +2542,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(requestChannelRequest)
EasyMock.replay(clientRequestQuotaManager, requestChannel)
- createKafkaApis(KAFKA_2_2_IV1).handleSyncGroupRequest(requestChannelRequest)
+ createKafkaApis(KAFKA_2_2_IV1).handleSyncGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
assertEquals(Errors.UNSUPPORTED_VERSION, response.error)
@@ -2561,7 +2594,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(requestChannelRequest)
EasyMock.replay(clientRequestQuotaManager, requestChannel)
- createKafkaApis(KAFKA_2_2_IV1).handleOffsetCommitRequest(requestChannelRequest)
+ createKafkaApis(KAFKA_2_2_IV1).handleOffsetCommitRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
val expectedTopicErrors = Collections.singletonList(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
@@ -2688,7 +2721,7 @@ class KafkaApisTest {
replay(replicaManager, fetchManager, clientQuotaManager, requestChannel, replicaQuotaManager, partition)
- createKafkaApis().handle(fetchFromFollower)
+ createKafkaApis().handle(fetchFromFollower, RequestLocal.withThreadConfinedCaching)
if (isReassigning)
assertEquals(records.sizeInBytes(), brokerTopicStats.allTopicsStats.reassignmentBytesOutPerSec.get.count())
@@ -2712,7 +2745,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(requestChannelRequest)
EasyMock.replay(clientRequestQuotaManager, requestChannel)
- createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest)
+ createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
val response = capturedResponse.getValue.asInstanceOf[InitProducerIdResponse]
assertEquals(Errors.INVALID_REQUEST, response.error)
@@ -2731,7 +2764,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(requestChannelRequest)
EasyMock.replay(clientRequestQuotaManager, requestChannel)
- createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest)
+ createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
val response = capturedResponse.getValue.asInstanceOf[InitProducerIdResponse]
assertEquals(Errors.INVALID_REQUEST, response.error)
@@ -2776,7 +2809,7 @@ class KafkaApisTest {
))
EasyMock.replay(replicaManager, controller, requestChannel)
- createKafkaApis().handleUpdateMetadataRequest(request)
+ createKafkaApis().handleUpdateMetadataRequest(request, RequestLocal.withThreadConfinedCaching)
val updateMetadataResponse = capturedResponse.getValue.asInstanceOf[UpdateMetadataResponse]
assertEquals(expectedError, updateMetadataResponse.error())
EasyMock.verify(replicaManager)
@@ -3754,7 +3787,7 @@ class KafkaApisTest {
@Test
def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
- verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest)
+ verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching))
}
@Test
@@ -3772,7 +3805,7 @@ class KafkaApisTest {
@Test
def testRaftShouldNeverHandleEnvelope(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
- verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleEnvelope)
+ verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 99928f0..65ab81f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2194,7 +2194,7 @@ class ReplicaManagerTest {
val batch = TestUtils.records(records = List(
new SimpleRecord(10, "k1".getBytes, "v1".getBytes),
new SimpleRecord(11, "k2".getBytes, "v2".getBytes)))
- partition.appendRecordsToLeader(batch, AppendOrigin.Client, requiredAcks = 0)
+ partition.appendRecordsToLeader(batch, AppendOrigin.Client, requiredAcks = 0, RequestLocal.withThreadConfinedCaching)
partition.log.get.updateHighWatermark(2L)
partition.log.get.maybeIncrementLogStartOffset(1L, LeaderOffsetIncremented)
replicaManager.logManager.checkpointLogRecoveryOffsets()
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 89ba5f1..ccccb8b 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -94,7 +94,7 @@ class BrokerMetadataListenerTest {
verify(groupCoordinator).handleDeletedPartitions(ArgumentMatchers.argThat[Seq[TopicPartition]] { partitions =>
partitions.toSet == partitionSet(topic, numPartitions)
- })
+ }, any)
val deleteImageCapture: ArgumentCaptor[MetadataImageBuilder] =
ArgumentCaptor.forClass(classOf[MetadataImageBuilder])
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
index 30f908e..e9910da 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
@@ -17,9 +17,9 @@
package org.apache.kafka.jmh.record;
import kafka.server.BrokerTopicStats;
+import kafka.server.RequestLocal;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.AbstractRecords;
-import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
@@ -74,7 +74,7 @@ public abstract class BaseRecordBatchBenchmark {
// Used by measureVariableBatchSize
ByteBuffer[] batchBuffers;
- BufferSupplier bufferSupplier;
+ RequestLocal requestLocal;
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
@Setup
@@ -85,9 +85,9 @@ public abstract class BaseRecordBatchBenchmark {
startingOffset = messageVersion == 2 ? 0 : 42;
if (bufferSupplierStr.equals("NO_CACHING")) {
- bufferSupplier = BufferSupplier.NO_CACHING;
+ requestLocal = RequestLocal.NoCaching();
} else if (bufferSupplierStr.equals("CREATE")) {
- bufferSupplier = BufferSupplier.create();
+ requestLocal = RequestLocal.withThreadConfinedCaching();
} else {
throw new IllegalArgumentException("Unsupported buffer supplier " + bufferSupplierStr);
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java
index f176e06..24ac53e 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java
@@ -59,6 +59,7 @@ public class CompressedRecordBatchValidationBenchmark extends BaseRecordBatchBen
false, messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, 0,
new AppendOrigin.Client$(),
ApiVersion.latestVersion(),
- brokerTopicStats);
+ brokerTopicStats,
+ requestLocal);
}
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
index 8aaa2d5..c331cd5 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
@@ -32,8 +32,6 @@ import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
-import java.io.IOException;
-
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@@ -49,9 +47,9 @@ public class RecordBatchIterationBenchmark extends BaseRecordBatchBenchmark {
}
@Benchmark
- public void measureIteratorForBatchWithSingleMessage(Blackhole bh) throws IOException {
+ public void measureIteratorForBatchWithSingleMessage(Blackhole bh) {
for (RecordBatch batch : MemoryRecords.readableRecords(singleBatchBuffer.duplicate()).batches()) {
- try (CloseableIterator<Record> iterator = batch.streamingIterator(bufferSupplier)) {
+ try (CloseableIterator<Record> iterator = batch.streamingIterator(requestLocal.bufferSupplier())) {
while (iterator.hasNext())
bh.consume(iterator.next());
}
@@ -61,10 +59,10 @@ public class RecordBatchIterationBenchmark extends BaseRecordBatchBenchmark {
@OperationsPerInvocation(value = batchCount)
@Fork(jvmArgsAppend = "-Xmx8g")
@Benchmark
- public void measureStreamingIteratorForVariableBatchSize(Blackhole bh) throws IOException {
+ public void measureStreamingIteratorForVariableBatchSize(Blackhole bh) {
for (int i = 0; i < batchCount; ++i) {
for (RecordBatch batch : MemoryRecords.readableRecords(batchBuffers[i].duplicate()).batches()) {
- try (CloseableIterator<Record> iterator = batch.streamingIterator(bufferSupplier)) {
+ try (CloseableIterator<Record> iterator = batch.streamingIterator(requestLocal.bufferSupplier())) {
while (iterator.hasNext())
bh.consume(iterator.next());
}
@@ -75,10 +73,10 @@ public class RecordBatchIterationBenchmark extends BaseRecordBatchBenchmark {
@OperationsPerInvocation(value = batchCount)
@Fork(jvmArgsAppend = "-Xmx8g")
@Benchmark
- public void measureSkipIteratorForVariableBatchSize(Blackhole bh) throws IOException {
+ public void measureSkipIteratorForVariableBatchSize(Blackhole bh) {
for (int i = 0; i < batchCount; ++i) {
for (MutableRecordBatch batch : MemoryRecords.readableRecords(batchBuffers[i].duplicate()).batches()) {
- try (CloseableIterator<Record> iterator = batch.skipKeyValueIterator(bufferSupplier)) {
+ try (CloseableIterator<Record> iterator = batch.skipKeyValueIterator(requestLocal.bufferSupplier())) {
while (iterator.hasNext())
bh.consume(iterator.next());
}