You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by di...@apache.org on 2024/01/08 09:49:02 UTC

(kafka) branch 3.7 updated: KAFKA-16059: close more kafkaApis instances (#15132)

This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new 5cc08721350 KAFKA-16059: close more kafkaApis instances (#15132)
5cc08721350 is described below

commit 5cc08721350f56e410d622c81bc932154afdca83
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Sat Jan 6 23:00:20 2024 +0900

    KAFKA-16059: close more kafkaApis instances (#15132)
    
    Reviewers: Divij Vaidya <di...@amazon.com>, Justine Olshan <jo...@confluent.io>
---
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 320 +++++++++++++--------
 1 file changed, 194 insertions(+), 126 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 1b590d527b5..81852f73d0a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -632,8 +632,8 @@ class KafkaApisTest extends Logging {
     val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)
     val requestBuilder = new DescribeQuorumRequest.Builder(requestData)
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-
-    testForwardableApi(kafkaApis = createKafkaApis(raftSupport = true),
+    kafkaApis = createKafkaApis(raftSupport = true)
+    testForwardableApi(kafkaApis = kafkaApis,
       ApiKeys.DESCRIBE_QUORUM,
       requestBuilder
     )
@@ -644,14 +644,16 @@ class KafkaApisTest extends Logging {
     requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]
   ): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    testForwardableApi(kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true),
+    kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true)
+    testForwardableApi(kafkaApis = kafkaApis,
       apiKey,
       requestBuilder
     )
   }
 
   private def testForwardableApi(apiKey: ApiKeys, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Unit = {
-    testForwardableApi(kafkaApis = createKafkaApis(enableForwarding = true),
+    kafkaApis = createKafkaApis(enableForwarding = true)
+    testForwardableApi(kafkaApis = kafkaApis,
       apiKey,
       requestBuilder
     )
@@ -1668,12 +1670,16 @@ class KafkaApisTest extends Logging {
       val request = buildRequest(offsetCommitRequest)
       when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
         any[Long])).thenReturn(0)
-      kafkaApis = createKafkaApis()
-      kafkaApis.handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
-
-      val response = verifyNoThrottling[OffsetCommitResponse](request)
-      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-        Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode))
+      val kafkaApis = createKafkaApis()
+      try {
+        kafkaApis.handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
+
+        val response = verifyNoThrottling[OffsetCommitResponse](request)
+        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+          Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode))
+      } finally {
+        kafkaApis.close()
+      }
     }
 
     checkInvalidPartition(-1)
@@ -1700,11 +1706,15 @@ class KafkaApisTest extends Logging {
       val request = buildRequest(offsetCommitRequest)
       when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
         any[Long])).thenReturn(0)
-      kafkaApis = createKafkaApis()
-      kafkaApis.handleTxnOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
-
-      val response = verifyNoThrottling[TxnOffsetCommitResponse](request)
-      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
+      val kafkaApis = createKafkaApis()
+      try {
+        kafkaApis.handleTxnOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
+
+        val response = verifyNoThrottling[TxnOffsetCommitResponse](request)
+        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
+      } finally {
+        kafkaApis.close()
+      }
     }
 
     checkInvalidPartition(-1)
@@ -2060,20 +2070,24 @@ class KafkaApisTest extends Logging {
         responseCallback.capture(),
         ArgumentMatchers.eq(requestLocal)
       )).thenAnswer(_ => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED)))
-      kafkaApis = createKafkaApis()
-      kafkaApis.handleInitProducerIdRequest(request, requestLocal)
-
-      verify(requestChannel).sendResponse(
-        ArgumentMatchers.eq(request),
-        capturedResponse.capture(),
-        ArgumentMatchers.eq(None)
-      )
-      val response = capturedResponse.getValue
+      val kafkaApis = createKafkaApis()
+      try {
+        kafkaApis.handleInitProducerIdRequest(request, requestLocal)
+
+        verify(requestChannel).sendResponse(
+          ArgumentMatchers.eq(request),
+          capturedResponse.capture(),
+          ArgumentMatchers.eq(None)
+        )
+        val response = capturedResponse.getValue
 
-      if (version < 4) {
-        assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
-      } else {
-        assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
+        if (version < 4) {
+          assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
+        } else {
+          assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
+        }
+      } finally {
+        kafkaApis.close()
       }
     }
   }
