You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/02/11 15:20:07 UTC

[kafka] branch trunk updated: KAFKA-13577: Replace easymock with mockito in kafka:core - part 3 (#11674)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0269edf  KAFKA-13577: Replace easymock with mockito in kafka:core - part 3 (#11674)
0269edf is described below

commit 0269edfc80e36d468516b943f0a9a08b7ee652fb
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Fri Feb 11 16:16:25 2022 +0100

    KAFKA-13577: Replace easymock with mockito in kafka:core - part 3 (#11674)
    
    
    Reviewers: Tom Bentley <tb...@redhat.com>
---
 build.gradle                                       |    1 -
 checkstyle/import-control-core.xml                 |    1 -
 .../scala/unit/kafka/server/AuthHelperTest.scala   |   53 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 1726 +++++++++-----------
 4 files changed, 799 insertions(+), 982 deletions(-)

diff --git a/build.gradle b/build.gradle
index acf0b19..fa0ed94 100644
--- a/build.gradle
+++ b/build.gradle
@@ -859,7 +859,6 @@ project(':core') {
     testImplementation project(':raft').sourceSets.test.output
     testImplementation libs.bcpkix
     testImplementation libs.mockitoCore
-    testImplementation libs.easymock
     testImplementation(libs.apacheda) {
       exclude group: 'xml-apis', module: 'xml-apis'
       // `mina-core` is a transitive dependency for `apacheds` and `apacheda`.
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index f4c3b5b..e02a3f2 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -28,7 +28,6 @@
   <allow pkg="javax.management" />
   <allow pkg="org.slf4j" />
   <allow pkg="org.junit" />
-  <allow pkg="org.easymock" />
   <allow pkg="java.security" />
   <allow pkg="javax.net.ssl" />
   <allow pkg="javax.security" />
diff --git a/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala b/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
index 194b2c6..7f229e9 100644
--- a/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
@@ -26,22 +26,22 @@ import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
-import org.easymock.EasyMock._
-import org.easymock.{EasyMock, IArgumentMatcher}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers.argThat
+import org.mockito.ArgumentMatchers
+import org.mockito.Mockito.{mock, verify, when}
 
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 class AuthHelperTest {
-  import AuthHelperTest._
 
   private val clientId = ""
 
   @Test
   def testAuthorize(): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
 
     val operation = AclOperation.WRITE
     val resourceType = ResourceType.TOPIC
@@ -56,23 +56,20 @@ class AuthHelperTest {
         1, true, true)
     )
 
-    EasyMock.expect(authorizer.authorize(requestContext, expectedActions.asJava))
-      .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
-      .once()
-
-    EasyMock.replay(authorizer)
+    when(authorizer.authorize(requestContext, expectedActions.asJava))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
 
     val result = new AuthHelper(Some(authorizer)).authorize(
       requestContext, operation, resourceType, resourceName)
 
-    verify(authorizer)
+    verify(authorizer).authorize(requestContext, expectedActions.asJava)
 
     assertEquals(true, result)
   }
 
   @Test
   def testFilterByAuthorized(): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
 
     val operation = AclOperation.WRITE
     val resourceType = ResourceType.TOPIC
@@ -94,19 +91,17 @@ class AuthHelperTest {
         1, true, true),
     )
 
-    EasyMock.expect(authorizer.authorize(
-      EasyMock.eq(requestContext), matchSameElements(expectedActions.asJava)
-    )).andAnswer { () =>
-      val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
+    when(authorizer.authorize(
+      ArgumentMatchers.eq(requestContext), argThat((t: java.util.List[Action]) => t.containsAll(expectedActions.asJava))
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]].asScala
       actions.map { action =>
         if (Set(resourceName1, resourceName3).contains(action.resourcePattern.name))
           AuthorizationResult.ALLOWED
         else
           AuthorizationResult.DENIED
       }.asJava
-    }.once()
-
-    EasyMock.replay(authorizer)
+    }
 
     val result = new AuthHelper(Some(authorizer)).filterByAuthorized(
       requestContext,
@@ -116,27 +111,11 @@ class AuthHelperTest {
       Seq(resourceName1, resourceName2, resourceName1, resourceName3)
     )(identity)
 
-    verify(authorizer)
+    verify(authorizer).authorize(
+      ArgumentMatchers.eq(requestContext), argThat((t: java.util.List[Action]) => t.containsAll(expectedActions.asJava))
+    )
 
     assertEquals(Set(resourceName1, resourceName3), result)
   }
 
 }
-
-object AuthHelperTest {
-
-  /**
-    * Similar to `EasyMock.eq`, but matches if both lists have the same elements irrespective of ordering.
-    */
-  def matchSameElements[T](list: java.util.List[T]): java.util.List[T] = {
-    EasyMock.reportMatcher(new IArgumentMatcher {
-      def matches(argument: Any): Boolean = argument match {
-        case l: java.util.List[_] => list.asScala.toSet == l.asScala.toSet
-        case _ => false
-      }
-      def appendTo(buffer: StringBuffer): Unit = buffer.append(s"list($list)")
-    })
-    null
-  }
-
-}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e617eab..c0058ce 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit
 import java.util.{Collections, Optional, Properties, Random}
 
 import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr}
-import kafka.cluster.{Broker, Partition}
+import kafka.cluster.Broker
 import kafka.controller.{ControllerContext, KafkaController}
 import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback}
 import kafka.coordinator.group._
@@ -82,44 +82,43 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerd
 import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
 import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
-import org.easymock.EasyMock._
-import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
-import org.mockito.{ArgumentMatchers, Mockito}
+import org.mockito.ArgumentMatchers.{any, anyBoolean, anyDouble, anyInt, anyLong, anyShort, anyString, argThat, isNotNull}
+import org.mockito.Mockito.{mock, reset, times, verify, when}
+import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
 
 import scala.collection.{Map, Seq, mutable}
 import scala.jdk.CollectionConverters._
-import java.util.Arrays
 
 class KafkaApisTest {
-  private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
-  private val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
-  private val replicaManager: ReplicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
-  private val groupCoordinator: GroupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator])
-  private val adminManager: ZkAdminManager = EasyMock.createNiceMock(classOf[ZkAdminManager])
-  private val txnCoordinator: TransactionCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
-  private val controller: KafkaController = EasyMock.createNiceMock(classOf[KafkaController])
-  private val forwardingManager: ForwardingManager = EasyMock.createNiceMock(classOf[ForwardingManager])
-  private val autoTopicCreationManager: AutoTopicCreationManager = EasyMock.createNiceMock(classOf[AutoTopicCreationManager])
+  private val requestChannel: RequestChannel = mock(classOf[RequestChannel])
+  private val requestChannelMetrics: RequestChannel.Metrics = mock(classOf[RequestChannel.Metrics])
+  private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+  private val groupCoordinator: GroupCoordinator = mock(classOf[GroupCoordinator])
+  private val adminManager: ZkAdminManager = mock(classOf[ZkAdminManager])
+  private val txnCoordinator: TransactionCoordinator = mock(classOf[TransactionCoordinator])
+  private val controller: KafkaController = mock(classOf[KafkaController])
+  private val forwardingManager: ForwardingManager = mock(classOf[ForwardingManager])
+  private val autoTopicCreationManager: AutoTopicCreationManager = mock(classOf[AutoTopicCreationManager])
 
   private val kafkaPrincipalSerde = new KafkaPrincipalSerde {
     override def serialize(principal: KafkaPrincipal): Array[Byte] = Utils.utf8(principal.toString)
     override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
   }
-  private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+  private val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
   private val metrics = new Metrics()
   private val brokerId = 1
   private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId)
-  private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
-  private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
-  private val clientControllerQuotaManager: ControllerMutationQuotaManager = EasyMock.createNiceMock(classOf[ControllerMutationQuotaManager])
-  private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
+  private val clientQuotaManager: ClientQuotaManager = mock(classOf[ClientQuotaManager])
+  private val clientRequestQuotaManager: ClientRequestQuotaManager = mock(classOf[ClientRequestQuotaManager])
+  private val clientControllerQuotaManager: ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager])
+  private val replicaQuotaManager: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager])
   private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager,
     clientControllerQuotaManager, replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None)
-  private val fetchManager: FetchManager = EasyMock.createNiceMock(classOf[FetchManager])
+  private val fetchManager: FetchManager = mock(classOf[FetchManager])
   private val brokerTopicStats = new BrokerTopicStats
   private val clusterId = "clusterId"
   private val time = new MockTime
@@ -143,7 +142,7 @@ class KafkaApisTest {
       val properties = TestUtils.createBrokerConfig(brokerId, "")
       properties.put(KafkaConfig.NodeIdProp, brokerId.toString)
       properties.put(KafkaConfig.ProcessRolesProp, "broker")
-      val voterId = (brokerId + 1)
+      val voterId = brokerId + 1
       properties.put(KafkaConfig.QuorumVotersProp, s"$voterId@localhost:9093")
       properties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
       properties
@@ -206,7 +205,7 @@ class KafkaApisTest {
 
   @Test
   def testDescribeConfigsWithAuthorizer(): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
 
     val operation = AclOperation.DESCRIBE_CONFIGS
     val resourceType = ResourceType.TOPIC
@@ -220,25 +219,18 @@ class KafkaApisTest {
     )
 
     // Verify that authorize is only called once
-    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(expectedActions.asJava)))
-      .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
-      .once()
+    when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(expectedActions.asJava)))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
 
-    val configRepository: ConfigRepository = EasyMock.strictMock(classOf[ConfigRepository])
+    val configRepository: ConfigRepository = mock(classOf[ConfigRepository])
     val topicConfigs = new Properties()
     val propName = "min.insync.replicas"
     val propValue = "3"
     topicConfigs.put(propName, propValue)
-    EasyMock.expect(configRepository.topicConfig(resourceName)).andReturn(topicConfigs)
+    when(configRepository.topicConfig(resourceName)).thenReturn(topicConfigs)
 
-    metadataCache =
-      EasyMock.partialMockBuilder(classOf[ZkMetadataCache])
-        .withConstructor(classOf[Int])
-        .withArgs(Int.box(brokerId))  // Need to box it for Scala 2.12 and before
-        .addMockedMethod("contains", classOf[String])
-        .createMock()
-
-    expect(metadataCache.contains(resourceName)).andReturn(true)
+    metadataCache = mock(classOf[ZkMetadataCache])
+    when(metadataCache.contains(resourceName)).thenReturn(true)
 
     val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
       .setIncludeSynonyms(true)
@@ -248,15 +240,14 @@ class KafkaApisTest {
       .build(requestHeader.apiVersion)
     val request = buildRequest(describeConfigsRequest,
       requestHeader = Option(requestHeader))
-    val capturedResponse = expectNoThrottling(request)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
 
-    EasyMock.replay(metadataCache, replicaManager, clientRequestQuotaManager, requestChannel,
-      authorizer, configRepository, adminManager)
     createKafkaApis(authorizer = Some(authorizer), configRepository = configRepository)
       .handleDescribeConfigsRequest(request)
 
-    verify(authorizer, replicaManager)
-
+    verify(authorizer).authorize(any(), ArgumentMatchers.eq(expectedActions.asJava))
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[DescribeConfigsResponse]
     val results = response.data().results()
     assertEquals(1, results.size())
@@ -290,7 +281,7 @@ class KafkaApisTest {
     alterConfigHandler: () => ApiError,
     expectedError: Errors
   ): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
 
     authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER, Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
 
@@ -299,13 +290,13 @@ class KafkaApisTest {
     val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion,
       clientId, 0)
 
-    EasyMock.expect(controller.isActive).andReturn(true)
+    when(controller.isActive).thenReturn(true)
 
     authorizeResource(authorizer, operation, ResourceType.TOPIC, resourceName, AuthorizationResult.ALLOWED)
 
     val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName)
-    EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false)))
-      .andAnswer(() => {
+    when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false)))
+      .thenAnswer(_ => {
         Map(configResource -> alterConfigHandler.apply())
       })
 
@@ -317,28 +308,26 @@ class KafkaApisTest {
     val request = TestUtils.buildRequestWithEnvelope(
       alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds())
 
-    val capturedResponse = EasyMock.newCapture[AbstractResponse]()
-    val capturedRequest = EasyMock.newCapture[RequestChannel.Request]()
-
-    EasyMock.expect(requestChannel.sendResponse(
-      EasyMock.capture(capturedRequest),
-      EasyMock.capture(capturedResponse),
-      EasyMock.anyObject()
-    ))
+    val capturedResponse: ArgumentCaptor[AlterConfigsResponse] = ArgumentCaptor.forClass(classOf[AlterConfigsResponse])
+    val capturedRequest: ArgumentCaptor[RequestChannel.Request] = ArgumentCaptor.forClass(classOf[RequestChannel.Request])
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
-      adminManager, controller)
     createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
 
+    verify(requestChannel).sendResponse(
+      capturedRequest.capture(),
+      capturedResponse.capture(),
+      any()
+    )
     assertEquals(Some(request), capturedRequest.getValue.envelope)
-    val innerResponse = capturedResponse.getValue.asInstanceOf[AlterConfigsResponse]
+    val innerResponse = capturedResponse.getValue
     val responseMap = innerResponse.data.responses().asScala.map { resourceResponse =>
       resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode)
     }.toMap
 
     assertEquals(Map(resourceName -> expectedError), responseMap)
 
-    verify(authorizer, controller, adminManager)
+    verify(controller).isActive
+    verify(adminManager).alterConfigs(any(), ArgumentMatchers.eq(false))
   }
 
   @Test
@@ -348,16 +337,16 @@ class KafkaApisTest {
     val leaveGroupRequest = new LeaveGroupRequest.Builder("group",
       Collections.singletonList(new MemberIdentity())).build(requestHeader.apiVersion)
 
-    EasyMock.expect(controller.isActive).andReturn(true)
+    when(controller.isActive).thenReturn(true)
 
     val request = TestUtils.buildRequestWithEnvelope(
       leaveGroupRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds())
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
 
-    val capturedResponse = expectNoThrottling(request)
-
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller)
     createKafkaApis(enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[EnvelopeResponse]
     assertEquals(Errors.INVALID_REQUEST, response.error())
   }
@@ -385,7 +374,7 @@ class KafkaApisTest {
                                          performAuthorize: Boolean = false,
                                          authorizeResult: AuthorizationResult = AuthorizationResult.ALLOWED,
                                          isActiveController: Boolean = true): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
 
     if (performAuthorize) {
       authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER, Resource.CLUSTER_NAME, authorizeResult)
@@ -395,7 +384,7 @@ class KafkaApisTest {
     val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion,
       clientId, 0)
 
-    EasyMock.expect(controller.isActive).andReturn(isActiveController)
+    when(controller.isActive).thenReturn(isActiveController)
 
     val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName)
 
@@ -408,35 +397,30 @@ class KafkaApisTest {
     val request = TestUtils.buildRequestWithEnvelope(
       alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds(), fromPrivilegedListener)
 
-    val capturedResponse = EasyMock.newCapture[AbstractResponse]()
-    if (shouldCloseConnection) {
-      EasyMock.expect(requestChannel.closeConnection(
-        EasyMock.eq(request),
-        EasyMock.eq(java.util.Collections.emptyMap())
-      ))
-    } else {
-      EasyMock.expect(requestChannel.sendResponse(
-        EasyMock.eq(request),
-        EasyMock.capture(capturedResponse),
-        EasyMock.eq(None)
-      ))
-    }
-
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
-      adminManager, controller)
+    val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse])
     createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
 
-    if (!shouldCloseConnection) {
+    if (shouldCloseConnection) {
+      verify(requestChannel).closeConnection(
+        ArgumentMatchers.eq(request),
+        ArgumentMatchers.eq(java.util.Collections.emptyMap())
+      )
+    } else {
+      verify(requestChannel).sendResponse(
+        ArgumentMatchers.eq(request),
+        capturedResponse.capture(),
+        ArgumentMatchers.eq(None))
       val response = capturedResponse.getValue.asInstanceOf[EnvelopeResponse]
       assertEquals(expectedError, response.error)
     }
-
-    verify(authorizer, adminManager, requestChannel)
+    if (performAuthorize) {
+      verify(authorizer).authorize(any(), any())
+    }
   }
 
   @Test
   def testAlterConfigsWithAuthorizer(): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
 
     val authorizedTopic = "authorized-topic"
     val unauthorizedTopic = "unauthorized-topic"
