You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/03 17:33:57 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #10026: MINOR: Add mock implementation of `BrokerToControllerChannelManager`

hachikuji commented on a change in pull request #10026:
URL: https://github.com/apache/kafka/pull/10026#discussion_r569606950



##########
File path: core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
##########
@@ -19,135 +19,139 @@ package kafka.server
 import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.Optional
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
 
 import kafka.network
 import kafka.network.RequestChannel
 import kafka.utils.MockTime
-import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
+import org.apache.kafka.clients.{MockClient, NodeApiVersions}
+import org.apache.kafka.clients.MockClient.RequestMatcher
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.AlterConfigsResponseData
+import org.apache.kafka.common.message.{AlterConfigsResponseData, ApiVersionsResponseData}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AlterConfigsRequest, AlterConfigsResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader, RequestTestUtils}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito
 
 import scala.jdk.CollectionConverters._
 
 class ForwardingManagerTest {
-  private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
   private val time = new MockTime()
+  private val client = new MockClient(time)
+  private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])
+  private val brokerToController = new MockBrokerToControllerChannelManager(
+    client, time, controllerNodeProvider, controllerApiVersions)
+  private val forwardingManager = new ForwardingManagerImpl(brokerToController)
   private val principalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
 
+  private def controllerApiVersions: NodeApiVersions = {
+    // The Envelope API is not yet included in the standard set of APIs
+    val envelopeApiVersion = new ApiVersionsResponseData.ApiVersion()
+      .setApiKey(ApiKeys.ENVELOPE.id)
+      .setMinVersion(ApiKeys.ENVELOPE.oldestVersion)
+      .setMaxVersion(ApiKeys.ENVELOPE.latestVersion)
+    NodeApiVersions.create(List(envelopeApiVersion).asJava)
+  }
+
   @Test
   def testResponseCorrelationIdMismatch(): Unit = {
-    val forwardingManager = new ForwardingManagerImpl(brokerToController)
     val requestCorrelationId = 27
-    val envelopeCorrelationId = 39
     val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
-
-    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
-    val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
-    val requestBody = new AlterConfigsRequest.Builder(Map(
-      configResource -> new AlterConfigsRequest.Config(configs)
-    ).asJava, false).build()
-    val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId)
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
     val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData())
     val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody, requestHeader.apiVersion,
       requestCorrelationId + 1)
 
-    Mockito.when(brokerToController.sendRequest(
-      any(classOf[EnvelopeRequest.Builder]),
-      any(classOf[ControllerRequestCompletionHandler])
-    )).thenAnswer(invocation => {
-      val completionHandler = invocation.getArgument[RequestCompletionHandler](1)
-      val response = buildEnvelopeResponse(responseBuffer, envelopeCorrelationId, completionHandler)
-      response.onComplete()
-    })
+    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+    val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
+    client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.NONE));
 
-    var response: AbstractResponse = null
-    forwardingManager.forwardRequest(request, result => response = result.orNull)
+    val responseOpt = new AtomicReference[Option[AbstractResponse]]()
+    forwardingManager.forwardRequest(request, responseOpt.set)
+    brokerToController.poll()
+    assertTrue(Option(responseOpt.get).isDefined)
 
-    assertNotNull(response)
+    val response = responseOpt.get.get
     assertEquals(Map(Errors.UNKNOWN_SERVER_ERROR -> 1).asJava, response.errorCounts())
   }
 
   @Test
   def testUnsupportedVersions(): Unit = {
-    val forwardingManager = new ForwardingManagerImpl(brokerToController)
     val requestCorrelationId = 27
     val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
-
-    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
-    val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
-    val requestBody = new AlterConfigsRequest.Builder(Map(
-      configResource -> new AlterConfigsRequest.Config(configs)
-    ).asJava, false).build()
-    val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId)
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
     val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData())
-
     val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody,
       requestHeader.apiVersion, requestCorrelationId)
 
-    Mockito.when(brokerToController.sendRequest(
-      any(classOf[EnvelopeRequest.Builder]),
-      any(classOf[ControllerRequestCompletionHandler])
-    )).thenAnswer(invocation => {
-      val completionHandler = invocation.getArgument[RequestCompletionHandler](1)
-      val response = buildEnvelopeResponse(responseBuffer, 30,
-        completionHandler, Errors.UNSUPPORTED_VERSION)
-      response.onComplete()
-    })
-
-    var response: AbstractResponse = null
-    val connectionClosed = new AtomicBoolean(false)
-    forwardingManager.forwardRequest(request, res => {
-      response = res.orNull
-      connectionClosed.set(true)
-    })
-
-    assertTrue(connectionClosed.get())
-    assertNull(response)
+    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+    val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
+    client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.UNSUPPORTED_VERSION));
+
+    val responseOpt = new AtomicReference[Option[AbstractResponse]]()
+    forwardingManager.forwardRequest(request, responseOpt.set)
+    brokerToController.poll()
+    assertEquals(None, responseOpt.get)
   }
 
-  private def buildEnvelopeResponse(
-    responseBuffer: ByteBuffer,
-    correlationId: Int,
-    completionHandler: RequestCompletionHandler,
-    error: Errors = Errors.NONE
-  ): ClientResponse = {
-    val envelopeRequestHeader = new RequestHeader(
-      ApiKeys.ENVELOPE,
-      ApiKeys.ENVELOPE.latestVersion(),
-      "clientId",
-      correlationId
-    )
-    val envelopeResponse = new EnvelopeResponse(
-      responseBuffer,
-      error
-    )
+  @Test
+  def testForwardingTimeoutWaitingForControllerDiscovery(): Unit = {
+    val requestCorrelationId = 27
+    val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
+    val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
-    new ClientResponse(
-      envelopeRequestHeader,
-      completionHandler,
-      "1",
-      time.milliseconds(),
-      time.milliseconds(),
-      false,
-      null,
-      null,
-      envelopeResponse
-    )
+    Mockito.when(controllerNodeProvider.get()).thenReturn(None)
+
+    val response = new AtomicReference[AbstractResponse]()
+    forwardingManager.forwardRequest(request, res => res.foreach(response.set))
+    brokerToController.poll()
+    assertNull(response.get)
+
+    // The controller is not discovered before reaching the retry timeout.
+    // The request should fail with a timeout error.
+    time.sleep(brokerToController.retryTimeoutMs)
+    brokerToController.poll()
+    assertNotNull(response.get)
+
+    val alterConfigResponse = response.get.asInstanceOf[AlterConfigsResponse]
+    assertEquals(Map(Errors.REQUEST_TIMED_OUT -> 1).asJava, alterConfigResponse.errorCounts)
+  }
+
+  @Test
+  def testForwardingTimeoutAfterRetry(): Unit = {
+    val requestCorrelationId = 27
+    val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
+    val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
+
+    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+
+    val response = new AtomicReference[AbstractResponse]()
+    forwardingManager.forwardRequest(request, res => res.foreach(response.set))
+    brokerToController.poll()
+    assertNull(response.get)
+
+    // After reaching the retry timeout, we get a disconnect. Instead of retrying,
+    // we should fail the request with a timeout error.
+    time.sleep(brokerToController.retryTimeoutMs)
+    client.respond(testAlterConfigRequest.getErrorResponse(0, Errors.UNKNOWN_SERVER_ERROR.exception), true)
+    brokerToController.poll()
+    brokerToController.poll()

Review comment:
       Yeah, this is one thing that is kind of annoying with this model. We need one `poll` to receive the disconnect response and then one more to trigger the callback.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org