@@ -2118,20 +2132,24 @@ class KafkaApisTest extends Logging {
         responseCallback.capture(),
         ArgumentMatchers.eq(requestLocal)
       )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
-      kafkaApis = createKafkaApis()
-      kafkaApis.handleAddOffsetsToTxnRequest(request, requestLocal)
-
-      verify(requestChannel).sendResponse(
-        ArgumentMatchers.eq(request),
-        capturedResponse.capture(),
-        ArgumentMatchers.eq(None)
-      )
-      val response = capturedResponse.getValue
+      val kafkaApis = createKafkaApis()
+      try {
+        kafkaApis.handleAddOffsetsToTxnRequest(request, requestLocal)
+
+        verify(requestChannel).sendResponse(
+          ArgumentMatchers.eq(request),
+          capturedResponse.capture(),
+          ArgumentMatchers.eq(None)
+        )
+        val response = capturedResponse.getValue
 
-      if (version < 2) {
-        assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
-      } else {
-        assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
+        if (version < 2) {
+          assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
+        } else {
+          assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
+        }
+      } finally {
+        kafkaApis.close()
       }
     }
   }
@@ -2172,20 +2190,24 @@ class KafkaApisTest extends Logging {
         responseCallback.capture(),
         ArgumentMatchers.eq(requestLocal)
       )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
-      kafkaApis = createKafkaApis()
-      kafkaApis.handleAddPartitionsToTxnRequest(request, requestLocal)
-
-      verify(requestChannel).sendResponse(
-        ArgumentMatchers.eq(request),
-        capturedResponse.capture(),
-        ArgumentMatchers.eq(None)
-      )
-      val response = capturedResponse.getValue
+      val kafkaApis = createKafkaApis()
+      try {
+        kafkaApis.handleAddPartitionsToTxnRequest(request, requestLocal)
+
+        verify(requestChannel).sendResponse(
+          ArgumentMatchers.eq(request),
+          capturedResponse.capture(),
+          ArgumentMatchers.eq(None)
+        )
+        val response = capturedResponse.getValue
 
-      if (version < 2) {
-        assertEquals(Collections.singletonMap(topicPartition, Errors.INVALID_PRODUCER_EPOCH), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID))
-      } else {
-        assertEquals(Collections.singletonMap(topicPartition, Errors.PRODUCER_FENCED), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID))
+        if (version < 2) {
+          assertEquals(Collections.singletonMap(topicPartition, Errors.INVALID_PRODUCER_EPOCH), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID))
+        } else {
+          assertEquals(Collections.singletonMap(topicPartition, Errors.PRODUCER_FENCED), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID))
+        }
+      } finally {
+        kafkaApis.close()
       }
     }
   }
@@ -2415,20 +2437,24 @@ class KafkaApisTest extends Logging {
         responseCallback.capture(),
         ArgumentMatchers.eq(requestLocal)
       )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
-      kafkaApis = createKafkaApis()
-      kafkaApis.handleEndTxnRequest(request, requestLocal)
-
-      verify(requestChannel).sendResponse(
-        ArgumentMatchers.eq(request),
-        capturedResponse.capture(),
-        ArgumentMatchers.eq(None)
-      )
-      val response = capturedResponse.getValue
+      val kafkaApis = createKafkaApis()
+      try {
+        kafkaApis.handleEndTxnRequest(request, requestLocal)
+
+        verify(requestChannel).sendResponse(
+          ArgumentMatchers.eq(request),
+          capturedResponse.capture(),
+          ArgumentMatchers.eq(None)
+        )
+        val response = capturedResponse.getValue
 
-      if (version < 2) {
-        assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
-      } else {
-        assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
+        if (version < 2) {
+          assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
+        } else {
+          assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
+        }
+      } finally {
+        kafkaApis.close()
       }
     }
   }
@@ -2476,16 +2502,20 @@ class KafkaApisTest extends Logging {
         any[Long])).thenReturn(0)
       when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
         any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