@@ -457,22 +441,19 @@ class KafkaApisTest {
       .build(topicHeader.apiVersion)
     val request = buildRequest(alterConfigsRequest)
 
-    EasyMock.expect(controller.isActive).andReturn(false)
-
-    val capturedResponse = expectNoThrottling(request)
-
-    EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false)))
-      .andReturn(Map(authorizedResource -> ApiError.NONE))
-
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
-      adminManager, controller)
+    when(controller.isActive).thenReturn(false)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+    when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false)))
+      .thenReturn(Map(authorizedResource -> ApiError.NONE))
 
     createKafkaApis(authorizer = Some(authorizer)).handleAlterConfigsRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     verifyAlterConfigResult(capturedResponse, Map(authorizedTopic -> Errors.NONE,
         unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED))
-
-    verify(authorizer, adminManager)
+    verify(authorizer, times(2)).authorize(any(), any())
+    verify(adminManager).alterConfigs(any(), anyBoolean())
   }
 
   @Test
@@ -487,10 +468,11 @@ class KafkaApisTest {
     val requestBuilder = new DescribeQuorumRequest.Builder(requestData)
     val request = buildRequest(requestBuilder.build())
 
-    val capturedResponse = expectNoThrottling(request)
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, adminManager, controller)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
     createKafkaApis(enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[DescribeQuorumResponse]
     assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode))
   }
@@ -543,31 +525,30 @@ class KafkaApisTest {
       // The controller check only makes sense for ZK clusters. For KRaft,
       // controller requests are handled on a separate listener, so there
       // is no choice but to forward them.
-      EasyMock.expect(controller.isActive).andReturn(false)
+      when(controller.isActive).thenReturn(false)
     }
 
-    val capturedResponse = expectNoThrottling(request)
-    val forwardCallback: Capture[Option[AbstractResponse] => Unit] = EasyMock.newCapture()
-
-    EasyMock.expect(forwardingManager.forwardRequest(
-      EasyMock.eq(request),
-      EasyMock.capture(forwardCallback)
-    )).once()
-
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller, forwardingManager)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+    val forwardCallback: ArgumentCaptor[Option[AbstractResponse] => Unit] = ArgumentCaptor.forClass(classOf[Option[AbstractResponse] => Unit])
 
     kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
+    verify(forwardingManager).forwardRequest(
+      ArgumentMatchers.eq(request),
+      forwardCallback.capture()
+    )
     assertNotNull(request.buffer, "The buffer was unexpectedly deallocated after " +
       s"`handle` returned (is $apiKey marked as forwardable in `ApiKeys`?)")
 
     val expectedResponse = apiRequest.getErrorResponse(Errors.NOT_CONTROLLER.exception)
-    assertTrue(forwardCallback.hasCaptured)
     forwardCallback.getValue.apply(Some(expectedResponse))
 
-    assertTrue(capturedResponse.hasCaptured)
+    val capturedResponse = verifyNoThrottling(request)
     assertEquals(expectedResponse, capturedResponse.getValue)
 
-    EasyMock.verify(controller, requestChannel, forwardingManager)
+    if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) {
+      verify(controller).isActive
+    }
   }
 
   private def authorizeResource(authorizer: Authorizer,
@@ -586,12 +567,11 @@ class KafkaApisTest {
         new ResourcePattern(resourceType, resourceName, PatternType.LITERAL),
         1, logIfAllowed, logIfDenied)
 
-    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(Seq(expectedAuthorizedAction).asJava)))
-      .andReturn(Seq(result).asJava)
-      .once()
+    when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(Seq(expectedAuthorizedAction).asJava)))
+      .thenReturn(Seq(result).asJava)
   }
 
-  private def verifyAlterConfigResult(capturedResponse: Capture[AbstractResponse],
+  private def verifyAlterConfigResult(capturedResponse: ArgumentCaptor[AbstractResponse],
                                       expectedResults: Map[String, Errors]): Unit = {
     val response = capturedResponse.getValue.asInstanceOf[AlterConfigsResponse]
     val responseMap = response.data.responses().asScala.map { resourceResponse =>
@@ -614,7 +594,7 @@ class KafkaApisTest {
 
   @Test
   def testIncrementalAlterConfigsWithAuthorizer(): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
 
     val authorizedTopic = "authorized-topic"
     val unauthorizedTopic = "unauthorized-topic"
@@ -628,23 +608,22 @@ class KafkaApisTest {
     val request = buildRequest(incrementalAlterConfigsRequest,
       fromPrivilegedListener = true, requestHeader = Option(requestHeader))
 
-    EasyMock.expect(controller.isActive).andReturn(true)
+    when(controller.isActive).thenReturn(true)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+    when(adminManager.incrementalAlterConfigs(any(), ArgumentMatchers.eq(false)))
+      .thenReturn(Map(authorizedResource -> ApiError.NONE))
 
-    val capturedResponse = expectNoThrottling(request)
-
-    EasyMock.expect(adminManager.incrementalAlterConfigs(anyObject(), EasyMock.eq(false)))
-      .andReturn(Map(authorizedResource -> ApiError.NONE))
-
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
-      adminManager, controller)
     createKafkaApis(authorizer = Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     verifyIncrementalAlterConfigResult(capturedResponse, Map(
       authorizedTopic -> Errors.NONE,
       unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED
     ))
 
-    verify(authorizer, adminManager)
+    verify(authorizer, times(2)).authorize(any(), any())
+    verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
   }
 
   private def getIncrementalAlterConfigRequestBuilder(configResources: Seq[ConfigResource]): IncrementalAlterConfigsRequest.Builder = {
@@ -657,7 +636,7 @@ class KafkaApisTest {
     new IncrementalAlterConfigsRequest.Builder(resourceMap, false)
   }
 
-  private def verifyIncrementalAlterConfigResult(capturedResponse: Capture[AbstractResponse],
+  private def verifyIncrementalAlterConfigResult(capturedResponse: ArgumentCaptor[AbstractResponse],
                                                  expectedResults: Map[String, Errors]): Unit = {
     val response = capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
     val responseMap = response.data.responses().asScala.map { resourceResponse =>
@@ -668,7 +647,7 @@ class KafkaApisTest {
 
   @Test
   def testAlterClientQuotasWithAuthorizer(): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
 
     authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
       Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
@@ -683,17 +662,17 @@ class KafkaApisTest {
     val request = buildRequest(alterClientQuotasRequest,
       fromPrivilegedListener = true, requestHeader = Option(requestHeader))
 
-    EasyMock.expect(controller.isActive).andReturn(true)
-
-    val capturedResponse = expectNoThrottling(request)
+    when(controller.isActive).thenReturn(true)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      anyLong)).thenReturn(0)
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
-      adminManager, controller)
     createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     verifyAlterClientQuotaResult(capturedResponse, Map(quotaEntity -> Errors.CLUSTER_AUTHORIZATION_FAILED))
 
-    verify(authorizer, adminManager)
+    verify(authorizer).authorize(any(), any())
+    verify(clientRequestQuotaManager).maybeRecordAndGetThrottleTimeMs(any(), anyLong)
   }
 
   @Test
@@ -702,7 +681,7 @@ class KafkaApisTest {
     testForwardableApi(ApiKeys.ALTER_CLIENT_QUOTAS, requestBuilder)
   }
 
-  private def verifyAlterClientQuotaResult(capturedResponse: Capture[AbstractResponse],
+  private def verifyAlterClientQuotaResult(capturedResponse: ArgumentCaptor[AbstractResponse],
                                            expected: Map[ClientQuotaEntity, Errors]): Unit = {
     val response = capturedResponse.getValue.asInstanceOf[AlterClientQuotasResponse]
     val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap
@@ -717,7 +696,7 @@ class KafkaApisTest {
 
   @Test
   def testCreateTopicsWithAuthorizer(): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
 
     val authorizedTopic = "authorized-topic"
     val unauthorizedTopic = "unauthorized-topic"
@@ -733,7 +712,7 @@ class KafkaApisTest {
 
     val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, ApiKeys.CREATE_TOPICS.latestVersion, clientId, 0)
 
-    EasyMock.expect(controller.isActive).andReturn(true)
+    when(controller.isActive).thenReturn(true)
 
     val topics = new CreateTopicsRequestData.CreatableTopicCollection(2)
     val topicToCreate = new CreateTopicsRequestData.CreatableTopic()
@@ -754,33 +733,27 @@ class KafkaApisTest {
     val request = buildRequest(createTopicsRequest,
       fromPrivilegedListener = true, requestHeader = Option(requestHeader))
 
-    val capturedResponse = expectNoThrottling(request)
-
-    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
-      EasyMock.eq(request), EasyMock.eq(6))).andReturn(UnboundedControllerMutationQuota)
-
-    val capturedCallback = EasyMock.newCapture[Map[String, ApiError] => Unit]()
-
-    EasyMock.expect(adminManager.createTopics(
-      EasyMock.eq(timeout),
-      EasyMock.eq(false),
-      EasyMock.eq(Map(authorizedTopic -> topicToCreate)),
-      anyObject(),
-      EasyMock.eq(UnboundedControllerMutationQuota),
-      EasyMock.capture(capturedCallback)))
-
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
-      requestChannel, authorizer, adminManager, controller)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+    when(clientControllerQuotaManager.newQuotaFor(
+      ArgumentMatchers.eq(request), ArgumentMatchers.eq(6))).thenReturn(UnboundedControllerMutationQuota)
 
     createKafkaApis(authorizer = Some(authorizer)).handleCreateTopicsRequest(request)
 
+    val capturedCallback: ArgumentCaptor[Map[String, ApiError] => Unit] = ArgumentCaptor.forClass(classOf[Map[String, ApiError] => Unit])
+
+    verify(adminManager).createTopics(
+      ArgumentMatchers.eq(timeout),
+      ArgumentMatchers.eq(false),
+      ArgumentMatchers.eq(Map(authorizedTopic -> topicToCreate)),
+      any(),
+      ArgumentMatchers.eq(UnboundedControllerMutationQuota),
+      capturedCallback.capture())
     capturedCallback.getValue.apply(Map(authorizedTopic -> ApiError.NONE))
 
-    verifyCreateTopicsResult(createTopicsRequest,
-      capturedResponse, Map(authorizedTopic -> Errors.NONE,
+    val capturedResponse = verifyNoThrottling(request)
+    verifyCreateTopicsResult(capturedResponse, Map(authorizedTopic -> Errors.NONE,
         unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED))
-
-    verify(authorizer, adminManager, clientControllerQuotaManager)
   }
 
   @Test
@@ -819,21 +792,20 @@ class KafkaApisTest {
         new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL),
         1, logIfAllowed, logIfDenied))
 
-    EasyMock.expect(authorizer.authorize(
-      anyObject[RequestContext], AuthHelperTest.matchSameElements(expectedAuthorizedActions.asJava)
-    )).andAnswer { () =>
-      val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
-      actions.map { action =>
+    when(authorizer.authorize(
+      any[RequestContext], argThat((t: java.util.List[Action]) => t != null && t.containsAll(expectedAuthorizedActions.asJava))
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]]
+      actions.asScala.map { action =>
         if (action.resourcePattern().name().equals(authorizedTopic))
           AuthorizationResult.ALLOWED
         else
           AuthorizationResult.DENIED
       }.asJava
-    }.once()
+    }
   }
 
-  private def verifyCreateTopicsResult(createTopicsRequest: CreateTopicsRequest,
-                                       capturedResponse: Capture[AbstractResponse],
+  private def verifyCreateTopicsResult(capturedResponse: ArgumentCaptor[AbstractResponse],
                                        expectedResults: Map[String, Errors]): Unit = {
     val response = capturedResponse.getValue.asInstanceOf[CreateTopicsResponse]
     val responseMap = response.data.topics().asScala.map { topicResponse =>
@@ -946,7 +918,7 @@ class KafkaApisTest {
   private def testFindCoordinatorWithTopicCreation(coordinatorType: CoordinatorType,
                                                    hasEnoughLiveBrokers: Boolean = true,
                                                    version: Short = ApiKeys.FIND_COORDINATOR.latestVersion): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
 
     val requestHeader = new RequestHeader(ApiKeys.FIND_COORDINATOR, version, clientId, 0)
 
@@ -964,14 +936,14 @@ class KafkaApisTest {
         case CoordinatorType.GROUP =>
           topicConfigOverride.put(KafkaConfig.OffsetsTopicPartitionsProp, numBrokersNeeded.toString)
           topicConfigOverride.put(KafkaConfig.OffsetsTopicReplicationFactorProp, numBrokersNeeded.toString)
-          EasyMock.expect(groupCoordinator.offsetsTopicConfigs).andReturn(new Properties)
+          when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
           authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.GROUP,
             groupId, AuthorizationResult.ALLOWED)
           Topic.GROUP_METADATA_TOPIC_NAME
         case CoordinatorType.TRANSACTION =>
           topicConfigOverride.put(KafkaConfig.TransactionsTopicPartitionsProp, numBrokersNeeded.toString)
           topicConfigOverride.put(KafkaConfig.TransactionsTopicReplicationFactorProp, numBrokersNeeded.toString)
-          EasyMock.expect(txnCoordinator.transactionTopicConfigs).andReturn(new Properties)
+          when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties)
           authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.TRANSACTIONAL_ID,
             groupId, AuthorizationResult.ALLOWED)
           Topic.TRANSACTION_STATE_TOPIC_NAME
@@ -983,7 +955,7 @@ class KafkaApisTest {
       new FindCoordinatorRequest.Builder(
         new FindCoordinatorRequestData()
           .setKeyType(coordinatorType.id())
-          .setCoordinatorKeys(Arrays.asList(groupId)))
+          .setCoordinatorKeys(asList(groupId)))
     } else {
       new FindCoordinatorRequest.Builder(
         new FindCoordinatorRequestData()
@@ -991,17 +963,15 @@ class KafkaApisTest {
           .setKey(groupId))
     }
     val request = buildRequest(findCoordinatorRequestBuilder.build(requestHeader.apiVersion))
-
-    val capturedResponse = expectNoThrottling(request)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
 
     val capturedRequest = verifyTopicCreation(topicName, true, true, request)
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
-      autoTopicCreationManager, forwardingManager, controller, clientControllerQuotaManager, groupCoordinator, txnCoordinator)
-
     createKafkaApis(authorizer = Some(authorizer),
       overrideProperties = topicConfigOverride).handleFindCoordinatorRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[FindCoordinatorResponse]
     if (version >= 4) {
       assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code, response.data.coordinators.get(0).errorCode)
@@ -1009,10 +979,7 @@ class KafkaApisTest {
     } else {
       assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code, response.data.errorCode)
     }
-
     assertTrue(capturedRequest.getValue.isEmpty)
-
-    verify(authorizer, autoTopicCreationManager)
   }
 
   @Test
@@ -1060,7 +1027,7 @@ class KafkaApisTest {
   private def testMetadataAutoTopicCreation(topicName: String,
                                             enableAutoTopicCreation: Boolean,
                                             expectedError: Errors): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
 
     val requestHeader = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion,
       clientId, 0)
@@ -1081,13 +1048,13 @@ class KafkaApisTest {
         case Topic.GROUP_METADATA_TOPIC_NAME =>
           topicConfigOverride.put(KafkaConfig.OffsetsTopicPartitionsProp, numBrokersNeeded.toString)
           topicConfigOverride.put(KafkaConfig.OffsetsTopicReplicationFactorProp, numBrokersNeeded.toString)
-          EasyMock.expect(groupCoordinator.offsetsTopicConfigs).andReturn(new Properties)
+          when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
           true
 
         case Topic.TRANSACTION_STATE_TOPIC_NAME =>
           topicConfigOverride.put(KafkaConfig.TransactionsTopicPartitionsProp, numBrokersNeeded.toString)
           topicConfigOverride.put(KafkaConfig.TransactionsTopicReplicationFactorProp, numBrokersNeeded.toString)
-          EasyMock.expect(txnCoordinator.transactionTopicConfigs).andReturn(new Properties)
+          when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties)
           true
         case _ =>
           topicConfigOverride.put(KafkaConfig.NumPartitionsProp, numBrokersNeeded.toString)
@@ -1100,16 +1067,15 @@ class KafkaApisTest {
     ).build(requestHeader.apiVersion)
     val request = buildRequest(metadataRequest)
 
-    val capturedResponse = expectNoThrottling(request)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
 
     val capturedRequest = verifyTopicCreation(topicName, enableAutoTopicCreation, isInternal, request)
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
-      autoTopicCreationManager, forwardingManager, clientControllerQuotaManager, groupCoordinator, txnCoordinator)
-
     createKafkaApis(authorizer = Some(authorizer), enableForwarding = enableAutoTopicCreation,
       overrideProperties = topicConfigOverride).handleTopicMetadataRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[MetadataResponse]
     val expectedMetadataResponse = util.Collections.singletonList(new TopicMetadata(
       expectedError,
@@ -1124,28 +1090,26 @@ class KafkaApisTest {
       assertTrue(capturedRequest.getValue.isDefined)
       assertEquals(request.context, capturedRequest.getValue.get)
     }
-
-    verify(authorizer, autoTopicCreationManager)
   }
 
   private def verifyTopicCreation(topicName: String,
                                   enableAutoTopicCreation: Boolean,
                                   isInternal: Boolean,
-                                  request: RequestChannel.Request): Capture[Option[RequestContext]] = {
-    val capturedRequest = EasyMock.newCapture[Option[RequestContext]]()
+                                  request: RequestChannel.Request): ArgumentCaptor[Option[RequestContext]] = {
+    val capturedRequest: ArgumentCaptor[Option[RequestContext]] = ArgumentCaptor.forClass(classOf[Option[RequestContext]])
     if (enableAutoTopicCreation) {
-      EasyMock.expect(clientControllerQuotaManager.newPermissiveQuotaFor(EasyMock.eq(request)))
-        .andReturn(UnboundedControllerMutationQuota)
+      when(clientControllerQuotaManager.newPermissiveQuotaFor(ArgumentMatchers.eq(request)))
+        .thenReturn(UnboundedControllerMutationQuota)
 
-      EasyMock.expect(autoTopicCreationManager.createTopics(
-        EasyMock.eq(Set(topicName)),
-        EasyMock.eq(UnboundedControllerMutationQuota),
-        EasyMock.capture(capturedRequest))).andReturn(
+      when(autoTopicCreationManager.createTopics(
+        ArgumentMatchers.eq(Set(topicName)),
+        ArgumentMatchers.eq(UnboundedControllerMutationQuota),
+        capturedRequest.capture())).thenReturn(
         Seq(new MetadataResponseTopic()
         .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
         .setIsInternal(isInternal)
         .setName(topicName))
-      ).once()
+      )
     }
     capturedRequest
   }
@@ -1166,9 +1130,6 @@ class KafkaApisTest {
       new MetadataRequestData.MetadataRequestTopic().setTopicId(Uuid.randomUuid()),
       new MetadataRequestData.MetadataRequestTopic().setName("topic1").setTopicId(Uuid.randomUuid()))
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager,
-      autoTopicCreationManager, forwardingManager, clientControllerQuotaManager, groupCoordinator, txnCoordinator)
-
     // if version is 10 or 11, the invalid topic metadata should return an error
     val invalidVersions = Set(10, 11)
     invalidVersions.foreach( version =>
@@ -1177,16 +1138,13 @@ class KafkaApisTest {
         val request = buildRequest(new MetadataRequest(metadataRequestData, version.toShort))
         val kafkaApis = createKafkaApis()
 
-        val capturedResponse = EasyMock.newCapture[AbstractResponse]()
-        EasyMock.expect(requestChannel.sendResponse(
-          EasyMock.eq(request),
-          EasyMock.capture(capturedResponse),
-          EasyMock.anyObject()
-        ))
-
-        EasyMock.replay(requestChannel)
+        val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse])
         kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
-
+        verify(requestChannel).sendResponse(
+          ArgumentMatchers.eq(request),
+          capturedResponse.capture(),
+          any()
+        )
         val response = capturedResponse.getValue.asInstanceOf[MetadataResponse]
         assertEquals(1, response.topicMetadata.size)
         assertEquals(1, response.errorCounts.get(Errors.INVALID_REQUEST))
@@ -1202,7 +1160,7 @@ class KafkaApisTest {
     addTopicToMetadataCache(topic, numPartitions = 1)
 
     def checkInvalidPartition(invalidPartitionId: Int): Unit = {
-      EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+      reset(replicaManager, clientRequestQuotaManager, requestChannel)
 
       val offsetCommitRequest = new OffsetCommitRequest.Builder(
         new OffsetCommitRequestData()
@@ -1220,10 +1178,11 @@ class KafkaApisTest {
           ))).build()
 
       val request = buildRequest(offsetCommitRequest)
-      val capturedResponse = expectNoThrottling(request)
-      EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
+      when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+        any[Long])).thenReturn(0)
       createKafkaApis().handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
 
+      val capturedResponse = verifyNoThrottling(request)
       val response = capturedResponse.getValue.asInstanceOf[OffsetCommitResponse]
       assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
         Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode))
