You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/02 22:01:45 UTC
[GitHub] [kafka] hachikuji opened a new pull request #10026: MINOR: Add mock implementation of `BrokerToControllerChannelManager`
hachikuji opened a new pull request #10026:
URL: https://github.com/apache/kafka/pull/10026
Tests involving `BrokerToControllerChannelManager` are simplified by being able to leverage `MockClient`. This patch introduces a `BrokerToControllerChannelManager` implementation which makes that possible.
The patch updates `ForwardingManagerTest` to use `BrokerToControllerChannelManager`. We also add a couple additional timeout cases, which exposed a minor bug. Previously we were using the wrong `TimeoutException`, which meant that expected timeout errors were in fact translated to `UNKNOWN_SERVER_ERROR`.
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10026: MINOR: Add mock implementation of `BrokerToControllerChannelManager`
Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10026:
URL: https://github.com/apache/kafka/pull/10026#discussion_r569620772
##########
File path: core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
##########
@@ -19,135 +19,139 @@ package kafka.server
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.Optional
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
import kafka.network
import kafka.network.RequestChannel
import kafka.utils.MockTime
-import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
+import org.apache.kafka.clients.{MockClient, NodeApiVersions}
+import org.apache.kafka.clients.MockClient.RequestMatcher
+import org.apache.kafka.common.Node
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.AlterConfigsResponseData
+import org.apache.kafka.common.message.{AlterConfigsResponseData, ApiVersionsResponseData}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AlterConfigsRequest, AlterConfigsResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader, RequestTestUtils}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers._
import org.mockito.Mockito
import scala.jdk.CollectionConverters._
class ForwardingManagerTest {
- private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
private val time = new MockTime()
+ private val client = new MockClient(time)
+ private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])
Review comment:
`ControllerNodeProvider` also is used to expose `listenerName` and `securityProtocol`, so the mock might still be simpler.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10026: MINOR: Add mock implementation of `BrokerToControllerChannelManager`
Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10026:
URL: https://github.com/apache/kafka/pull/10026#discussion_r569606950
##########
File path: core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
##########
@@ -19,135 +19,139 @@ package kafka.server
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.Optional
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
import kafka.network
import kafka.network.RequestChannel
import kafka.utils.MockTime
-import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
+import org.apache.kafka.clients.{MockClient, NodeApiVersions}
+import org.apache.kafka.clients.MockClient.RequestMatcher
+import org.apache.kafka.common.Node
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.AlterConfigsResponseData
+import org.apache.kafka.common.message.{AlterConfigsResponseData, ApiVersionsResponseData}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AlterConfigsRequest, AlterConfigsResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader, RequestTestUtils}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers._
import org.mockito.Mockito
import scala.jdk.CollectionConverters._
class ForwardingManagerTest {
- private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
private val time = new MockTime()
+ private val client = new MockClient(time)
+ private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])
+ private val brokerToController = new MockBrokerToControllerChannelManager(
+ client, time, controllerNodeProvider, controllerApiVersions)
+ private val forwardingManager = new ForwardingManagerImpl(brokerToController)
private val principalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
+ private def controllerApiVersions: NodeApiVersions = {
+ // The Envelope API is not yet included in the standard set of APIs
+ val envelopeApiVersion = new ApiVersionsResponseData.ApiVersion()
+ .setApiKey(ApiKeys.ENVELOPE.id)
+ .setMinVersion(ApiKeys.ENVELOPE.oldestVersion)
+ .setMaxVersion(ApiKeys.ENVELOPE.latestVersion)
+ NodeApiVersions.create(List(envelopeApiVersion).asJava)
+ }
+
@Test
def testResponseCorrelationIdMismatch(): Unit = {
- val forwardingManager = new ForwardingManagerImpl(brokerToController)
val requestCorrelationId = 27
- val envelopeCorrelationId = 39
val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
-
- val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
- val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
- val requestBody = new AlterConfigsRequest.Builder(Map(
- configResource -> new AlterConfigsRequest.Config(configs)
- ).asJava, false).build()
- val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId)
+ val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData())
val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody, requestHeader.apiVersion,
requestCorrelationId + 1)
- Mockito.when(brokerToController.sendRequest(
- any(classOf[EnvelopeRequest.Builder]),
- any(classOf[ControllerRequestCompletionHandler])
- )).thenAnswer(invocation => {
- val completionHandler = invocation.getArgument[RequestCompletionHandler](1)
- val response = buildEnvelopeResponse(responseBuffer, envelopeCorrelationId, completionHandler)
- response.onComplete()
- })
+ Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+ val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
+ client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.NONE));
- var response: AbstractResponse = null
- forwardingManager.forwardRequest(request, result => response = result.orNull)
+ val responseOpt = new AtomicReference[Option[AbstractResponse]]()
+ forwardingManager.forwardRequest(request, responseOpt.set)
+ brokerToController.poll()
+ assertTrue(Option(responseOpt.get).isDefined)
- assertNotNull(response)
+ val response = responseOpt.get.get
assertEquals(Map(Errors.UNKNOWN_SERVER_ERROR -> 1).asJava, response.errorCounts())
}
@Test
def testUnsupportedVersions(): Unit = {
- val forwardingManager = new ForwardingManagerImpl(brokerToController)
val requestCorrelationId = 27
val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
-
- val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
- val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
- val requestBody = new AlterConfigsRequest.Builder(Map(
- configResource -> new AlterConfigsRequest.Config(configs)
- ).asJava, false).build()
- val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId)
+ val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData())
-
val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody,
requestHeader.apiVersion, requestCorrelationId)
- Mockito.when(brokerToController.sendRequest(
- any(classOf[EnvelopeRequest.Builder]),
- any(classOf[ControllerRequestCompletionHandler])
- )).thenAnswer(invocation => {
- val completionHandler = invocation.getArgument[RequestCompletionHandler](1)
- val response = buildEnvelopeResponse(responseBuffer, 30,
- completionHandler, Errors.UNSUPPORTED_VERSION)
- response.onComplete()
- })
-
- var response: AbstractResponse = null
- val connectionClosed = new AtomicBoolean(false)
- forwardingManager.forwardRequest(request, res => {
- response = res.orNull
- connectionClosed.set(true)
- })
-
- assertTrue(connectionClosed.get())
- assertNull(response)
+ Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+ val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
+ client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.UNSUPPORTED_VERSION));
+
+ val responseOpt = new AtomicReference[Option[AbstractResponse]]()
+ forwardingManager.forwardRequest(request, responseOpt.set)
+ brokerToController.poll()
+ assertEquals(None, responseOpt.get)
}
- private def buildEnvelopeResponse(
- responseBuffer: ByteBuffer,
- correlationId: Int,
- completionHandler: RequestCompletionHandler,
- error: Errors = Errors.NONE
- ): ClientResponse = {
- val envelopeRequestHeader = new RequestHeader(
- ApiKeys.ENVELOPE,
- ApiKeys.ENVELOPE.latestVersion(),
- "clientId",
- correlationId
- )
- val envelopeResponse = new EnvelopeResponse(
- responseBuffer,
- error
- )
+ @Test
+ def testForwardingTimeoutWaitingForControllerDiscovery(): Unit = {
+ val requestCorrelationId = 27
+ val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
+ val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
+ val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
- new ClientResponse(
- envelopeRequestHeader,
- completionHandler,
- "1",
- time.milliseconds(),
- time.milliseconds(),
- false,
- null,
- null,
- envelopeResponse
- )
+ Mockito.when(controllerNodeProvider.get()).thenReturn(None)
+
+ val response = new AtomicReference[AbstractResponse]()
+ forwardingManager.forwardRequest(request, res => res.foreach(response.set))
+ brokerToController.poll()
+ assertNull(response.get)
+
+ // The controller is not discovered before reaching the retry timeout.
+ // The request should fail with a timeout error.
+ time.sleep(brokerToController.retryTimeoutMs)
+ brokerToController.poll()
+ assertNotNull(response.get)
+
+ val alterConfigResponse = response.get.asInstanceOf[AlterConfigsResponse]
+ assertEquals(Map(Errors.REQUEST_TIMED_OUT -> 1).asJava, alterConfigResponse.errorCounts)
+ }
+
+ @Test
+ def testForwardingTimeoutAfterRetry(): Unit = {
+ val requestCorrelationId = 27
+ val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
+ val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
+ val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
+
+ Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+
+ val response = new AtomicReference[AbstractResponse]()
+ forwardingManager.forwardRequest(request, res => res.foreach(response.set))
+ brokerToController.poll()
+ assertNull(response.get)
+
+ // After reaching the retry timeout, we get a disconnect. Instead of retrying,
+ // we should fail the request with a timeout error.
+ time.sleep(brokerToController.retryTimeoutMs)
+ client.respond(testAlterConfigRequest.getErrorResponse(0, Errors.UNKNOWN_SERVER_ERROR.exception), true)
+ brokerToController.poll()
+ brokerToController.poll()
Review comment:
Yeah, this is one thing that is kind of annoying with this model. We need one `poll` to receive the disconnect response and then one more to trigger the callback.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10026: MINOR: Add mock implementation of `BrokerToControllerChannelManager`
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10026:
URL: https://github.com/apache/kafka/pull/10026#discussion_r569550254
##########
File path: core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
##########
@@ -19,135 +19,139 @@ package kafka.server
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.Optional
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
import kafka.network
import kafka.network.RequestChannel
import kafka.utils.MockTime
-import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
+import org.apache.kafka.clients.{MockClient, NodeApiVersions}
+import org.apache.kafka.clients.MockClient.RequestMatcher
+import org.apache.kafka.common.Node
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.AlterConfigsResponseData
+import org.apache.kafka.common.message.{AlterConfigsResponseData, ApiVersionsResponseData}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AlterConfigsRequest, AlterConfigsResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader, RequestTestUtils}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers._
import org.mockito.Mockito
import scala.jdk.CollectionConverters._
class ForwardingManagerTest {
- private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
private val time = new MockTime()
+ private val client = new MockClient(time)
+ private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])
Review comment:
Since this is just a single method interface, can we use a lambda instead of a mock?
##########
File path: core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
##########
@@ -19,135 +19,139 @@ package kafka.server
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.Optional
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
import kafka.network
import kafka.network.RequestChannel
import kafka.utils.MockTime
-import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
+import org.apache.kafka.clients.{MockClient, NodeApiVersions}
+import org.apache.kafka.clients.MockClient.RequestMatcher
+import org.apache.kafka.common.Node
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.AlterConfigsResponseData
+import org.apache.kafka.common.message.{AlterConfigsResponseData, ApiVersionsResponseData}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AlterConfigsRequest, AlterConfigsResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader, RequestTestUtils}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers._
import org.mockito.Mockito
import scala.jdk.CollectionConverters._
class ForwardingManagerTest {
- private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
private val time = new MockTime()
+ private val client = new MockClient(time)
+ private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])
+ private val brokerToController = new MockBrokerToControllerChannelManager(
+ client, time, controllerNodeProvider, controllerApiVersions)
+ private val forwardingManager = new ForwardingManagerImpl(brokerToController)
private val principalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
+ private def controllerApiVersions: NodeApiVersions = {
+ // The Envelope API is not yet included in the standard set of APIs
+ val envelopeApiVersion = new ApiVersionsResponseData.ApiVersion()
+ .setApiKey(ApiKeys.ENVELOPE.id)
+ .setMinVersion(ApiKeys.ENVELOPE.oldestVersion)
+ .setMaxVersion(ApiKeys.ENVELOPE.latestVersion)
+ NodeApiVersions.create(List(envelopeApiVersion).asJava)
+ }
+
@Test
def testResponseCorrelationIdMismatch(): Unit = {
- val forwardingManager = new ForwardingManagerImpl(brokerToController)
val requestCorrelationId = 27
- val envelopeCorrelationId = 39
val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
-
- val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
- val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
- val requestBody = new AlterConfigsRequest.Builder(Map(
- configResource -> new AlterConfigsRequest.Config(configs)
- ).asJava, false).build()
- val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId)
+ val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData())
val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody, requestHeader.apiVersion,
requestCorrelationId + 1)
- Mockito.when(brokerToController.sendRequest(
- any(classOf[EnvelopeRequest.Builder]),
- any(classOf[ControllerRequestCompletionHandler])
- )).thenAnswer(invocation => {
- val completionHandler = invocation.getArgument[RequestCompletionHandler](1)
- val response = buildEnvelopeResponse(responseBuffer, envelopeCorrelationId, completionHandler)
- response.onComplete()
- })
+ Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+ val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
+ client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.NONE));
- var response: AbstractResponse = null
- forwardingManager.forwardRequest(request, result => response = result.orNull)
+ val responseOpt = new AtomicReference[Option[AbstractResponse]]()
+ forwardingManager.forwardRequest(request, responseOpt.set)
+ brokerToController.poll()
+ assertTrue(Option(responseOpt.get).isDefined)
- assertNotNull(response)
+ val response = responseOpt.get.get
assertEquals(Map(Errors.UNKNOWN_SERVER_ERROR -> 1).asJava, response.errorCounts())
}
@Test
def testUnsupportedVersions(): Unit = {
- val forwardingManager = new ForwardingManagerImpl(brokerToController)
val requestCorrelationId = 27
val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
-
- val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
- val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
- val requestBody = new AlterConfigsRequest.Builder(Map(
- configResource -> new AlterConfigsRequest.Config(configs)
- ).asJava, false).build()
- val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId)
+ val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData())
-
val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody,
requestHeader.apiVersion, requestCorrelationId)
- Mockito.when(brokerToController.sendRequest(
- any(classOf[EnvelopeRequest.Builder]),
- any(classOf[ControllerRequestCompletionHandler])
- )).thenAnswer(invocation => {
- val completionHandler = invocation.getArgument[RequestCompletionHandler](1)
- val response = buildEnvelopeResponse(responseBuffer, 30,
- completionHandler, Errors.UNSUPPORTED_VERSION)
- response.onComplete()
- })
-
- var response: AbstractResponse = null
- val connectionClosed = new AtomicBoolean(false)
- forwardingManager.forwardRequest(request, res => {
- response = res.orNull
- connectionClosed.set(true)
- })
-
- assertTrue(connectionClosed.get())
- assertNull(response)
+ Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+ val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
+ client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.UNSUPPORTED_VERSION));
+
+ val responseOpt = new AtomicReference[Option[AbstractResponse]]()
+ forwardingManager.forwardRequest(request, responseOpt.set)
+ brokerToController.poll()
+ assertEquals(None, responseOpt.get)
}
- private def buildEnvelopeResponse(
- responseBuffer: ByteBuffer,
- correlationId: Int,
- completionHandler: RequestCompletionHandler,
- error: Errors = Errors.NONE
- ): ClientResponse = {
- val envelopeRequestHeader = new RequestHeader(
- ApiKeys.ENVELOPE,
- ApiKeys.ENVELOPE.latestVersion(),
- "clientId",
- correlationId
- )
- val envelopeResponse = new EnvelopeResponse(
- responseBuffer,
- error
- )
+ @Test
+ def testForwardingTimeoutWaitingForControllerDiscovery(): Unit = {
+ val requestCorrelationId = 27
+ val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
+ val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
+ val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
- new ClientResponse(
- envelopeRequestHeader,
- completionHandler,
- "1",
- time.milliseconds(),
- time.milliseconds(),
- false,
- null,
- null,
- envelopeResponse
- )
+ Mockito.when(controllerNodeProvider.get()).thenReturn(None)
+
+ val response = new AtomicReference[AbstractResponse]()
+ forwardingManager.forwardRequest(request, res => res.foreach(response.set))
+ brokerToController.poll()
+ assertNull(response.get)
+
+ // The controller is not discovered before reaching the retry timeout.
+ // The request should fail with a timeout error.
+ time.sleep(brokerToController.retryTimeoutMs)
+ brokerToController.poll()
+ assertNotNull(response.get)
+
+ val alterConfigResponse = response.get.asInstanceOf[AlterConfigsResponse]
+ assertEquals(Map(Errors.REQUEST_TIMED_OUT -> 1).asJava, alterConfigResponse.errorCounts)
+ }
+
+ @Test
+ def testForwardingTimeoutAfterRetry(): Unit = {
+ val requestCorrelationId = 27
+ val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
+ val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
+ val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
+
+ Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+
+ val response = new AtomicReference[AbstractResponse]()
+ forwardingManager.forwardRequest(request, res => res.foreach(response.set))
+ brokerToController.poll()
+ assertNull(response.get)
+
+ // After reaching the retry timeout, we get a disconnect. Instead of retrying,
+ // we should fail the request with a timeout error.
+ time.sleep(brokerToController.retryTimeoutMs)
+ client.respond(testAlterConfigRequest.getErrorResponse(0, Errors.UNKNOWN_SERVER_ERROR.exception), true)
+ brokerToController.poll()
+ brokerToController.poll()
Review comment:
Why the second poll here? Is this to trigger the timeout error?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #10026: MINOR: Add mock implementation of `BrokerToControllerChannelManager`
Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10026:
URL: https://github.com/apache/kafka/pull/10026
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10026: MINOR: Add mock implementation of `BrokerToControllerChannelManager`
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10026:
URL: https://github.com/apache/kafka/pull/10026#discussion_r569642348
##########
File path: core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
##########
@@ -19,135 +19,139 @@ package kafka.server
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.Optional
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
import kafka.network
import kafka.network.RequestChannel
import kafka.utils.MockTime
-import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
+import org.apache.kafka.clients.{MockClient, NodeApiVersions}
+import org.apache.kafka.clients.MockClient.RequestMatcher
+import org.apache.kafka.common.Node
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.AlterConfigsResponseData
+import org.apache.kafka.common.message.{AlterConfigsResponseData, ApiVersionsResponseData}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AlterConfigsRequest, AlterConfigsResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader, RequestTestUtils}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers._
import org.mockito.Mockito
import scala.jdk.CollectionConverters._
class ForwardingManagerTest {
- private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
private val time = new MockTime()
+ private val client = new MockClient(time)
+ private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])
Review comment:
Gotcha, yea a mock is probably simpler in this case. If we end up doing a lot of ControllerNodeProvider mocking down the road, it might make sense for a full test implementation, but not for this PR
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org