You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by di...@apache.org on 2024/01/08 09:49:02 UTC
(kafka) branch 3.7 updated: KAFKA-16059: close more kafkaApis instances (#15132)
This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push:
new 5cc08721350 KAFKA-16059: close more kafkaApis instances (#15132)
5cc08721350 is described below
commit 5cc08721350f56e410d622c81bc932154afdca83
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Sat Jan 6 23:00:20 2024 +0900
KAFKA-16059: close more kafkaApis instances (#15132)
Reviewers: Divij Vaidya <di...@amazon.com>, Justine Olshan <jo...@confluent.io>
---
.../scala/unit/kafka/server/KafkaApisTest.scala | 320 +++++++++++++--------
1 file changed, 194 insertions(+), 126 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 1b590d527b5..81852f73d0a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -632,8 +632,8 @@ class KafkaApisTest extends Logging {
val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)
val requestBuilder = new DescribeQuorumRequest.Builder(requestData)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-
- testForwardableApi(kafkaApis = createKafkaApis(raftSupport = true),
+ kafkaApis = createKafkaApis(raftSupport = true)
+ testForwardableApi(kafkaApis = kafkaApis,
ApiKeys.DESCRIBE_QUORUM,
requestBuilder
)
@@ -644,14 +644,16 @@ class KafkaApisTest extends Logging {
requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]
): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- testForwardableApi(kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true),
+ kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true)
+ testForwardableApi(kafkaApis = kafkaApis,
apiKey,
requestBuilder
)
}
private def testForwardableApi(apiKey: ApiKeys, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Unit = {
- testForwardableApi(kafkaApis = createKafkaApis(enableForwarding = true),
+ kafkaApis = createKafkaApis(enableForwarding = true)
+ testForwardableApi(kafkaApis = kafkaApis,
apiKey,
requestBuilder
)
@@ -1668,12 +1670,16 @@ class KafkaApisTest extends Logging {
val request = buildRequest(offsetCommitRequest)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
- kafkaApis = createKafkaApis()
- kafkaApis.handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
-
- val response = verifyNoThrottling[OffsetCommitResponse](request)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
- Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode))
+ val kafkaApis = createKafkaApis()
+ try {
+ kafkaApis.handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
+
+ val response = verifyNoThrottling[OffsetCommitResponse](request)
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode))
+ } finally {
+ kafkaApis.close()
+ }
}
checkInvalidPartition(-1)
@@ -1700,11 +1706,15 @@ class KafkaApisTest extends Logging {
val request = buildRequest(offsetCommitRequest)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
- kafkaApis = createKafkaApis()
- kafkaApis.handleTxnOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
-
- val response = verifyNoThrottling[TxnOffsetCommitResponse](request)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
+ val kafkaApis = createKafkaApis()
+ try {
+ kafkaApis.handleTxnOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
+
+ val response = verifyNoThrottling[TxnOffsetCommitResponse](request)
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
+ } finally {
+ kafkaApis.close()
+ }
}
checkInvalidPartition(-1)
@@ -2060,20 +2070,24 @@ class KafkaApisTest extends Logging {
responseCallback.capture(),
ArgumentMatchers.eq(requestLocal)
)).thenAnswer(_ => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED)))
- kafkaApis = createKafkaApis()
- kafkaApis.handleInitProducerIdRequest(request, requestLocal)
-
- verify(requestChannel).sendResponse(
- ArgumentMatchers.eq(request),
- capturedResponse.capture(),
- ArgumentMatchers.eq(None)
- )
- val response = capturedResponse.getValue
+ val kafkaApis = createKafkaApis()
+ try {
+ kafkaApis.handleInitProducerIdRequest(request, requestLocal)
+
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val response = capturedResponse.getValue
- if (version < 4) {
- assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
- } else {
- assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
+ if (version < 4) {
+ assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
+ } else {
+ assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
+ }
+ } finally {
+ kafkaApis.close()
}
}
}
@@ -2118,20 +2132,24 @@ class KafkaApisTest extends Logging {
responseCallback.capture(),
ArgumentMatchers.eq(requestLocal)
)).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
- kafkaApis = createKafkaApis()
- kafkaApis.handleAddOffsetsToTxnRequest(request, requestLocal)
-
- verify(requestChannel).sendResponse(
- ArgumentMatchers.eq(request),
- capturedResponse.capture(),
- ArgumentMatchers.eq(None)
- )
- val response = capturedResponse.getValue
+ val kafkaApis = createKafkaApis()
+ try {
+ kafkaApis.handleAddOffsetsToTxnRequest(request, requestLocal)
+
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val response = capturedResponse.getValue
- if (version < 2) {
- assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
- } else {
- assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
+ if (version < 2) {
+ assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
+ } else {
+ assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
+ }
+ } finally {
+ kafkaApis.close()
}
}
}
@@ -2172,20 +2190,24 @@ class KafkaApisTest extends Logging {
responseCallback.capture(),
ArgumentMatchers.eq(requestLocal)
)).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
- kafkaApis = createKafkaApis()
- kafkaApis.handleAddPartitionsToTxnRequest(request, requestLocal)
-
- verify(requestChannel).sendResponse(
- ArgumentMatchers.eq(request),
- capturedResponse.capture(),
- ArgumentMatchers.eq(None)
- )
- val response = capturedResponse.getValue
+ val kafkaApis = createKafkaApis()
+ try {
+ kafkaApis.handleAddPartitionsToTxnRequest(request, requestLocal)
+
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val response = capturedResponse.getValue
- if (version < 2) {
- assertEquals(Collections.singletonMap(topicPartition, Errors.INVALID_PRODUCER_EPOCH), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID))
- } else {
- assertEquals(Collections.singletonMap(topicPartition, Errors.PRODUCER_FENCED), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID))
+ if (version < 2) {
+ assertEquals(Collections.singletonMap(topicPartition, Errors.INVALID_PRODUCER_EPOCH), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID))
+ } else {
+ assertEquals(Collections.singletonMap(topicPartition, Errors.PRODUCER_FENCED), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID))
+ }
+ } finally {
+ kafkaApis.close()
}
}
}
@@ -2415,20 +2437,24 @@ class KafkaApisTest extends Logging {
responseCallback.capture(),
ArgumentMatchers.eq(requestLocal)
)).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
- kafkaApis = createKafkaApis()
- kafkaApis.handleEndTxnRequest(request, requestLocal)
-
- verify(requestChannel).sendResponse(
- ArgumentMatchers.eq(request),
- capturedResponse.capture(),
- ArgumentMatchers.eq(None)
- )
- val response = capturedResponse.getValue
+ val kafkaApis = createKafkaApis()
+ try {
+ kafkaApis.handleEndTxnRequest(request, requestLocal)
+
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val response = capturedResponse.getValue
- if (version < 2) {
- assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
- } else {
- assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
+ if (version < 2) {
+ assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
+ } else {
+ assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
+ }
+ } finally {
+ kafkaApis.close()
}
}
}
@@ -2476,16 +2502,20 @@ class KafkaApisTest extends Logging {
any[Long])).thenReturn(0)
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
- kafkaApis = createKafkaApis()
- kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching)
-
- val response = verifyNoThrottling[ProduceResponse](request)
-
- assertEquals(1, response.data.responses.size)
- val topicProduceResponse = response.data.responses.asScala.head
- assertEquals(1, topicProduceResponse.partitionResponses.size)
- val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head
- assertEquals(Errors.INVALID_PRODUCER_EPOCH, Errors.forCode(partitionProduceResponse.errorCode))
+ val kafkaApis = createKafkaApis()
+ try {
+ kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching)
+
+ val response = verifyNoThrottling[ProduceResponse](request)
+
+ assertEquals(1, response.data.responses.size)
+ val topicProduceResponse = response.data.responses.asScala.head
+ assertEquals(1, topicProduceResponse.partitionResponses.size)
+ val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head
+ assertEquals(Errors.INVALID_PRODUCER_EPOCH, Errors.forCode(partitionProduceResponse.errorCode))
+ } finally {
+ kafkaApis.close()
+ }
}
}
@@ -2716,20 +2746,24 @@ class KafkaApisTest extends Logging {
.build(version.toShort)
val request = buildRequest(produceRequest)
- kafkaApis = createKafkaApis()
- kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching)
-
- verify(replicaManager).appendRecords(anyLong,
- anyShort,
- ArgumentMatchers.eq(false),
- ArgumentMatchers.eq(AppendOrigin.CLIENT),
- any(),
- responseCallback.capture(),
- any(),
- any(),
- any(),
- ArgumentMatchers.eq(transactionalId),
- any())
+ val kafkaApis = createKafkaApis()
+ try {
+ kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching)
+
+ verify(replicaManager).appendRecords(anyLong,
+ anyShort,
+ ArgumentMatchers.eq(false),
+ ArgumentMatchers.eq(AppendOrigin.CLIENT),
+ any(),
+ responseCallback.capture(),
+ any(),
+ any(),
+ any(),
+ ArgumentMatchers.eq(transactionalId),
+ any())
+ } finally {
+ kafkaApis.close()
+ }
}
}
@@ -2749,11 +2783,15 @@ class KafkaApisTest extends Logging {
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
- kafkaApis = createKafkaApis()
- kafkaApis.handleAddPartitionsToTxnRequest(request, RequestLocal.withThreadConfinedCaching)
-
- val response = verifyNoThrottling[AddPartitionsToTxnResponse](request)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(invalidTopicPartition))
+ val kafkaApis = createKafkaApis()
+ try {
+ kafkaApis.handleAddPartitionsToTxnRequest(request, RequestLocal.withThreadConfinedCaching)
+
+ val response = verifyNoThrottling[AddPartitionsToTxnResponse](request)
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(invalidTopicPartition))
+ } finally {
+ kafkaApis.close()
+ }
}
checkInvalidPartition(-1)
@@ -2762,32 +2800,37 @@ class KafkaApisTest extends Logging {
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
+ kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
- () => createKafkaApis(IBP_0_10_2_IV0).handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
+ () => kafkaApis.handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
}
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
+ kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
- () => createKafkaApis(IBP_0_10_2_IV0).handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
+ () => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
}
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported(): Unit = {
+ kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
- () => createKafkaApis(IBP_0_10_2_IV0).handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
+ () => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
}
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
+ kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
- () => createKafkaApis(IBP_0_10_2_IV0).handleEndTxnRequest(null, RequestLocal.withThreadConfinedCaching))
+ () => kafkaApis.handleEndTxnRequest(null, RequestLocal.withThreadConfinedCaching))
}
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported(): Unit = {
+ kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
- () => createKafkaApis(IBP_0_10_2_IV0).handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching))
+ () => kafkaApis.handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching))
}
@Test
@@ -3782,13 +3825,17 @@ class KafkaApisTest extends Logging {
)).thenReturn(CompletableFuture.completedFuture(
new OffsetDeleteResponseData()
))
- kafkaApis = createKafkaApis()
- kafkaApis.handleOffsetDeleteRequest(request, RequestLocal.NoCaching)
+ val kafkaApis = createKafkaApis()
+ try {
+ kafkaApis.handleOffsetDeleteRequest(request, RequestLocal.NoCaching)
- val response = verifyNoThrottling[OffsetDeleteResponse](request)
+ val response = verifyNoThrottling[OffsetDeleteResponse](request)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
- Errors.forCode(response.data.topics.find(topic).partitions.find(invalidPartitionId).errorCode))
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ Errors.forCode(response.data.topics.find(topic).partitions.find(invalidPartitionId).errorCode))
+ } finally {
+ kafkaApis.close()
+ }
}
checkInvalidPartition(-1)
@@ -4074,6 +4121,7 @@ class KafkaApisTest extends Logging {
assertEquals(authorizedTopic, metadataResponseTopic.name())
}
}
+ kafkaApis.close()
// 4. Send TopicMetadataReq using topic name
reset(clientRequestQuotaManager, requestChannel)
@@ -6688,67 +6736,78 @@ class KafkaApisTest extends Logging {
@Test
def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldNeverHandleErrorMessage(kafkaApis.handleLeaderAndIsrRequest)
}
@Test
def testRaftShouldNeverHandleStopReplicaRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleStopReplicaRequest)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldNeverHandleErrorMessage(kafkaApis.handleStopReplicaRequest)
}
@Test
def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching))
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldNeverHandleErrorMessage(kafkaApis.handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching))
}
@Test
def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleControlledShutdownRequest)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldNeverHandleErrorMessage(kafkaApis.handleControlledShutdownRequest)
}
@Test
def testRaftShouldNeverHandleAlterPartitionRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleAlterPartitionRequest)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldNeverHandleErrorMessage(kafkaApis.handleAlterPartitionRequest)
}
@Test
def testRaftShouldNeverHandleEnvelope(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching))
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldNeverHandleErrorMessage(kafkaApis.handleEnvelope(_, RequestLocal.withThreadConfinedCaching))
}
@Test
def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTopicsRequest)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreatePartitionsRequest)
}
@Test
def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateAcls)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateAcls)
}
@Test
def testRaftShouldAlwaysForwardDeleteAcls(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteAcls)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteAcls)
}
@Test
@@ -6792,7 +6851,8 @@ class KafkaApisTest extends Logging {
@Test
def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterPartitionReassignmentsRequest)
}
@Test
@@ -6838,7 +6898,8 @@ class KafkaApisTest extends Logging {
// We skip the pre-forward checks in handleCreateTokenRequest
def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTokenRequestZk)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTokenRequestZk)
}
@Test
@@ -6846,7 +6907,8 @@ class KafkaApisTest extends Logging {
// We skip the pre-forward checks in handleRenewTokenRequest
def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleRenewTokenRequestZk)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleRenewTokenRequestZk)
}
@Test
@@ -6854,37 +6916,43 @@ class KafkaApisTest extends Logging {
// We skip the pre-forward checks in handleExpireTokenRequest
def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleExpireTokenRequestZk)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleExpireTokenRequestZk)
}
@Test
def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterUserScramCredentialsRequest)
}
@Test
def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleUpdateFeatures)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleUpdateFeatures)
}
@Test
def testRaftShouldAlwaysForwardElectLeaders(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleElectLeaders)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleElectLeaders)
}
@Test
def testRaftShouldAlwaysForwardListPartitionReassignments(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest)
+ kafkaApis = createKafkaApis(raftSupport = true)
+ verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleListPartitionReassignmentsRequest)
}
@Test