@@ -1239,7 +1198,7 @@ class KafkaApisTest {
     addTopicToMetadataCache(topic, numPartitions = 1)
 
     def checkInvalidPartition(invalidPartitionId: Int): Unit = {
-      EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+      reset(replicaManager, clientRequestQuotaManager, requestChannel)
 
       val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
       val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty())
@@ -1251,11 +1210,11 @@ class KafkaApisTest {
         Map(invalidTopicPartition -> partitionOffsetCommitData).asJava,
       ).build()
       val request = buildRequest(offsetCommitRequest)
-
-      val capturedResponse = expectNoThrottling(request)
-      EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
+      when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+        any[Long])).thenReturn(0)
       createKafkaApis().handleTxnOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
 
+      val capturedResponse = verifyNoThrottling(request)
       val response = capturedResponse.getValue.asInstanceOf[TxnOffsetCommitResponse]
       assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
     }
@@ -1270,11 +1229,11 @@ class KafkaApisTest {
     addTopicToMetadataCache(topic, numPartitions = 2)
 
     for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to ApiKeys.TXN_OFFSET_COMMIT.latestVersion) {
-      EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator)
+      reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator)
 
       val topicPartition = new TopicPartition(topic, 1)
-      val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
-      val responseCallback: Capture[Map[TopicPartition, Errors] => Unit] = EasyMock.newCapture()
+      val capturedResponse: ArgumentCaptor[TxnOffsetCommitResponse] = ArgumentCaptor.forClass(classOf[TxnOffsetCommitResponse])
+      val responseCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit])
 
       val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty())
       val groupId = "groupId"
@@ -1292,30 +1251,26 @@ class KafkaApisTest {
       val request = buildRequest(offsetCommitRequest)
 
       val requestLocal = RequestLocal.withThreadConfinedCaching
-      EasyMock.expect(groupCoordinator.handleTxnCommitOffsets(
-        EasyMock.eq(groupId),
-        EasyMock.eq(producerId),
-        EasyMock.eq(epoch),
-        EasyMock.anyString(),
-        EasyMock.eq(Option.empty),
-        EasyMock.anyInt(),
-        EasyMock.anyObject(),
-        EasyMock.capture(responseCallback),
-        EasyMock.eq(requestLocal)
-      )).andAnswer(
-        () => responseCallback.getValue.apply(Map(topicPartition -> Errors.COORDINATOR_LOAD_IN_PROGRESS)))
-
-      EasyMock.expect(requestChannel.sendResponse(
-        EasyMock.eq(request),
-        EasyMock.capture(capturedResponse),
-        EasyMock.eq(None)
-      ))
-
-      EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator)
+      when(groupCoordinator.handleTxnCommitOffsets(
+        ArgumentMatchers.eq(groupId),
+        ArgumentMatchers.eq(producerId),
+        ArgumentMatchers.eq(epoch),
+        anyString,
+        ArgumentMatchers.eq(Option.empty),
+        anyInt,
+        any(),
+        responseCallback.capture(),
+        ArgumentMatchers.eq(requestLocal)
+      )).thenAnswer(_ => responseCallback.getValue.apply(Map(topicPartition -> Errors.COORDINATOR_LOAD_IN_PROGRESS)))
 
       createKafkaApis().handleTxnOffsetCommitRequest(request, requestLocal)
 
-      val response = capturedResponse.getValue.asInstanceOf[TxnOffsetCommitResponse]
+      verify(requestChannel).sendResponse(
+        ArgumentMatchers.eq(request),
+        capturedResponse.capture(),
+        ArgumentMatchers.eq(None)
+      )
+      val response = capturedResponse.getValue
 
       if (version < 2) {
         assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, response.errors().get(topicPartition))
@@ -1332,10 +1287,10 @@ class KafkaApisTest {
 
     for (version <- ApiKeys.INIT_PRODUCER_ID.oldestVersion to ApiKeys.INIT_PRODUCER_ID.latestVersion) {
 
-      EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+      reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
 
-      val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
-      val responseCallback: Capture[InitProducerIdResult => Unit] = EasyMock.newCapture()
+      val capturedResponse: ArgumentCaptor[InitProducerIdResponse] = ArgumentCaptor.forClass(classOf[InitProducerIdResponse])
+      val responseCallback: ArgumentCaptor[InitProducerIdResult => Unit] = ArgumentCaptor.forClass(classOf[InitProducerIdResult => Unit])
 
       val transactionalId = "txnId"
       val producerId = if (version < 3)
@@ -1366,26 +1321,22 @@ class KafkaApisTest {
         Option(new ProducerIdAndEpoch(producerId, epoch))
 
       val requestLocal = RequestLocal.withThreadConfinedCaching
-      EasyMock.expect(txnCoordinator.handleInitProducerId(
-        EasyMock.eq(transactionalId),
-        EasyMock.eq(txnTimeoutMs),
-        EasyMock.eq(expectedProducerIdAndEpoch),
-        EasyMock.capture(responseCallback),
-        EasyMock.eq(requestLocal)
-      )).andAnswer(
-        () => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED)))
-
-      EasyMock.expect(requestChannel.sendResponse(
-        EasyMock.eq(request),
-        EasyMock.capture(capturedResponse),
-        EasyMock.eq(None)
-      ))
-
-      EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+      when(txnCoordinator.handleInitProducerId(
+        ArgumentMatchers.eq(transactionalId),
+        ArgumentMatchers.eq(txnTimeoutMs),
+        ArgumentMatchers.eq(expectedProducerIdAndEpoch),
+        responseCallback.capture(),
+        ArgumentMatchers.eq(requestLocal)
+      )).thenAnswer(_ => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED)))
 
       createKafkaApis().handleInitProducerIdRequest(request, requestLocal)
 
