You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/02 22:01:45 UTC

[GitHub] [kafka] hachikuji opened a new pull request #10026: MINOR: Add mock implementation of `BrokerToControllerChannelManager`

hachikuji opened a new pull request #10026:
URL: https://github.com/apache/kafka/pull/10026


   Tests involving `BrokerToControllerChannelManager` are simplified by being able to leverage `MockClient`. This patch introduces a `BrokerToControllerChannelManager` implementation which makes that possible.
   
   The patch updates `ForwardingManagerTest` to use `BrokerToControllerChannelManager`. We also add a couple additional timeout cases, which exposed a minor bug. Previously we were using the wrong `TimeoutException`, which meant that expected timeout errors were in fact translated to `UNKNOWN_SERVER_ERROR`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10026: MINOR: Add mock implementation of `BrokerToControllerChannelManager`

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10026:
URL: https://github.com/apache/kafka/pull/10026#discussion_r569620772



##########
File path: core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
##########
@@ -19,135 +19,139 @@ package kafka.server
 import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.Optional
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
 
 import kafka.network
 import kafka.network.RequestChannel
 import kafka.utils.MockTime
-import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
+import org.apache.kafka.clients.{MockClient, NodeApiVersions}
+import org.apache.kafka.clients.MockClient.RequestMatcher
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.AlterConfigsResponseData
+import org.apache.kafka.common.message.{AlterConfigsResponseData, ApiVersionsResponseData}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AlterConfigsRequest, AlterConfigsResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader, RequestTestUtils}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito
 
 import scala.jdk.CollectionConverters._
 
 class ForwardingManagerTest {
-  private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
   private val time = new MockTime()
+  private val client = new MockClient(time)
+  private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])

Review comment:
       `ControllerNodeProvider` also is used to expose `listenerName` and `securityProtocol`, so the mock might still be simpler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10026: MINOR: Add mock implementation of `BrokerToControllerChannelManager`

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10026:
URL: https://github.com/apache/kafka/pull/10026#discussion_r569606950



##########
File path: core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
##########
@@ -19,135 +19,139 @@ package kafka.server
 import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.Optional
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
 
 import kafka.network
 import kafka.network.RequestChannel
 import kafka.utils.MockTime
-import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
+import org.apache.kafka.clients.{MockClient, NodeApiVersions}
+import org.apache.kafka.clients.MockClient.RequestMatcher
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.AlterConfigsResponseData
+import org.apache.kafka.common.message.{AlterConfigsResponseData, ApiVersionsResponseData}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AlterConfigsRequest, AlterConfigsResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader, RequestTestUtils}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito
 
 import scala.jdk.CollectionConverters._
 
 class ForwardingManagerTest {
-  private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
   private val time = new MockTime()
+  private val client = new MockClient(time)
+  private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])
+  private val brokerToController = new MockBrokerToControllerChannelManager(
+    client, time, controllerNodeProvider, controllerApiVersions)
+  private val forwardingManager = new ForwardingManagerImpl(brokerToController)
   private val principalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
 
+  private def controllerApiVersions: NodeApiVersions = {
+    // The Envelope API is not yet included in the standard set of APIs
+    val envelopeApiVersion = new ApiVersionsResponseData.ApiVersion()
+      .setApiKey(ApiKeys.ENVELOPE.id)
+      .setMinVersion(ApiKeys.ENVELOPE.oldestVersion)
+      .setMaxVersion(ApiKeys.ENVELOPE.latestVersion)
+    NodeApiVersions.create(List(envelopeApiVersion).asJava)
+  }
+
   @Test
   def testResponseCorrelationIdMismatch(): Unit = {
-    val forwardingManager = new ForwardingManagerImpl(brokerToController)
     val requestCorrelationId = 27
-    val envelopeCorrelationId = 39
     val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
-
-    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
-    val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
-    val requestBody = new AlterConfigsRequest.Builder(Map(
-      configResource -> new AlterConfigsRequest.Config(configs)
-    ).asJava, false).build()
-    val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId)
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
     val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData())
     val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody, requestHeader.apiVersion,
       requestCorrelationId + 1)
 
-    Mockito.when(brokerToController.sendRequest(
-      any(classOf[EnvelopeRequest.Builder]),
-      any(classOf[ControllerRequestCompletionHandler])
-    )).thenAnswer(invocation => {
-      val completionHandler = invocation.getArgument[RequestCompletionHandler](1)
-      val response = buildEnvelopeResponse(responseBuffer, envelopeCorrelationId, completionHandler)
-      response.onComplete()
-    })
+    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+    val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
+    client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.NONE));
 