-      kafkaApis = createKafkaApis()
-      kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching)
-
-      val response = verifyNoThrottling[ProduceResponse](request)
-
-      assertEquals(1, response.data.responses.size)
-      val topicProduceResponse = response.data.responses.asScala.head
-      assertEquals(1, topicProduceResponse.partitionResponses.size)
-      val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head
-      assertEquals(Errors.INVALID_PRODUCER_EPOCH, Errors.forCode(partitionProduceResponse.errorCode))
+      val kafkaApis = createKafkaApis()
+      try {
+        kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching)
+
+        val response = verifyNoThrottling[ProduceResponse](request)
+
+        assertEquals(1, response.data.responses.size)
+        val topicProduceResponse = response.data.responses.asScala.head
+        assertEquals(1, topicProduceResponse.partitionResponses.size)
+        val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head
+        assertEquals(Errors.INVALID_PRODUCER_EPOCH, Errors.forCode(partitionProduceResponse.errorCode))
+      } finally {
+        kafkaApis.close()
+      }
     }
   }
 
@@ -2716,20 +2746,24 @@ class KafkaApisTest extends Logging {
         .build(version.toShort)
       val request = buildRequest(produceRequest)
 
-      kafkaApis = createKafkaApis()
-      kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching)
-      
-      verify(replicaManager).appendRecords(anyLong,
-        anyShort,
-        ArgumentMatchers.eq(false),
-        ArgumentMatchers.eq(AppendOrigin.CLIENT),
-        any(),
-        responseCallback.capture(),
-        any(),
-        any(),
-        any(),
-        ArgumentMatchers.eq(transactionalId),
-        any())
+      val kafkaApis = createKafkaApis()
+      try {
+        kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching)
+
+        verify(replicaManager).appendRecords(anyLong,
+          anyShort,
+          ArgumentMatchers.eq(false),
+          ArgumentMatchers.eq(AppendOrigin.CLIENT),
+          any(),
+          responseCallback.capture(),
+          any(),
+          any(),
+          any(),
+          ArgumentMatchers.eq(transactionalId),
+          any())
+      } finally {
+        kafkaApis.close()
+      }
     }
   }
 
@@ -2749,11 +2783,15 @@ class KafkaApisTest extends Logging {
 
       when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
         any[Long])).thenReturn(0)
-      kafkaApis = createKafkaApis()
-      kafkaApis.handleAddPartitionsToTxnRequest(request, RequestLocal.withThreadConfinedCaching)
-
-      val response = verifyNoThrottling[AddPartitionsToTxnResponse](request)
-      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(invalidTopicPartition))
+      val kafkaApis = createKafkaApis()
+      try {
+        kafkaApis.handleAddPartitionsToTxnRequest(request, RequestLocal.withThreadConfinedCaching)
+
+        val response = verifyNoThrottling[AddPartitionsToTxnResponse](request)
+        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(invalidTopicPartition))
+      } finally {
+        kafkaApis.close()
+      }
     }
 
     checkInvalidPartition(-1)