-      val response = capturedResponse.getValue.asInstanceOf[InitProducerIdResponse]
+      verify(requestChannel).sendResponse(
+        ArgumentMatchers.eq(request),
+        capturedResponse.capture(),
+        ArgumentMatchers.eq(None)
+      )
+      val response = capturedResponse.getValue
 
       if (version < 4) {
         assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
@@ -1402,10 +1353,10 @@ class KafkaApisTest {
 
     for (version <- ApiKeys.ADD_OFFSETS_TO_TXN.oldestVersion to ApiKeys.ADD_OFFSETS_TO_TXN.latestVersion) {
 
-      EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator, txnCoordinator)
+      reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator, txnCoordinator)
 
-      val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
-      val responseCallback: Capture[Errors => Unit] = EasyMock.newCapture()
+      val capturedResponse: ArgumentCaptor[AddOffsetsToTxnResponse] = ArgumentCaptor.forClass(classOf[AddOffsetsToTxnResponse])
+      val responseCallback: ArgumentCaptor[Errors => Unit] = ArgumentCaptor.forClass(classOf[Errors => Unit])
 
       val groupId = "groupId"
       val transactionalId = "txnId"
@@ -1422,32 +1373,28 @@ class KafkaApisTest {
       val request = buildRequest(addOffsetsToTxnRequest)
 
       val partition = 1
-      EasyMock.expect(groupCoordinator.partitionFor(
-        EasyMock.eq(groupId)
-      )).andReturn(partition)
+      when(groupCoordinator.partitionFor(
+        ArgumentMatchers.eq(groupId)
+      )).thenReturn(partition)
 
       val requestLocal = RequestLocal.withThreadConfinedCaching
-      EasyMock.expect(txnCoordinator.handleAddPartitionsToTransaction(
-        EasyMock.eq(transactionalId),
-        EasyMock.eq(producerId),
-        EasyMock.eq(epoch),
-        EasyMock.eq(Set(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))),
-        EasyMock.capture(responseCallback),
-        EasyMock.eq(requestLocal)
-      )).andAnswer(
-        () => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
-
-      EasyMock.expect(requestChannel.sendResponse(
-        EasyMock.eq(request),
-        EasyMock.capture(capturedResponse),
-        EasyMock.eq(None)
-      ))
-
-      EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, groupCoordinator)
+      when(txnCoordinator.handleAddPartitionsToTransaction(
+        ArgumentMatchers.eq(transactionalId),
+        ArgumentMatchers.eq(producerId),
+        ArgumentMatchers.eq(epoch),
+        ArgumentMatchers.eq(Set(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))),
+        responseCallback.capture(),
+        ArgumentMatchers.eq(requestLocal)
+      )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
 
       createKafkaApis().handleAddOffsetsToTxnRequest(request, requestLocal)
 
-      val response = capturedResponse.getValue.asInstanceOf[AddOffsetsToTxnResponse]
+      verify(requestChannel).sendResponse(
+        ArgumentMatchers.eq(request),
+        capturedResponse.capture(),
+        ArgumentMatchers.eq(None)
+      )
+      val response = capturedResponse.getValue
 
       if (version < 2) {
         assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
@@ -1464,10 +1411,10 @@ class KafkaApisTest {
 
     for (version <- ApiKeys.ADD_PARTITIONS_TO_TXN.oldestVersion to ApiKeys.ADD_PARTITIONS_TO_TXN.latestVersion) {
 
-      EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+      reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
 
-      val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
-      val responseCallback: Capture[Errors => Unit] = EasyMock.newCapture()
+      val capturedResponse: ArgumentCaptor[AddPartitionsToTxnResponse] = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnResponse])
+      val responseCallback: ArgumentCaptor[Errors => Unit] = ArgumentCaptor.forClass(classOf[Errors => Unit])
 
       val transactionalId = "txnId"
       val producerId = 15L
@@ -1485,27 +1432,23 @@ class KafkaApisTest {
       val request = buildRequest(addPartitionsToTxnRequest)
 
       val requestLocal = RequestLocal.withThreadConfinedCaching
-      EasyMock.expect(txnCoordinator.handleAddPartitionsToTransaction(
-        EasyMock.eq(transactionalId),
-        EasyMock.eq(producerId),
-        EasyMock.eq(epoch),
-        EasyMock.eq(Set(topicPartition)),
-        EasyMock.capture(responseCallback),
-        EasyMock.eq(requestLocal)
-      )).andAnswer(
-        () => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
-
-      EasyMock.expect(requestChannel.sendResponse(
-        EasyMock.eq(request),
-        EasyMock.capture(capturedResponse),
-        EasyMock.eq(None)
-      ))
-
-      EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+      when(txnCoordinator.handleAddPartitionsToTransaction(
+        ArgumentMatchers.eq(transactionalId),
+        ArgumentMatchers.eq(producerId),
+        ArgumentMatchers.eq(epoch),
+        ArgumentMatchers.eq(Set(topicPartition)),
+        responseCallback.capture(),
+        ArgumentMatchers.eq(requestLocal)
+      )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
 
       createKafkaApis().handleAddPartitionToTxnRequest(request, requestLocal)
 
-      val response = capturedResponse.getValue.asInstanceOf[AddPartitionsToTxnResponse]
+      verify(requestChannel).sendResponse(
+        ArgumentMatchers.eq(request),
+        capturedResponse.capture(),
+        ArgumentMatchers.eq(None)
+      )
+      val response = capturedResponse.getValue
 
       if (version < 2) {
         assertEquals(Collections.singletonMap(topicPartition, Errors.INVALID_PRODUCER_EPOCH), response.errors())
@@ -1521,10 +1464,10 @@ class KafkaApisTest {
     addTopicToMetadataCache(topic, numPartitions = 2)
 
     for (version <- ApiKeys.END_TXN.oldestVersion to ApiKeys.END_TXN.latestVersion) {
-      EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+      reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
 
-      val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
-      val responseCallback: Capture[Errors => Unit]  = EasyMock.newCapture()
+      val capturedResponse: ArgumentCaptor[EndTxnResponse] = ArgumentCaptor.forClass(classOf[EndTxnResponse])
+      val responseCallback: ArgumentCaptor[Errors => Unit] = ArgumentCaptor.forClass(classOf[Errors => Unit])
 
       val transactionalId = "txnId"
       val producerId = 15L
@@ -1540,26 +1483,23 @@ class KafkaApisTest {
       val request = buildRequest(endTxnRequest)
 
       val requestLocal = RequestLocal.withThreadConfinedCaching
-      EasyMock.expect(txnCoordinator.handleEndTransaction(
-        EasyMock.eq(transactionalId),
-        EasyMock.eq(producerId),
-        EasyMock.eq(epoch),
-        EasyMock.eq(TransactionResult.COMMIT),
-        EasyMock.capture(responseCallback),
-        EasyMock.eq(requestLocal)
-      )).andAnswer(
-        () => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
-
-      EasyMock.expect(requestChannel.sendResponse(
-        EasyMock.eq(request),
-        EasyMock.capture(capturedResponse),
-        EasyMock.eq(None)
-      ))
+      when(txnCoordinator.handleEndTransaction(
+        ArgumentMatchers.eq(transactionalId),
+        ArgumentMatchers.eq(producerId),
+        ArgumentMatchers.eq(epoch),
+        ArgumentMatchers.eq(TransactionResult.COMMIT),
+        responseCallback.capture(),
+        ArgumentMatchers.eq(requestLocal)
+      )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
 
-      EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
       createKafkaApis().handleEndTxnRequest(request, requestLocal)
 
-      val response = capturedResponse.getValue.asInstanceOf[EndTxnResponse]
+      verify(requestChannel).sendResponse(
+        ArgumentMatchers.eq(request),
+        capturedResponse.capture(),
+        ArgumentMatchers.eq(None)
+      )
+      val response = capturedResponse.getValue
 
       if (version < 2) {
         assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
@@ -1576,9 +1516,9 @@ class KafkaApisTest {
 
     for (version <- ApiKeys.PRODUCE.oldestVersion to ApiKeys.PRODUCE.latestVersion) {
 
-      EasyMock.reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+      reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
 
-      val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+      val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
 
       val tp = new TopicPartition("topic", 0)
 
@@ -1595,25 +1535,25 @@ class KafkaApisTest {
         .build(version.toShort)
       val request = buildRequest(produceRequest)
 
-      EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
-        EasyMock.anyShort(),
-        EasyMock.eq(false),
-        EasyMock.eq(AppendOrigin.Client),
-        EasyMock.anyObject(),
-        EasyMock.capture(responseCallback),
-        EasyMock.anyObject(),
-        EasyMock.anyObject(),
-        EasyMock.anyObject())
-      ).andAnswer(() => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))))
-
-      val capturedResponse = expectNoThrottling(request)
-      EasyMock.expect(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
-        anyObject[RequestChannel.Request](), anyDouble, anyLong)).andReturn(0)
-
-      EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+      when(replicaManager.appendRecords(anyLong,
+        anyShort,
+        ArgumentMatchers.eq(false),
+        ArgumentMatchers.eq(AppendOrigin.Client),
+        any(),
+        responseCallback.capture(),
+        any(),
+        any(),
+        any())
+      ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))))
+
+      when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+        any[Long])).thenReturn(0)
+      when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+        any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
 
       createKafkaApis().handleProduceRequest(request, RequestLocal.withThreadConfinedCaching)
 
+      val capturedResponse = verifyNoThrottling(request)
       val response = capturedResponse.getValue.asInstanceOf[ProduceResponse]
 
       assertEquals(1, response.data.responses.size)
@@ -1630,7 +1570,7 @@ class KafkaApisTest {
     addTopicToMetadataCache(topic, numPartitions = 1)
 
     def checkInvalidPartition(invalidPartitionId: Int): Unit = {
-      EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+      reset(replicaManager, clientRequestQuotaManager, requestChannel)
 
       val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
       val addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(
@@ -1638,10 +1578,11 @@ class KafkaApisTest {
       ).build()
       val request = buildRequest(addPartitionsToTxnRequest)
 
-      val capturedResponse = expectNoThrottling(request)
-      EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
+      when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+        any[Long])).thenReturn(0)
       createKafkaApis().handleAddPartitionToTxnRequest(request, RequestLocal.withThreadConfinedCaching)
 
+      val capturedResponse = verifyNoThrottling(request)
       val response = capturedResponse.getValue.asInstanceOf[AddPartitionsToTxnResponse]
       assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
     }
@@ -1683,44 +1624,41 @@ class KafkaApisTest {
   @Test
   def shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired(): Unit = {
     val topicPartition = new TopicPartition("t", 0)
-    val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(asList(topicPartition))
+    val (_, request) = createWriteTxnMarkersRequest(asList(topicPartition))
     val expectedErrors = Map(topicPartition -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT).asJava
-    val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
-
-    EasyMock.expect(replicaManager.getMagic(topicPartition))
-      .andReturn(Some(RecordBatch.MAGIC_VALUE_V1))
-    EasyMock.expect(requestChannel.sendResponse(
-      EasyMock.eq(request),
-      EasyMock.capture(capturedResponse),
-      EasyMock.eq(None)
-    ))
-    EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
+    val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
+
+    when(replicaManager.getMagic(topicPartition))
+      .thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
 
     createKafkaApis().handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching)
 
-    val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
+    verify(requestChannel).sendResponse(
+      ArgumentMatchers.eq(request),
+      capturedResponse.capture(),
+      ArgumentMatchers.eq(None)
+    )
+    val markersResponse = capturedResponse.getValue
     assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
   }
 
   @Test
   def shouldRespondWithUnknownTopicWhenPartitionIsNotHosted(): Unit = {
     val topicPartition = new TopicPartition("t", 0)
-    val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(asList(topicPartition))
+    val (_, request) = createWriteTxnMarkersRequest(asList(topicPartition))
     val expectedErrors = Map(topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION).asJava
-    val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
-
-    EasyMock.expect(replicaManager.getMagic(topicPartition))
-      .andReturn(None)
-    EasyMock.expect(requestChannel.sendResponse(
-      EasyMock.eq(request),
-      EasyMock.capture(capturedResponse),
-      EasyMock.eq(None)
-    ))
-    EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
+    val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
 
+    when(replicaManager.getMagic(topicPartition))
+      .thenReturn(None)
     createKafkaApis().handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching)
 
-    val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
+    verify(requestChannel).sendResponse(
+      ArgumentMatchers.eq(request),
+      capturedResponse.capture(),
+      ArgumentMatchers.eq(None)
+    )
+    val markersResponse = capturedResponse.getValue
     assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
   }
 
@@ -1728,41 +1666,38 @@ class KafkaApisTest {
   def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
     val tp1 = new TopicPartition("t", 0)
     val tp2 = new TopicPartition("t1", 0)
-    val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(asList(tp1, tp2))
+    val (_, request) = createWriteTxnMarkersRequest(asList(tp1, tp2))
     val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava
 
-    val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
-    val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+    val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
+    val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
 
-    EasyMock.expect(replicaManager.getMagic(tp1))
-      .andReturn(Some(RecordBatch.MAGIC_VALUE_V1))
-    EasyMock.expect(replicaManager.getMagic(tp2))
-      .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+    when(replicaManager.getMagic(tp1))
+      .thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
+    when(replicaManager.getMagic(tp2))
+      .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
 
     val requestLocal = RequestLocal.withThreadConfinedCaching
-    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
-      EasyMock.anyShort(),
-      EasyMock.eq(true),
-      EasyMock.eq(AppendOrigin.Coordinator),
-      EasyMock.anyObject(),
-      EasyMock.capture(responseCallback),
-      EasyMock.anyObject(),
-      EasyMock.anyObject(),
-      EasyMock.eq(requestLocal))
-    ).andAnswer(() => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
-
-    EasyMock.expect(requestChannel.sendResponse(
-      EasyMock.eq(request),
-      EasyMock.capture(capturedResponse),
-      EasyMock.eq(None)
-    ))
-    EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
+    when(replicaManager.appendRecords(anyLong,
+      anyShort,
+      ArgumentMatchers.eq(true),
+      ArgumentMatchers.eq(AppendOrigin.Coordinator),
+      any(),
+      responseCallback.capture(),
+      any(),
+      any(),
+      ArgumentMatchers.eq(requestLocal))
+    ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
 
     createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal)
 
-    val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
+    verify(requestChannel).sendResponse(
+      ArgumentMatchers.eq(request),
+      capturedResponse.capture(),
+      ArgumentMatchers.eq(None)
+    )
+    val markersResponse = capturedResponse.getValue
     assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
-    EasyMock.verify(replicaManager)
   }
 
   @Test
@@ -1830,108 +1765,90 @@ class KafkaApisTest {
     ).build()
     val request = buildRequest(stopReplicaRequest)
 
-    EasyMock.expect(replicaManager.stopReplicas(
-      EasyMock.eq(request.context.correlationId),
-      EasyMock.eq(controllerId),
-      EasyMock.eq(controllerEpoch),
-      EasyMock.eq(stopReplicaRequest.partitionStates().asScala)
-    )).andReturn(
+    when(replicaManager.stopReplicas(
+      ArgumentMatchers.eq(request.context.correlationId),
+      ArgumentMatchers.eq(controllerId),
+      ArgumentMatchers.eq(controllerEpoch),
+      ArgumentMatchers.eq(stopReplicaRequest.partitionStates().asScala)
+    )).thenReturn(
       (mutable.Map(
         groupMetadataPartition -> Errors.NONE,
         txnStatePartition -> Errors.NONE,
         fooPartition -> Errors.NONE
       ), Errors.NONE)
     )
-    EasyMock.expect(controller.brokerEpoch).andStubReturn(brokerEpoch)
+    when(controller.brokerEpoch).thenReturn(brokerEpoch)
 
-    if (deletePartition) {
-      if (leaderEpoch >= 0) {
-        txnCoordinator.onResignation(txnStatePartition.partition, Some(leaderEpoch))
-      } else {
-        txnCoordinator.onResignation(txnStatePartition.partition, None)
-      }
-      EasyMock.expectLastCall()
-    }
+    createKafkaApis().handleStopReplicaRequest(request)
 
     if (deletePartition) {
       if (leaderEpoch >= 0) {
-        groupCoordinator.onResignation(groupMetadataPartition.partition, Some(leaderEpoch))
+        verify(txnCoordinator).onResignation(txnStatePartition.partition, Some(leaderEpoch))
+        verify(groupCoordinator).onResignation(groupMetadataPartition.partition, Some(leaderEpoch))
       } else {
-        groupCoordinator.onResignation(groupMetadataPartition.partition, None)
+        verify(txnCoordinator).onResignation(txnStatePartition.partition, None)
+        verify(groupCoordinator).onResignation(groupMetadataPartition.partition, None)
       }
-      EasyMock.expectLastCall()
     }
-
-    EasyMock.replay(controller, replicaManager, txnCoordinator, groupCoordinator)
-
-    createKafkaApis().handleStopReplicaRequest(request)
-
-    EasyMock.verify(txnCoordinator, groupCoordinator)
   }
 
   @Test
   def shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
     val tp1 = new TopicPartition("t", 0)
     val tp2 = new TopicPartition("t1", 0)
-    val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(asList(tp1, tp2))
+    val (_, request) = createWriteTxnMarkersRequest(asList(tp1, tp2))
     val expectedErrors = Map(tp1 -> Errors.UNKNOWN_TOPIC_OR_PARTITION, tp2 -> Errors.NONE).asJava
 
-    val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
-    val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+    val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
+    val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
 
-    EasyMock.expect(replicaManager.getMagic(tp1))
-      .andReturn(None)
-    EasyMock.expect(replicaManager.getMagic(tp2))
-      .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+    when(replicaManager.getMagic(tp1))
+      .thenReturn(None)
+    when(replicaManager.getMagic(tp2))
+      .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
 
     val requestLocal = RequestLocal.withThreadConfinedCaching
-    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
-      EasyMock.anyShort(),
-      EasyMock.eq(true),
-      EasyMock.eq(AppendOrigin.Coordinator),
-      EasyMock.anyObject(),
-      EasyMock.capture(responseCallback),
-      EasyMock.anyObject(),
-      EasyMock.anyObject(),
-      EasyMock.eq(requestLocal))
-    ).andAnswer(() => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
-
-    EasyMock.expect(requestChannel.sendResponse(
-      EasyMock.eq(request),
-      EasyMock.capture(capturedResponse),
-      EasyMock.eq(None)
-    ))
-    EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
+    when(replicaManager.appendRecords(anyLong,
+      anyShort,
+      ArgumentMatchers.eq(true),
+      ArgumentMatchers.eq(AppendOrigin.Coordinator),
+      any(),
+      responseCallback.capture(),
+      any(),
+      any(),
+      ArgumentMatchers.eq(requestLocal))
+    ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
 
     createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal)
+    verify(requestChannel).sendResponse(
+      ArgumentMatchers.eq(request),
+      capturedResponse.capture(),
+      ArgumentMatchers.eq(None)
+    )
 
-    val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
+    val markersResponse = capturedResponse.getValue
     assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
-    EasyMock.verify(replicaManager)
   }
 
   @Test
   def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = {
     val topicPartition = new TopicPartition("t", 0)
     val request = createWriteTxnMarkersRequest(asList(topicPartition))._2
-    EasyMock.expect(replicaManager.getMagic(topicPartition))
-      .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+    when(replicaManager.getMagic(topicPartition))
+      .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
 
     val requestLocal = RequestLocal.withThreadConfinedCaching
-    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
-      EasyMock.anyShort(),
-      EasyMock.eq(true),
-      EasyMock.eq(AppendOrigin.Coordinator),
-      EasyMock.anyObject(),
-      EasyMock.anyObject(),
-      EasyMock.anyObject(),
-      EasyMock.anyObject(),
-      EasyMock.eq(requestLocal)))
-
-    EasyMock.replay(replicaManager)
 
     createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal)
-    EasyMock.verify(replicaManager)
+    verify(replicaManager).appendRecords(anyLong,
+      anyShort,
+      ArgumentMatchers.eq(true),
+      ArgumentMatchers.eq(AppendOrigin.Coordinator),
+      any(),
+      any(),
+      any(),
+      any(),
+      ArgumentMatchers.eq(requestLocal))
   }
 
   @Test