-    var response: AbstractResponse = null
-    forwardingManager.forwardRequest(request, result => response = result.orNull)
+    val responseOpt = new AtomicReference[Option[AbstractResponse]]()
+    forwardingManager.forwardRequest(request, responseOpt.set)
+    brokerToController.poll()
+    assertTrue(Option(responseOpt.get).isDefined)
 
-    assertNotNull(response)
+    val response = responseOpt.get.get
     assertEquals(Map(Errors.UNKNOWN_SERVER_ERROR -> 1).asJava, response.errorCounts())
   }
 
   @Test
   def testUnsupportedVersions(): Unit = {
-    val forwardingManager = new ForwardingManagerImpl(brokerToController)
     val requestCorrelationId = 27
     val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
-
-    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
-    val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
-    val requestBody = new AlterConfigsRequest.Builder(Map(
-      configResource -> new AlterConfigsRequest.Config(configs)
-    ).asJava, false).build()
-    val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId)
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
     val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData())
-
     val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody,
       requestHeader.apiVersion, requestCorrelationId)
 
-    Mockito.when(brokerToController.sendRequest(
-      any(classOf[EnvelopeRequest.Builder]),
-      any(classOf[ControllerRequestCompletionHandler])
-    )).thenAnswer(invocation => {
-      val completionHandler = invocation.getArgument[RequestCompletionHandler](1)
-      val response = buildEnvelopeResponse(responseBuffer, 30,
-        completionHandler, Errors.UNSUPPORTED_VERSION)
-      response.onComplete()
-    })
-
-    var response: AbstractResponse = null
-    val connectionClosed = new AtomicBoolean(false)
-    forwardingManager.forwardRequest(request, res => {
-      response = res.orNull
-      connectionClosed.set(true)
-    })
-
-    assertTrue(connectionClosed.get())
-    assertNull(response)
+    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+    val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
+    client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.UNSUPPORTED_VERSION));
+
+    val responseOpt = new AtomicReference[Option[AbstractResponse]]()
+    forwardingManager.forwardRequest(request, responseOpt.set)
+    brokerToController.poll()
+    assertEquals(None, responseOpt.get)
   }
 
-  private def buildEnvelopeResponse(
-    responseBuffer: ByteBuffer,
-    correlationId: Int,
-    completionHandler: RequestCompletionHandler,
-    error: Errors = Errors.NONE
-  ): ClientResponse = {
-    val envelopeRequestHeader = new RequestHeader(
-      ApiKeys.ENVELOPE,
-      ApiKeys.ENVELOPE.latestVersion(),
-      "clientId",
-      correlationId
-    )
-    val envelopeResponse = new EnvelopeResponse(
-      responseBuffer,
-      error
-    )
+  @Test
+  def testForwardingTimeoutWaitingForControllerDiscovery(): Unit = {
+    val requestCorrelationId = 27
+    val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
+    val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
-    new ClientResponse(
-      envelopeRequestHeader,
-      completionHandler,
-      "1",
-      time.milliseconds(),
-      time.milliseconds(),
-      false,
-      null,
-      null,
-      envelopeResponse
-    )
+    Mockito.when(controllerNodeProvider.get()).thenReturn(None)
+
+    val response = new AtomicReference[AbstractResponse]()
+    forwardingManager.forwardRequest(request, res => res.foreach(response.set))
+    brokerToController.poll()
+    assertNull(response.get)
+
+    // The controller is not discovered before reaching the retry timeout.
+    // The request should fail with a timeout error.
+    time.sleep(brokerToController.retryTimeoutMs)
+    brokerToController.poll()
+    assertNotNull(response.get)
+
+    val alterConfigResponse = response.get.asInstanceOf[AlterConfigsResponse]
+    assertEquals(Map(Errors.REQUEST_TIMED_OUT -> 1).asJava, alterConfigResponse.errorCounts)
+  }
+
+  @Test
+  def testForwardingTimeoutAfterRetry(): Unit = {
+    val requestCorrelationId = 27
+    val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
+    val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
+
+    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+
+    val response = new AtomicReference[AbstractResponse]()
+    forwardingManager.forwardRequest(request, res => res.foreach(response.set))
+    brokerToController.poll()
+    assertNull(response.get)
+
+    // After reaching the retry timeout, we get a disconnect. Instead of retrying,
+    // we should fail the request with a timeout error.
+    time.sleep(brokerToController.retryTimeoutMs)
+    client.respond(testAlterConfigRequest.getErrorResponse(0, Errors.UNKNOWN_SERVER_ERROR.exception), true)
+    brokerToController.poll()
+    brokerToController.poll()

Review comment:
       Yeah, this is one thing that is kind of annoying with this model. We need one `poll` to receive the disconnect response and then one more to trigger the callback.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10026: MINOR: Add mock implementation of `BrokerToControllerChannelManager`

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10026:
URL: https://github.com/apache/kafka/pull/10026#discussion_r569550254



##########
File path: core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
##########
@@ -19,135 +19,139 @@ package kafka.server
 import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.Optional
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
 
 import kafka.network
 import kafka.network.RequestChannel
 import kafka.utils.MockTime
-import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
+import org.apache.kafka.clients.{MockClient, NodeApiVersions}
+import org.apache.kafka.clients.MockClient.RequestMatcher
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.AlterConfigsResponseData
+import org.apache.kafka.common.message.{AlterConfigsResponseData, ApiVersionsResponseData}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AlterConfigsRequest, AlterConfigsResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader, RequestTestUtils}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito
 
 import scala.jdk.CollectionConverters._
 
 class ForwardingManagerTest {
-  private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
   private val time = new MockTime()
+  private val client = new MockClient(time)
+  private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])