@@ -2762,32 +2800,37 @@ class KafkaApisTest extends Logging {
 
   @Test
   def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
+    kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
     assertThrows(classOf[UnsupportedVersionException],
-      () => createKafkaApis(IBP_0_10_2_IV0).handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
+      () => kafkaApis.handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
   def shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
+    kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
     assertThrows(classOf[UnsupportedVersionException],
-      () => createKafkaApis(IBP_0_10_2_IV0).handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
+      () => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
   def shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported(): Unit = {
+    kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
     assertThrows(classOf[UnsupportedVersionException],
-      () => createKafkaApis(IBP_0_10_2_IV0).handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
+      () => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
   def shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
+    kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
     assertThrows(classOf[UnsupportedVersionException],
-      () => createKafkaApis(IBP_0_10_2_IV0).handleEndTxnRequest(null, RequestLocal.withThreadConfinedCaching))
+      () => kafkaApis.handleEndTxnRequest(null, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
   def shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported(): Unit = {
+    kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
     assertThrows(classOf[UnsupportedVersionException],
-      () => createKafkaApis(IBP_0_10_2_IV0).handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching))
+      () => kafkaApis.handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
@@ -3782,13 +3825,17 @@ class KafkaApisTest extends Logging {
       )).thenReturn(CompletableFuture.completedFuture(
         new OffsetDeleteResponseData()
       ))
-      kafkaApis = createKafkaApis()
-      kafkaApis.handleOffsetDeleteRequest(request, RequestLocal.NoCaching)
+      val kafkaApis = createKafkaApis()
+      try {
+        kafkaApis.handleOffsetDeleteRequest(request, RequestLocal.NoCaching)
 
-      val response = verifyNoThrottling[OffsetDeleteResponse](request)
+        val response = verifyNoThrottling[OffsetDeleteResponse](request)
 
-      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-        Errors.forCode(response.data.topics.find(topic).partitions.find(invalidPartitionId).errorCode))
+        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+          Errors.forCode(response.data.topics.find(topic).partitions.find(invalidPartitionId).errorCode))
+      } finally {
+        kafkaApis.close()
+      }
     }
 
     checkInvalidPartition(-1)
@@ -4074,6 +4121,7 @@ class KafkaApisTest extends Logging {
         assertEquals(authorizedTopic, metadataResponseTopic.name())
       }
     }
+    kafkaApis.close()
 
     // 4. Send TopicMetadataReq using topic name
     reset(clientRequestQuotaManager, requestChannel)
@@ -6688,67 +6736,78 @@ class KafkaApisTest extends Logging {
   @Test
   def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldNeverHandleErrorMessage(kafkaApis.handleLeaderAndIsrRequest)
   }
 
   @Test
   def testRaftShouldNeverHandleStopReplicaRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleStopReplicaRequest)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldNeverHandleErrorMessage(kafkaApis.handleStopReplicaRequest)
   }
 
   @Test
   def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching))
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldNeverHandleErrorMessage(kafkaApis.handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
   def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleControlledShutdownRequest)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldNeverHandleErrorMessage(kafkaApis.handleControlledShutdownRequest)
   }
 
   @Test
   def testRaftShouldNeverHandleAlterPartitionRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleAlterPartitionRequest)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldNeverHandleErrorMessage(kafkaApis.handleAlterPartitionRequest)
   }
 
   @Test
   def testRaftShouldNeverHandleEnvelope(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching))
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldNeverHandleErrorMessage(kafkaApis.handleEnvelope(_, RequestLocal.withThreadConfinedCaching))
   }
 
   @Test
   def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTopicsRequest)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTopicsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreatePartitionsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteTopicsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateAcls)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateAcls)
   }
 
   @Test
   def testRaftShouldAlwaysForwardDeleteAcls(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteAcls)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteAcls)
   }
 
   @Test
@@ -6792,7 +6851,8 @@ class KafkaApisTest extends Logging {
   @Test
   def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterPartitionReassignmentsRequest)
   }
 
   @Test
@@ -6838,7 +6898,8 @@ class KafkaApisTest extends Logging {
   // We skip the pre-forward checks in handleCreateTokenRequest 
   def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTokenRequestZk)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTokenRequestZk)
   }
 
   @Test
@@ -6846,7 +6907,8 @@ class KafkaApisTest extends Logging {
   // We skip the pre-forward checks in handleRenewTokenRequest 
   def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleRenewTokenRequestZk)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleRenewTokenRequestZk)
   }
 
   @Test
@@ -6854,37 +6916,43 @@ class KafkaApisTest extends Logging {
   // We skip the pre-forward checks in handleExpireTokenRequest 
   def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleExpireTokenRequestZk)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleExpireTokenRequestZk)
   }
 
   @Test
   def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterUserScramCredentialsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleUpdateFeatures)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleUpdateFeatures)
   }
 
   @Test
   def testRaftShouldAlwaysForwardElectLeaders(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleElectLeaders)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleElectLeaders)
   }
 
   @Test
   def testRaftShouldAlwaysForwardListPartitionReassignments(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest)
+    kafkaApis = createKafkaApis(raftSupport = true)
+    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleListPartitionReassignmentsRequest)
   }
 
   @Test