@@ -1966,20 +1883,21 @@ class KafkaApisTest {
     val memberSummary = MemberSummary("memberid", Some("instanceid"), "clientid", "clienthost", metadata, assignment)
     val groupSummary = GroupSummary("Stable", "consumer", "roundrobin", List(memberSummary))
 
-    EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+    reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
 
     val describeGroupsRequest = new DescribeGroupsRequest.Builder(
       new DescribeGroupsRequestData().setGroups(List(groupId).asJava)
     ).build()
     val request = buildRequest(describeGroupsRequest)
 
-    val capturedResponse = expectNoThrottling(request)
-    EasyMock.expect(groupCoordinator.handleDescribeGroup(EasyMock.eq(groupId)))
-      .andReturn((Errors.NONE, groupSummary))
-    EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+    when(groupCoordinator.handleDescribeGroup(ArgumentMatchers.eq(groupId)))
+      .thenReturn((Errors.NONE, groupSummary))
 
     createKafkaApis().handleDescribeGroupRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[DescribeGroupsResponse]
 
     val group = response.data.groups().get(0)
@@ -2005,7 +1923,7 @@ class KafkaApisTest {
     addTopicToMetadataCache("topic-1", numPartitions = 2)
     addTopicToMetadataCache("topic-2", numPartitions = 2)
 
-    EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+    reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
 
     val topics = new OffsetDeleteRequestTopicCollection()
     topics.add(new OffsetDeleteRequestTopic()
@@ -2027,27 +1945,27 @@ class KafkaApisTest {
     val request = buildRequest(offsetDeleteRequest)
 
     val requestLocal = RequestLocal.withThreadConfinedCaching
-    val capturedResponse = expectNoThrottling(request)
-    EasyMock.expect(groupCoordinator.handleDeleteOffsets(
-      EasyMock.eq(group),
-      EasyMock.eq(Seq(
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+    when(groupCoordinator.handleDeleteOffsets(
+      ArgumentMatchers.eq(group),
+      ArgumentMatchers.eq(Seq(
         new TopicPartition("topic-1", 0),
         new TopicPartition("topic-1", 1),
         new TopicPartition("topic-2", 0),
         new TopicPartition("topic-2", 1)
       )),
-      EasyMock.eq(requestLocal)
-    )).andReturn((Errors.NONE, Map(
+      ArgumentMatchers.eq(requestLocal)
+    )).thenReturn((Errors.NONE, Map(
       new TopicPartition("topic-1", 0) -> Errors.NONE,
       new TopicPartition("topic-1", 1) -> Errors.NONE,
       new TopicPartition("topic-2", 0) -> Errors.NONE,
       new TopicPartition("topic-2", 1) -> Errors.NONE,
     )))
 
-    EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
-
     createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[OffsetDeleteResponse]
 
     def errorForPartition(topic: String, partition: Int): Errors = {
@@ -2068,7 +1986,7 @@ class KafkaApisTest {
     addTopicToMetadataCache(topic, numPartitions = 1)
 
     def checkInvalidPartition(invalidPartitionId: Int): Unit = {
-      EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+      reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
 
       val topics = new OffsetDeleteRequestTopicCollection()
       topics.add(new OffsetDeleteRequestTopic()
@@ -2081,15 +1999,16 @@ class KafkaApisTest {
           .setTopics(topics)
       ).build()
       val request = buildRequest(offsetDeleteRequest)
-      val capturedResponse = expectNoThrottling(request)
+      when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+        any[Long])).thenReturn(0)
 
       val requestLocal = RequestLocal.withThreadConfinedCaching
-      EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty),
-        EasyMock.eq(requestLocal))).andReturn((Errors.NONE, Map.empty))
-      EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+      when(groupCoordinator.handleDeleteOffsets(ArgumentMatchers.eq(group), ArgumentMatchers.eq(Seq.empty),
+        ArgumentMatchers.eq(requestLocal))).thenReturn((Errors.NONE, Map.empty[TopicPartition, Errors]))
 
       createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
 
+      val capturedResponse = verifyNoThrottling(request)
       val response = capturedResponse.getValue.asInstanceOf[OffsetDeleteResponse]
 
       assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
@@ -2104,7 +2023,7 @@ class KafkaApisTest {
   def testOffsetDeleteWithInvalidGroup(): Unit = {
     val group = "groupId"
 
-    EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+    reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
 
     val offsetDeleteRequest = new OffsetDeleteRequest.Builder(
       new OffsetDeleteRequestData()
@@ -2112,14 +2031,15 @@ class KafkaApisTest {
     ).build()
     val request = buildRequest(offsetDeleteRequest)
 
-    val capturedResponse = expectNoThrottling(request)
     val requestLocal = RequestLocal.withThreadConfinedCaching
-    EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty),
-      EasyMock.eq(requestLocal))).andReturn((Errors.GROUP_ID_NOT_FOUND, Map.empty))
-    EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+    when(groupCoordinator.handleDeleteOffsets(ArgumentMatchers.eq(group), ArgumentMatchers.eq(Seq.empty),
+      ArgumentMatchers.eq(requestLocal))).thenReturn((Errors.GROUP_ID_NOT_FOUND, Map.empty[TopicPartition, Errors]))
 
     createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[OffsetDeleteResponse]
 
     assertEquals(Errors.GROUP_ID_NOT_FOUND, Errors.forCode(response.data.errorCode))
@@ -2130,13 +2050,13 @@ class KafkaApisTest {
     val isolationLevel = IsolationLevel.READ_UNCOMMITTED
     val currentLeaderEpoch = Optional.of[Integer](15)
 
-    EasyMock.expect(replicaManager.fetchOffsetForTimestamp(
-      EasyMock.eq(tp),
-      EasyMock.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP),
-      EasyMock.eq(Some(isolationLevel)),
-      EasyMock.eq(currentLeaderEpoch),
-      fetchOnlyFromLeader = EasyMock.eq(true))
-    ).andThrow(error.exception)
+    when(replicaManager.fetchOffsetForTimestamp(
+      ArgumentMatchers.eq(tp),
+      ArgumentMatchers.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP),
+      ArgumentMatchers.eq(Some(isolationLevel)),
+      ArgumentMatchers.eq(currentLeaderEpoch),
+      fetchOnlyFromLeader = ArgumentMatchers.eq(true))
+    ).thenThrow(error.exception)
 
     val targetTimes = List(new ListOffsetsTopic()
       .setName(tp.topic)
@@ -2147,11 +2067,12 @@ class KafkaApisTest {
     val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false)
       .setTargetTimes(targetTimes).build()
     val request = buildRequest(listOffsetRequest)
-    val capturedResponse = expectNoThrottling(request)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
     createKafkaApis().handleListOffsetRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[ListOffsetsResponse]
     val partitionDataOptional = response.topics.asScala.find(_.name == tp.topic).get
       .partitions.asScala.find(_.partitionIndex == tp.partition)
@@ -2205,52 +2126,46 @@ class KafkaApisTest {
    * authorization but before checking in MetadataCache.
    */
   @Test
-  def getAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition(): Unit = {
+  def testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition(): Unit = {
     // Setup: authorizer authorizes 2 topics, but one got deleted in metadata cache
-    metadataCache =
-      EasyMock.partialMockBuilder(classOf[ZkMetadataCache])
-        .withConstructor(classOf[Int])
-        .withArgs(Int.box(brokerId))  // Need to box it for Scala 2.12 and before
-        .addMockedMethod("getAllTopics")
-        .addMockedMethod("getTopicMetadata")
-        .createMock()
+    metadataCache = mock(classOf[ZkMetadataCache])
+    when(metadataCache.getAliveBrokerNodes(any())).thenReturn(List(new Node(brokerId,"localhost", 0)))
+    when(metadataCache.getControllerId).thenReturn(None)
 
     // 2 topics returned for authorization in during handle
     val topicsReturnedFromMetadataCacheForAuthorization = Set("remaining-topic", "later-deleted-topic")
-    expect(metadataCache.getAllTopics()).andReturn(topicsReturnedFromMetadataCacheForAuthorization).once()
+    when(metadataCache.getAllTopics()).thenReturn(topicsReturnedFromMetadataCacheForAuthorization)
     // 1 topic is deleted from metadata right at the time between authorization and the next getTopicMetadata() call
-    expect(metadataCache.getTopicMetadata(
-      EasyMock.eq(topicsReturnedFromMetadataCacheForAuthorization),
-      anyObject[ListenerName],
+    when(metadataCache.getTopicMetadata(
+      ArgumentMatchers.eq(topicsReturnedFromMetadataCacheForAuthorization),
+      any[ListenerName],
       anyBoolean,
       anyBoolean
-    )).andStubReturn(Seq(
+    )).thenReturn(Seq(
       new MetadataResponseTopic()
         .setErrorCode(Errors.NONE.code)
         .setName("remaining-topic")
         .setIsInternal(false)
     ))
 
-    EasyMock.replay(metadataCache)
 
-    var createTopicIsCalled: Boolean = false;
+    var createTopicIsCalled: Boolean = false
     // Specific mock on zkClient for this use case
     // Expect it's never called to do auto topic creation
-    expect(zkClient.setOrCreateEntityConfigs(
-      EasyMock.eq(ConfigType.Topic),
-      EasyMock.anyString,
-      EasyMock.anyObject[Properties]
-    )).andStubAnswer(() => {
+    when(zkClient.setOrCreateEntityConfigs(
+      ArgumentMatchers.eq(ConfigType.Topic),
+      anyString,
+      any[Properties]
+    )).thenAnswer(_ => {
       createTopicIsCalled = true
     })
     // No need to use
-    expect(zkClient.getAllBrokersInCluster)
-      .andStubReturn(Seq(new Broker(
+    when(zkClient.getAllBrokersInCluster)
+      .thenReturn(Seq(new Broker(
         brokerId, "localhost", 9902,
         ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT
       )))
 
-    EasyMock.replay(zkClient)
 
     val (requestListener, _) = updateMetadataCacheWithInconsistentListeners()
     val response = sendMetadataRequestWithInconsistentListeners(requestListener)
@@ -2277,7 +2192,7 @@ class KafkaApisTest {
       ).asJava)
 
     // 2. Set up authorizer
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
     val unauthorizedTopic = "unauthorized-topic"
     val authorizedTopic = "authorized-topic"
 
@@ -2287,20 +2202,20 @@ class KafkaApisTest {
     )
 
     // Here we need to use AuthHelperTest.matchSameElements instead of EasyMock.eq since the order of the request is unknown
-    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], AuthHelperTest.matchSameElements(expectedActions.asJava)))
-      .andAnswer { () =>
-      val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
+    when(authorizer.authorize(any[RequestContext], argThat((t: java.util.List[Action]) => t.containsAll(expectedActions.asJava))))
+      .thenAnswer { invocation =>
+      val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]].asScala
       actions.map { action =>
         if (action.resourcePattern().name().equals(authorizedTopic))
           AuthorizationResult.ALLOWED
         else
           AuthorizationResult.DENIED
       }.asJava
-    }.times(2)
+    }
 
     // 3. Set up MetadataCache
     val authorizedTopicId = Uuid.randomUuid()
-    val unauthorizedTopicId = Uuid.randomUuid();
+    val unauthorizedTopicId = Uuid.randomUuid()
 
     val topicIds = new util.HashMap[String, Uuid]()
     topicIds.put(authorizedTopic, authorizedTopicId)
@@ -2328,10 +2243,11 @@ class KafkaApisTest {
     // 4. Send TopicMetadataReq using topicId
     val metadataReqByTopicId = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, unauthorizedTopicId)).build()
     val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener)
-    val capturedMetadataByTopicIdResp = expectNoThrottling(repByTopicId)
-    EasyMock.replay(clientRequestQuotaManager, requestChannel, authorizer)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
 
     createKafkaApis(authorizer = Some(authorizer)).handleTopicMetadataRequest(repByTopicId)
+    val capturedMetadataByTopicIdResp = verifyNoThrottling(repByTopicId)
     val metadataByTopicIdResp = capturedMetadataByTopicIdResp.getValue.asInstanceOf[MetadataResponse]
 
     val metadataByTopicId = metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv => (kv._1, kv._2.head))
@@ -2349,13 +2265,12 @@ class KafkaApisTest {
     }
 
     // 4. Send TopicMetadataReq using topic name
-    EasyMock.reset(clientRequestQuotaManager, requestChannel)
+    reset(clientRequestQuotaManager, requestChannel)
     val metadataReqByTopicName = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopic, unauthorizedTopic), false).build()
     val repByTopicName = buildRequest(metadataReqByTopicName, plaintextListener)
-    val capturedMetadataByTopicNameResp = expectNoThrottling(repByTopicName)
-    EasyMock.replay(clientRequestQuotaManager, requestChannel)
 
     createKafkaApis(authorizer = Some(authorizer)).handleTopicMetadataRequest(repByTopicName)
+    val capturedMetadataByTopicNameResp = verifyNoThrottling(repByTopicName)
     val metadataByTopicNameResp = capturedMetadataByTopicNameResp.getValue.asInstanceOf[MetadataResponse]
 
     val metadataByTopicName = metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv => (kv._1, kv._2.head))
@@ -2384,21 +2299,18 @@ class KafkaApisTest {
     val hw = 3
     val timestamp = 1000
 
-    expect(replicaManager.getLogConfig(EasyMock.eq(tp))).andReturn(None)
-
-    replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean,
-      anyObject[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], anyObject[ReplicaQuota],
-      anyObject[Seq[(TopicIdPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel],
-      anyObject[Option[ClientMetadata]])
-    expectLastCall[Unit].andAnswer(new IAnswer[Unit] {
-      def answer: Unit = {
-        val callback = getCurrentArguments.apply(7)
-          .asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]
-        val records = MemoryRecords.withRecords(CompressionType.NONE,
-          new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8)))
-        callback(Seq(tidp -> FetchPartitionData(Errors.NONE, hw, 0, records,
-          None, None, None, Option.empty, isReassignmentFetch = false)))
-      }
+    when(replicaManager.getLogConfig(ArgumentMatchers.eq(tp))).thenReturn(None)
+
+    when(replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean,
+      any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota],
+      any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit](), any[IsolationLevel],
+      any[Option[ClientMetadata]])
+    ).thenAnswer(invocation => {
+      val callback = invocation.getArgument(7).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]
+      val records = MemoryRecords.withRecords(CompressionType.NONE,
+        new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8)))
+      callback(Seq(tidp -> FetchPartitionData(Errors.NONE, hw, 0, records,
+        None, None, None, Option.empty, isReassignmentFetch = false)))
     })
 
     val fetchData = Map(tidp -> new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000,
@@ -2408,25 +2320,24 @@ class KafkaApisTest {
     val fetchMetadata = new JFetchMetadata(0, 0)
     val fetchContext = new FullFetchContext(time, new FetchSessionCache(1000, 100),
       fetchMetadata, fetchData, false, false)
-    expect(fetchManager.newContext(
-      anyObject[Short],
-      anyObject[JFetchMetadata],
-      anyObject[Boolean],
-      anyObject[util.Map[TopicIdPartition, FetchRequest.PartitionData]],
-      anyObject[util.List[TopicIdPartition]],
-      anyObject[util.Map[Uuid, String]])).andReturn(fetchContext)
+    when(fetchManager.newContext(
+      any[Short],
+      any[JFetchMetadata],
+      any[Boolean],
+      any[util.Map[TopicIdPartition, FetchRequest.PartitionData]],
+      any[util.List[TopicIdPartition]],
+      any[util.Map[Uuid, String]])).thenReturn(fetchContext)
 
-    EasyMock.expect(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
-      anyObject[RequestChannel.Request](), anyDouble, anyLong)).andReturn(0)
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
 
     val fetchRequest = new FetchRequest.Builder(9, 9, -1, 100, 0, fetchDataBuilder)
       .build()
     val request = buildRequest(fetchRequest)
-    val capturedResponse = expectNoThrottling(request)
 
-    EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, fetchManager)
     createKafkaApis().handleFetchRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[FetchResponse]
     val responseData = response.responseData(metadataCache.topicIdsToNames(), 9)
     assertTrue(responseData.containsKey(tp))
@@ -2452,7 +2363,7 @@ class KafkaApisTest {
     addTopicToMetadataCache(foo.topic, 1, topicId = foo.topicId)
 
     // We will never return a logConfig when the topic name is null. This is ok since we won't have any records to convert.
-    expect(replicaManager.getLogConfig(EasyMock.eq(unresolvedFoo.topicPartition))).andReturn(None)
+    when(replicaManager.getLogConfig(ArgumentMatchers.eq(unresolvedFoo.topicPartition))).thenReturn(None)
 
     // Simulate unknown topic ID in the context
     val fetchData = Map(new TopicIdPartition(foo.topicId, new TopicPartition(null, foo.partition)) ->
@@ -2463,27 +2374,26 @@ class KafkaApisTest {
     val fetchContext = new FullFetchContext(time, new FetchSessionCache(1000, 100),
       fetchMetadata, fetchData, true, replicaId >= 0)
     // We expect to have the resolved partition, but we will simulate an unknown one with the fetchContext we return.
-    expect(fetchManager.newContext(
+    when(fetchManager.newContext(
       ApiKeys.FETCH.latestVersion,
       fetchMetadata,
       replicaId >= 0,
       Collections.singletonMap(foo, new FetchRequest.PartitionData(foo.topicId, 0, 0, 1000, Optional.empty())),
       Collections.emptyList[TopicIdPartition],
       metadataCache.topicIdsToNames())
-    ).andReturn(fetchContext)
+    ).thenReturn(fetchContext)
 
-    EasyMock.expect(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
-      anyObject[RequestChannel.Request](), anyDouble, anyLong)).andReturn(0)
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
 
     // If replicaId is -1 we will build a consumer request. Any non-negative replicaId will build a follower request.
     val fetchRequest = new FetchRequest.Builder(ApiKeys.FETCH.latestVersion, ApiKeys.FETCH.latestVersion,
       replicaId, 100, 0, fetchDataBuilder).metadata(fetchMetadata).build()
     val request = buildRequest(fetchRequest)
-    val capturedResponse = expectNoThrottling(request)
 
-    EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, fetchManager)
     createKafkaApis().handleFetchRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[FetchResponse]
     val responseData = response.responseData(metadataCache.topicIdsToNames(), ApiKeys.FETCH.latestVersion)
     assertTrue(responseData.containsKey(foo.topicPartition))
@@ -2508,25 +2418,8 @@ class KafkaApisTest {
     val protocolType = "consumer"
     val rebalanceTimeoutMs = 10
     val sessionTimeoutMs = 5
-    val capturedProtocols = EasyMock.newCapture[List[(String, Array[Byte])]]()
-
-    EasyMock.expect(groupCoordinator.handleJoinGroup(
-      EasyMock.eq(groupId),
-      EasyMock.eq(memberId),
-      EasyMock.eq(None),
-      EasyMock.eq(true),
-      EasyMock.eq(clientId),
-      EasyMock.eq(InetAddress.getLocalHost.toString),
-      EasyMock.eq(rebalanceTimeoutMs),
-      EasyMock.eq(sessionTimeoutMs),
-      EasyMock.eq(protocolType),
-      EasyMock.capture(capturedProtocols),
-      anyObject(),
-      anyObject(),
-      anyObject()
-    ))
 
-    EasyMock.replay(groupCoordinator)
+    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
 
     createKafkaApis().handleJoinGroupRequest(
       buildRequest(
@@ -2545,8 +2438,21 @@ class KafkaApisTest {
       ),
       RequestLocal.withThreadConfinedCaching)
 
-    EasyMock.verify(groupCoordinator)
-
+    verify(groupCoordinator).handleJoinGroup(
+      ArgumentMatchers.eq(groupId),
+      ArgumentMatchers.eq(memberId),
+      ArgumentMatchers.eq(None),
+      ArgumentMatchers.eq(true),
+      ArgumentMatchers.eq(clientId),
+      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
+      ArgumentMatchers.eq(rebalanceTimeoutMs),
+      ArgumentMatchers.eq(sessionTimeoutMs),
+      ArgumentMatchers.eq(protocolType),
+      capturedProtocols.capture(),
+      any(),
+      any(),
+      any()
+    )
     val capturedProtocolsList = capturedProtocols.getValue
     assertEquals(protocols.size, capturedProtocolsList.size)
     protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
@@ -2563,7 +2469,7 @@ class KafkaApisTest {
   }
 
   def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
 
     val groupId = "group"
     val memberId = "member1"
@@ -2571,23 +2477,7 @@ class KafkaApisTest {
     val rebalanceTimeoutMs = 10
     val sessionTimeoutMs = 5
 
-    val capturedCallback = EasyMock.newCapture[JoinGroupCallback]()
-
-    EasyMock.expect(groupCoordinator.handleJoinGroup(
-      EasyMock.eq(groupId),
-      EasyMock.eq(memberId),
-      EasyMock.eq(None),
-      EasyMock.eq(if (version >= 4) true else false),
-      EasyMock.eq(clientId),
-      EasyMock.eq(InetAddress.getLocalHost.toString),
-      EasyMock.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      EasyMock.eq(sessionTimeoutMs),
-      EasyMock.eq(protocolType),
-      EasyMock.eq(List.empty),
-      EasyMock.capture(capturedCallback),
-      EasyMock.anyObject(),
-      EasyMock.anyObject()
-    ))
+    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
 
     val joinGroupRequest = new JoinGroupRequest.Builder(
       new JoinGroupRequestData()
@@ -2599,15 +2489,27 @@ class KafkaApisTest {
     ).build(version)
 
     val requestChannelRequest = buildRequest(joinGroupRequest)
-    val capturedResponse = expectNoThrottling(requestChannelRequest)
 
-    EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
     createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
-    EasyMock.verify(groupCoordinator)
-
+    verify(groupCoordinator).handleJoinGroup(
+      ArgumentMatchers.eq(groupId),
+      ArgumentMatchers.eq(memberId),
+      ArgumentMatchers.eq(None),
+      ArgumentMatchers.eq(if (version >= 4) true else false),
+      ArgumentMatchers.eq(clientId),
+      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
+      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
+      ArgumentMatchers.eq(sessionTimeoutMs),
+      ArgumentMatchers.eq(protocolType),
+      ArgumentMatchers.eq(List.empty),
+      capturedCallback.capture(),
+      any(),
+      any()
+    )
     capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
 
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
 
     assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
@@ -2622,8 +2524,6 @@ class KafkaApisTest {
     } else {
       assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
     }
-
-    EasyMock.verify(clientRequestQuotaManager, requestChannel)
   }
 
   @Test
@@ -2634,7 +2534,7 @@ class KafkaApisTest {
   }
 
   def testJoinGroupProtocolType(version: Short): Unit = {
-    EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
 
     val groupId = "group"
     val memberId = "member1"
@@ -2643,23 +2543,7 @@ class KafkaApisTest {
     val rebalanceTimeoutMs = 10
     val sessionTimeoutMs = 5
 
-    val capturedCallback = EasyMock.newCapture[JoinGroupCallback]()
-
-    EasyMock.expect(groupCoordinator.handleJoinGroup(
-      EasyMock.eq(groupId),
-      EasyMock.eq(memberId),
-      EasyMock.eq(None),
-      EasyMock.eq(if (version >= 4) true else false),
-      EasyMock.eq(clientId),
-      EasyMock.eq(InetAddress.getLocalHost.toString),
-      EasyMock.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      EasyMock.eq(sessionTimeoutMs),
-      EasyMock.eq(protocolType),
-      EasyMock.eq(List.empty),
-      EasyMock.capture(capturedCallback),
-      EasyMock.anyObject(),
-      EasyMock.anyObject()
-    ))
+    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
 
     val joinGroupRequest = new JoinGroupRequest.Builder(
       new JoinGroupRequestData()
@@ -2671,13 +2555,24 @@ class KafkaApisTest {
     ).build(version)
 
     val requestChannelRequest = buildRequest(joinGroupRequest)
-    val capturedResponse = expectNoThrottling(requestChannelRequest)
 
-    EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
     createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
-    EasyMock.verify(groupCoordinator)
-
+    verify(groupCoordinator).handleJoinGroup(
+      ArgumentMatchers.eq(groupId),
+      ArgumentMatchers.eq(memberId),
+      ArgumentMatchers.eq(None),
+      ArgumentMatchers.eq(if (version >= 4) true else false),
+      ArgumentMatchers.eq(clientId),
+      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
+      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
+      ArgumentMatchers.eq(sessionTimeoutMs),
+      ArgumentMatchers.eq(protocolType),
+      ArgumentMatchers.eq(List.empty),
+      capturedCallback.capture(),
+      any(),
+      any()
+    )
     capturedCallback.getValue.apply(JoinGroupResult(
       members = List.empty,
       memberId = memberId,
@@ -2687,7 +2582,7 @@ class KafkaApisTest {
       leaderId = memberId,
       error = Errors.NONE
     ))
-
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
 
     assertEquals(Errors.NONE, response.error)
@@ -2697,8 +2592,6 @@ class KafkaApisTest {
     assertEquals(memberId, response.data.leader)
     assertEquals(protocolName, response.data.protocolName)
     assertEquals(protocolType, response.data.protocolType)
-
-    EasyMock.verify(clientRequestQuotaManager, requestChannel)
   }
 
   @Test
@@ -2709,28 +2602,16 @@ class KafkaApisTest {
   }
 
   def testSyncGroupProtocolTypeAndName(version: Short): Unit = {
-    EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
 
     val groupId = "group"
     val memberId = "member1"
     val protocolType = "consumer"
     val protocolName = "range"
 
-    val capturedCallback = EasyMock.newCapture[SyncGroupCallback]()
+    val capturedCallback: ArgumentCaptor[SyncGroupCallback] = ArgumentCaptor.forClass(classOf[SyncGroupCallback])
 
     val requestLocal = RequestLocal.withThreadConfinedCaching
-    EasyMock.expect(groupCoordinator.handleSyncGroup(
-      EasyMock.eq(groupId),
-      EasyMock.eq(0),
-      EasyMock.eq(memberId),
-      EasyMock.eq(if (version >= 5) Some(protocolType) else None),
-      EasyMock.eq(if (version >= 5) Some(protocolName) else None),
-      EasyMock.eq(None),
-      EasyMock.eq(Map.empty),
-      EasyMock.capture(capturedCallback),
-      EasyMock.eq(requestLocal)
-    ))
-
     val syncGroupRequest = new SyncGroupRequest.Builder(
       new SyncGroupRequestData()
         .setGroupId(groupId)
@@ -2741,13 +2622,20 @@ class KafkaApisTest {
     ).build(version)
 
     val requestChannelRequest = buildRequest(syncGroupRequest)
-    val capturedResponse = expectNoThrottling(requestChannelRequest)
 
-    EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
     createKafkaApis().handleSyncGroupRequest(requestChannelRequest, requestLocal)
 
-    EasyMock.verify(groupCoordinator)
-
+    verify(groupCoordinator).handleSyncGroup(
+      ArgumentMatchers.eq(groupId),
+      ArgumentMatchers.eq(0),
+      ArgumentMatchers.eq(memberId),
+      ArgumentMatchers.eq(if (version >= 5) Some(protocolType) else None),
+      ArgumentMatchers.eq(if (version >= 5) Some(protocolName) else None),
+      ArgumentMatchers.eq(None),
+      ArgumentMatchers.eq(Map.empty),
+      capturedCallback.capture(),
+      ArgumentMatchers.eq(requestLocal)
+    )
     capturedCallback.getValue.apply(SyncGroupResult(
       protocolType = Some(protocolType),
       protocolName = Some(protocolName),
@@ -2755,13 +2643,12 @@ class KafkaApisTest {
       error = Errors.NONE
     ))
 
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
 
     assertEquals(Errors.NONE, response.error)
     assertArrayEquals(Array.empty[Byte], response.data.assignment)
     assertEquals(protocolType, response.data.protocolType)
-
-    EasyMock.verify(clientRequestQuotaManager, requestChannel)
   }
 
   @Test
@@ -2772,29 +2659,16 @@ class KafkaApisTest {
   }
 
   def testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(version: Short): Unit = {
-    EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
 
     val groupId = "group"
     val memberId = "member1"
     val protocolType = "consumer"
     val protocolName = "range"
 
-    val capturedCallback = EasyMock.newCapture[SyncGroupCallback]()
+    val capturedCallback: ArgumentCaptor[SyncGroupCallback] = ArgumentCaptor.forClass(classOf[SyncGroupCallback])
 
     val requestLocal = RequestLocal.withThreadConfinedCaching
-    if (version < 5) {
-      EasyMock.expect(groupCoordinator.handleSyncGroup(
-        EasyMock.eq(groupId),
-        EasyMock.eq(0),
-        EasyMock.eq(memberId),
-        EasyMock.eq(None),
-        EasyMock.eq(None),
-        EasyMock.eq(None),
-        EasyMock.eq(Map.empty),
-        EasyMock.capture(capturedCallback),
-        EasyMock.eq(requestLocal)
-      ))
-    }
 
     val syncGroupRequest = new SyncGroupRequest.Builder(
       new SyncGroupRequestData()
@@ -2804,14 +2678,20 @@ class KafkaApisTest {
     ).build(version)
 
     val requestChannelRequest = buildRequest(syncGroupRequest)
-    val capturedResponse = expectNoThrottling(requestChannelRequest)
 
-    EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
     createKafkaApis().handleSyncGroupRequest(requestChannelRequest, requestLocal)
 
-    EasyMock.verify(groupCoordinator)
-
     if (version < 5) {
+      verify(groupCoordinator).handleSyncGroup(
+        ArgumentMatchers.eq(groupId),
+        ArgumentMatchers.eq(0),
+        ArgumentMatchers.eq(memberId),
+        ArgumentMatchers.eq(None),
+        ArgumentMatchers.eq(None),
+        ArgumentMatchers.eq(None),
+        ArgumentMatchers.eq(Map.empty),
+        capturedCallback.capture(),
+        ArgumentMatchers.eq(requestLocal))
       capturedCallback.getValue.apply(SyncGroupResult(
         protocolType = Some(protocolType),
         protocolName = Some(protocolName),
@@ -2820,6 +2700,7 @@ class KafkaApisTest {
       ))
     }
 
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
 
     if (version < 5) {
@@ -2827,8 +2708,6 @@ class KafkaApisTest {
     } else {
       assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
     }
-
-    EasyMock.verify(clientRequestQuotaManager, requestChannel)
   }
 
   @Test
@@ -2843,14 +2722,12 @@ class KafkaApisTest {
     ).build()
 
     val requestChannelRequest = buildRequest(joinGroupRequest)
-    val capturedResponse = expectNoThrottling(requestChannelRequest)
 
-    EasyMock.replay(clientRequestQuotaManager, requestChannel)
     createKafkaApis(KAFKA_2_2_IV1).handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
     assertEquals(Errors.UNSUPPORTED_VERSION, response.error())
-    EasyMock.replay(groupCoordinator)
   }
 
   @Test
@@ -2864,14 +2741,12 @@ class KafkaApisTest {
     ).build()
 
     val requestChannelRequest = buildRequest(syncGroupRequest)
-    val capturedResponse = expectNoThrottling(requestChannelRequest)
 
-    EasyMock.replay(clientRequestQuotaManager, requestChannel)
     createKafkaApis(KAFKA_2_2_IV1).handleSyncGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
     assertEquals(Errors.UNSUPPORTED_VERSION, response.error)
-    EasyMock.replay(groupCoordinator)
   }
 
   @Test
@@ -2884,14 +2759,12 @@ class KafkaApisTest {
         .setGenerationId(1)
     ).build()
     val requestChannelRequest = buildRequest(heartbeatRequest)
-    val capturedResponse = expectNoThrottling(requestChannelRequest)
 
-    EasyMock.replay(clientRequestQuotaManager, requestChannel)
     createKafkaApis(KAFKA_2_2_IV1).handleHeartbeatRequest(requestChannelRequest)
 
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[HeartbeatResponse]
     assertEquals(Errors.UNSUPPORTED_VERSION, response.error())
-    EasyMock.replay(groupCoordinator)
   }
 
   @Test
@@ -2916,9 +2789,7 @@ class KafkaApisTest {
     ).build()
 
     val requestChannelRequest = buildRequest(offsetCommitRequest)
-    val capturedResponse = expectNoThrottling(requestChannelRequest)
 
-    EasyMock.replay(clientRequestQuotaManager, requestChannel)
     createKafkaApis(KAFKA_2_2_IV1).handleOffsetCommitRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
     val expectedTopicErrors = Collections.singletonList(
@@ -2930,9 +2801,9 @@ class KafkaApisTest {
             .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
         ))
     )
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[OffsetCommitResponse]
     assertEquals(expectedTopicErrors, response.data.topics())
-    EasyMock.replay(groupCoordinator)
   }
 
   @Test
@@ -2948,12 +2819,6 @@ class KafkaApisTest {
         .setGroupInstanceId("instance-2")
     )
 
-    EasyMock.expect(groupCoordinator.handleLeaveGroup(
-      EasyMock.eq(groupId),
-      EasyMock.eq(leaveMemberList),
-      anyObject()
-    ))
-
     val leaveRequest = buildRequest(
       new LeaveGroupRequest.Builder(
         groupId,
@@ -2962,8 +2827,11 @@ class KafkaApisTest {
     )
 
     createKafkaApis().handleLeaveGroupRequest(leaveRequest)
-
-    EasyMock.replay(groupCoordinator)
+    verify(groupCoordinator).handleLeaveGroup(
+      ArgumentMatchers.eq(groupId),
+      ArgumentMatchers.eq(leaveMemberList),
+      any()
+    )
   }
 
   @Test
@@ -2976,12 +2844,6 @@ class KafkaApisTest {
         .setMemberId(memberId)
     )
 
-    EasyMock.expect(groupCoordinator.handleLeaveGroup(
-      EasyMock.eq(groupId),
-      EasyMock.eq(singleLeaveMember),
-      anyObject()
-    ))
-
     val leaveRequest = buildRequest(
       new LeaveGroupRequest.Builder(
         groupId,
@@ -2990,8 +2852,11 @@ class KafkaApisTest {
     )
 
     createKafkaApis().handleLeaveGroupRequest(leaveRequest)
-
-    EasyMock.replay(groupCoordinator)
+    verify(groupCoordinator).handleLeaveGroup(
+      ArgumentMatchers.eq(groupId),
+      ArgumentMatchers.eq(singleLeaveMember),
+      any()
+    )
   }
 
   @Test
@@ -3020,38 +2885,32 @@ class KafkaApisTest {
 
     val records = MemoryRecords.withRecords(CompressionType.NONE,
       new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8)))
-    replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean,
-      anyObject[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], anyObject[ReplicaQuota],
-      anyObject[Seq[(TopicIdPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel],
-      anyObject[Option[ClientMetadata]])
-    expectLastCall[Unit].andAnswer(new IAnswer[Unit] {
-      def answer: Unit = {
-        val callback = getCurrentArguments.apply(7).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]
-        callback(Seq(tidp0 -> FetchPartitionData(Errors.NONE, hw, 0, records,
-          None, None, None, Option.empty, isReassignmentFetch = isReassigning)))
-      }
+    when(replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean,
+      any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota],
+      any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit](), any[IsolationLevel],
+      any[Option[ClientMetadata]])
+    ).thenAnswer(invocation => {
+      val callback = invocation.getArgument(7).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]
+      callback(Seq(tidp0 -> FetchPartitionData(Errors.NONE, hw, 0, records,
+        None, None, None, Option.empty, isReassignmentFetch = isReassigning)))
     })
 
     val fetchMetadata = new JFetchMetadata(0, 0)
     val fetchContext = new FullFetchContext(time, new FetchSessionCache(1000, 100),
       fetchMetadata, fetchData, true, true)
-    expect(fetchManager.newContext(
-      anyObject[Short],
-      anyObject[JFetchMetadata],
-      anyObject[Boolean],
-      anyObject[util.Map[TopicIdPartition, FetchRequest.PartitionData]],
-      anyObject[util.List[TopicIdPartition]],
-      anyObject[util.Map[Uuid, String]])).andReturn(fetchContext)
-
-    expect(replicaQuotaManager.record(anyLong()))
-    expect(replicaManager.getLogConfig(EasyMock.eq(tp0))).andReturn(None)
-
-    val partition: Partition = createNiceMock(classOf[Partition])
-    expect(replicaManager.isAddingReplica(anyObject(), anyInt())).andReturn(isReassigning)
+    when(fetchManager.newContext(
+      any[Short],
+      any[JFetchMetadata],
+      any[Boolean],
+      any[util.Map[TopicIdPartition, FetchRequest.PartitionData]],
+      any[util.List[TopicIdPartition]],
+      any[util.Map[Uuid, String]])).thenReturn(fetchContext)
 
-    replay(replicaManager, fetchManager, clientQuotaManager, requestChannel, replicaQuotaManager, partition)
+    when(replicaManager.getLogConfig(ArgumentMatchers.eq(tp0))).thenReturn(None)
+    when(replicaManager.isAddingReplica(any(), anyInt)).thenReturn(isReassigning)
 
     createKafkaApis().handle(fetchFromFollower, RequestLocal.withThreadConfinedCaching)
+    verify(replicaQuotaManager).record(anyLong)
 
     if (isReassigning)
       assertEquals(records.sizeInBytes(), brokerTopicStats.allTopicsStats.reassignmentBytesOutPerSec.get.count())
@@ -3072,11 +2931,10 @@ class KafkaApisTest {
     ).build()
 
     val requestChannelRequest = buildRequest(initProducerIdRequest)
-    val capturedResponse = expectNoThrottling(requestChannelRequest)
 
-    EasyMock.replay(clientRequestQuotaManager, requestChannel)
     createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[InitProducerIdResponse]
     assertEquals(Errors.INVALID_REQUEST, response.error)
   }
@@ -3091,11 +2949,10 @@ class KafkaApisTest {
         .setProducerEpoch(2)
     ).build()
     val requestChannelRequest = buildRequest(initProducerIdRequest)
-    val capturedResponse = expectNoThrottling(requestChannelRequest)
 
-    EasyMock.replay(clientRequestQuotaManager, requestChannel)
     createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
 
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[InitProducerIdResponse]
     assertEquals(Errors.INVALID_REQUEST, response.error)
   }
@@ -3122,27 +2979,30 @@ class KafkaApisTest {
     val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, brokerEpochInRequest, 1)
     val request = buildRequest(updateMetadataRequest)
 
-    val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
+    val capturedResponse: ArgumentCaptor[UpdateMetadataResponse] = ArgumentCaptor.forClass(classOf[UpdateMetadataResponse])
 
-    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
-    EasyMock.expect(replicaManager.maybeUpdateMetadataCache(
-      EasyMock.eq(request.context.correlationId),
-      EasyMock.anyObject()
-    )).andStubReturn(
+    when(controller.brokerEpoch).thenReturn(currentBrokerEpoch)
+    when(replicaManager.maybeUpdateMetadataCache(
+      ArgumentMatchers.eq(request.context.correlationId),
+      any()
+    )).thenReturn(
       Seq()
     )
 
-    EasyMock.expect(requestChannel.sendResponse(
-      EasyMock.eq(request),
-      EasyMock.capture(capturedResponse),
-      EasyMock.eq(None)
-    ))
-    EasyMock.replay(replicaManager, controller, requestChannel)
-
     createKafkaApis().handleUpdateMetadataRequest(request, RequestLocal.withThreadConfinedCaching)
-    val updateMetadataResponse = capturedResponse.getValue.asInstanceOf[UpdateMetadataResponse]
+    verify(requestChannel).sendResponse(
+      ArgumentMatchers.eq(request),
+      capturedResponse.capture(),
+      ArgumentMatchers.eq(None)
+    )
+    val updateMetadataResponse = capturedResponse.getValue
     assertEquals(expectedError, updateMetadataResponse.error())
-    EasyMock.verify(replicaManager)
+    if (expectedError == Errors.NONE) {
+      verify(replicaManager).maybeUpdateMetadataCache(
+        ArgumentMatchers.eq(request.context.correlationId),
+        any()
+      )
+    }
   }
 
   @Test
@@ -3166,7 +3026,7 @@ class KafkaApisTest {
   def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
     val controllerId = 2
     val controllerEpoch = 6
-    val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
+    val capturedResponse: ArgumentCaptor[LeaderAndIsrResponse] = ArgumentCaptor.forClass(classOf[LeaderAndIsrResponse])
     val partitionStates = Seq(
       new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
         .setTopicName("topicW")
@@ -3193,26 +3053,23 @@ class KafkaApisTest {
       .setErrorCode(Errors.NONE.code)
       .setPartitionErrors(asList()), leaderAndIsrRequest.version())
 
-    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
-    EasyMock.expect(replicaManager.becomeLeaderOrFollower(
-      EasyMock.eq(request.context.correlationId),
-      EasyMock.anyObject(),
-      EasyMock.anyObject()
-    )).andStubReturn(
+    when(controller.brokerEpoch).thenReturn(currentBrokerEpoch)
+    when(replicaManager.becomeLeaderOrFollower(
+      ArgumentMatchers.eq(request.context.correlationId),
+      any(),
+      any()
+    )).thenReturn(
       response
     )
 
-    EasyMock.expect(requestChannel.sendResponse(
-      EasyMock.eq(request),
-      EasyMock.capture(capturedResponse),
-      EasyMock.eq(None)
-    ))
-    EasyMock.replay(replicaManager, controller, requestChannel)
-
     createKafkaApis().handleLeaderAndIsrRequest(request)
-    val leaderAndIsrResponse = capturedResponse.getValue.asInstanceOf[LeaderAndIsrResponse]
+    verify(requestChannel).sendResponse(
+      ArgumentMatchers.eq(request),
+      capturedResponse.capture(),
+      ArgumentMatchers.eq(None)
+    )
+    val leaderAndIsrResponse = capturedResponse.getValue
     assertEquals(expectedError, leaderAndIsrResponse.error())
-    EasyMock.verify(replicaManager)
   }
 
   @Test
@@ -3236,7 +3093,7 @@ class KafkaApisTest {
   def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
     val controllerId = 0
     val controllerEpoch = 5
-    val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
+    val capturedResponse: ArgumentCaptor[StopReplicaResponse] = ArgumentCaptor.forClass(classOf[StopReplicaResponse])
     val fooPartition = new TopicPartition("foo", 0)
     val topicStates = Seq(
       new StopReplicaTopicState()
@@ -3256,29 +3113,26 @@ class KafkaApisTest {
     ).build()
     val request = buildRequest(stopReplicaRequest)
 
-    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
-    EasyMock.expect(replicaManager.stopReplicas(
-      EasyMock.eq(request.context.correlationId),
-      EasyMock.eq(controllerId),
-      EasyMock.eq(controllerEpoch),
-      EasyMock.eq(stopReplicaRequest.partitionStates().asScala)
-    )).andStubReturn(
+    when(controller.brokerEpoch).thenReturn(currentBrokerEpoch)
+    when(replicaManager.stopReplicas(
+      ArgumentMatchers.eq(request.context.correlationId),
+      ArgumentMatchers.eq(controllerId),
+      ArgumentMatchers.eq(controllerEpoch),
+      ArgumentMatchers.eq(stopReplicaRequest.partitionStates().asScala)
+    )).thenReturn(
       (mutable.Map(
         fooPartition -> Errors.NONE
       ), Errors.NONE)
     )
-    EasyMock.expect(requestChannel.sendResponse(
-      EasyMock.eq(request),
-      EasyMock.capture(capturedResponse),
-      EasyMock.eq(None)
-    ))
-
-    EasyMock.replay(controller, replicaManager, requestChannel)
 
     createKafkaApis().handleStopReplicaRequest(request)
-    val stopReplicaResponse = capturedResponse.getValue.asInstanceOf[StopReplicaResponse]
+    verify(requestChannel).sendResponse(
+      ArgumentMatchers.eq(request),
+      capturedResponse.capture(),
+      ArgumentMatchers.eq(None)
+    )
+    val stopReplicaResponse = capturedResponse.getValue
     assertEquals(expectedError, stopReplicaResponse.error())
-    EasyMock.verify(replicaManager)
   }
 
   @Test
@@ -3304,7 +3158,7 @@ class KafkaApisTest {
   }
 
   private def listGroupRequest(state: Option[String], overviews: List[GroupOverview]): ListGroupsResponse = {
-    EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
+    reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
 
     val data = new ListGroupsRequestData()
     if (state.isDefined)
@@ -3312,14 +3166,13 @@ class KafkaApisTest {
     val listGroupsRequest = new ListGroupsRequest.Builder(data).build()
     val requestChannelRequest = buildRequest(listGroupsRequest)
 
-    val capturedResponse = expectNoThrottling(requestChannelRequest)
     val expectedStates: Set[String] = if (state.isDefined) Set(state.get) else Set()
-    EasyMock.expect(groupCoordinator.handleListGroups(expectedStates))
-      .andReturn((Errors.NONE, overviews))
-    EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel)
+    when(groupCoordinator.handleListGroups(expectedStates))
+      .thenReturn((Errors.NONE, overviews))
 
     createKafkaApis().handleListGroupsRequest(requestChannelRequest)
 
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[ListGroupsResponse]
     assertEquals(Errors.NONE.code, response.data.errorCode)
     response
@@ -3357,11 +3210,10 @@ class KafkaApisTest {
       .setIncludeClusterAuthorizedOperations(true)).build()
 
     val request = buildRequest(describeClusterRequest, plaintextListener)
-    val capturedResponse = expectNoThrottling(request)
 
-    EasyMock.replay(clientRequestQuotaManager, requestChannel)
     createKafkaApis().handleDescribeCluster(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val describeClusterResponse = capturedResponse.getValue.asInstanceOf[DescribeClusterResponse]
 
     assertEquals(metadataCache.getControllerId.get, describeClusterResponse.data.controllerId)
@@ -3412,11 +3264,10 @@ class KafkaApisTest {
   private def sendMetadataRequestWithInconsistentListeners(requestListener: ListenerName): MetadataResponse = {
     val metadataRequest = MetadataRequest.Builder.allTopics.build()
     val requestChannelRequest = buildRequest(metadataRequest, requestListener)
-    val capturedResponse = expectNoThrottling(requestChannelRequest)
-    EasyMock.replay(clientRequestQuotaManager, requestChannel)
 
     createKafkaApis().handleTopicMetadataRequest(requestChannelRequest)
 
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
     capturedResponse.getValue.asInstanceOf[MetadataResponse]
   }
 
@@ -3425,13 +3276,13 @@ class KafkaApisTest {
     val latestOffset = 15L
     val currentLeaderEpoch = Optional.empty[Integer]()
 
-    EasyMock.expect(replicaManager.fetchOffsetForTimestamp(
-      EasyMock.eq(tp),
-      EasyMock.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
-      EasyMock.eq(Some(isolationLevel)),
-      EasyMock.eq(currentLeaderEpoch),
-      fetchOnlyFromLeader = EasyMock.eq(true))
-    ).andReturn(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, latestOffset, currentLeaderEpoch)))
+    when(replicaManager.fetchOffsetForTimestamp(
+      ArgumentMatchers.eq(tp),
+      ArgumentMatchers.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
+      ArgumentMatchers.eq(Some(isolationLevel)),
+      ArgumentMatchers.eq(currentLeaderEpoch),
+      fetchOnlyFromLeader = ArgumentMatchers.eq(true))
+    ).thenReturn(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, latestOffset, currentLeaderEpoch)))
 
     val targetTimes = List(new ListOffsetsTopic()
       .setName(tp.topic)
@@ -3441,11 +3292,10 @@ class KafkaApisTest {
     val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false)
       .setTargetTimes(targetTimes).build()
     val request = buildRequest(listOffsetRequest)
-    val capturedResponse = expectNoThrottling(request)
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
     createKafkaApis().handleListOffsetRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[ListOffsetsResponse]
     val partitionDataOptional = response.topics.asScala.find(_.name == tp.topic).get
       .partitions.asScala.find(_.partitionIndex == tp.partition)
@@ -3479,22 +3329,13 @@ class KafkaApisTest {
       requestChannelMetrics, envelope = None)
   }
 
-  private def expectNoThrottling(request: RequestChannel.Request): Capture[AbstractResponse] = {
-    EasyMock.expect(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(EasyMock.anyObject[RequestChannel.Request](),
-      EasyMock.anyObject[Long])).andReturn(0)
-
-    EasyMock.expect(clientRequestQuotaManager.throttle(
-      EasyMock.eq(request),
-      EasyMock.anyObject[ThrottleCallback](),
-      EasyMock.eq(0)))
-
-    val capturedResponse = EasyMock.newCapture[AbstractResponse]()
-    EasyMock.expect(requestChannel.sendResponse(
-      EasyMock.eq(request),
-      EasyMock.capture(capturedResponse),
-      EasyMock.anyObject()
-    ))
-
+  private def verifyNoThrottling(request: RequestChannel.Request): ArgumentCaptor[AbstractResponse] = {
+    val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse])
+    verify(requestChannel).sendResponse(
+      ArgumentMatchers.eq(request),
+      capturedResponse.capture(),
+      any()
+    )
     capturedResponse
   }
 
@@ -3557,9 +3398,10 @@ class KafkaApisTest {
     ).build()
     val request = buildRequest(alterReplicaLogDirsRequest)
 
-    EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+    reset(replicaManager, clientRequestQuotaManager, requestChannel)
 
-    val capturedResponse = expectNoThrottling(request)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
     val t0p0 = new TopicPartition("t0", 0)
     val t0p1 = new TopicPartition("t0", 1)
     val t0p2 = new TopicPartition("t0", 2)
@@ -3567,15 +3409,15 @@ class KafkaApisTest {
       t0p0 -> Errors.NONE,
       t0p1 -> Errors.LOG_DIR_NOT_FOUND,
       t0p2 -> Errors.INVALID_TOPIC_EXCEPTION)
-    EasyMock.expect(replicaManager.alterReplicaLogDirs(EasyMock.eq(Map(
+    when(replicaManager.alterReplicaLogDirs(ArgumentMatchers.eq(Map(
       t0p0 -> "/foo",
       t0p1 -> "/foo",
       t0p2 -> "/foo"))))
-    .andReturn(partitionResults)
-    EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel)
+    .thenReturn(partitionResults)
 
     createKafkaApis().handleAlterReplicaLogDirsRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[AlterReplicaLogDirsResponse]
     assertEquals(partitionResults, response.data.results.asScala.flatMap { tr =>
       tr.partitions().asScala.map { pr =>
@@ -3631,7 +3473,7 @@ class KafkaApisTest {
     val tp3 = new TopicPartition("baz", 1)
     val tp4 = new TopicPartition("invalid;topic", 1)
 
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
     val data = new DescribeProducersRequestData().setTopics(List(
       new DescribeProducersRequestData.TopicRequest()
         .setName(tp1.topic)
@@ -3655,22 +3497,19 @@ class KafkaApisTest {
 
     // Topic `foo` is authorized and present in the metadata
     addTopicToMetadataCache(tp1.topic, 4) // We will only access the first topic
-    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions(tp1.topic))))
-      .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
-      .once()
+    when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(tp1.topic))))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
 
     // Topic `bar` is not authorized
-    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions(tp2.topic))))
-      .andReturn(Seq(AuthorizationResult.DENIED).asJava)
-      .once()
+    when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(tp2.topic))))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
 
     // Topic `baz` is authorized, but not present in the metadata
-    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions(tp3.topic))))
-      .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
-      .once()
+    when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(tp3.topic))))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
 
-    EasyMock.expect(replicaManager.activeProducerState(tp1))
-      .andReturn(new DescribeProducersResponseData.PartitionResponse()
+    when(replicaManager.activeProducerState(tp1))
+      .thenReturn(new DescribeProducersResponseData.PartitionResponse()
         .setErrorCode(Errors.NONE.code)
         .setPartitionIndex(tp1.partition)
         .setActiveProducers(List(
@@ -3685,11 +3524,12 @@ class KafkaApisTest {
 
     val describeProducersRequest = new DescribeProducersRequest.Builder(data).build()
     val request = buildRequest(describeProducersRequest)
-    val capturedResponse = expectNoThrottling(request)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, authorizer)
     createKafkaApis(authorizer = Some(authorizer)).handleDescribeProducersRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[DescribeProducersResponse]
     assertEquals(Set("foo", "bar", "baz", "invalid;topic"), response.data.topics.asScala.map(_.name).toSet)
 
@@ -3721,12 +3561,13 @@ class KafkaApisTest {
 
   @Test
   def testDescribeTransactions(): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
     val data = new DescribeTransactionsRequestData()
       .setTransactionalIds(List("foo", "bar").asJava)
     val describeTransactionsRequest = new DescribeTransactionsRequest.Builder(data).build()
     val request = buildRequest(describeTransactionsRequest)
-    val capturedResponse = expectNoThrottling(request)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
 
     def buildExpectedActions(transactionalId: String): util.List[Action] = {
       val pattern = new ResourcePattern(ResourceType.TRANSACTIONAL_ID, transactionalId, PatternType.LITERAL)
@@ -3734,8 +3575,8 @@ class KafkaApisTest {
       Collections.singletonList(action)
     }
 
-    EasyMock.expect(txnCoordinator.handleDescribeTransactions("foo"))
-      .andReturn(new DescribeTransactionsResponseData.TransactionState()
+    when(txnCoordinator.handleDescribeTransactions("foo"))
+      .thenReturn(new DescribeTransactionsResponseData.TransactionState()
         .setErrorCode(Errors.NONE.code)
         .setTransactionalId("foo")
         .setProducerId(12345L)
@@ -3744,17 +3585,15 @@ class KafkaApisTest {
         .setTransactionState("CompleteCommit")
         .setTransactionTimeoutMs(10000))
 
-    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions("foo"))))
-      .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
-      .once()
+    when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions("foo"))))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
 
-    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions("bar"))))
-      .andReturn(Seq(AuthorizationResult.DENIED).asJava)
-      .once()
+    when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions("bar"))))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, authorizer)
     createKafkaApis(authorizer = Some(authorizer)).handleDescribeTransactionsRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[DescribeTransactionsResponse]
     assertEquals(2, response.data.transactionStates.size)
 