Review comment:
       Since this is just a single method interface, can we use a lambda instead of a mock?

##########
File path: core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
##########
@@ -19,135 +19,139 @@ package kafka.server
 import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.Optional
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
 
 import kafka.network
 import kafka.network.RequestChannel
 import kafka.utils.MockTime
-import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
+import org.apache.kafka.clients.{MockClient, NodeApiVersions}
+import org.apache.kafka.clients.MockClient.RequestMatcher
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.AlterConfigsResponseData
+import org.apache.kafka.common.message.{AlterConfigsResponseData, ApiVersionsResponseData}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AlterConfigsRequest, AlterConfigsResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader, RequestTestUtils}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito
 
 import scala.jdk.CollectionConverters._
 
 class ForwardingManagerTest {
-  private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
   private val time = new MockTime()
+  private val client = new MockClient(time)
+  private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])
+  private val brokerToController = new MockBrokerToControllerChannelManager(
+    client, time, controllerNodeProvider, controllerApiVersions)
+  private val forwardingManager = new ForwardingManagerImpl(brokerToController)
   private val principalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
 
+  private def controllerApiVersions: NodeApiVersions = {
+    // The Envelope API is not yet included in the standard set of APIs
+    val envelopeApiVersion = new ApiVersionsResponseData.ApiVersion()
+      .setApiKey(ApiKeys.ENVELOPE.id)
+      .setMinVersion(ApiKeys.ENVELOPE.oldestVersion)
+      .setMaxVersion(ApiKeys.ENVELOPE.latestVersion)
+    NodeApiVersions.create(List(envelopeApiVersion).asJava)
+  }
+
   @Test
   def testResponseCorrelationIdMismatch(): Unit = {
-    val forwardingManager = new ForwardingManagerImpl(brokerToController)
     val requestCorrelationId = 27
-    val envelopeCorrelationId = 39
     val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
-
-    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
-    val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
-    val requestBody = new AlterConfigsRequest.Builder(Map(
-      configResource -> new AlterConfigsRequest.Config(configs)
-    ).asJava, false).build()
-    val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId)
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
     val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData())
     val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody, requestHeader.apiVersion,
       requestCorrelationId + 1)
 
-    Mockito.when(brokerToController.sendRequest(
-      any(classOf[EnvelopeRequest.Builder]),
-      any(classOf[ControllerRequestCompletionHandler])
-    )).thenAnswer(invocation => {
-      val completionHandler = invocation.getArgument[RequestCompletionHandler](1)
-      val response = buildEnvelopeResponse(responseBuffer, envelopeCorrelationId, completionHandler)
-      response.onComplete()
-    })
+    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+    val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
+    client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.NONE));
 
-    var response: AbstractResponse = null
-    forwardingManager.forwardRequest(request, result => response = result.orNull)
+    val responseOpt = new AtomicReference[Option[AbstractResponse]]()
+    forwardingManager.forwardRequest(request, responseOpt.set)
+    brokerToController.poll()
+    assertTrue(Option(responseOpt.get).isDefined)
 
-    assertNotNull(response)
+    val response = responseOpt.get.get
     assertEquals(Map(Errors.UNKNOWN_SERVER_ERROR -> 1).asJava, response.errorCounts())
   }
 
   @Test
   def testUnsupportedVersions(): Unit = {
-    val forwardingManager = new ForwardingManagerImpl(brokerToController)
     val requestCorrelationId = 27
     val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
-
-    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
-    val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
-    val requestBody = new AlterConfigsRequest.Builder(Map(
-      configResource -> new AlterConfigsRequest.Config(configs)
-    ).asJava, false).build()
-    val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId)
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
     val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData())
-
     val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody,
       requestHeader.apiVersion, requestCorrelationId)
 
