You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/02/11 15:20:07 UTC
[kafka] branch trunk updated: KAFKA-13577: Replace easymock with mockito in kafka:core - part 3 (#11674)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0269edf KAFKA-13577: Replace easymock with mockito in kafka:core - part 3 (#11674)
0269edf is described below
commit 0269edfc80e36d468516b943f0a9a08b7ee652fb
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Fri Feb 11 16:16:25 2022 +0100
KAFKA-13577: Replace easymock with mockito in kafka:core - part 3 (#11674)
Reviewers: Tom Bentley <tb...@redhat.com>
---
build.gradle | 1 -
checkstyle/import-control-core.xml | 1 -
.../scala/unit/kafka/server/AuthHelperTest.scala | 53 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 1726 +++++++++-----------
4 files changed, 799 insertions(+), 982 deletions(-)
diff --git a/build.gradle b/build.gradle
index acf0b19..fa0ed94 100644
--- a/build.gradle
+++ b/build.gradle
@@ -859,7 +859,6 @@ project(':core') {
testImplementation project(':raft').sourceSets.test.output
testImplementation libs.bcpkix
testImplementation libs.mockitoCore
- testImplementation libs.easymock
testImplementation(libs.apacheda) {
exclude group: 'xml-apis', module: 'xml-apis'
// `mina-core` is a transitive dependency for `apacheds` and `apacheda`.
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index f4c3b5b..e02a3f2 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -28,7 +28,6 @@
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
- <allow pkg="org.easymock" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
diff --git a/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala b/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
index 194b2c6..7f229e9 100644
--- a/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
@@ -26,22 +26,22 @@ import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
-import org.easymock.EasyMock._
-import org.easymock.{EasyMock, IArgumentMatcher}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers.argThat
+import org.mockito.ArgumentMatchers
+import org.mockito.Mockito.{mock, verify, when}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
class AuthHelperTest {
- import AuthHelperTest._
private val clientId = ""
@Test
def testAuthorize(): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
val operation = AclOperation.WRITE
val resourceType = ResourceType.TOPIC
@@ -56,23 +56,20 @@ class AuthHelperTest {
1, true, true)
)
- EasyMock.expect(authorizer.authorize(requestContext, expectedActions.asJava))
- .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
- .once()
-
- EasyMock.replay(authorizer)
+ when(authorizer.authorize(requestContext, expectedActions.asJava))
+ .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
val result = new AuthHelper(Some(authorizer)).authorize(
requestContext, operation, resourceType, resourceName)
- verify(authorizer)
+ verify(authorizer).authorize(requestContext, expectedActions.asJava)
assertEquals(true, result)
}
@Test
def testFilterByAuthorized(): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
val operation = AclOperation.WRITE
val resourceType = ResourceType.TOPIC
@@ -94,19 +91,17 @@ class AuthHelperTest {
1, true, true),
)
- EasyMock.expect(authorizer.authorize(
- EasyMock.eq(requestContext), matchSameElements(expectedActions.asJava)
- )).andAnswer { () =>
- val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
+ when(authorizer.authorize(
+ ArgumentMatchers.eq(requestContext), argThat((t: java.util.List[Action]) => t.containsAll(expectedActions.asJava))
+ )).thenAnswer { invocation =>
+ val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]].asScala
actions.map { action =>
if (Set(resourceName1, resourceName3).contains(action.resourcePattern.name))
AuthorizationResult.ALLOWED
else
AuthorizationResult.DENIED
}.asJava
- }.once()
-
- EasyMock.replay(authorizer)
+ }
val result = new AuthHelper(Some(authorizer)).filterByAuthorized(
requestContext,
@@ -116,27 +111,11 @@ class AuthHelperTest {
Seq(resourceName1, resourceName2, resourceName1, resourceName3)
)(identity)
- verify(authorizer)
+ verify(authorizer).authorize(
+ ArgumentMatchers.eq(requestContext), argThat((t: java.util.List[Action]) => t.containsAll(expectedActions.asJava))
+ )
assertEquals(Set(resourceName1, resourceName3), result)
}
}
-
-object AuthHelperTest {
-
- /**
- * Similar to `EasyMock.eq`, but matches if both lists have the same elements irrespective of ordering.
- */
- def matchSameElements[T](list: java.util.List[T]): java.util.List[T] = {
- EasyMock.reportMatcher(new IArgumentMatcher {
- def matches(argument: Any): Boolean = argument match {
- case l: java.util.List[_] => list.asScala.toSet == l.asScala.toSet
- case _ => false
- }
- def appendTo(buffer: StringBuffer): Unit = buffer.append(s"list($list)")
- })
- null
- }
-
-}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e617eab..c0058ce 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit
import java.util.{Collections, Optional, Properties, Random}
import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr}
-import kafka.cluster.{Broker, Partition}
+import kafka.cluster.Broker
import kafka.controller.{ControllerContext, KafkaController}
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback}
import kafka.coordinator.group._
@@ -82,44 +82,43 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerd
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
-import org.easymock.EasyMock._
-import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
-import org.mockito.{ArgumentMatchers, Mockito}
+import org.mockito.ArgumentMatchers.{any, anyBoolean, anyDouble, anyInt, anyLong, anyShort, anyString, argThat, isNotNull}
+import org.mockito.Mockito.{mock, reset, times, verify, when}
+import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
-import java.util.Arrays
class KafkaApisTest {
- private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
- private val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
- private val replicaManager: ReplicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
- private val groupCoordinator: GroupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator])
- private val adminManager: ZkAdminManager = EasyMock.createNiceMock(classOf[ZkAdminManager])
- private val txnCoordinator: TransactionCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
- private val controller: KafkaController = EasyMock.createNiceMock(classOf[KafkaController])
- private val forwardingManager: ForwardingManager = EasyMock.createNiceMock(classOf[ForwardingManager])
- private val autoTopicCreationManager: AutoTopicCreationManager = EasyMock.createNiceMock(classOf[AutoTopicCreationManager])
+ private val requestChannel: RequestChannel = mock(classOf[RequestChannel])
+ private val requestChannelMetrics: RequestChannel.Metrics = mock(classOf[RequestChannel.Metrics])
+ private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+ private val groupCoordinator: GroupCoordinator = mock(classOf[GroupCoordinator])
+ private val adminManager: ZkAdminManager = mock(classOf[ZkAdminManager])
+ private val txnCoordinator: TransactionCoordinator = mock(classOf[TransactionCoordinator])
+ private val controller: KafkaController = mock(classOf[KafkaController])
+ private val forwardingManager: ForwardingManager = mock(classOf[ForwardingManager])
+ private val autoTopicCreationManager: AutoTopicCreationManager = mock(classOf[AutoTopicCreationManager])
private val kafkaPrincipalSerde = new KafkaPrincipalSerde {
override def serialize(principal: KafkaPrincipal): Array[Byte] = Utils.utf8(principal.toString)
override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
}
- private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+ private val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
private val metrics = new Metrics()
private val brokerId = 1
private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId)
- private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
- private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
- private val clientControllerQuotaManager: ControllerMutationQuotaManager = EasyMock.createNiceMock(classOf[ControllerMutationQuotaManager])
- private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
+ private val clientQuotaManager: ClientQuotaManager = mock(classOf[ClientQuotaManager])
+ private val clientRequestQuotaManager: ClientRequestQuotaManager = mock(classOf[ClientRequestQuotaManager])
+ private val clientControllerQuotaManager: ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager])
+ private val replicaQuotaManager: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager])
private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager,
clientControllerQuotaManager, replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None)
- private val fetchManager: FetchManager = EasyMock.createNiceMock(classOf[FetchManager])
+ private val fetchManager: FetchManager = mock(classOf[FetchManager])
private val brokerTopicStats = new BrokerTopicStats
private val clusterId = "clusterId"
private val time = new MockTime
@@ -143,7 +142,7 @@ class KafkaApisTest {
val properties = TestUtils.createBrokerConfig(brokerId, "")
properties.put(KafkaConfig.NodeIdProp, brokerId.toString)
properties.put(KafkaConfig.ProcessRolesProp, "broker")
- val voterId = (brokerId + 1)
+ val voterId = brokerId + 1
properties.put(KafkaConfig.QuorumVotersProp, s"$voterId@localhost:9093")
properties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
properties
@@ -206,7 +205,7 @@ class KafkaApisTest {
@Test
def testDescribeConfigsWithAuthorizer(): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
val operation = AclOperation.DESCRIBE_CONFIGS
val resourceType = ResourceType.TOPIC
@@ -220,25 +219,18 @@ class KafkaApisTest {
)
// Verify that authorize is only called once
- EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(expectedActions.asJava)))
- .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
- .once()
+ when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(expectedActions.asJava)))
+ .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
- val configRepository: ConfigRepository = EasyMock.strictMock(classOf[ConfigRepository])
+ val configRepository: ConfigRepository = mock(classOf[ConfigRepository])
val topicConfigs = new Properties()
val propName = "min.insync.replicas"
val propValue = "3"
topicConfigs.put(propName, propValue)
- EasyMock.expect(configRepository.topicConfig(resourceName)).andReturn(topicConfigs)
+ when(configRepository.topicConfig(resourceName)).thenReturn(topicConfigs)
- metadataCache =
- EasyMock.partialMockBuilder(classOf[ZkMetadataCache])
- .withConstructor(classOf[Int])
- .withArgs(Int.box(brokerId)) // Need to box it for Scala 2.12 and before
- .addMockedMethod("contains", classOf[String])
- .createMock()
-
- expect(metadataCache.contains(resourceName)).andReturn(true)
+ metadataCache = mock(classOf[ZkMetadataCache])
+ when(metadataCache.contains(resourceName)).thenReturn(true)
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
.setIncludeSynonyms(true)
@@ -248,15 +240,14 @@ class KafkaApisTest {
.build(requestHeader.apiVersion)
val request = buildRequest(describeConfigsRequest,
requestHeader = Option(requestHeader))
- val capturedResponse = expectNoThrottling(request)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
- EasyMock.replay(metadataCache, replicaManager, clientRequestQuotaManager, requestChannel,
- authorizer, configRepository, adminManager)
createKafkaApis(authorizer = Some(authorizer), configRepository = configRepository)
.handleDescribeConfigsRequest(request)
- verify(authorizer, replicaManager)
-
+ verify(authorizer).authorize(any(), ArgumentMatchers.eq(expectedActions.asJava))
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[DescribeConfigsResponse]
val results = response.data().results()
assertEquals(1, results.size())
@@ -290,7 +281,7 @@ class KafkaApisTest {
alterConfigHandler: () => ApiError,
expectedError: Errors
): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER, Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
@@ -299,13 +290,13 @@ class KafkaApisTest {
val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion,
clientId, 0)
- EasyMock.expect(controller.isActive).andReturn(true)
+ when(controller.isActive).thenReturn(true)
authorizeResource(authorizer, operation, ResourceType.TOPIC, resourceName, AuthorizationResult.ALLOWED)
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName)
- EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false)))
- .andAnswer(() => {
+ when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false)))
+ .thenAnswer(_ => {
Map(configResource -> alterConfigHandler.apply())
})
@@ -317,28 +308,26 @@ class KafkaApisTest {
val request = TestUtils.buildRequestWithEnvelope(
alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds())
- val capturedResponse = EasyMock.newCapture[AbstractResponse]()
- val capturedRequest = EasyMock.newCapture[RequestChannel.Request]()
-
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.capture(capturedRequest),
- EasyMock.capture(capturedResponse),
- EasyMock.anyObject()
- ))
+ val capturedResponse: ArgumentCaptor[AlterConfigsResponse] = ArgumentCaptor.forClass(classOf[AlterConfigsResponse])
+ val capturedRequest: ArgumentCaptor[RequestChannel.Request] = ArgumentCaptor.forClass(classOf[RequestChannel.Request])
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
- adminManager, controller)
createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
+ verify(requestChannel).sendResponse(
+ capturedRequest.capture(),
+ capturedResponse.capture(),
+ any()
+ )
assertEquals(Some(request), capturedRequest.getValue.envelope)
- val innerResponse = capturedResponse.getValue.asInstanceOf[AlterConfigsResponse]
+ val innerResponse = capturedResponse.getValue
val responseMap = innerResponse.data.responses().asScala.map { resourceResponse =>
resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode)
}.toMap
assertEquals(Map(resourceName -> expectedError), responseMap)
- verify(authorizer, controller, adminManager)
+ verify(controller).isActive
+ verify(adminManager).alterConfigs(any(), ArgumentMatchers.eq(false))
}
@Test
@@ -348,16 +337,16 @@ class KafkaApisTest {
val leaveGroupRequest = new LeaveGroupRequest.Builder("group",
Collections.singletonList(new MemberIdentity())).build(requestHeader.apiVersion)
- EasyMock.expect(controller.isActive).andReturn(true)
+ when(controller.isActive).thenReturn(true)
val request = TestUtils.buildRequestWithEnvelope(
leaveGroupRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds())
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
- val capturedResponse = expectNoThrottling(request)
-
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller)
createKafkaApis(enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[EnvelopeResponse]
assertEquals(Errors.INVALID_REQUEST, response.error())
}
@@ -385,7 +374,7 @@ class KafkaApisTest {
performAuthorize: Boolean = false,
authorizeResult: AuthorizationResult = AuthorizationResult.ALLOWED,
isActiveController: Boolean = true): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
if (performAuthorize) {
authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER, Resource.CLUSTER_NAME, authorizeResult)
@@ -395,7 +384,7 @@ class KafkaApisTest {
val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion,
clientId, 0)
- EasyMock.expect(controller.isActive).andReturn(isActiveController)
+ when(controller.isActive).thenReturn(isActiveController)
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName)
@@ -408,35 +397,30 @@ class KafkaApisTest {
val request = TestUtils.buildRequestWithEnvelope(
alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds(), fromPrivilegedListener)
- val capturedResponse = EasyMock.newCapture[AbstractResponse]()
- if (shouldCloseConnection) {
- EasyMock.expect(requestChannel.closeConnection(
- EasyMock.eq(request),
- EasyMock.eq(java.util.Collections.emptyMap())
- ))
- } else {
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.eq(None)
- ))
- }
-
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
- adminManager, controller)
+ val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse])
createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
- if (!shouldCloseConnection) {
+ if (shouldCloseConnection) {
+ verify(requestChannel).closeConnection(
+ ArgumentMatchers.eq(request),
+ ArgumentMatchers.eq(java.util.Collections.emptyMap())
+ )
+ } else {
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None))
val response = capturedResponse.getValue.asInstanceOf[EnvelopeResponse]
assertEquals(expectedError, response.error)
}
-
- verify(authorizer, adminManager, requestChannel)
+ if (performAuthorize) {
+ verify(authorizer).authorize(any(), any())
+ }
}
@Test
def testAlterConfigsWithAuthorizer(): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
val authorizedTopic = "authorized-topic"
val unauthorizedTopic = "unauthorized-topic"
@@ -457,22 +441,19 @@ class KafkaApisTest {
.build(topicHeader.apiVersion)
val request = buildRequest(alterConfigsRequest)
- EasyMock.expect(controller.isActive).andReturn(false)
-
- val capturedResponse = expectNoThrottling(request)
-
- EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false)))
- .andReturn(Map(authorizedResource -> ApiError.NONE))
-
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
- adminManager, controller)
+ when(controller.isActive).thenReturn(false)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+ when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false)))
+ .thenReturn(Map(authorizedResource -> ApiError.NONE))
createKafkaApis(authorizer = Some(authorizer)).handleAlterConfigsRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
verifyAlterConfigResult(capturedResponse, Map(authorizedTopic -> Errors.NONE,
unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED))
-
- verify(authorizer, adminManager)
+ verify(authorizer, times(2)).authorize(any(), any())
+ verify(adminManager).alterConfigs(any(), anyBoolean())
}
@Test
@@ -487,10 +468,11 @@ class KafkaApisTest {
val requestBuilder = new DescribeQuorumRequest.Builder(requestData)
val request = buildRequest(requestBuilder.build())
- val capturedResponse = expectNoThrottling(request)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, adminManager, controller)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
createKafkaApis(enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[DescribeQuorumResponse]
assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode))
}
@@ -543,31 +525,30 @@ class KafkaApisTest {
// The controller check only makes sense for ZK clusters. For KRaft,
// controller requests are handled on a separate listener, so there
// is no choice but to forward them.
- EasyMock.expect(controller.isActive).andReturn(false)
+ when(controller.isActive).thenReturn(false)
}
- val capturedResponse = expectNoThrottling(request)
- val forwardCallback: Capture[Option[AbstractResponse] => Unit] = EasyMock.newCapture()
-
- EasyMock.expect(forwardingManager.forwardRequest(
- EasyMock.eq(request),
- EasyMock.capture(forwardCallback)
- )).once()
-
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller, forwardingManager)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+ val forwardCallback: ArgumentCaptor[Option[AbstractResponse] => Unit] = ArgumentCaptor.forClass(classOf[Option[AbstractResponse] => Unit])
kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
+ verify(forwardingManager).forwardRequest(
+ ArgumentMatchers.eq(request),
+ forwardCallback.capture()
+ )
assertNotNull(request.buffer, "The buffer was unexpectedly deallocated after " +
s"`handle` returned (is $apiKey marked as forwardable in `ApiKeys`?)")
val expectedResponse = apiRequest.getErrorResponse(Errors.NOT_CONTROLLER.exception)
- assertTrue(forwardCallback.hasCaptured)
forwardCallback.getValue.apply(Some(expectedResponse))
- assertTrue(capturedResponse.hasCaptured)
+ val capturedResponse = verifyNoThrottling(request)
assertEquals(expectedResponse, capturedResponse.getValue)
- EasyMock.verify(controller, requestChannel, forwardingManager)
+ if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) {
+ verify(controller).isActive
+ }
}
private def authorizeResource(authorizer: Authorizer,
@@ -586,12 +567,11 @@ class KafkaApisTest {
new ResourcePattern(resourceType, resourceName, PatternType.LITERAL),
1, logIfAllowed, logIfDenied)
- EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(Seq(expectedAuthorizedAction).asJava)))
- .andReturn(Seq(result).asJava)
- .once()
+ when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(Seq(expectedAuthorizedAction).asJava)))
+ .thenReturn(Seq(result).asJava)
}
- private def verifyAlterConfigResult(capturedResponse: Capture[AbstractResponse],
+ private def verifyAlterConfigResult(capturedResponse: ArgumentCaptor[AbstractResponse],
expectedResults: Map[String, Errors]): Unit = {
val response = capturedResponse.getValue.asInstanceOf[AlterConfigsResponse]
val responseMap = response.data.responses().asScala.map { resourceResponse =>
@@ -614,7 +594,7 @@ class KafkaApisTest {
@Test
def testIncrementalAlterConfigsWithAuthorizer(): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
val authorizedTopic = "authorized-topic"
val unauthorizedTopic = "unauthorized-topic"
@@ -628,23 +608,22 @@ class KafkaApisTest {
val request = buildRequest(incrementalAlterConfigsRequest,
fromPrivilegedListener = true, requestHeader = Option(requestHeader))
- EasyMock.expect(controller.isActive).andReturn(true)
+ when(controller.isActive).thenReturn(true)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+ when(adminManager.incrementalAlterConfigs(any(), ArgumentMatchers.eq(false)))
+ .thenReturn(Map(authorizedResource -> ApiError.NONE))
- val capturedResponse = expectNoThrottling(request)
-
- EasyMock.expect(adminManager.incrementalAlterConfigs(anyObject(), EasyMock.eq(false)))
- .andReturn(Map(authorizedResource -> ApiError.NONE))
-
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
- adminManager, controller)
createKafkaApis(authorizer = Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
verifyIncrementalAlterConfigResult(capturedResponse, Map(
authorizedTopic -> Errors.NONE,
unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED
))
- verify(authorizer, adminManager)
+ verify(authorizer, times(2)).authorize(any(), any())
+ verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
}
private def getIncrementalAlterConfigRequestBuilder(configResources: Seq[ConfigResource]): IncrementalAlterConfigsRequest.Builder = {
@@ -657,7 +636,7 @@ class KafkaApisTest {
new IncrementalAlterConfigsRequest.Builder(resourceMap, false)
}
- private def verifyIncrementalAlterConfigResult(capturedResponse: Capture[AbstractResponse],
+ private def verifyIncrementalAlterConfigResult(capturedResponse: ArgumentCaptor[AbstractResponse],
expectedResults: Map[String, Errors]): Unit = {
val response = capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
val responseMap = response.data.responses().asScala.map { resourceResponse =>
@@ -668,7 +647,7 @@ class KafkaApisTest {
@Test
def testAlterClientQuotasWithAuthorizer(): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
@@ -683,17 +662,17 @@ class KafkaApisTest {
val request = buildRequest(alterClientQuotasRequest,
fromPrivilegedListener = true, requestHeader = Option(requestHeader))
- EasyMock.expect(controller.isActive).andReturn(true)
-
- val capturedResponse = expectNoThrottling(request)
+ when(controller.isActive).thenReturn(true)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ anyLong)).thenReturn(0)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
- adminManager, controller)
createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
verifyAlterClientQuotaResult(capturedResponse, Map(quotaEntity -> Errors.CLUSTER_AUTHORIZATION_FAILED))
- verify(authorizer, adminManager)
+ verify(authorizer).authorize(any(), any())
+ verify(clientRequestQuotaManager).maybeRecordAndGetThrottleTimeMs(any(), anyLong)
}
@Test
@@ -702,7 +681,7 @@ class KafkaApisTest {
testForwardableApi(ApiKeys.ALTER_CLIENT_QUOTAS, requestBuilder)
}
- private def verifyAlterClientQuotaResult(capturedResponse: Capture[AbstractResponse],
+ private def verifyAlterClientQuotaResult(capturedResponse: ArgumentCaptor[AbstractResponse],
expected: Map[ClientQuotaEntity, Errors]): Unit = {
val response = capturedResponse.getValue.asInstanceOf[AlterClientQuotasResponse]
val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap
@@ -717,7 +696,7 @@ class KafkaApisTest {
@Test
def testCreateTopicsWithAuthorizer(): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
val authorizedTopic = "authorized-topic"
val unauthorizedTopic = "unauthorized-topic"
@@ -733,7 +712,7 @@ class KafkaApisTest {
val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, ApiKeys.CREATE_TOPICS.latestVersion, clientId, 0)
- EasyMock.expect(controller.isActive).andReturn(true)
+ when(controller.isActive).thenReturn(true)
val topics = new CreateTopicsRequestData.CreatableTopicCollection(2)
val topicToCreate = new CreateTopicsRequestData.CreatableTopic()
@@ -754,33 +733,27 @@ class KafkaApisTest {
val request = buildRequest(createTopicsRequest,
fromPrivilegedListener = true, requestHeader = Option(requestHeader))
- val capturedResponse = expectNoThrottling(request)
-
- EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
- EasyMock.eq(request), EasyMock.eq(6))).andReturn(UnboundedControllerMutationQuota)
-
- val capturedCallback = EasyMock.newCapture[Map[String, ApiError] => Unit]()
-
- EasyMock.expect(adminManager.createTopics(
- EasyMock.eq(timeout),
- EasyMock.eq(false),
- EasyMock.eq(Map(authorizedTopic -> topicToCreate)),
- anyObject(),
- EasyMock.eq(UnboundedControllerMutationQuota),
- EasyMock.capture(capturedCallback)))
-
- EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
- requestChannel, authorizer, adminManager, controller)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+ when(clientControllerQuotaManager.newQuotaFor(
+ ArgumentMatchers.eq(request), ArgumentMatchers.eq(6))).thenReturn(UnboundedControllerMutationQuota)
createKafkaApis(authorizer = Some(authorizer)).handleCreateTopicsRequest(request)
+ val capturedCallback: ArgumentCaptor[Map[String, ApiError] => Unit] = ArgumentCaptor.forClass(classOf[Map[String, ApiError] => Unit])
+
+ verify(adminManager).createTopics(
+ ArgumentMatchers.eq(timeout),
+ ArgumentMatchers.eq(false),
+ ArgumentMatchers.eq(Map(authorizedTopic -> topicToCreate)),
+ any(),
+ ArgumentMatchers.eq(UnboundedControllerMutationQuota),
+ capturedCallback.capture())
capturedCallback.getValue.apply(Map(authorizedTopic -> ApiError.NONE))
- verifyCreateTopicsResult(createTopicsRequest,
- capturedResponse, Map(authorizedTopic -> Errors.NONE,
+ val capturedResponse = verifyNoThrottling(request)
+ verifyCreateTopicsResult(capturedResponse, Map(authorizedTopic -> Errors.NONE,
unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED))
-
- verify(authorizer, adminManager, clientControllerQuotaManager)
}
@Test
@@ -819,21 +792,20 @@ class KafkaApisTest {
new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL),
1, logIfAllowed, logIfDenied))
- EasyMock.expect(authorizer.authorize(
- anyObject[RequestContext], AuthHelperTest.matchSameElements(expectedAuthorizedActions.asJava)
- )).andAnswer { () =>
- val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
- actions.map { action =>
+ when(authorizer.authorize(
+ any[RequestContext], argThat((t: java.util.List[Action]) => t != null && t.containsAll(expectedAuthorizedActions.asJava))
+ )).thenAnswer { invocation =>
+ val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]]
+ actions.asScala.map { action =>
if (action.resourcePattern().name().equals(authorizedTopic))
AuthorizationResult.ALLOWED
else
AuthorizationResult.DENIED
}.asJava
- }.once()
+ }
}
- private def verifyCreateTopicsResult(createTopicsRequest: CreateTopicsRequest,
- capturedResponse: Capture[AbstractResponse],
+ private def verifyCreateTopicsResult(capturedResponse: ArgumentCaptor[AbstractResponse],
expectedResults: Map[String, Errors]): Unit = {
val response = capturedResponse.getValue.asInstanceOf[CreateTopicsResponse]
val responseMap = response.data.topics().asScala.map { topicResponse =>
@@ -946,7 +918,7 @@ class KafkaApisTest {
private def testFindCoordinatorWithTopicCreation(coordinatorType: CoordinatorType,
hasEnoughLiveBrokers: Boolean = true,
version: Short = ApiKeys.FIND_COORDINATOR.latestVersion): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
val requestHeader = new RequestHeader(ApiKeys.FIND_COORDINATOR, version, clientId, 0)
@@ -964,14 +936,14 @@ class KafkaApisTest {
case CoordinatorType.GROUP =>
topicConfigOverride.put(KafkaConfig.OffsetsTopicPartitionsProp, numBrokersNeeded.toString)
topicConfigOverride.put(KafkaConfig.OffsetsTopicReplicationFactorProp, numBrokersNeeded.toString)
- EasyMock.expect(groupCoordinator.offsetsTopicConfigs).andReturn(new Properties)
+ when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.GROUP,
groupId, AuthorizationResult.ALLOWED)
Topic.GROUP_METADATA_TOPIC_NAME
case CoordinatorType.TRANSACTION =>
topicConfigOverride.put(KafkaConfig.TransactionsTopicPartitionsProp, numBrokersNeeded.toString)
topicConfigOverride.put(KafkaConfig.TransactionsTopicReplicationFactorProp, numBrokersNeeded.toString)
- EasyMock.expect(txnCoordinator.transactionTopicConfigs).andReturn(new Properties)
+ when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties)
authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.TRANSACTIONAL_ID,
groupId, AuthorizationResult.ALLOWED)
Topic.TRANSACTION_STATE_TOPIC_NAME
@@ -983,7 +955,7 @@ class KafkaApisTest {
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(coordinatorType.id())
- .setCoordinatorKeys(Arrays.asList(groupId)))
+ .setCoordinatorKeys(asList(groupId)))
} else {
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
@@ -991,17 +963,15 @@ class KafkaApisTest {
.setKey(groupId))
}
val request = buildRequest(findCoordinatorRequestBuilder.build(requestHeader.apiVersion))
-
- val capturedResponse = expectNoThrottling(request)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
val capturedRequest = verifyTopicCreation(topicName, true, true, request)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
- autoTopicCreationManager, forwardingManager, controller, clientControllerQuotaManager, groupCoordinator, txnCoordinator)
-
createKafkaApis(authorizer = Some(authorizer),
overrideProperties = topicConfigOverride).handleFindCoordinatorRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[FindCoordinatorResponse]
if (version >= 4) {
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code, response.data.coordinators.get(0).errorCode)
@@ -1009,10 +979,7 @@ class KafkaApisTest {
} else {
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code, response.data.errorCode)
}
-
assertTrue(capturedRequest.getValue.isEmpty)
-
- verify(authorizer, autoTopicCreationManager)
}
@Test
@@ -1060,7 +1027,7 @@ class KafkaApisTest {
private def testMetadataAutoTopicCreation(topicName: String,
enableAutoTopicCreation: Boolean,
expectedError: Errors): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
val requestHeader = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion,
clientId, 0)
@@ -1081,13 +1048,13 @@ class KafkaApisTest {
case Topic.GROUP_METADATA_TOPIC_NAME =>
topicConfigOverride.put(KafkaConfig.OffsetsTopicPartitionsProp, numBrokersNeeded.toString)
topicConfigOverride.put(KafkaConfig.OffsetsTopicReplicationFactorProp, numBrokersNeeded.toString)
- EasyMock.expect(groupCoordinator.offsetsTopicConfigs).andReturn(new Properties)
+ when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
true
case Topic.TRANSACTION_STATE_TOPIC_NAME =>
topicConfigOverride.put(KafkaConfig.TransactionsTopicPartitionsProp, numBrokersNeeded.toString)
topicConfigOverride.put(KafkaConfig.TransactionsTopicReplicationFactorProp, numBrokersNeeded.toString)
- EasyMock.expect(txnCoordinator.transactionTopicConfigs).andReturn(new Properties)
+ when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties)
true
case _ =>
topicConfigOverride.put(KafkaConfig.NumPartitionsProp, numBrokersNeeded.toString)
@@ -1100,16 +1067,15 @@ class KafkaApisTest {
).build(requestHeader.apiVersion)
val request = buildRequest(metadataRequest)
- val capturedResponse = expectNoThrottling(request)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
val capturedRequest = verifyTopicCreation(topicName, enableAutoTopicCreation, isInternal, request)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
- autoTopicCreationManager, forwardingManager, clientControllerQuotaManager, groupCoordinator, txnCoordinator)
-
createKafkaApis(authorizer = Some(authorizer), enableForwarding = enableAutoTopicCreation,
overrideProperties = topicConfigOverride).handleTopicMetadataRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[MetadataResponse]
val expectedMetadataResponse = util.Collections.singletonList(new TopicMetadata(
expectedError,
@@ -1124,28 +1090,26 @@ class KafkaApisTest {
assertTrue(capturedRequest.getValue.isDefined)
assertEquals(request.context, capturedRequest.getValue.get)
}
-
- verify(authorizer, autoTopicCreationManager)
}
private def verifyTopicCreation(topicName: String,
enableAutoTopicCreation: Boolean,
isInternal: Boolean,
- request: RequestChannel.Request): Capture[Option[RequestContext]] = {
- val capturedRequest = EasyMock.newCapture[Option[RequestContext]]()
+ request: RequestChannel.Request): ArgumentCaptor[Option[RequestContext]] = {
+ val capturedRequest: ArgumentCaptor[Option[RequestContext]] = ArgumentCaptor.forClass(classOf[Option[RequestContext]])
if (enableAutoTopicCreation) {
- EasyMock.expect(clientControllerQuotaManager.newPermissiveQuotaFor(EasyMock.eq(request)))
- .andReturn(UnboundedControllerMutationQuota)
+ when(clientControllerQuotaManager.newPermissiveQuotaFor(ArgumentMatchers.eq(request)))
+ .thenReturn(UnboundedControllerMutationQuota)
- EasyMock.expect(autoTopicCreationManager.createTopics(
- EasyMock.eq(Set(topicName)),
- EasyMock.eq(UnboundedControllerMutationQuota),
- EasyMock.capture(capturedRequest))).andReturn(
+ when(autoTopicCreationManager.createTopics(
+ ArgumentMatchers.eq(Set(topicName)),
+ ArgumentMatchers.eq(UnboundedControllerMutationQuota),
+ capturedRequest.capture())).thenReturn(
Seq(new MetadataResponseTopic()
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setIsInternal(isInternal)
.setName(topicName))
- ).once()
+ )
}
capturedRequest
}
@@ -1166,9 +1130,6 @@ class KafkaApisTest {
new MetadataRequestData.MetadataRequestTopic().setTopicId(Uuid.randomUuid()),
new MetadataRequestData.MetadataRequestTopic().setName("topic1").setTopicId(Uuid.randomUuid()))
- EasyMock.replay(replicaManager, clientRequestQuotaManager,
- autoTopicCreationManager, forwardingManager, clientControllerQuotaManager, groupCoordinator, txnCoordinator)
-
// if version is 10 or 11, the invalid topic metadata should return an error
val invalidVersions = Set(10, 11)
invalidVersions.foreach( version =>
@@ -1177,16 +1138,13 @@ class KafkaApisTest {
val request = buildRequest(new MetadataRequest(metadataRequestData, version.toShort))
val kafkaApis = createKafkaApis()
- val capturedResponse = EasyMock.newCapture[AbstractResponse]()
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.anyObject()
- ))
-
- EasyMock.replay(requestChannel)
+ val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse])
kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
-
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ any()
+ )
val response = capturedResponse.getValue.asInstanceOf[MetadataResponse]
assertEquals(1, response.topicMetadata.size)
assertEquals(1, response.errorCounts.get(Errors.INVALID_REQUEST))
@@ -1202,7 +1160,7 @@ class KafkaApisTest {
addTopicToMetadataCache(topic, numPartitions = 1)
def checkInvalidPartition(invalidPartitionId: Int): Unit = {
- EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+ reset(replicaManager, clientRequestQuotaManager, requestChannel)
val offsetCommitRequest = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
@@ -1220,10 +1178,11 @@ class KafkaApisTest {
))).build()
val request = buildRequest(offsetCommitRequest)
- val capturedResponse = expectNoThrottling(request)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
createKafkaApis().handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[OffsetCommitResponse]
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode))
@@ -1239,7 +1198,7 @@ class KafkaApisTest {
addTopicToMetadataCache(topic, numPartitions = 1)
def checkInvalidPartition(invalidPartitionId: Int): Unit = {
- EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+ reset(replicaManager, clientRequestQuotaManager, requestChannel)
val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty())
@@ -1251,11 +1210,11 @@ class KafkaApisTest {
Map(invalidTopicPartition -> partitionOffsetCommitData).asJava,
).build()
val request = buildRequest(offsetCommitRequest)
-
- val capturedResponse = expectNoThrottling(request)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
createKafkaApis().handleTxnOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[TxnOffsetCommitResponse]
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
}
@@ -1270,11 +1229,11 @@ class KafkaApisTest {
addTopicToMetadataCache(topic, numPartitions = 2)
for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to ApiKeys.TXN_OFFSET_COMMIT.latestVersion) {
- EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator)
+ reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator)
val topicPartition = new TopicPartition(topic, 1)
- val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
- val responseCallback: Capture[Map[TopicPartition, Errors] => Unit] = EasyMock.newCapture()
+ val capturedResponse: ArgumentCaptor[TxnOffsetCommitResponse] = ArgumentCaptor.forClass(classOf[TxnOffsetCommitResponse])
+ val responseCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit])
val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty())
val groupId = "groupId"
@@ -1292,30 +1251,26 @@ class KafkaApisTest {
val request = buildRequest(offsetCommitRequest)
val requestLocal = RequestLocal.withThreadConfinedCaching
- EasyMock.expect(groupCoordinator.handleTxnCommitOffsets(
- EasyMock.eq(groupId),
- EasyMock.eq(producerId),
- EasyMock.eq(epoch),
- EasyMock.anyString(),
- EasyMock.eq(Option.empty),
- EasyMock.anyInt(),
- EasyMock.anyObject(),
- EasyMock.capture(responseCallback),
- EasyMock.eq(requestLocal)
- )).andAnswer(
- () => responseCallback.getValue.apply(Map(topicPartition -> Errors.COORDINATOR_LOAD_IN_PROGRESS)))
-
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.eq(None)
- ))
-
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator)
+ when(groupCoordinator.handleTxnCommitOffsets(
+ ArgumentMatchers.eq(groupId),
+ ArgumentMatchers.eq(producerId),
+ ArgumentMatchers.eq(epoch),
+ anyString,
+ ArgumentMatchers.eq(Option.empty),
+ anyInt,
+ any(),
+ responseCallback.capture(),
+ ArgumentMatchers.eq(requestLocal)
+ )).thenAnswer(_ => responseCallback.getValue.apply(Map(topicPartition -> Errors.COORDINATOR_LOAD_IN_PROGRESS)))
createKafkaApis().handleTxnOffsetCommitRequest(request, requestLocal)
- val response = capturedResponse.getValue.asInstanceOf[TxnOffsetCommitResponse]
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val response = capturedResponse.getValue
if (version < 2) {
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, response.errors().get(topicPartition))
@@ -1332,10 +1287,10 @@ class KafkaApisTest {
for (version <- ApiKeys.INIT_PRODUCER_ID.oldestVersion to ApiKeys.INIT_PRODUCER_ID.latestVersion) {
- EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+ reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
- val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
- val responseCallback: Capture[InitProducerIdResult => Unit] = EasyMock.newCapture()
+ val capturedResponse: ArgumentCaptor[InitProducerIdResponse] = ArgumentCaptor.forClass(classOf[InitProducerIdResponse])
+ val responseCallback: ArgumentCaptor[InitProducerIdResult => Unit] = ArgumentCaptor.forClass(classOf[InitProducerIdResult => Unit])
val transactionalId = "txnId"
val producerId = if (version < 3)
@@ -1366,26 +1321,22 @@ class KafkaApisTest {
Option(new ProducerIdAndEpoch(producerId, epoch))
val requestLocal = RequestLocal.withThreadConfinedCaching
- EasyMock.expect(txnCoordinator.handleInitProducerId(
- EasyMock.eq(transactionalId),
- EasyMock.eq(txnTimeoutMs),
- EasyMock.eq(expectedProducerIdAndEpoch),
- EasyMock.capture(responseCallback),
- EasyMock.eq(requestLocal)
- )).andAnswer(
- () => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED)))
-
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.eq(None)
- ))
-
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+ when(txnCoordinator.handleInitProducerId(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(txnTimeoutMs),
+ ArgumentMatchers.eq(expectedProducerIdAndEpoch),
+ responseCallback.capture(),
+ ArgumentMatchers.eq(requestLocal)
+ )).thenAnswer(_ => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED)))
createKafkaApis().handleInitProducerIdRequest(request, requestLocal)
- val response = capturedResponse.getValue.asInstanceOf[InitProducerIdResponse]
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val response = capturedResponse.getValue
if (version < 4) {
assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
@@ -1402,10 +1353,10 @@ class KafkaApisTest {
for (version <- ApiKeys.ADD_OFFSETS_TO_TXN.oldestVersion to ApiKeys.ADD_OFFSETS_TO_TXN.latestVersion) {
- EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator, txnCoordinator)
+ reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator, txnCoordinator)
- val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
- val responseCallback: Capture[Errors => Unit] = EasyMock.newCapture()
+ val capturedResponse: ArgumentCaptor[AddOffsetsToTxnResponse] = ArgumentCaptor.forClass(classOf[AddOffsetsToTxnResponse])
+ val responseCallback: ArgumentCaptor[Errors => Unit] = ArgumentCaptor.forClass(classOf[Errors => Unit])
val groupId = "groupId"
val transactionalId = "txnId"
@@ -1422,32 +1373,28 @@ class KafkaApisTest {
val request = buildRequest(addOffsetsToTxnRequest)
val partition = 1
- EasyMock.expect(groupCoordinator.partitionFor(
- EasyMock.eq(groupId)
- )).andReturn(partition)
+ when(groupCoordinator.partitionFor(
+ ArgumentMatchers.eq(groupId)
+ )).thenReturn(partition)
val requestLocal = RequestLocal.withThreadConfinedCaching
- EasyMock.expect(txnCoordinator.handleAddPartitionsToTransaction(
- EasyMock.eq(transactionalId),
- EasyMock.eq(producerId),
- EasyMock.eq(epoch),
- EasyMock.eq(Set(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))),
- EasyMock.capture(responseCallback),
- EasyMock.eq(requestLocal)
- )).andAnswer(
- () => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
-
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.eq(None)
- ))
-
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, groupCoordinator)
+ when(txnCoordinator.handleAddPartitionsToTransaction(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(producerId),
+ ArgumentMatchers.eq(epoch),
+ ArgumentMatchers.eq(Set(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))),
+ responseCallback.capture(),
+ ArgumentMatchers.eq(requestLocal)
+ )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
createKafkaApis().handleAddOffsetsToTxnRequest(request, requestLocal)
- val response = capturedResponse.getValue.asInstanceOf[AddOffsetsToTxnResponse]
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val response = capturedResponse.getValue
if (version < 2) {
assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
@@ -1464,10 +1411,10 @@ class KafkaApisTest {
for (version <- ApiKeys.ADD_PARTITIONS_TO_TXN.oldestVersion to ApiKeys.ADD_PARTITIONS_TO_TXN.latestVersion) {
- EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+ reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
- val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
- val responseCallback: Capture[Errors => Unit] = EasyMock.newCapture()
+ val capturedResponse: ArgumentCaptor[AddPartitionsToTxnResponse] = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnResponse])
+ val responseCallback: ArgumentCaptor[Errors => Unit] = ArgumentCaptor.forClass(classOf[Errors => Unit])
val transactionalId = "txnId"
val producerId = 15L
@@ -1485,27 +1432,23 @@ class KafkaApisTest {
val request = buildRequest(addPartitionsToTxnRequest)
val requestLocal = RequestLocal.withThreadConfinedCaching
- EasyMock.expect(txnCoordinator.handleAddPartitionsToTransaction(
- EasyMock.eq(transactionalId),
- EasyMock.eq(producerId),
- EasyMock.eq(epoch),
- EasyMock.eq(Set(topicPartition)),
- EasyMock.capture(responseCallback),
- EasyMock.eq(requestLocal)
- )).andAnswer(
- () => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
-
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.eq(None)
- ))
-
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+ when(txnCoordinator.handleAddPartitionsToTransaction(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(producerId),
+ ArgumentMatchers.eq(epoch),
+ ArgumentMatchers.eq(Set(topicPartition)),
+ responseCallback.capture(),
+ ArgumentMatchers.eq(requestLocal)
+ )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
createKafkaApis().handleAddPartitionToTxnRequest(request, requestLocal)
- val response = capturedResponse.getValue.asInstanceOf[AddPartitionsToTxnResponse]
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val response = capturedResponse.getValue
if (version < 2) {
assertEquals(Collections.singletonMap(topicPartition, Errors.INVALID_PRODUCER_EPOCH), response.errors())
@@ -1521,10 +1464,10 @@ class KafkaApisTest {
addTopicToMetadataCache(topic, numPartitions = 2)
for (version <- ApiKeys.END_TXN.oldestVersion to ApiKeys.END_TXN.latestVersion) {
- EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+ reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
- val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
- val responseCallback: Capture[Errors => Unit] = EasyMock.newCapture()
+ val capturedResponse: ArgumentCaptor[EndTxnResponse] = ArgumentCaptor.forClass(classOf[EndTxnResponse])
+ val responseCallback: ArgumentCaptor[Errors => Unit] = ArgumentCaptor.forClass(classOf[Errors => Unit])
val transactionalId = "txnId"
val producerId = 15L
@@ -1540,26 +1483,23 @@ class KafkaApisTest {
val request = buildRequest(endTxnRequest)
val requestLocal = RequestLocal.withThreadConfinedCaching
- EasyMock.expect(txnCoordinator.handleEndTransaction(
- EasyMock.eq(transactionalId),
- EasyMock.eq(producerId),
- EasyMock.eq(epoch),
- EasyMock.eq(TransactionResult.COMMIT),
- EasyMock.capture(responseCallback),
- EasyMock.eq(requestLocal)
- )).andAnswer(
- () => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
-
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.eq(None)
- ))
+ when(txnCoordinator.handleEndTransaction(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(producerId),
+ ArgumentMatchers.eq(epoch),
+ ArgumentMatchers.eq(TransactionResult.COMMIT),
+ responseCallback.capture(),
+ ArgumentMatchers.eq(requestLocal)
+ )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
createKafkaApis().handleEndTxnRequest(request, requestLocal)
- val response = capturedResponse.getValue.asInstanceOf[EndTxnResponse]
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val response = capturedResponse.getValue
if (version < 2) {
assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
@@ -1576,9 +1516,9 @@ class KafkaApisTest {
for (version <- ApiKeys.PRODUCE.oldestVersion to ApiKeys.PRODUCE.latestVersion) {
- EasyMock.reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+ reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
- val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+ val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
val tp = new TopicPartition("topic", 0)
@@ -1595,25 +1535,25 @@ class KafkaApisTest {
.build(version.toShort)
val request = buildRequest(produceRequest)
- EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
- EasyMock.anyShort(),
- EasyMock.eq(false),
- EasyMock.eq(AppendOrigin.Client),
- EasyMock.anyObject(),
- EasyMock.capture(responseCallback),
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.anyObject())
- ).andAnswer(() => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))))
-
- val capturedResponse = expectNoThrottling(request)
- EasyMock.expect(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
- anyObject[RequestChannel.Request](), anyDouble, anyLong)).andReturn(0)
-
- EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+ when(replicaManager.appendRecords(anyLong,
+ anyShort,
+ ArgumentMatchers.eq(false),
+ ArgumentMatchers.eq(AppendOrigin.Client),
+ any(),
+ responseCallback.capture(),
+ any(),
+ any(),
+ any())
+ ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))))
+
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+ when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+ any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
createKafkaApis().handleProduceRequest(request, RequestLocal.withThreadConfinedCaching)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[ProduceResponse]
assertEquals(1, response.data.responses.size)
@@ -1630,7 +1570,7 @@ class KafkaApisTest {
addTopicToMetadataCache(topic, numPartitions = 1)
def checkInvalidPartition(invalidPartitionId: Int): Unit = {
- EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+ reset(replicaManager, clientRequestQuotaManager, requestChannel)
val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
val addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(
@@ -1638,10 +1578,11 @@ class KafkaApisTest {
).build()
val request = buildRequest(addPartitionsToTxnRequest)
- val capturedResponse = expectNoThrottling(request)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
createKafkaApis().handleAddPartitionToTxnRequest(request, RequestLocal.withThreadConfinedCaching)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[AddPartitionsToTxnResponse]
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
}
@@ -1683,44 +1624,41 @@ class KafkaApisTest {
@Test
def shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired(): Unit = {
val topicPartition = new TopicPartition("t", 0)
- val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(asList(topicPartition))
+ val (_, request) = createWriteTxnMarkersRequest(asList(topicPartition))
val expectedErrors = Map(topicPartition -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT).asJava
- val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
-
- EasyMock.expect(replicaManager.getMagic(topicPartition))
- .andReturn(Some(RecordBatch.MAGIC_VALUE_V1))
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.eq(None)
- ))
- EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
+ val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
+
+ when(replicaManager.getMagic(topicPartition))
+ .thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
createKafkaApis().handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching)
- val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val markersResponse = capturedResponse.getValue
assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
}
@Test
def shouldRespondWithUnknownTopicWhenPartitionIsNotHosted(): Unit = {
val topicPartition = new TopicPartition("t", 0)
- val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(asList(topicPartition))
+ val (_, request) = createWriteTxnMarkersRequest(asList(topicPartition))
val expectedErrors = Map(topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION).asJava
- val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
-
- EasyMock.expect(replicaManager.getMagic(topicPartition))
- .andReturn(None)
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.eq(None)
- ))
- EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
+ val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
+ when(replicaManager.getMagic(topicPartition))
+ .thenReturn(None)
createKafkaApis().handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching)
- val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val markersResponse = capturedResponse.getValue
assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
}
@@ -1728,41 +1666,38 @@ class KafkaApisTest {
def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
val tp1 = new TopicPartition("t", 0)
val tp2 = new TopicPartition("t1", 0)
- val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(asList(tp1, tp2))
+ val (_, request) = createWriteTxnMarkersRequest(asList(tp1, tp2))
val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava
- val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
- val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+ val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
+ val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
- EasyMock.expect(replicaManager.getMagic(tp1))
- .andReturn(Some(RecordBatch.MAGIC_VALUE_V1))
- EasyMock.expect(replicaManager.getMagic(tp2))
- .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+ when(replicaManager.getMagic(tp1))
+ .thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
+ when(replicaManager.getMagic(tp2))
+ .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
val requestLocal = RequestLocal.withThreadConfinedCaching
- EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
- EasyMock.anyShort(),
- EasyMock.eq(true),
- EasyMock.eq(AppendOrigin.Coordinator),
- EasyMock.anyObject(),
- EasyMock.capture(responseCallback),
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.eq(requestLocal))
- ).andAnswer(() => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
-
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.eq(None)
- ))
- EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
+ when(replicaManager.appendRecords(anyLong,
+ anyShort,
+ ArgumentMatchers.eq(true),
+ ArgumentMatchers.eq(AppendOrigin.Coordinator),
+ any(),
+ responseCallback.capture(),
+ any(),
+ any(),
+ ArgumentMatchers.eq(requestLocal))
+ ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal)
- val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val markersResponse = capturedResponse.getValue
assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
- EasyMock.verify(replicaManager)
}
@Test
@@ -1830,108 +1765,90 @@ class KafkaApisTest {
).build()
val request = buildRequest(stopReplicaRequest)
- EasyMock.expect(replicaManager.stopReplicas(
- EasyMock.eq(request.context.correlationId),
- EasyMock.eq(controllerId),
- EasyMock.eq(controllerEpoch),
- EasyMock.eq(stopReplicaRequest.partitionStates().asScala)
- )).andReturn(
+ when(replicaManager.stopReplicas(
+ ArgumentMatchers.eq(request.context.correlationId),
+ ArgumentMatchers.eq(controllerId),
+ ArgumentMatchers.eq(controllerEpoch),
+ ArgumentMatchers.eq(stopReplicaRequest.partitionStates().asScala)
+ )).thenReturn(
(mutable.Map(
groupMetadataPartition -> Errors.NONE,
txnStatePartition -> Errors.NONE,
fooPartition -> Errors.NONE
), Errors.NONE)
)
- EasyMock.expect(controller.brokerEpoch).andStubReturn(brokerEpoch)
+ when(controller.brokerEpoch).thenReturn(brokerEpoch)
- if (deletePartition) {
- if (leaderEpoch >= 0) {
- txnCoordinator.onResignation(txnStatePartition.partition, Some(leaderEpoch))
- } else {
- txnCoordinator.onResignation(txnStatePartition.partition, None)
- }
- EasyMock.expectLastCall()
- }
+ createKafkaApis().handleStopReplicaRequest(request)
if (deletePartition) {
if (leaderEpoch >= 0) {
- groupCoordinator.onResignation(groupMetadataPartition.partition, Some(leaderEpoch))
+ verify(txnCoordinator).onResignation(txnStatePartition.partition, Some(leaderEpoch))
+ verify(groupCoordinator).onResignation(groupMetadataPartition.partition, Some(leaderEpoch))
} else {
- groupCoordinator.onResignation(groupMetadataPartition.partition, None)
+ verify(txnCoordinator).onResignation(txnStatePartition.partition, None)
+ verify(groupCoordinator).onResignation(groupMetadataPartition.partition, None)
}
- EasyMock.expectLastCall()
}
-
- EasyMock.replay(controller, replicaManager, txnCoordinator, groupCoordinator)
-
- createKafkaApis().handleStopReplicaRequest(request)
-
- EasyMock.verify(txnCoordinator, groupCoordinator)
}
@Test
def shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
val tp1 = new TopicPartition("t", 0)
val tp2 = new TopicPartition("t1", 0)
- val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(asList(tp1, tp2))
+ val (_, request) = createWriteTxnMarkersRequest(asList(tp1, tp2))
val expectedErrors = Map(tp1 -> Errors.UNKNOWN_TOPIC_OR_PARTITION, tp2 -> Errors.NONE).asJava
- val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
- val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+ val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
+ val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
- EasyMock.expect(replicaManager.getMagic(tp1))
- .andReturn(None)
- EasyMock.expect(replicaManager.getMagic(tp2))
- .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+ when(replicaManager.getMagic(tp1))
+ .thenReturn(None)
+ when(replicaManager.getMagic(tp2))
+ .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
val requestLocal = RequestLocal.withThreadConfinedCaching
- EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
- EasyMock.anyShort(),
- EasyMock.eq(true),
- EasyMock.eq(AppendOrigin.Coordinator),
- EasyMock.anyObject(),
- EasyMock.capture(responseCallback),
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.eq(requestLocal))
- ).andAnswer(() => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
-
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.eq(None)
- ))
- EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
+ when(replicaManager.appendRecords(anyLong,
+ anyShort,
+ ArgumentMatchers.eq(true),
+ ArgumentMatchers.eq(AppendOrigin.Coordinator),
+ any(),
+ responseCallback.capture(),
+ any(),
+ any(),
+ ArgumentMatchers.eq(requestLocal))
+ ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal)
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
- val markersResponse = capturedResponse.getValue.asInstanceOf[WriteTxnMarkersResponse]
+ val markersResponse = capturedResponse.getValue
assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
- EasyMock.verify(replicaManager)
}
@Test
def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = {
val topicPartition = new TopicPartition("t", 0)
val request = createWriteTxnMarkersRequest(asList(topicPartition))._2
- EasyMock.expect(replicaManager.getMagic(topicPartition))
- .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+ when(replicaManager.getMagic(topicPartition))
+ .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
val requestLocal = RequestLocal.withThreadConfinedCaching
- EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
- EasyMock.anyShort(),
- EasyMock.eq(true),
- EasyMock.eq(AppendOrigin.Coordinator),
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.eq(requestLocal)))
-
- EasyMock.replay(replicaManager)
createKafkaApis().handleWriteTxnMarkersRequest(request, requestLocal)
- EasyMock.verify(replicaManager)
+ verify(replicaManager).appendRecords(anyLong,
+ anyShort,
+ ArgumentMatchers.eq(true),
+ ArgumentMatchers.eq(AppendOrigin.Coordinator),
+ any(),
+ any(),
+ any(),
+ any(),
+ ArgumentMatchers.eq(requestLocal))
}
@Test
@@ -1966,20 +1883,21 @@ class KafkaApisTest {
val memberSummary = MemberSummary("memberid", Some("instanceid"), "clientid", "clienthost", metadata, assignment)
val groupSummary = GroupSummary("Stable", "consumer", "roundrobin", List(memberSummary))
- EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+ reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
val describeGroupsRequest = new DescribeGroupsRequest.Builder(
new DescribeGroupsRequestData().setGroups(List(groupId).asJava)
).build()
val request = buildRequest(describeGroupsRequest)
- val capturedResponse = expectNoThrottling(request)
- EasyMock.expect(groupCoordinator.handleDescribeGroup(EasyMock.eq(groupId)))
- .andReturn((Errors.NONE, groupSummary))
- EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+ when(groupCoordinator.handleDescribeGroup(ArgumentMatchers.eq(groupId)))
+ .thenReturn((Errors.NONE, groupSummary))
createKafkaApis().handleDescribeGroupRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[DescribeGroupsResponse]
val group = response.data.groups().get(0)
@@ -2005,7 +1923,7 @@ class KafkaApisTest {
addTopicToMetadataCache("topic-1", numPartitions = 2)
addTopicToMetadataCache("topic-2", numPartitions = 2)
- EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+ reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
val topics = new OffsetDeleteRequestTopicCollection()
topics.add(new OffsetDeleteRequestTopic()
@@ -2027,27 +1945,27 @@ class KafkaApisTest {
val request = buildRequest(offsetDeleteRequest)
val requestLocal = RequestLocal.withThreadConfinedCaching
- val capturedResponse = expectNoThrottling(request)
- EasyMock.expect(groupCoordinator.handleDeleteOffsets(
- EasyMock.eq(group),
- EasyMock.eq(Seq(
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+ when(groupCoordinator.handleDeleteOffsets(
+ ArgumentMatchers.eq(group),
+ ArgumentMatchers.eq(Seq(
new TopicPartition("topic-1", 0),
new TopicPartition("topic-1", 1),
new TopicPartition("topic-2", 0),
new TopicPartition("topic-2", 1)
)),
- EasyMock.eq(requestLocal)
- )).andReturn((Errors.NONE, Map(
+ ArgumentMatchers.eq(requestLocal)
+ )).thenReturn((Errors.NONE, Map(
new TopicPartition("topic-1", 0) -> Errors.NONE,
new TopicPartition("topic-1", 1) -> Errors.NONE,
new TopicPartition("topic-2", 0) -> Errors.NONE,
new TopicPartition("topic-2", 1) -> Errors.NONE,
)))
- EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
-
createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[OffsetDeleteResponse]
def errorForPartition(topic: String, partition: Int): Errors = {
@@ -2068,7 +1986,7 @@ class KafkaApisTest {
addTopicToMetadataCache(topic, numPartitions = 1)
def checkInvalidPartition(invalidPartitionId: Int): Unit = {
- EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+ reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
val topics = new OffsetDeleteRequestTopicCollection()
topics.add(new OffsetDeleteRequestTopic()
@@ -2081,15 +1999,16 @@ class KafkaApisTest {
.setTopics(topics)
).build()
val request = buildRequest(offsetDeleteRequest)
- val capturedResponse = expectNoThrottling(request)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
val requestLocal = RequestLocal.withThreadConfinedCaching
- EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty),
- EasyMock.eq(requestLocal))).andReturn((Errors.NONE, Map.empty))
- EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+ when(groupCoordinator.handleDeleteOffsets(ArgumentMatchers.eq(group), ArgumentMatchers.eq(Seq.empty),
+ ArgumentMatchers.eq(requestLocal))).thenReturn((Errors.NONE, Map.empty[TopicPartition, Errors]))
createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[OffsetDeleteResponse]
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
@@ -2104,7 +2023,7 @@ class KafkaApisTest {
def testOffsetDeleteWithInvalidGroup(): Unit = {
val group = "groupId"
- EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+ reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
val offsetDeleteRequest = new OffsetDeleteRequest.Builder(
new OffsetDeleteRequestData()
@@ -2112,14 +2031,15 @@ class KafkaApisTest {
).build()
val request = buildRequest(offsetDeleteRequest)
- val capturedResponse = expectNoThrottling(request)
val requestLocal = RequestLocal.withThreadConfinedCaching
- EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty),
- EasyMock.eq(requestLocal))).andReturn((Errors.GROUP_ID_NOT_FOUND, Map.empty))
- EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+ when(groupCoordinator.handleDeleteOffsets(ArgumentMatchers.eq(group), ArgumentMatchers.eq(Seq.empty),
+ ArgumentMatchers.eq(requestLocal))).thenReturn((Errors.GROUP_ID_NOT_FOUND, Map.empty[TopicPartition, Errors]))
createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[OffsetDeleteResponse]
assertEquals(Errors.GROUP_ID_NOT_FOUND, Errors.forCode(response.data.errorCode))
@@ -2130,13 +2050,13 @@ class KafkaApisTest {
val isolationLevel = IsolationLevel.READ_UNCOMMITTED
val currentLeaderEpoch = Optional.of[Integer](15)
- EasyMock.expect(replicaManager.fetchOffsetForTimestamp(
- EasyMock.eq(tp),
- EasyMock.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP),
- EasyMock.eq(Some(isolationLevel)),
- EasyMock.eq(currentLeaderEpoch),
- fetchOnlyFromLeader = EasyMock.eq(true))
- ).andThrow(error.exception)
+ when(replicaManager.fetchOffsetForTimestamp(
+ ArgumentMatchers.eq(tp),
+ ArgumentMatchers.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP),
+ ArgumentMatchers.eq(Some(isolationLevel)),
+ ArgumentMatchers.eq(currentLeaderEpoch),
+ fetchOnlyFromLeader = ArgumentMatchers.eq(true))
+ ).thenThrow(error.exception)
val targetTimes = List(new ListOffsetsTopic()
.setName(tp.topic)
@@ -2147,11 +2067,12 @@ class KafkaApisTest {
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false)
.setTargetTimes(targetTimes).build()
val request = buildRequest(listOffsetRequest)
- val capturedResponse = expectNoThrottling(request)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
createKafkaApis().handleListOffsetRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[ListOffsetsResponse]
val partitionDataOptional = response.topics.asScala.find(_.name == tp.topic).get
.partitions.asScala.find(_.partitionIndex == tp.partition)
@@ -2205,52 +2126,46 @@ class KafkaApisTest {
* authorization but before checking in MetadataCache.
*/
@Test
- def getAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition(): Unit = {
+ def testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition(): Unit = {
// Setup: authorizer authorizes 2 topics, but one got deleted in metadata cache
- metadataCache =
- EasyMock.partialMockBuilder(classOf[ZkMetadataCache])
- .withConstructor(classOf[Int])
- .withArgs(Int.box(brokerId)) // Need to box it for Scala 2.12 and before
- .addMockedMethod("getAllTopics")
- .addMockedMethod("getTopicMetadata")
- .createMock()
+ metadataCache = mock(classOf[ZkMetadataCache])
+ when(metadataCache.getAliveBrokerNodes(any())).thenReturn(List(new Node(brokerId,"localhost", 0)))
+ when(metadataCache.getControllerId).thenReturn(None)
// 2 topics returned for authorization in during handle
val topicsReturnedFromMetadataCacheForAuthorization = Set("remaining-topic", "later-deleted-topic")
- expect(metadataCache.getAllTopics()).andReturn(topicsReturnedFromMetadataCacheForAuthorization).once()
+ when(metadataCache.getAllTopics()).thenReturn(topicsReturnedFromMetadataCacheForAuthorization)
// 1 topic is deleted from metadata right at the time between authorization and the next getTopicMetadata() call
- expect(metadataCache.getTopicMetadata(
- EasyMock.eq(topicsReturnedFromMetadataCacheForAuthorization),
- anyObject[ListenerName],
+ when(metadataCache.getTopicMetadata(
+ ArgumentMatchers.eq(topicsReturnedFromMetadataCacheForAuthorization),
+ any[ListenerName],
anyBoolean,
anyBoolean
- )).andStubReturn(Seq(
+ )).thenReturn(Seq(
new MetadataResponseTopic()
.setErrorCode(Errors.NONE.code)
.setName("remaining-topic")
.setIsInternal(false)
))
- EasyMock.replay(metadataCache)
- var createTopicIsCalled: Boolean = false;
+ var createTopicIsCalled: Boolean = false
// Specific mock on zkClient for this use case
// Expect it's never called to do auto topic creation
- expect(zkClient.setOrCreateEntityConfigs(
- EasyMock.eq(ConfigType.Topic),
- EasyMock.anyString,
- EasyMock.anyObject[Properties]
- )).andStubAnswer(() => {
+ when(zkClient.setOrCreateEntityConfigs(
+ ArgumentMatchers.eq(ConfigType.Topic),
+ anyString,
+ any[Properties]
+ )).thenAnswer(_ => {
createTopicIsCalled = true
})
// No need to use
- expect(zkClient.getAllBrokersInCluster)
- .andStubReturn(Seq(new Broker(
+ when(zkClient.getAllBrokersInCluster)
+ .thenReturn(Seq(new Broker(
brokerId, "localhost", 9902,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT
)))
- EasyMock.replay(zkClient)
val (requestListener, _) = updateMetadataCacheWithInconsistentListeners()
val response = sendMetadataRequestWithInconsistentListeners(requestListener)
@@ -2277,7 +2192,7 @@ class KafkaApisTest {
).asJava)
// 2. Set up authorizer
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
val unauthorizedTopic = "unauthorized-topic"
val authorizedTopic = "authorized-topic"
@@ -2287,20 +2202,20 @@ class KafkaApisTest {
)
// Here we need to use AuthHelperTest.matchSameElements instead of EasyMock.eq since the order of the request is unknown
- EasyMock.expect(authorizer.authorize(anyObject[RequestContext], AuthHelperTest.matchSameElements(expectedActions.asJava)))
- .andAnswer { () =>
- val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
+ when(authorizer.authorize(any[RequestContext], argThat((t: java.util.List[Action]) => t.containsAll(expectedActions.asJava))))
+ .thenAnswer { invocation =>
+ val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]].asScala
actions.map { action =>
if (action.resourcePattern().name().equals(authorizedTopic))
AuthorizationResult.ALLOWED
else
AuthorizationResult.DENIED
}.asJava
- }.times(2)
+ }
// 3. Set up MetadataCache
val authorizedTopicId = Uuid.randomUuid()
- val unauthorizedTopicId = Uuid.randomUuid();
+ val unauthorizedTopicId = Uuid.randomUuid()
val topicIds = new util.HashMap[String, Uuid]()
topicIds.put(authorizedTopic, authorizedTopicId)
@@ -2328,10 +2243,11 @@ class KafkaApisTest {
// 4. Send TopicMetadataReq using topicId
val metadataReqByTopicId = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, unauthorizedTopicId)).build()
val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener)
- val capturedMetadataByTopicIdResp = expectNoThrottling(repByTopicId)
- EasyMock.replay(clientRequestQuotaManager, requestChannel, authorizer)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
createKafkaApis(authorizer = Some(authorizer)).handleTopicMetadataRequest(repByTopicId)
+ val capturedMetadataByTopicIdResp = verifyNoThrottling(repByTopicId)
val metadataByTopicIdResp = capturedMetadataByTopicIdResp.getValue.asInstanceOf[MetadataResponse]
val metadataByTopicId = metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv => (kv._1, kv._2.head))
@@ -2349,13 +2265,12 @@ class KafkaApisTest {
}
// 4. Send TopicMetadataReq using topic name
- EasyMock.reset(clientRequestQuotaManager, requestChannel)
+ reset(clientRequestQuotaManager, requestChannel)
val metadataReqByTopicName = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopic, unauthorizedTopic), false).build()
val repByTopicName = buildRequest(metadataReqByTopicName, plaintextListener)
- val capturedMetadataByTopicNameResp = expectNoThrottling(repByTopicName)
- EasyMock.replay(clientRequestQuotaManager, requestChannel)
createKafkaApis(authorizer = Some(authorizer)).handleTopicMetadataRequest(repByTopicName)
+ val capturedMetadataByTopicNameResp = verifyNoThrottling(repByTopicName)
val metadataByTopicNameResp = capturedMetadataByTopicNameResp.getValue.asInstanceOf[MetadataResponse]
val metadataByTopicName = metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv => (kv._1, kv._2.head))
@@ -2384,21 +2299,18 @@ class KafkaApisTest {
val hw = 3
val timestamp = 1000
- expect(replicaManager.getLogConfig(EasyMock.eq(tp))).andReturn(None)
-
- replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean,
- anyObject[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], anyObject[ReplicaQuota],
- anyObject[Seq[(TopicIdPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel],
- anyObject[Option[ClientMetadata]])
- expectLastCall[Unit].andAnswer(new IAnswer[Unit] {
- def answer: Unit = {
- val callback = getCurrentArguments.apply(7)
- .asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]
- val records = MemoryRecords.withRecords(CompressionType.NONE,
- new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8)))
- callback(Seq(tidp -> FetchPartitionData(Errors.NONE, hw, 0, records,
- None, None, None, Option.empty, isReassignmentFetch = false)))
- }
+ when(replicaManager.getLogConfig(ArgumentMatchers.eq(tp))).thenReturn(None)
+
+ when(replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean,
+ any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota],
+ any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit](), any[IsolationLevel],
+ any[Option[ClientMetadata]])
+ ).thenAnswer(invocation => {
+ val callback = invocation.getArgument(7).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]
+ val records = MemoryRecords.withRecords(CompressionType.NONE,
+ new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8)))
+ callback(Seq(tidp -> FetchPartitionData(Errors.NONE, hw, 0, records,
+ None, None, None, Option.empty, isReassignmentFetch = false)))
})
val fetchData = Map(tidp -> new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000,
@@ -2408,25 +2320,24 @@ class KafkaApisTest {
val fetchMetadata = new JFetchMetadata(0, 0)
val fetchContext = new FullFetchContext(time, new FetchSessionCache(1000, 100),
fetchMetadata, fetchData, false, false)
- expect(fetchManager.newContext(
- anyObject[Short],
- anyObject[JFetchMetadata],
- anyObject[Boolean],
- anyObject[util.Map[TopicIdPartition, FetchRequest.PartitionData]],
- anyObject[util.List[TopicIdPartition]],
- anyObject[util.Map[Uuid, String]])).andReturn(fetchContext)
+ when(fetchManager.newContext(
+ any[Short],
+ any[JFetchMetadata],
+ any[Boolean],
+ any[util.Map[TopicIdPartition, FetchRequest.PartitionData]],
+ any[util.List[TopicIdPartition]],
+ any[util.Map[Uuid, String]])).thenReturn(fetchContext)
- EasyMock.expect(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
- anyObject[RequestChannel.Request](), anyDouble, anyLong)).andReturn(0)
+ when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+ any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
val fetchRequest = new FetchRequest.Builder(9, 9, -1, 100, 0, fetchDataBuilder)
.build()
val request = buildRequest(fetchRequest)
- val capturedResponse = expectNoThrottling(request)
- EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, fetchManager)
createKafkaApis().handleFetchRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[FetchResponse]
val responseData = response.responseData(metadataCache.topicIdsToNames(), 9)
assertTrue(responseData.containsKey(tp))
@@ -2452,7 +2363,7 @@ class KafkaApisTest {
addTopicToMetadataCache(foo.topic, 1, topicId = foo.topicId)
// We will never return a logConfig when the topic name is null. This is ok since we won't have any records to convert.
- expect(replicaManager.getLogConfig(EasyMock.eq(unresolvedFoo.topicPartition))).andReturn(None)
+ when(replicaManager.getLogConfig(ArgumentMatchers.eq(unresolvedFoo.topicPartition))).thenReturn(None)
// Simulate unknown topic ID in the context
val fetchData = Map(new TopicIdPartition(foo.topicId, new TopicPartition(null, foo.partition)) ->
@@ -2463,27 +2374,26 @@ class KafkaApisTest {
val fetchContext = new FullFetchContext(time, new FetchSessionCache(1000, 100),
fetchMetadata, fetchData, true, replicaId >= 0)
// We expect to have the resolved partition, but we will simulate an unknown one with the fetchContext we return.
- expect(fetchManager.newContext(
+ when(fetchManager.newContext(
ApiKeys.FETCH.latestVersion,
fetchMetadata,
replicaId >= 0,
Collections.singletonMap(foo, new FetchRequest.PartitionData(foo.topicId, 0, 0, 1000, Optional.empty())),
Collections.emptyList[TopicIdPartition],
metadataCache.topicIdsToNames())
- ).andReturn(fetchContext)
+ ).thenReturn(fetchContext)
- EasyMock.expect(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
- anyObject[RequestChannel.Request](), anyDouble, anyLong)).andReturn(0)
+ when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+ any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
// If replicaId is -1 we will build a consumer request. Any non-negative replicaId will build a follower request.
val fetchRequest = new FetchRequest.Builder(ApiKeys.FETCH.latestVersion, ApiKeys.FETCH.latestVersion,
replicaId, 100, 0, fetchDataBuilder).metadata(fetchMetadata).build()
val request = buildRequest(fetchRequest)
- val capturedResponse = expectNoThrottling(request)
- EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, fetchManager)
createKafkaApis().handleFetchRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[FetchResponse]
val responseData = response.responseData(metadataCache.topicIdsToNames(), ApiKeys.FETCH.latestVersion)
assertTrue(responseData.containsKey(foo.topicPartition))
@@ -2508,25 +2418,8 @@ class KafkaApisTest {
val protocolType = "consumer"
val rebalanceTimeoutMs = 10
val sessionTimeoutMs = 5
- val capturedProtocols = EasyMock.newCapture[List[(String, Array[Byte])]]()
-
- EasyMock.expect(groupCoordinator.handleJoinGroup(
- EasyMock.eq(groupId),
- EasyMock.eq(memberId),
- EasyMock.eq(None),
- EasyMock.eq(true),
- EasyMock.eq(clientId),
- EasyMock.eq(InetAddress.getLocalHost.toString),
- EasyMock.eq(rebalanceTimeoutMs),
- EasyMock.eq(sessionTimeoutMs),
- EasyMock.eq(protocolType),
- EasyMock.capture(capturedProtocols),
- anyObject(),
- anyObject(),
- anyObject()
- ))
- EasyMock.replay(groupCoordinator)
+ val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
createKafkaApis().handleJoinGroupRequest(
buildRequest(
@@ -2545,8 +2438,21 @@ class KafkaApisTest {
),
RequestLocal.withThreadConfinedCaching)
- EasyMock.verify(groupCoordinator)
-
+ verify(groupCoordinator).handleJoinGroup(
+ ArgumentMatchers.eq(groupId),
+ ArgumentMatchers.eq(memberId),
+ ArgumentMatchers.eq(None),
+ ArgumentMatchers.eq(true),
+ ArgumentMatchers.eq(clientId),
+ ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
+ ArgumentMatchers.eq(rebalanceTimeoutMs),
+ ArgumentMatchers.eq(sessionTimeoutMs),
+ ArgumentMatchers.eq(protocolType),
+ capturedProtocols.capture(),
+ any(),
+ any(),
+ any()
+ )
val capturedProtocolsList = capturedProtocols.getValue
assertEquals(protocols.size, capturedProtocolsList.size)
protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
@@ -2563,7 +2469,7 @@ class KafkaApisTest {
}
def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
- EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+ reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
val groupId = "group"
val memberId = "member1"
@@ -2571,23 +2477,7 @@ class KafkaApisTest {
val rebalanceTimeoutMs = 10
val sessionTimeoutMs = 5
- val capturedCallback = EasyMock.newCapture[JoinGroupCallback]()
-
- EasyMock.expect(groupCoordinator.handleJoinGroup(
- EasyMock.eq(groupId),
- EasyMock.eq(memberId),
- EasyMock.eq(None),
- EasyMock.eq(if (version >= 4) true else false),
- EasyMock.eq(clientId),
- EasyMock.eq(InetAddress.getLocalHost.toString),
- EasyMock.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
- EasyMock.eq(sessionTimeoutMs),
- EasyMock.eq(protocolType),
- EasyMock.eq(List.empty),
- EasyMock.capture(capturedCallback),
- EasyMock.anyObject(),
- EasyMock.anyObject()
- ))
+ val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
val joinGroupRequest = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
@@ -2599,15 +2489,27 @@ class KafkaApisTest {
).build(version)
val requestChannelRequest = buildRequest(joinGroupRequest)
- val capturedResponse = expectNoThrottling(requestChannelRequest)
- EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
- EasyMock.verify(groupCoordinator)
-
+ verify(groupCoordinator).handleJoinGroup(
+ ArgumentMatchers.eq(groupId),
+ ArgumentMatchers.eq(memberId),
+ ArgumentMatchers.eq(None),
+ ArgumentMatchers.eq(if (version >= 4) true else false),
+ ArgumentMatchers.eq(clientId),
+ ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
+ ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
+ ArgumentMatchers.eq(sessionTimeoutMs),
+ ArgumentMatchers.eq(protocolType),
+ ArgumentMatchers.eq(List.empty),
+ capturedCallback.capture(),
+ any(),
+ any()
+ )
capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
@@ -2622,8 +2524,6 @@ class KafkaApisTest {
} else {
assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
}
-
- EasyMock.verify(clientRequestQuotaManager, requestChannel)
}
@Test
@@ -2634,7 +2534,7 @@ class KafkaApisTest {
}
def testJoinGroupProtocolType(version: Short): Unit = {
- EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+ reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
val groupId = "group"
val memberId = "member1"
@@ -2643,23 +2543,7 @@ class KafkaApisTest {
val rebalanceTimeoutMs = 10
val sessionTimeoutMs = 5
- val capturedCallback = EasyMock.newCapture[JoinGroupCallback]()
-
- EasyMock.expect(groupCoordinator.handleJoinGroup(
- EasyMock.eq(groupId),
- EasyMock.eq(memberId),
- EasyMock.eq(None),
- EasyMock.eq(if (version >= 4) true else false),
- EasyMock.eq(clientId),
- EasyMock.eq(InetAddress.getLocalHost.toString),
- EasyMock.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
- EasyMock.eq(sessionTimeoutMs),
- EasyMock.eq(protocolType),
- EasyMock.eq(List.empty),
- EasyMock.capture(capturedCallback),
- EasyMock.anyObject(),
- EasyMock.anyObject()
- ))
+ val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
val joinGroupRequest = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
@@ -2671,13 +2555,24 @@ class KafkaApisTest {
).build(version)
val requestChannelRequest = buildRequest(joinGroupRequest)
- val capturedResponse = expectNoThrottling(requestChannelRequest)
- EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
- EasyMock.verify(groupCoordinator)
-
+ verify(groupCoordinator).handleJoinGroup(
+ ArgumentMatchers.eq(groupId),
+ ArgumentMatchers.eq(memberId),
+ ArgumentMatchers.eq(None),
+ ArgumentMatchers.eq(if (version >= 4) true else false),
+ ArgumentMatchers.eq(clientId),
+ ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
+ ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
+ ArgumentMatchers.eq(sessionTimeoutMs),
+ ArgumentMatchers.eq(protocolType),
+ ArgumentMatchers.eq(List.empty),
+ capturedCallback.capture(),
+ any(),
+ any()
+ )
capturedCallback.getValue.apply(JoinGroupResult(
members = List.empty,
memberId = memberId,
@@ -2687,7 +2582,7 @@ class KafkaApisTest {
leaderId = memberId,
error = Errors.NONE
))
-
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
assertEquals(Errors.NONE, response.error)
@@ -2697,8 +2592,6 @@ class KafkaApisTest {
assertEquals(memberId, response.data.leader)
assertEquals(protocolName, response.data.protocolName)
assertEquals(protocolType, response.data.protocolType)
-
- EasyMock.verify(clientRequestQuotaManager, requestChannel)
}
@Test
@@ -2709,28 +2602,16 @@ class KafkaApisTest {
}
def testSyncGroupProtocolTypeAndName(version: Short): Unit = {
- EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+ reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
val groupId = "group"
val memberId = "member1"
val protocolType = "consumer"
val protocolName = "range"
- val capturedCallback = EasyMock.newCapture[SyncGroupCallback]()
+ val capturedCallback: ArgumentCaptor[SyncGroupCallback] = ArgumentCaptor.forClass(classOf[SyncGroupCallback])
val requestLocal = RequestLocal.withThreadConfinedCaching
- EasyMock.expect(groupCoordinator.handleSyncGroup(
- EasyMock.eq(groupId),
- EasyMock.eq(0),
- EasyMock.eq(memberId),
- EasyMock.eq(if (version >= 5) Some(protocolType) else None),
- EasyMock.eq(if (version >= 5) Some(protocolName) else None),
- EasyMock.eq(None),
- EasyMock.eq(Map.empty),
- EasyMock.capture(capturedCallback),
- EasyMock.eq(requestLocal)
- ))
-
val syncGroupRequest = new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(groupId)
@@ -2741,13 +2622,20 @@ class KafkaApisTest {
).build(version)
val requestChannelRequest = buildRequest(syncGroupRequest)
- val capturedResponse = expectNoThrottling(requestChannelRequest)
- EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
createKafkaApis().handleSyncGroupRequest(requestChannelRequest, requestLocal)
- EasyMock.verify(groupCoordinator)
-
+ verify(groupCoordinator).handleSyncGroup(
+ ArgumentMatchers.eq(groupId),
+ ArgumentMatchers.eq(0),
+ ArgumentMatchers.eq(memberId),
+ ArgumentMatchers.eq(if (version >= 5) Some(protocolType) else None),
+ ArgumentMatchers.eq(if (version >= 5) Some(protocolName) else None),
+ ArgumentMatchers.eq(None),
+ ArgumentMatchers.eq(Map.empty),
+ capturedCallback.capture(),
+ ArgumentMatchers.eq(requestLocal)
+ )
capturedCallback.getValue.apply(SyncGroupResult(
protocolType = Some(protocolType),
protocolName = Some(protocolName),
@@ -2755,13 +2643,12 @@ class KafkaApisTest {
error = Errors.NONE
))
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
assertEquals(Errors.NONE, response.error)
assertArrayEquals(Array.empty[Byte], response.data.assignment)
assertEquals(protocolType, response.data.protocolType)
-
- EasyMock.verify(clientRequestQuotaManager, requestChannel)
}
@Test
@@ -2772,29 +2659,16 @@ class KafkaApisTest {
}
def testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(version: Short): Unit = {
- EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+ reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
val groupId = "group"
val memberId = "member1"
val protocolType = "consumer"
val protocolName = "range"
- val capturedCallback = EasyMock.newCapture[SyncGroupCallback]()
+ val capturedCallback: ArgumentCaptor[SyncGroupCallback] = ArgumentCaptor.forClass(classOf[SyncGroupCallback])
val requestLocal = RequestLocal.withThreadConfinedCaching
- if (version < 5) {
- EasyMock.expect(groupCoordinator.handleSyncGroup(
- EasyMock.eq(groupId),
- EasyMock.eq(0),
- EasyMock.eq(memberId),
- EasyMock.eq(None),
- EasyMock.eq(None),
- EasyMock.eq(None),
- EasyMock.eq(Map.empty),
- EasyMock.capture(capturedCallback),
- EasyMock.eq(requestLocal)
- ))
- }
val syncGroupRequest = new SyncGroupRequest.Builder(
new SyncGroupRequestData()
@@ -2804,14 +2678,20 @@ class KafkaApisTest {
).build(version)
val requestChannelRequest = buildRequest(syncGroupRequest)
- val capturedResponse = expectNoThrottling(requestChannelRequest)
- EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
createKafkaApis().handleSyncGroupRequest(requestChannelRequest, requestLocal)
- EasyMock.verify(groupCoordinator)
-
if (version < 5) {
+ verify(groupCoordinator).handleSyncGroup(
+ ArgumentMatchers.eq(groupId),
+ ArgumentMatchers.eq(0),
+ ArgumentMatchers.eq(memberId),
+ ArgumentMatchers.eq(None),
+ ArgumentMatchers.eq(None),
+ ArgumentMatchers.eq(None),
+ ArgumentMatchers.eq(Map.empty),
+ capturedCallback.capture(),
+ ArgumentMatchers.eq(requestLocal))
capturedCallback.getValue.apply(SyncGroupResult(
protocolType = Some(protocolType),
protocolName = Some(protocolName),
@@ -2820,6 +2700,7 @@ class KafkaApisTest {
))
}
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
if (version < 5) {
@@ -2827,8 +2708,6 @@ class KafkaApisTest {
} else {
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
}
-
- EasyMock.verify(clientRequestQuotaManager, requestChannel)
}
@Test
@@ -2843,14 +2722,12 @@ class KafkaApisTest {
).build()
val requestChannelRequest = buildRequest(joinGroupRequest)
- val capturedResponse = expectNoThrottling(requestChannelRequest)
- EasyMock.replay(clientRequestQuotaManager, requestChannel)
createKafkaApis(KAFKA_2_2_IV1).handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
assertEquals(Errors.UNSUPPORTED_VERSION, response.error())
- EasyMock.replay(groupCoordinator)
}
@Test
@@ -2864,14 +2741,12 @@ class KafkaApisTest {
).build()
val requestChannelRequest = buildRequest(syncGroupRequest)
- val capturedResponse = expectNoThrottling(requestChannelRequest)
- EasyMock.replay(clientRequestQuotaManager, requestChannel)
createKafkaApis(KAFKA_2_2_IV1).handleSyncGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
assertEquals(Errors.UNSUPPORTED_VERSION, response.error)
- EasyMock.replay(groupCoordinator)
}
@Test
@@ -2884,14 +2759,12 @@ class KafkaApisTest {
.setGenerationId(1)
).build()
val requestChannelRequest = buildRequest(heartbeatRequest)
- val capturedResponse = expectNoThrottling(requestChannelRequest)
- EasyMock.replay(clientRequestQuotaManager, requestChannel)
createKafkaApis(KAFKA_2_2_IV1).handleHeartbeatRequest(requestChannelRequest)
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[HeartbeatResponse]
assertEquals(Errors.UNSUPPORTED_VERSION, response.error())
- EasyMock.replay(groupCoordinator)
}
@Test
@@ -2916,9 +2789,7 @@ class KafkaApisTest {
).build()
val requestChannelRequest = buildRequest(offsetCommitRequest)
- val capturedResponse = expectNoThrottling(requestChannelRequest)
- EasyMock.replay(clientRequestQuotaManager, requestChannel)
createKafkaApis(KAFKA_2_2_IV1).handleOffsetCommitRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
val expectedTopicErrors = Collections.singletonList(
@@ -2930,9 +2801,9 @@ class KafkaApisTest {
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
))
)
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[OffsetCommitResponse]
assertEquals(expectedTopicErrors, response.data.topics())
- EasyMock.replay(groupCoordinator)
}
@Test
@@ -2948,12 +2819,6 @@ class KafkaApisTest {
.setGroupInstanceId("instance-2")
)
- EasyMock.expect(groupCoordinator.handleLeaveGroup(
- EasyMock.eq(groupId),
- EasyMock.eq(leaveMemberList),
- anyObject()
- ))
-
val leaveRequest = buildRequest(
new LeaveGroupRequest.Builder(
groupId,
@@ -2962,8 +2827,11 @@ class KafkaApisTest {
)
createKafkaApis().handleLeaveGroupRequest(leaveRequest)
-
- EasyMock.replay(groupCoordinator)
+ verify(groupCoordinator).handleLeaveGroup(
+ ArgumentMatchers.eq(groupId),
+ ArgumentMatchers.eq(leaveMemberList),
+ any()
+ )
}
@Test
@@ -2976,12 +2844,6 @@ class KafkaApisTest {
.setMemberId(memberId)
)
- EasyMock.expect(groupCoordinator.handleLeaveGroup(
- EasyMock.eq(groupId),
- EasyMock.eq(singleLeaveMember),
- anyObject()
- ))
-
val leaveRequest = buildRequest(
new LeaveGroupRequest.Builder(
groupId,
@@ -2990,8 +2852,11 @@ class KafkaApisTest {
)
createKafkaApis().handleLeaveGroupRequest(leaveRequest)
-
- EasyMock.replay(groupCoordinator)
+ verify(groupCoordinator).handleLeaveGroup(
+ ArgumentMatchers.eq(groupId),
+ ArgumentMatchers.eq(singleLeaveMember),
+ any()
+ )
}
@Test
@@ -3020,38 +2885,32 @@ class KafkaApisTest {
val records = MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8)))
- replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean,
- anyObject[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], anyObject[ReplicaQuota],
- anyObject[Seq[(TopicIdPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel],
- anyObject[Option[ClientMetadata]])
- expectLastCall[Unit].andAnswer(new IAnswer[Unit] {
- def answer: Unit = {
- val callback = getCurrentArguments.apply(7).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]
- callback(Seq(tidp0 -> FetchPartitionData(Errors.NONE, hw, 0, records,
- None, None, None, Option.empty, isReassignmentFetch = isReassigning)))
- }
+ when(replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean,
+ any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota],
+ any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit](), any[IsolationLevel],
+ any[Option[ClientMetadata]])
+ ).thenAnswer(invocation => {
+ val callback = invocation.getArgument(7).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]
+ callback(Seq(tidp0 -> FetchPartitionData(Errors.NONE, hw, 0, records,
+ None, None, None, Option.empty, isReassignmentFetch = isReassigning)))
})
val fetchMetadata = new JFetchMetadata(0, 0)
val fetchContext = new FullFetchContext(time, new FetchSessionCache(1000, 100),
fetchMetadata, fetchData, true, true)
- expect(fetchManager.newContext(
- anyObject[Short],
- anyObject[JFetchMetadata],
- anyObject[Boolean],
- anyObject[util.Map[TopicIdPartition, FetchRequest.PartitionData]],
- anyObject[util.List[TopicIdPartition]],
- anyObject[util.Map[Uuid, String]])).andReturn(fetchContext)
-
- expect(replicaQuotaManager.record(anyLong()))
- expect(replicaManager.getLogConfig(EasyMock.eq(tp0))).andReturn(None)
-
- val partition: Partition = createNiceMock(classOf[Partition])
- expect(replicaManager.isAddingReplica(anyObject(), anyInt())).andReturn(isReassigning)
+ when(fetchManager.newContext(
+ any[Short],
+ any[JFetchMetadata],
+ any[Boolean],
+ any[util.Map[TopicIdPartition, FetchRequest.PartitionData]],
+ any[util.List[TopicIdPartition]],
+ any[util.Map[Uuid, String]])).thenReturn(fetchContext)
- replay(replicaManager, fetchManager, clientQuotaManager, requestChannel, replicaQuotaManager, partition)
+ when(replicaManager.getLogConfig(ArgumentMatchers.eq(tp0))).thenReturn(None)
+ when(replicaManager.isAddingReplica(any(), anyInt)).thenReturn(isReassigning)
createKafkaApis().handle(fetchFromFollower, RequestLocal.withThreadConfinedCaching)
+ verify(replicaQuotaManager).record(anyLong)
if (isReassigning)
assertEquals(records.sizeInBytes(), brokerTopicStats.allTopicsStats.reassignmentBytesOutPerSec.get.count())
@@ -3072,11 +2931,10 @@ class KafkaApisTest {
).build()
val requestChannelRequest = buildRequest(initProducerIdRequest)
- val capturedResponse = expectNoThrottling(requestChannelRequest)
- EasyMock.replay(clientRequestQuotaManager, requestChannel)
createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[InitProducerIdResponse]
assertEquals(Errors.INVALID_REQUEST, response.error)
}
@@ -3091,11 +2949,10 @@ class KafkaApisTest {
.setProducerEpoch(2)
).build()
val requestChannelRequest = buildRequest(initProducerIdRequest)
- val capturedResponse = expectNoThrottling(requestChannelRequest)
- EasyMock.replay(clientRequestQuotaManager, requestChannel)
createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[InitProducerIdResponse]
assertEquals(Errors.INVALID_REQUEST, response.error)
}
@@ -3122,27 +2979,30 @@ class KafkaApisTest {
val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, brokerEpochInRequest, 1)
val request = buildRequest(updateMetadataRequest)
- val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
+ val capturedResponse: ArgumentCaptor[UpdateMetadataResponse] = ArgumentCaptor.forClass(classOf[UpdateMetadataResponse])
- EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
- EasyMock.expect(replicaManager.maybeUpdateMetadataCache(
- EasyMock.eq(request.context.correlationId),
- EasyMock.anyObject()
- )).andStubReturn(
+ when(controller.brokerEpoch).thenReturn(currentBrokerEpoch)
+ when(replicaManager.maybeUpdateMetadataCache(
+ ArgumentMatchers.eq(request.context.correlationId),
+ any()
+ )).thenReturn(
Seq()
)
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.eq(None)
- ))
- EasyMock.replay(replicaManager, controller, requestChannel)
-
createKafkaApis().handleUpdateMetadataRequest(request, RequestLocal.withThreadConfinedCaching)
- val updateMetadataResponse = capturedResponse.getValue.asInstanceOf[UpdateMetadataResponse]
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val updateMetadataResponse = capturedResponse.getValue
assertEquals(expectedError, updateMetadataResponse.error())
- EasyMock.verify(replicaManager)
+ if (expectedError == Errors.NONE) {
+ verify(replicaManager).maybeUpdateMetadataCache(
+ ArgumentMatchers.eq(request.context.correlationId),
+ any()
+ )
+ }
}
@Test
@@ -3166,7 +3026,7 @@ class KafkaApisTest {
def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
val controllerId = 2
val controllerEpoch = 6
- val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
+ val capturedResponse: ArgumentCaptor[LeaderAndIsrResponse] = ArgumentCaptor.forClass(classOf[LeaderAndIsrResponse])
val partitionStates = Seq(
new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
.setTopicName("topicW")
@@ -3193,26 +3053,23 @@ class KafkaApisTest {
.setErrorCode(Errors.NONE.code)
.setPartitionErrors(asList()), leaderAndIsrRequest.version())
- EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
- EasyMock.expect(replicaManager.becomeLeaderOrFollower(
- EasyMock.eq(request.context.correlationId),
- EasyMock.anyObject(),
- EasyMock.anyObject()
- )).andStubReturn(
+ when(controller.brokerEpoch).thenReturn(currentBrokerEpoch)
+ when(replicaManager.becomeLeaderOrFollower(
+ ArgumentMatchers.eq(request.context.correlationId),
+ any(),
+ any()
+ )).thenReturn(
response
)
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.eq(None)
- ))
- EasyMock.replay(replicaManager, controller, requestChannel)
-
createKafkaApis().handleLeaderAndIsrRequest(request)
- val leaderAndIsrResponse = capturedResponse.getValue.asInstanceOf[LeaderAndIsrResponse]
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val leaderAndIsrResponse = capturedResponse.getValue
assertEquals(expectedError, leaderAndIsrResponse.error())
- EasyMock.verify(replicaManager)
}
@Test
@@ -3236,7 +3093,7 @@ class KafkaApisTest {
def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
val controllerId = 0
val controllerEpoch = 5
- val capturedResponse: Capture[AbstractResponse] = EasyMock.newCapture()
+ val capturedResponse: ArgumentCaptor[StopReplicaResponse] = ArgumentCaptor.forClass(classOf[StopReplicaResponse])
val fooPartition = new TopicPartition("foo", 0)
val topicStates = Seq(
new StopReplicaTopicState()
@@ -3256,29 +3113,26 @@ class KafkaApisTest {
).build()
val request = buildRequest(stopReplicaRequest)
- EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
- EasyMock.expect(replicaManager.stopReplicas(
- EasyMock.eq(request.context.correlationId),
- EasyMock.eq(controllerId),
- EasyMock.eq(controllerEpoch),
- EasyMock.eq(stopReplicaRequest.partitionStates().asScala)
- )).andStubReturn(
+ when(controller.brokerEpoch).thenReturn(currentBrokerEpoch)
+ when(replicaManager.stopReplicas(
+ ArgumentMatchers.eq(request.context.correlationId),
+ ArgumentMatchers.eq(controllerId),
+ ArgumentMatchers.eq(controllerEpoch),
+ ArgumentMatchers.eq(stopReplicaRequest.partitionStates().asScala)
+ )).thenReturn(
(mutable.Map(
fooPartition -> Errors.NONE
), Errors.NONE)
)
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.eq(None)
- ))
-
- EasyMock.replay(controller, replicaManager, requestChannel)
createKafkaApis().handleStopReplicaRequest(request)
- val stopReplicaResponse = capturedResponse.getValue.asInstanceOf[StopReplicaResponse]
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+ val stopReplicaResponse = capturedResponse.getValue
assertEquals(expectedError, stopReplicaResponse.error())
- EasyMock.verify(replicaManager)
}
@Test
@@ -3304,7 +3158,7 @@ class KafkaApisTest {
}
private def listGroupRequest(state: Option[String], overviews: List[GroupOverview]): ListGroupsResponse = {
- EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
+ reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
val data = new ListGroupsRequestData()
if (state.isDefined)
@@ -3312,14 +3166,13 @@ class KafkaApisTest {
val listGroupsRequest = new ListGroupsRequest.Builder(data).build()
val requestChannelRequest = buildRequest(listGroupsRequest)
- val capturedResponse = expectNoThrottling(requestChannelRequest)
val expectedStates: Set[String] = if (state.isDefined) Set(state.get) else Set()
- EasyMock.expect(groupCoordinator.handleListGroups(expectedStates))
- .andReturn((Errors.NONE, overviews))
- EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel)
+ when(groupCoordinator.handleListGroups(expectedStates))
+ .thenReturn((Errors.NONE, overviews))
createKafkaApis().handleListGroupsRequest(requestChannelRequest)
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[ListGroupsResponse]
assertEquals(Errors.NONE.code, response.data.errorCode)
response
@@ -3357,11 +3210,10 @@ class KafkaApisTest {
.setIncludeClusterAuthorizedOperations(true)).build()
val request = buildRequest(describeClusterRequest, plaintextListener)
- val capturedResponse = expectNoThrottling(request)
- EasyMock.replay(clientRequestQuotaManager, requestChannel)
createKafkaApis().handleDescribeCluster(request)
+ val capturedResponse = verifyNoThrottling(request)
val describeClusterResponse = capturedResponse.getValue.asInstanceOf[DescribeClusterResponse]
assertEquals(metadataCache.getControllerId.get, describeClusterResponse.data.controllerId)
@@ -3412,11 +3264,10 @@ class KafkaApisTest {
private def sendMetadataRequestWithInconsistentListeners(requestListener: ListenerName): MetadataResponse = {
val metadataRequest = MetadataRequest.Builder.allTopics.build()
val requestChannelRequest = buildRequest(metadataRequest, requestListener)
- val capturedResponse = expectNoThrottling(requestChannelRequest)
- EasyMock.replay(clientRequestQuotaManager, requestChannel)
createKafkaApis().handleTopicMetadataRequest(requestChannelRequest)
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
capturedResponse.getValue.asInstanceOf[MetadataResponse]
}
@@ -3425,13 +3276,13 @@ class KafkaApisTest {
val latestOffset = 15L
val currentLeaderEpoch = Optional.empty[Integer]()
- EasyMock.expect(replicaManager.fetchOffsetForTimestamp(
- EasyMock.eq(tp),
- EasyMock.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
- EasyMock.eq(Some(isolationLevel)),
- EasyMock.eq(currentLeaderEpoch),
- fetchOnlyFromLeader = EasyMock.eq(true))
- ).andReturn(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, latestOffset, currentLeaderEpoch)))
+ when(replicaManager.fetchOffsetForTimestamp(
+ ArgumentMatchers.eq(tp),
+ ArgumentMatchers.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
+ ArgumentMatchers.eq(Some(isolationLevel)),
+ ArgumentMatchers.eq(currentLeaderEpoch),
+ fetchOnlyFromLeader = ArgumentMatchers.eq(true))
+ ).thenReturn(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, latestOffset, currentLeaderEpoch)))
val targetTimes = List(new ListOffsetsTopic()
.setName(tp.topic)
@@ -3441,11 +3292,10 @@ class KafkaApisTest {
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false)
.setTargetTimes(targetTimes).build()
val request = buildRequest(listOffsetRequest)
- val capturedResponse = expectNoThrottling(request)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
createKafkaApis().handleListOffsetRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[ListOffsetsResponse]
val partitionDataOptional = response.topics.asScala.find(_.name == tp.topic).get
.partitions.asScala.find(_.partitionIndex == tp.partition)
@@ -3479,22 +3329,13 @@ class KafkaApisTest {
requestChannelMetrics, envelope = None)
}
- private def expectNoThrottling(request: RequestChannel.Request): Capture[AbstractResponse] = {
- EasyMock.expect(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(EasyMock.anyObject[RequestChannel.Request](),
- EasyMock.anyObject[Long])).andReturn(0)
-
- EasyMock.expect(clientRequestQuotaManager.throttle(
- EasyMock.eq(request),
- EasyMock.anyObject[ThrottleCallback](),
- EasyMock.eq(0)))
-
- val capturedResponse = EasyMock.newCapture[AbstractResponse]()
- EasyMock.expect(requestChannel.sendResponse(
- EasyMock.eq(request),
- EasyMock.capture(capturedResponse),
- EasyMock.anyObject()
- ))
-
+ private def verifyNoThrottling(request: RequestChannel.Request): ArgumentCaptor[AbstractResponse] = {
+ val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse])
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ any()
+ )
capturedResponse
}
@@ -3557,9 +3398,10 @@ class KafkaApisTest {
).build()
val request = buildRequest(alterReplicaLogDirsRequest)
- EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+ reset(replicaManager, clientRequestQuotaManager, requestChannel)
- val capturedResponse = expectNoThrottling(request)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
val t0p0 = new TopicPartition("t0", 0)
val t0p1 = new TopicPartition("t0", 1)
val t0p2 = new TopicPartition("t0", 2)
@@ -3567,15 +3409,15 @@ class KafkaApisTest {
t0p0 -> Errors.NONE,
t0p1 -> Errors.LOG_DIR_NOT_FOUND,
t0p2 -> Errors.INVALID_TOPIC_EXCEPTION)
- EasyMock.expect(replicaManager.alterReplicaLogDirs(EasyMock.eq(Map(
+ when(replicaManager.alterReplicaLogDirs(ArgumentMatchers.eq(Map(
t0p0 -> "/foo",
t0p1 -> "/foo",
t0p2 -> "/foo"))))
- .andReturn(partitionResults)
- EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel)
+ .thenReturn(partitionResults)
createKafkaApis().handleAlterReplicaLogDirsRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[AlterReplicaLogDirsResponse]
assertEquals(partitionResults, response.data.results.asScala.flatMap { tr =>
tr.partitions().asScala.map { pr =>
@@ -3631,7 +3473,7 @@ class KafkaApisTest {
val tp3 = new TopicPartition("baz", 1)
val tp4 = new TopicPartition("invalid;topic", 1)
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
val data = new DescribeProducersRequestData().setTopics(List(
new DescribeProducersRequestData.TopicRequest()
.setName(tp1.topic)
@@ -3655,22 +3497,19 @@ class KafkaApisTest {
// Topic `foo` is authorized and present in the metadata
addTopicToMetadataCache(tp1.topic, 4) // We will only access the first topic
- EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions(tp1.topic))))
- .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
- .once()
+ when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(tp1.topic))))
+ .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
// Topic `bar` is not authorized
- EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions(tp2.topic))))
- .andReturn(Seq(AuthorizationResult.DENIED).asJava)
- .once()
+ when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(tp2.topic))))
+ .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
// Topic `baz` is authorized, but not present in the metadata
- EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions(tp3.topic))))
- .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
- .once()
+ when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(tp3.topic))))
+ .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
- EasyMock.expect(replicaManager.activeProducerState(tp1))
- .andReturn(new DescribeProducersResponseData.PartitionResponse()
+ when(replicaManager.activeProducerState(tp1))
+ .thenReturn(new DescribeProducersResponseData.PartitionResponse()
.setErrorCode(Errors.NONE.code)
.setPartitionIndex(tp1.partition)
.setActiveProducers(List(
@@ -3685,11 +3524,12 @@ class KafkaApisTest {
val describeProducersRequest = new DescribeProducersRequest.Builder(data).build()
val request = buildRequest(describeProducersRequest)
- val capturedResponse = expectNoThrottling(request)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, authorizer)
createKafkaApis(authorizer = Some(authorizer)).handleDescribeProducersRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[DescribeProducersResponse]
assertEquals(Set("foo", "bar", "baz", "invalid;topic"), response.data.topics.asScala.map(_.name).toSet)
@@ -3721,12 +3561,13 @@ class KafkaApisTest {
@Test
def testDescribeTransactions(): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
val data = new DescribeTransactionsRequestData()
.setTransactionalIds(List("foo", "bar").asJava)
val describeTransactionsRequest = new DescribeTransactionsRequest.Builder(data).build()
val request = buildRequest(describeTransactionsRequest)
- val capturedResponse = expectNoThrottling(request)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
def buildExpectedActions(transactionalId: String): util.List[Action] = {
val pattern = new ResourcePattern(ResourceType.TRANSACTIONAL_ID, transactionalId, PatternType.LITERAL)
@@ -3734,8 +3575,8 @@ class KafkaApisTest {
Collections.singletonList(action)
}
- EasyMock.expect(txnCoordinator.handleDescribeTransactions("foo"))
- .andReturn(new DescribeTransactionsResponseData.TransactionState()
+ when(txnCoordinator.handleDescribeTransactions("foo"))
+ .thenReturn(new DescribeTransactionsResponseData.TransactionState()
.setErrorCode(Errors.NONE.code)
.setTransactionalId("foo")
.setProducerId(12345L)
@@ -3744,17 +3585,15 @@ class KafkaApisTest {
.setTransactionState("CompleteCommit")
.setTransactionTimeoutMs(10000))
- EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions("foo"))))
- .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
- .once()
+ when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions("foo"))))
+ .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
- EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions("bar"))))
- .andReturn(Seq(AuthorizationResult.DENIED).asJava)
- .once()
+ when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions("bar"))))
+ .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, authorizer)
createKafkaApis(authorizer = Some(authorizer)).handleDescribeTransactionsRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[DescribeTransactionsResponse]
assertEquals(2, response.data.transactionStates.size)
@@ -3773,13 +3612,14 @@ class KafkaApisTest {
@Test
def testDescribeTransactionsFiltersUnauthorizedTopics(): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
val transactionalId = "foo"
val data = new DescribeTransactionsRequestData()
.setTransactionalIds(List(transactionalId).asJava)
val describeTransactionsRequest = new DescribeTransactionsRequest.Builder(data).build()
val request = buildRequest(describeTransactionsRequest)
- val capturedResponse = expectNoThrottling(request)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
def expectDescribe(
resourceType: ResourceType,
@@ -3790,9 +3630,8 @@ class KafkaApisTest {
val action = new Action(AclOperation.DESCRIBE, pattern, 1, true, true)
val actions = Collections.singletonList(action)
- EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(actions)))
- .andReturn(Seq(result).asJava)
- .once()
+ when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(actions)))
+ .thenReturn(Seq(result).asJava)
}
// Principal is authorized to one of the two topics. The second topic should be
@@ -3822,12 +3661,12 @@ class KafkaApisTest {
describeTransactionsResponse.topics.add(mkTopicData(topic = "foo", Seq(1, 2)))
describeTransactionsResponse.topics.add(mkTopicData(topic = "bar", Seq(3, 4)))
- EasyMock.expect(txnCoordinator.handleDescribeTransactions("foo"))
- .andReturn(describeTransactionsResponse)
+ when(txnCoordinator.handleDescribeTransactions("foo"))
+ .thenReturn(describeTransactionsResponse)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, authorizer)
createKafkaApis(authorizer = Some(authorizer)).handleDescribeTransactionsRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[DescribeTransactionsResponse]
assertEquals(1, response.data.transactionStates.size)
@@ -3846,15 +3685,16 @@ class KafkaApisTest {
val data = new ListTransactionsRequestData()
val listTransactionsRequest = new ListTransactionsRequest.Builder(data).build()
val request = buildRequest(listTransactionsRequest)
- val capturedResponse = expectNoThrottling(request)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
- EasyMock.expect(txnCoordinator.handleListTransactions(Set.empty[Long], Set.empty[String]))
- .andReturn(new ListTransactionsResponseData()
+ when(txnCoordinator.handleListTransactions(Set.empty[Long], Set.empty[String]))
+ .thenReturn(new ListTransactionsResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code))
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
createKafkaApis().handleListTransactionsRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[ListTransactionsResponse]
assertEquals(0, response.data.transactionStates.size)
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, Errors.forCode(response.data.errorCode))
@@ -3862,11 +3702,12 @@ class KafkaApisTest {
@Test
def testListTransactionsAuthorization(): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
val data = new ListTransactionsRequestData()
val listTransactionsRequest = new ListTransactionsRequest.Builder(data).build()
val request = buildRequest(listTransactionsRequest)
- val capturedResponse = expectNoThrottling(request)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
val transactionStates = new util.ArrayList[ListTransactionsResponseData.TransactionState]()
transactionStates.add(new ListTransactionsResponseData.TransactionState()
@@ -3878,8 +3719,8 @@ class KafkaApisTest {
.setProducerId(98765)
.setTransactionState("PrepareAbort"))
- EasyMock.expect(txnCoordinator.handleListTransactions(Set.empty[Long], Set.empty[String]))
- .andReturn(new ListTransactionsResponseData()
+ when(txnCoordinator.handleListTransactions(Set.empty[Long], Set.empty[String]))
+ .thenReturn(new ListTransactionsResponseData()
.setErrorCode(Errors.NONE.code)
.setTransactionStates(transactionStates))
@@ -3889,17 +3730,15 @@ class KafkaApisTest {
Collections.singletonList(action)
}
- EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions("foo"))))
- .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
- .once()
+ when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions("foo"))))
+ .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
- EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions("bar"))))
- .andReturn(Seq(AuthorizationResult.DENIED).asJava)
- .once()
+ when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions("bar"))))
+ .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, authorizer)
createKafkaApis(authorizer = Some(authorizer)).handleListTransactionsRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[ListTransactionsResponse]
assertEquals(1, response.data.transactionStates.size())
val transactionState = response.data.transactionStates.get(0)
@@ -3910,31 +3749,39 @@ class KafkaApisTest {
@Test
def testDeleteTopicsByIdAuthorization(): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
- val controllerContext: ControllerContext = EasyMock.mock(classOf[ControllerContext])
-
- EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
- EasyMock.anyObject(classOf[RequestChannel.Request]),
- EasyMock.anyShort()
- )).andReturn(UnboundedControllerMutationQuota)
- EasyMock.expect(controller.isActive).andReturn(true)
- EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext)
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ val controllerContext: ControllerContext = mock(classOf[ControllerContext])
+
+ when(clientControllerQuotaManager.newQuotaFor(
+ any[RequestChannel.Request],
+ anyShort
+ )).thenReturn(UnboundedControllerMutationQuota)
+ when(controller.isActive).thenReturn(true)
+ when(controller.controllerContext).thenReturn(controllerContext)
+
+ val topicResults = Map(
+ AclOperation.DESCRIBE -> Map(
+ "foo" -> AuthorizationResult.DENIED,
+ "bar" -> AuthorizationResult.ALLOWED
+ ),
+ AclOperation.DELETE -> Map(
+ "foo" -> AuthorizationResult.DENIED,
+ "bar" -> AuthorizationResult.DENIED
+ )
+ )
+ when(authorizer.authorize(any[RequestContext], isNotNull[util.List[Action]])).thenAnswer(invocation => {
+ val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]]
+ actions.asScala.map { action =>
+ val topic = action.resourcePattern.name
+ val ops = action.operation()
+ topicResults(ops)(topic)
+ }.asJava
+ })
// Try to delete three topics:
// 1. One without describe permission
// 2. One without delete permission
// 3. One which is authorized, but doesn't exist
-
- expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
- "foo" -> AuthorizationResult.DENIED,
- "bar" -> AuthorizationResult.ALLOWED
- ))
-
- expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
- "foo" -> AuthorizationResult.DENIED,
- "bar" -> AuthorizationResult.DENIED
- ))
-
val topicIdsMap = Map(
Uuid.randomUuid() -> Some("foo"),
Uuid.randomUuid() -> Some("bar"),
@@ -3942,7 +3789,7 @@ class KafkaApisTest {
)
topicIdsMap.foreach { case (topicId, topicNameOpt) =>
- EasyMock.expect(controllerContext.topicName(topicId)).andReturn(topicNameOpt)
+ when(controllerContext.topicName(topicId)).thenReturn(topicNameOpt)
}
val topicDatas = topicIdsMap.keys.map { topicId =>
@@ -3953,12 +3800,13 @@ class KafkaApisTest {
.build(ApiKeys.DELETE_TOPICS.latestVersion)
val request = buildRequest(deleteRequest)
- val capturedResponse = expectNoThrottling(request)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
- requestChannel, txnCoordinator, controller, controllerContext, authorizer)
createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)
+ verify(authorizer, times(2)).authorize(any(), any())
+ val capturedResponse = verifyNoThrottling(request)
val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
topicIdsMap.foreach { case (topicId, nameOpt) =>
@@ -3982,30 +3830,39 @@ class KafkaApisTest {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: Boolean): Unit = {
- val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val authorizer: Authorizer = mock(classOf[Authorizer])
- EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
- EasyMock.anyObject(classOf[RequestChannel.Request]),
- EasyMock.anyShort()
- )).andReturn(UnboundedControllerMutationQuota)
- EasyMock.expect(controller.isActive).andReturn(true)
+ when(clientControllerQuotaManager.newQuotaFor(
+ any[RequestChannel.Request],
+ anyShort
+ )).thenReturn(UnboundedControllerMutationQuota)
+ when(controller.isActive).thenReturn(true)
// Try to delete three topics:
// 1. One without describe permission
// 2. One without delete permission
// 3. One which is authorized, but doesn't exist
- expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
- "foo" -> AuthorizationResult.DENIED,
- "bar" -> AuthorizationResult.ALLOWED,
- "baz" -> AuthorizationResult.ALLOWED
- ))
-
- expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
- "foo" -> AuthorizationResult.DENIED,
- "bar" -> AuthorizationResult.DENIED,
- "baz" -> AuthorizationResult.ALLOWED
- ))
+ val topicResults = Map(
+ AclOperation.DESCRIBE -> Map(
+ "foo" -> AuthorizationResult.DENIED,
+ "bar" -> AuthorizationResult.ALLOWED,
+ "baz" -> AuthorizationResult.ALLOWED
+ ),
+ AclOperation.DELETE -> Map(
+ "foo" -> AuthorizationResult.DENIED,
+ "bar" -> AuthorizationResult.DENIED,
+ "baz" -> AuthorizationResult.ALLOWED
+ )
+ )
+ when(authorizer.authorize(any[RequestContext], isNotNull[util.List[Action]])).thenAnswer(invocation => {
+ val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]]
+ actions.asScala.map { action =>
+ val topic = action.resourcePattern.name
+ val ops = action.operation()
+ topicResults(ops)(topic)
+ }.asJava
+ })
val deleteRequest = if (usePrimitiveTopicNameArray) {
new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
@@ -4023,12 +3880,13 @@ class KafkaApisTest {
}
val request = buildRequest(deleteRequest)
- val capturedResponse = expectNoThrottling(request)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
- EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
- requestChannel, txnCoordinator, controller, authorizer)
createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)
+ verify(authorizer, times(2)).authorize(any(), any())
+ val capturedResponse = verifyNoThrottling(request)
val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
def lookupErrorCode(topic: String): Option[Errors] = {
@@ -4041,34 +3899,11 @@ class KafkaApisTest {
assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), lookupErrorCode("baz"))
}
- def expectTopicAuthorization(
- authorizer: Authorizer,
- aclOperation: AclOperation,
- topicResults: Map[String, AuthorizationResult]
- ): Unit = {
- val expectedActions = topicResults.keys.map { topic =>
- val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
- topic -> new Action(aclOperation, pattern, 1, true, true)
- }.toMap
-
- val actionsCapture: Capture[util.List[Action]] = EasyMock.newCapture()
- EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.capture(actionsCapture)))
- .andAnswer(() => {
- actionsCapture.getValue.asScala.map { action =>
- val topic = action.resourcePattern.name
- assertEquals(expectedActions(topic), action)
- topicResults(topic)
- }.asJava
- })
- .once()
- }
-
private def createMockRequest(): RequestChannel.Request = {
- val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request])
- val requestHeader: RequestHeader = EasyMock.createNiceMock(classOf[RequestHeader])
- expect(request.header).andReturn(requestHeader).anyTimes()
- expect(requestHeader.apiKey()).andReturn(ApiKeys.values().head).anyTimes()
- EasyMock.replay(request, requestHeader)
+ val request: RequestChannel.Request = mock(classOf[RequestChannel.Request])
+ val requestHeader: RequestHeader = mock(classOf[RequestHeader])
+ when(request.header).thenReturn(requestHeader)
+ when(requestHeader.apiKey()).thenReturn(ApiKeys.values().head)
request
}
@@ -4152,11 +3987,13 @@ class KafkaApisTest {
@Test
def testEmptyLegacyAlterConfigsRequestWithKRaft(): Unit = {
- val request = buildRequest(new AlterConfigsRequest(new AlterConfigsRequestData(), 1.toShort));
+ val request = buildRequest(new AlterConfigsRequest(new AlterConfigsRequestData(), 1.toShort))
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- val capturedResponse = expectNoThrottling(request)
- EasyMock.replay(clientRequestQuotaManager, requestChannel)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+
createKafkaApis(raftSupport = true).handleAlterConfigsRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
assertEquals(new AlterConfigsResponseData(),
capturedResponse.getValue.asInstanceOf[AlterConfigsResponse].data())
}
@@ -4165,18 +4002,19 @@ class KafkaApisTest {
def testInvalidLegacyAlterConfigsRequestWithKRaft(): Unit = {
val request = buildRequest(new AlterConfigsRequest(new AlterConfigsRequestData().
setValidateOnly(true).
- setResources(new LAlterConfigsResourceCollection(Arrays.asList(
+ setResources(new LAlterConfigsResourceCollection(asList(
new LAlterConfigsResource().
setResourceName(brokerId.toString).
setResourceType(BROKER.id()).
- setConfigs(new LAlterableConfigCollection(Arrays.asList(new LAlterableConfig().
+ setConfigs(new LAlterableConfigCollection(asList(new LAlterableConfig().
setName("foo").
setValue(null)).iterator()))).iterator())), 1.toShort))
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- val capturedResponse = expectNoThrottling(request)
- EasyMock.replay(clientRequestQuotaManager, requestChannel)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
createKafkaApis(raftSupport = true).handleAlterConfigsRequest(request)
- assertEquals(new AlterConfigsResponseData().setResponses(Arrays.asList(
+ val capturedResponse = verifyNoThrottling(request)
+ assertEquals(new AlterConfigsResponseData().setResponses(asList(
new LAlterConfigsResourceResponse().
setErrorCode(Errors.INVALID_REQUEST.code()).
setErrorMessage("Null value not supported for : foo").
@@ -4193,11 +4031,12 @@ class KafkaApisTest {
@Test
def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = {
- val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort));
+ val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort))
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- val capturedResponse = expectNoThrottling(request)
- EasyMock.replay(clientRequestQuotaManager, requestChannel)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest(request)
+ val capturedResponse = verifyNoThrottling(request)
assertEquals(new IncrementalAlterConfigsResponseData(),
capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse].data())
}
@@ -4206,18 +4045,19 @@ class KafkaApisTest {
def testLog4jIncrementalAlterConfigsRequestWithKRaft(): Unit = {
val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData().
setValidateOnly(true).
- setResources(new IAlterConfigsResourceCollection(Arrays.asList(new IAlterConfigsResource().
+ setResources(new IAlterConfigsResourceCollection(asList(new IAlterConfigsResource().
setResourceName(brokerId.toString).
setResourceType(BROKER_LOGGER.id()).
- setConfigs(new IAlterableConfigCollection(Arrays.asList(new IAlterableConfig().
+ setConfigs(new IAlterableConfigCollection(asList(new IAlterableConfig().
setName(Log4jController.ROOT_LOGGER).
setValue("TRACE")).iterator()))).iterator())),
1.toShort))
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- val capturedResponse = expectNoThrottling(request)
- EasyMock.replay(clientRequestQuotaManager, requestChannel)
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest(request)
- assertEquals(new IncrementalAlterConfigsResponseData().setResponses(Arrays.asList(
+ val capturedResponse = verifyNoThrottling(request)
+ assertEquals(new IncrementalAlterConfigsResponseData().setResponses(asList(
new IAlterConfigsResourceResponse().
setErrorCode(0.toShort).
setErrorMessage(null).