@@ -3773,13 +3612,14 @@ class KafkaApisTest {
 
   @Test
   def testDescribeTransactionsFiltersUnauthorizedTopics(): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
     val transactionalId = "foo"
     val data = new DescribeTransactionsRequestData()
       .setTransactionalIds(List(transactionalId).asJava)
     val describeTransactionsRequest = new DescribeTransactionsRequest.Builder(data).build()
     val request = buildRequest(describeTransactionsRequest)
-    val capturedResponse = expectNoThrottling(request)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
 
     def expectDescribe(
       resourceType: ResourceType,
@@ -3790,9 +3630,8 @@ class KafkaApisTest {
       val action = new Action(AclOperation.DESCRIBE, pattern, 1, true, true)
       val actions = Collections.singletonList(action)
 
-      EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(actions)))
-        .andReturn(Seq(result).asJava)
-        .once()
+      when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(actions)))
+        .thenReturn(Seq(result).asJava)
     }
 
     // Principal is authorized to one of the two topics. The second topic should be
@@ -3822,12 +3661,12 @@ class KafkaApisTest {
     describeTransactionsResponse.topics.add(mkTopicData(topic = "foo", Seq(1, 2)))
     describeTransactionsResponse.topics.add(mkTopicData(topic = "bar", Seq(3, 4)))
 
-    EasyMock.expect(txnCoordinator.handleDescribeTransactions("foo"))
-      .andReturn(describeTransactionsResponse)
+    when(txnCoordinator.handleDescribeTransactions("foo"))
+      .thenReturn(describeTransactionsResponse)
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, authorizer)
     createKafkaApis(authorizer = Some(authorizer)).handleDescribeTransactionsRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[DescribeTransactionsResponse]
     assertEquals(1, response.data.transactionStates.size)
 