-    Mockito.when(brokerToController.sendRequest(
-      any(classOf[EnvelopeRequest.Builder]),
-      any(classOf[ControllerRequestCompletionHandler])
-    )).thenAnswer(invocation => {
-      val completionHandler = invocation.getArgument[RequestCompletionHandler](1)
-      val response = buildEnvelopeResponse(responseBuffer, 30,
-        completionHandler, Errors.UNSUPPORTED_VERSION)
-      response.onComplete()
-    })
-
-    var response: AbstractResponse = null
-    val connectionClosed = new AtomicBoolean(false)
-    forwardingManager.forwardRequest(request, res => {
-      response = res.orNull
-      connectionClosed.set(true)
-    })
-
-    assertTrue(connectionClosed.get())
-    assertNull(response)
+    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+    val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
+    client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.UNSUPPORTED_VERSION));
+
+    val responseOpt = new AtomicReference[Option[AbstractResponse]]()
+    forwardingManager.forwardRequest(request, responseOpt.set)
+    brokerToController.poll()
+    assertEquals(None, responseOpt.get)
   }
 
-  private def buildEnvelopeResponse(
-    responseBuffer: ByteBuffer,
-    correlationId: Int,
-    completionHandler: RequestCompletionHandler,
-    error: Errors = Errors.NONE
-  ): ClientResponse = {
-    val envelopeRequestHeader = new RequestHeader(
-      ApiKeys.ENVELOPE,
-      ApiKeys.ENVELOPE.latestVersion(),
-      "clientId",
-      correlationId
-    )
-    val envelopeResponse = new EnvelopeResponse(
-      responseBuffer,
-      error
-    )
+  @Test
+  def testForwardingTimeoutWaitingForControllerDiscovery(): Unit = {
+    val requestCorrelationId = 27
+    val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
+    val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
-    new ClientResponse(
-      envelopeRequestHeader,
-      completionHandler,
-      "1",
-      time.milliseconds(),
-      time.milliseconds(),
-      false,
-      null,
-      null,
-      envelopeResponse
-    )
+    Mockito.when(controllerNodeProvider.get()).thenReturn(None)
+
+    val response = new AtomicReference[AbstractResponse]()
+    forwardingManager.forwardRequest(request, res => res.foreach(response.set))
+    brokerToController.poll()
+    assertNull(response.get)
+
+    // The controller is not discovered before reaching the retry timeout.
+    // The request should fail with a timeout error.
+    time.sleep(brokerToController.retryTimeoutMs)
+    brokerToController.poll()
+    assertNotNull(response.get)
+
+    val alterConfigResponse = response.get.asInstanceOf[AlterConfigsResponse]
+    assertEquals(Map(Errors.REQUEST_TIMED_OUT -> 1).asJava, alterConfigResponse.errorCounts)
+  }
+
+  @Test
+  def testForwardingTimeoutAfterRetry(): Unit = {
+    val requestCorrelationId = 27
+    val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
+    val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
+
+    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
+
+    val response = new AtomicReference[AbstractResponse]()
+    forwardingManager.forwardRequest(request, res => res.foreach(response.set))
+    brokerToController.poll()
+    assertNull(response.get)
+
+    // After reaching the retry timeout, we get a disconnect. Instead of retrying,
+    // we should fail the request with a timeout error.
+    time.sleep(brokerToController.retryTimeoutMs)
+    client.respond(testAlterConfigRequest.getErrorResponse(0, Errors.UNKNOWN_SERVER_ERROR.exception), true)
+    brokerToController.poll()
+    brokerToController.poll()

Review comment:
       Why the second poll here? Is this to trigger the timeout error?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10026: MINOR: Add mock implementation of `BrokerToControllerChannelManager`

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10026:
URL: https://github.com/apache/kafka/pull/10026


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10026: MINOR: Add mock implementation of `BrokerToControllerChannelManager`

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10026:
URL: https://github.com/apache/kafka/pull/10026#discussion_r569642348



##########
File path: core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
##########
@@ -19,135 +19,139 @@ package kafka.server
 import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.Optional
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
 
 import kafka.network
 import kafka.network.RequestChannel
 import kafka.utils.MockTime
-import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
+import org.apache.kafka.clients.{MockClient, NodeApiVersions}
+import org.apache.kafka.clients.MockClient.RequestMatcher
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.AlterConfigsResponseData
+import org.apache.kafka.common.message.{AlterConfigsResponseData, ApiVersionsResponseData}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AlterConfigsRequest, AlterConfigsResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader, RequestTestUtils}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito
 
 import scala.jdk.CollectionConverters._
 
 class ForwardingManagerTest {
-  private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
   private val time = new MockTime()
+  private val client = new MockClient(time)
+  private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])

Review comment:
       Gotcha, yea a mock is probably simpler in this case. If we end up doing a lot of ControllerNodeProvider mocking down the road, it might make sense for a full test implementation, but not for this PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org