@@ -3846,15 +3685,16 @@ class KafkaApisTest {
     val data = new ListTransactionsRequestData()
     val listTransactionsRequest = new ListTransactionsRequest.Builder(data).build()
     val request = buildRequest(listTransactionsRequest)
-    val capturedResponse = expectNoThrottling(request)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
 
-    EasyMock.expect(txnCoordinator.handleListTransactions(Set.empty[Long], Set.empty[String]))
-      .andReturn(new ListTransactionsResponseData()
+    when(txnCoordinator.handleListTransactions(Set.empty[Long], Set.empty[String]))
+      .thenReturn(new ListTransactionsResponseData()
         .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code))
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
     createKafkaApis().handleListTransactionsRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[ListTransactionsResponse]
     assertEquals(0, response.data.transactionStates.size)
     assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, Errors.forCode(response.data.errorCode))
@@ -3862,11 +3702,12 @@ class KafkaApisTest {
 
   @Test
   def testListTransactionsAuthorization(): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
     val data = new ListTransactionsRequestData()
     val listTransactionsRequest = new ListTransactionsRequest.Builder(data).build()
     val request = buildRequest(listTransactionsRequest)
-    val capturedResponse = expectNoThrottling(request)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
 
     val transactionStates = new util.ArrayList[ListTransactionsResponseData.TransactionState]()
     transactionStates.add(new ListTransactionsResponseData.TransactionState()
@@ -3878,8 +3719,8 @@ class KafkaApisTest {
       .setProducerId(98765)
       .setTransactionState("PrepareAbort"))
 
-    EasyMock.expect(txnCoordinator.handleListTransactions(Set.empty[Long], Set.empty[String]))
-      .andReturn(new ListTransactionsResponseData()
+    when(txnCoordinator.handleListTransactions(Set.empty[Long], Set.empty[String]))
+      .thenReturn(new ListTransactionsResponseData()
         .setErrorCode(Errors.NONE.code)
         .setTransactionStates(transactionStates))
 
@@ -3889,17 +3730,15 @@ class KafkaApisTest {
       Collections.singletonList(action)
     }
 
-    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions("foo"))))
-      .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
-      .once()
+    when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions("foo"))))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
 
-    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions("bar"))))
-      .andReturn(Seq(AuthorizationResult.DENIED).asJava)
-      .once()
+    when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions("bar"))))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, authorizer)
     createKafkaApis(authorizer = Some(authorizer)).handleListTransactionsRequest(request)
 
+    val capturedResponse = verifyNoThrottling(request)
     val response = capturedResponse.getValue.asInstanceOf[ListTransactionsResponse]
     assertEquals(1, response.data.transactionStates.size())
     val transactionState = response.data.transactionStates.get(0)
@@ -3910,31 +3749,39 @@ class KafkaApisTest {
 
   @Test
   def testDeleteTopicsByIdAuthorization(): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
-    val controllerContext: ControllerContext = EasyMock.mock(classOf[ControllerContext])
-
-    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
-      EasyMock.anyObject(classOf[RequestChannel.Request]),
-      EasyMock.anyShort()
-    )).andReturn(UnboundedControllerMutationQuota)
-    EasyMock.expect(controller.isActive).andReturn(true)
-    EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext)
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val controllerContext: ControllerContext = mock(classOf[ControllerContext])
+
+    when(clientControllerQuotaManager.newQuotaFor(
+      any[RequestChannel.Request],
+      anyShort
+    )).thenReturn(UnboundedControllerMutationQuota)
+    when(controller.isActive).thenReturn(true)
+    when(controller.controllerContext).thenReturn(controllerContext)
+
+    val topicResults = Map(
+      AclOperation.DESCRIBE -> Map(
+        "foo" -> AuthorizationResult.DENIED,
+        "bar" -> AuthorizationResult.ALLOWED
+      ),
+      AclOperation.DELETE -> Map(
+        "foo" -> AuthorizationResult.DENIED,
+        "bar" -> AuthorizationResult.DENIED
+      )
+    )
+    when(authorizer.authorize(any[RequestContext], isNotNull[util.List[Action]])).thenAnswer(invocation => {
+      val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]]
+      actions.asScala.map { action =>
+        val topic = action.resourcePattern.name
+        val ops = action.operation()
+        topicResults(ops)(topic)
+      }.asJava
+    })
 
     // Try to delete three topics:
     // 1. One without describe permission
     // 2. One without delete permission
     // 3. One which is authorized, but doesn't exist
-
-    expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
-      "foo" -> AuthorizationResult.DENIED,
-      "bar" -> AuthorizationResult.ALLOWED
-    ))
-
-    expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
-      "foo" -> AuthorizationResult.DENIED,
-      "bar" -> AuthorizationResult.DENIED
-    ))
-
     val topicIdsMap = Map(
       Uuid.randomUuid() -> Some("foo"),
       Uuid.randomUuid() -> Some("bar"),
@@ -3942,7 +3789,7 @@ class KafkaApisTest {
     )
 
     topicIdsMap.foreach { case (topicId, topicNameOpt) =>
-      EasyMock.expect(controllerContext.topicName(topicId)).andReturn(topicNameOpt)
+      when(controllerContext.topicName(topicId)).thenReturn(topicNameOpt)
     }
 
     val topicDatas = topicIdsMap.keys.map { topicId =>
@@ -3953,12 +3800,13 @@ class KafkaApisTest {
       .build(ApiKeys.DELETE_TOPICS.latestVersion)
 
     val request = buildRequest(deleteRequest)
-    val capturedResponse = expectNoThrottling(request)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
-      requestChannel, txnCoordinator, controller, controllerContext, authorizer)
     createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)
+    verify(authorizer, times(2)).authorize(any(), any())
 
+    val capturedResponse = verifyNoThrottling(request)
     val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
 
     topicIdsMap.foreach { case (topicId, nameOpt) =>
@@ -3982,30 +3830,39 @@ class KafkaApisTest {
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: Boolean): Unit = {
-    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val authorizer: Authorizer = mock(classOf[Authorizer])
 
-    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
-      EasyMock.anyObject(classOf[RequestChannel.Request]),
-      EasyMock.anyShort()
-    )).andReturn(UnboundedControllerMutationQuota)
-    EasyMock.expect(controller.isActive).andReturn(true)
+    when(clientControllerQuotaManager.newQuotaFor(
+      any[RequestChannel.Request],
+      anyShort
+    )).thenReturn(UnboundedControllerMutationQuota)
+    when(controller.isActive).thenReturn(true)
 
     // Try to delete three topics:
     // 1. One without describe permission
     // 2. One without delete permission
     // 3. One which is authorized, but doesn't exist
 
-    expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
-      "foo" -> AuthorizationResult.DENIED,
-      "bar" -> AuthorizationResult.ALLOWED,
-      "baz" -> AuthorizationResult.ALLOWED
-    ))
-
-    expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
-      "foo" -> AuthorizationResult.DENIED,
-      "bar" -> AuthorizationResult.DENIED,
-      "baz" -> AuthorizationResult.ALLOWED
-    ))
+    val topicResults = Map(
+      AclOperation.DESCRIBE -> Map(
+        "foo" -> AuthorizationResult.DENIED,
+        "bar" -> AuthorizationResult.ALLOWED,
+        "baz" -> AuthorizationResult.ALLOWED
+      ),
+      AclOperation.DELETE -> Map(
+        "foo" -> AuthorizationResult.DENIED,
+        "bar" -> AuthorizationResult.DENIED,
+        "baz" -> AuthorizationResult.ALLOWED
+      )
+    )
+    when(authorizer.authorize(any[RequestContext], isNotNull[util.List[Action]])).thenAnswer(invocation => {
+      val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]]
+      actions.asScala.map { action =>
+        val topic = action.resourcePattern.name
+        val ops = action.operation()
+        topicResults(ops)(topic)
+      }.asJava
+    })
 
     val deleteRequest = if (usePrimitiveTopicNameArray) {
       new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
@@ -4023,12 +3880,13 @@ class KafkaApisTest {
     }
 
     val request = buildRequest(deleteRequest)
-    val capturedResponse = expectNoThrottling(request)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
-      requestChannel, txnCoordinator, controller, authorizer)
     createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)
+    verify(authorizer, times(2)).authorize(any(), any())
 
+    val capturedResponse = verifyNoThrottling(request)
     val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
 
     def lookupErrorCode(topic: String): Option[Errors] = {
@@ -4041,34 +3899,11 @@ class KafkaApisTest {
     assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), lookupErrorCode("baz"))
   }
 
-  def expectTopicAuthorization(
-    authorizer: Authorizer,
-    aclOperation: AclOperation,
-    topicResults: Map[String, AuthorizationResult]
-  ): Unit = {
-    val expectedActions = topicResults.keys.map { topic =>
-      val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
-      topic -> new Action(aclOperation, pattern, 1, true, true)
-    }.toMap
-
-    val actionsCapture: Capture[util.List[Action]] = EasyMock.newCapture()
-    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.capture(actionsCapture)))
-      .andAnswer(() => {
-        actionsCapture.getValue.asScala.map { action =>
-          val topic = action.resourcePattern.name
-          assertEquals(expectedActions(topic), action)
-          topicResults(topic)
-        }.asJava
-      })
-      .once()
-  }
-
   private def createMockRequest(): RequestChannel.Request = {
-    val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request])
-    val requestHeader: RequestHeader = EasyMock.createNiceMock(classOf[RequestHeader])
-    expect(request.header).andReturn(requestHeader).anyTimes()
-    expect(requestHeader.apiKey()).andReturn(ApiKeys.values().head).anyTimes()
-    EasyMock.replay(request, requestHeader)
+    val request: RequestChannel.Request = mock(classOf[RequestChannel.Request])
+    val requestHeader: RequestHeader = mock(classOf[RequestHeader])
+    when(request.header).thenReturn(requestHeader)
+    when(requestHeader.apiKey()).thenReturn(ApiKeys.values().head)
     request
   }
 
@@ -4152,11 +3987,13 @@ class KafkaApisTest {
 
   @Test
   def testEmptyLegacyAlterConfigsRequestWithKRaft(): Unit = {
-    val request = buildRequest(new AlterConfigsRequest(new AlterConfigsRequestData(), 1.toShort));
+    val request = buildRequest(new AlterConfigsRequest(new AlterConfigsRequestData(), 1.toShort))
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    val capturedResponse = expectNoThrottling(request)
-    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+
     createKafkaApis(raftSupport = true).handleAlterConfigsRequest(request)
+    val capturedResponse = verifyNoThrottling(request)
     assertEquals(new AlterConfigsResponseData(),
       capturedResponse.getValue.asInstanceOf[AlterConfigsResponse].data())
   }
@@ -4165,18 +4002,19 @@ class KafkaApisTest {
   def testInvalidLegacyAlterConfigsRequestWithKRaft(): Unit = {
     val request = buildRequest(new AlterConfigsRequest(new AlterConfigsRequestData().
       setValidateOnly(true).
-      setResources(new LAlterConfigsResourceCollection(Arrays.asList(
+      setResources(new LAlterConfigsResourceCollection(asList(
         new LAlterConfigsResource().
           setResourceName(brokerId.toString).
           setResourceType(BROKER.id()).
-          setConfigs(new LAlterableConfigCollection(Arrays.asList(new LAlterableConfig().
+          setConfigs(new LAlterableConfigCollection(asList(new LAlterableConfig().
             setName("foo").
             setValue(null)).iterator()))).iterator())), 1.toShort))
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    val capturedResponse = expectNoThrottling(request)
-    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
     createKafkaApis(raftSupport = true).handleAlterConfigsRequest(request)
-    assertEquals(new AlterConfigsResponseData().setResponses(Arrays.asList(
+    val capturedResponse = verifyNoThrottling(request)
+    assertEquals(new AlterConfigsResponseData().setResponses(asList(
       new LAlterConfigsResourceResponse().
         setErrorCode(Errors.INVALID_REQUEST.code()).
         setErrorMessage("Null value not supported for : foo").
@@ -4193,11 +4031,12 @@ class KafkaApisTest {
 
   @Test
   def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = {
-    val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort));
+    val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort))
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    val capturedResponse = expectNoThrottling(request)
-    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
     createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest(request)
+    val capturedResponse = verifyNoThrottling(request)
     assertEquals(new IncrementalAlterConfigsResponseData(),
       capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse].data())
   }
@@ -4206,18 +4045,19 @@ class KafkaApisTest {
   def testLog4jIncrementalAlterConfigsRequestWithKRaft(): Unit = {
     val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData().
       setValidateOnly(true).
-      setResources(new IAlterConfigsResourceCollection(Arrays.asList(new IAlterConfigsResource().
+      setResources(new IAlterConfigsResourceCollection(asList(new IAlterConfigsResource().
         setResourceName(brokerId.toString).
         setResourceType(BROKER_LOGGER.id()).
-        setConfigs(new IAlterableConfigCollection(Arrays.asList(new IAlterableConfig().
+        setConfigs(new IAlterableConfigCollection(asList(new IAlterableConfig().
           setName(Log4jController.ROOT_LOGGER).
           setValue("TRACE")).iterator()))).iterator())),
         1.toShort))
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-    val capturedResponse = expectNoThrottling(request)
-    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
     createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest(request)
-    assertEquals(new IncrementalAlterConfigsResponseData().setResponses(Arrays.asList(
+    val capturedResponse = verifyNoThrottling(request)
+    assertEquals(new IncrementalAlterConfigsResponseData().setResponses(asList(
       new IAlterConfigsResourceResponse().
         setErrorCode(0.toShort).
         setErrorMessage(null).