You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/30 11:53:23 UTC

[GitHub] [kafka] showuon opened a new pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

showuon opened a new pull request #10794:
URL: https://github.com/apache/kafka/pull/10794


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r667244195



##########
File path: core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
##########
@@ -179,6 +184,108 @@ class RequestChannelTest {
     assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
+  @Test
+  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = {
+    val channelRequest = buildRequestWithEnvelope()
+
+    val mockSend: ByteBufferSend = Mockito.mock(classOf[ByteBufferSend])
+    val envelopResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
+
+    Mockito.doReturn(mockSend).when(channelRequest.envelope.get.context).buildResponseSend(envelopResponseArgumentCaptor.capture())
+
+    // create an inner response without error
+    val ResponseWithoutError = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))

Review comment:
       nit: use lower case `ResponseWithoutError`

##########
File path: core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
##########
@@ -179,6 +184,108 @@ class RequestChannelTest {
     assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
+  @Test
+  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = {
+    val channelRequest = buildRequestWithEnvelope()

Review comment:
       nit: it's a bit confusing that `buildRequestWithEnvelope` creates an alter config request, which we then respond to with a metadata response. Could we let `buildRequestWithEnvelope` provide the `AbstractRequest` so that we can use a `MetadataRequest`? For example, see `KafkaApis.buildRequestWithEnvelope`. Probably we could factor out a common method.

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -124,8 +124,15 @@ object RequestChannel extends Logging {
     def buildResponseSend(abstractResponse: AbstractResponse): Send = {
       envelope match {
         case Some(request) =>
-          val responseBytes = context.buildResponseEnvelopePayload(abstractResponse)
-          val envelopeResponse = new EnvelopeResponse(responseBytes, Errors.NONE)
+          val envelopeResponse = if (abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+            // Since it's a Not Controller error response, we need to make envelope response with Not Controller error

Review comment:
       nit: we're a little inconsistent in the comments when referring to the NOT_CONTROLLER error. Here we use "Not Controller" while in the tests we use "Not_Controller." I would suggest using "NOT_CONTROLLER" consistently.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##########
@@ -36,10 +38,21 @@
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
+        Throwable throwableToBeEncoded = t;
+        // Get the original cause from `CompletionException` and `ExecutionException`, otherwise, it will return unexpected UNKNOWN_SERVER_ERROR.

Review comment:
       nit: I think we can make this a little more concise. How about this?
   
   ```java
   // Get the underlying cause for common exception types from the concurrent library. 
   // This is useful to handle cases where exceptions may be raised from a future or a 
   // completion stage (as might be the case for requests sent to the controller in 
   // `ControllerApis`)
   ```

##########
File path: core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
##########
@@ -179,6 +184,108 @@ class RequestChannelTest {
     assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
+  @Test
+  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = {
+    val channelRequest = buildRequestWithEnvelope()
+
+    val mockSend: ByteBufferSend = Mockito.mock(classOf[ByteBufferSend])
+    val envelopResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
+
+    Mockito.doReturn(mockSend).when(channelRequest.envelope.get.context).buildResponseSend(envelopResponseArgumentCaptor.capture())
+
+    // create an inner response without error
+    val ResponseWithoutError = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
+
+    // build an envelope response
+    channelRequest.buildResponseSend(ResponseWithoutError)
+
+    // expect the envelopeResponse result without error
+    val capturedValue: EnvelopeResponse = envelopResponseArgumentCaptor.getValue
+    assertTrue(capturedValue.error().equals(Errors.NONE))
+  }
+
+  @Test
+  def testEnvelopeBuildResponseSendShouldReturnNotControllerErrorIfInnerResponseHasOne(): Unit = {
+    val channelRequest = buildRequestWithEnvelope()
+
+    val mockSend: ByteBufferSend = Mockito.mock(classOf[ByteBufferSend])
+    val envelopResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
+
+    Mockito.doReturn(mockSend).when(channelRequest.envelope.get.context).buildResponseSend(envelopResponseArgumentCaptor.capture())
+
+    // create an inner response with Not_Controller error
+    val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
+      Collections.singletonMap("a", Errors.NOT_CONTROLLER),
+      Collections.singletonMap("a", 2))
+
+    // build an envelope response
+    channelRequest.buildResponseSend(responseWithNotControllerError)
+
+    // expect the envelopeResponse result has Not_Controller error
+    val capturedValue: EnvelopeResponse = envelopResponseArgumentCaptor.getValue
+    assertTrue(capturedValue.error().equals(Errors.NOT_CONTROLLER))
+  }
+
+  private def buildRequestWithEnvelope(): RequestChannel.Request = {
+    val resourceName = "topic-1"
+    val clientId = "id"
+    val header = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion,
+      clientId, 0)
+
+    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName)
+
+    val configs = Map(
+      configResource -> new AlterConfigsRequest.Config(
+        Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
+    val request = new AlterConfigsRequest.Builder(configs.asJava, false).build(header.apiVersion)
+
+    val principalSerde = new KafkaPrincipalSerde() {
+      override def serialize(principal: KafkaPrincipal): Array[Byte] = Utils.utf8(principal.toString)
+      override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
+    }
+
+    val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+
+    val requestHeader = new RequestHeader(request.apiKey, request.version, clientId, 0)
+    val requestBuffer = request.serializeWithHeader(requestHeader)
+
+    val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion(), clientId, 0)
+    val envelopeBuffer = new EnvelopeRequest.Builder(
+      requestBuffer,
+      principalSerde.serialize(KafkaPrincipal.ANONYMOUS),
+      InetAddress.getLocalHost.getAddress
+    ).build().serializeWithHeader(envelopeHeader)
+
+    val envelopeContext = new RequestContext(envelopeHeader, "1", InetAddress.getLocalHost,
+      KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY,
+      false, Optional.of(principalSerde))
+
+    val requestContextSpy: RequestContext = Mockito.spy(envelopeContext)
+
+    RequestHeader.parse(envelopeBuffer)
+    val EnvelopeBuffer2 = envelopeBuffer.duplicate()

Review comment:
       nit: lower case `envelopeBuffer`

##########
File path: core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
##########
@@ -179,6 +184,108 @@ class RequestChannelTest {
     assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
+  @Test
+  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = {
+    val channelRequest = buildRequestWithEnvelope()
+
+    val mockSend: ByteBufferSend = Mockito.mock(classOf[ByteBufferSend])
+    val envelopResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
+
+    Mockito.doReturn(mockSend).when(channelRequest.envelope.get.context).buildResponseSend(envelopResponseArgumentCaptor.capture())
+
+    // create an inner response without error
+    val ResponseWithoutError = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
+
+    // build an envelope response
+    channelRequest.buildResponseSend(ResponseWithoutError)
+
+    // expect the envelopeResponse result without error
+    val capturedValue: EnvelopeResponse = envelopResponseArgumentCaptor.getValue
+    assertTrue(capturedValue.error().equals(Errors.NONE))
+  }
+
+  @Test
+  def testEnvelopeBuildResponseSendShouldReturnNotControllerErrorIfInnerResponseHasOne(): Unit = {

Review comment:
       Another useful test case is when the inner response has an error which is different from NOT_CONTROLLER. In this case, we expect no envelope error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r665698647



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##########
@@ -36,10 +37,16 @@
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
+        Throwable throwableToBeEncode = t;
+        // Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR.

Review comment:
       Hmm, a little strange to handle this case explicitly and not others. Would we want to handle `ExecutionException` as well for example? 
   
   Also, the comment seems to be missing some context. Which future does it refer to? Maybe it would be helpful to point to the specific usage that requires this.

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -124,8 +124,15 @@ object RequestChannel extends Logging {
     def buildResponseSend(abstractResponse: AbstractResponse): Send = {
       envelope match {
         case Some(request) =>
-          val responseBytes = context.buildResponseEnvelopePayload(abstractResponse)
-          val envelopeResponse = new EnvelopeResponse(responseBytes, Errors.NONE)
+          val envelopeResponse = if (abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+            // Since it's Not Controller error response, we need to make envelope response with Not Controller error
+            // to notify the requester (i.e. BrokerToControllerRequestThread) to update active controller
+            new EnvelopeResponse(new EnvelopeResponseData()
+              .setErrorCode(Errors.NOT_CONTROLLER.code()))

Review comment:
       Ok, I think this makes sense. Basically we were missing the same logic we have in `KafkaApis.handleEnvelope` to verify the controller status. Instead, we just send the request into the controller thread. Do I have that right? I guess another way we could do it is to send the full envelope into the controller and let it set the NOT_CONTROLLER error at the right level, but this solution also seems reasonable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642461619



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -198,15 +199,15 @@ class DefaultAutoTopicCreationManager(
 
       // Borrow client information such as client id and correlation id from the original request,
       // in order to correlate the create request with the original metadata request.
-      val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+      requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
         requestVersion,
         context.clientId,
         context.correlationId)
       ForwardingManager.buildEnvelopeRequest(context,
         createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
     }.getOrElse(createTopicsRequest)
 
-    channelManager.sendRequest(request, requestCompletionHandler)
+    channelManager.sendRequest(request, requestCompletionHandler, requestHeader)

Review comment:
       This is 1 of 2 places we use `EnvelopeRequest`, so add `requestHeader`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r645948173



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -350,7 +372,8 @@ class BrokerToControllerRequestThread(
     } else if (response.wasDisconnected()) {
       updateControllerAddress(null)
       requestQueue.putFirst(queueItem)
-    } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+    } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER) ||
+      maybeCheckNotControllerErrorInsideEnvelopeResponse(queueItem.requestHeader, response.responseBody())) {

Review comment:
       @showuon Because it's all checking for Not Controller error, can we put the logic for check NotControllerError into the same method? So the else if argument is a little bit cleaner. But this is only a small tip, and it's up to you to decide if you need to do it. This is a great discovery. You have solved my confusion when I met UNKNOWN_SERVER_ERROR. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r645321936



##########
File path: build.gradle
##########
@@ -829,6 +829,7 @@ project(':core') {
     testImplementation(libs.jfreechart) {
       exclude group: 'junit', module: 'junit'
     }
+    testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.

Review comment:
       Yes, I need this to mock a stack method. This library only added in test, not in production, and also used in other tests, I think it should be fine. Any concern about this? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642461619



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -198,15 +199,15 @@ class DefaultAutoTopicCreationManager(
 
       // Borrow client information such as client id and correlation id from the original request,
       // in order to correlate the create request with the original metadata request.
-      val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+      requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
         requestVersion,
         context.clientId,
         context.correlationId)
       ForwardingManager.buildEnvelopeRequest(context,
         createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
     }.getOrElse(createTopicsRequest)
 
-    channelManager.sendRequest(request, requestCompletionHandler)
+    channelManager.sendRequest(request, requestCompletionHandler, requestHeader)

Review comment:
       This is 1st of 2 places we use `EnvelopeRequest`, so add `requestHeader`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: [WIP]KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642195443



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##########
@@ -36,10 +37,16 @@
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
+        Throwable throwableToBeDecode = t;
+        // Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR.
+        if (t instanceof CompletionException) {
+            throwableToBeDecode = t.getCause();

Review comment:
       fix 1: unwrap the `CompletionException` to get the original exception inside.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-853761494


   @hachikuji @mumrah @abbccdda @cmccabe , call for review since the tests keep failing. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r667245949



##########
File path: core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
##########
@@ -179,6 +184,108 @@ class RequestChannelTest {
     assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
+  @Test
+  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = {
+    val channelRequest = buildRequestWithEnvelope()
+
+    val mockSend: ByteBufferSend = Mockito.mock(classOf[ByteBufferSend])
+    val envelopResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
+
+    Mockito.doReturn(mockSend).when(channelRequest.envelope.get.context).buildResponseSend(envelopResponseArgumentCaptor.capture())
+
+    // create an inner response without error
+    val ResponseWithoutError = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
+
+    // build an envelope response
+    channelRequest.buildResponseSend(ResponseWithoutError)
+
+    // expect the envelopeResponse result without error
+    val capturedValue: EnvelopeResponse = envelopResponseArgumentCaptor.getValue
+    assertTrue(capturedValue.error().equals(Errors.NONE))
+  }
+
+  @Test
+  def testEnvelopeBuildResponseSendShouldReturnNotControllerErrorIfInnerResponseHasOne(): Unit = {
+    val channelRequest = buildRequestWithEnvelope()
+
+    val mockSend: ByteBufferSend = Mockito.mock(classOf[ByteBufferSend])
+    val envelopResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
+
+    Mockito.doReturn(mockSend).when(channelRequest.envelope.get.context).buildResponseSend(envelopResponseArgumentCaptor.capture())
+
+    // create an inner response with Not_Controller error
+    val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
+      Collections.singletonMap("a", Errors.NOT_CONTROLLER),
+      Collections.singletonMap("a", 2))
+
+    // build an envelope response
+    channelRequest.buildResponseSend(responseWithNotControllerError)
+
+    // expect the envelopeResponse result has Not_Controller error
+    val capturedValue: EnvelopeResponse = envelopResponseArgumentCaptor.getValue
+    assertTrue(capturedValue.error().equals(Errors.NOT_CONTROLLER))
+  }
+
+  private def buildRequestWithEnvelope(): RequestChannel.Request = {
+    val resourceName = "topic-1"
+    val clientId = "id"
+    val header = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion,
+      clientId, 0)
+
+    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName)
+
+    val configs = Map(
+      configResource -> new AlterConfigsRequest.Config(
+        Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
+    val request = new AlterConfigsRequest.Builder(configs.asJava, false).build(header.apiVersion)
+
+    val principalSerde = new KafkaPrincipalSerde() {
+      override def serialize(principal: KafkaPrincipal): Array[Byte] = Utils.utf8(principal.toString)
+      override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
+    }
+
+    val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+
+    val requestHeader = new RequestHeader(request.apiKey, request.version, clientId, 0)
+    val requestBuffer = request.serializeWithHeader(requestHeader)
+
+    val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion(), clientId, 0)
+    val envelopeBuffer = new EnvelopeRequest.Builder(
+      requestBuffer,
+      principalSerde.serialize(KafkaPrincipal.ANONYMOUS),
+      InetAddress.getLocalHost.getAddress
+    ).build().serializeWithHeader(envelopeHeader)
+
+    val envelopeContext = new RequestContext(envelopeHeader, "1", InetAddress.getLocalHost,
+      KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY,
+      false, Optional.of(principalSerde))
+
+    val requestContextSpy: RequestContext = Mockito.spy(envelopeContext)
+
+    RequestHeader.parse(envelopeBuffer)
+    val EnvelopeBuffer2 = envelopeBuffer.duplicate()

Review comment:
       nit: lower case `envelopeBuffer2`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642743186



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -363,6 +386,22 @@ class BrokerToControllerRequestThread(
     }
   }
 
+  def maybeCheckNotControllerErrorInsideEnvelopeResponse(requestHeader: RequestHeader, responseBody: AbstractResponse): Boolean = {
+    if (responseBody.isInstanceOf[EnvelopeResponse] && requestHeader != null) {
+      info(s"Trying to find NOT_CONTROLLER exception inside envelope response")
+      val envelopeResponse = responseBody.asInstanceOf[EnvelopeResponse]
+      val envelopeError = envelopeResponse.error()
+
+      if (envelopeError == Errors.NONE) {
+        val response = AbstractResponse.parseResponse(envelopeResponse.responseData, requestHeader)
+        envelopeResponse.responseData().rewind()
+        return response.errorCounts().containsKey(Errors.NOT_CONTROLLER)
+      }
+    }
+
+    false;

Review comment:
       Nice catch! Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: [WIP]KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-851153567


   @hachikuji @mumrah @abbccdda @cmccabe , could you help review this PR? I'll add tests later. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r665850985



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##########
@@ -36,10 +37,16 @@
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
+        Throwable throwableToBeEncode = t;
+        // Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR.

Review comment:
       `ExecutionException` -> good point! Updated. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r667293072



##########
File path: core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
##########
@@ -179,6 +184,108 @@ class RequestChannelTest {
     assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
+  @Test
+  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = {
+    val channelRequest = buildRequestWithEnvelope()
+
+    val mockSend: ByteBufferSend = Mockito.mock(classOf[ByteBufferSend])
+    val envelopResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
+
+    Mockito.doReturn(mockSend).when(channelRequest.envelope.get.context).buildResponseSend(envelopResponseArgumentCaptor.capture())
+
+    // create an inner response without error
+    val ResponseWithoutError = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
+
+    // build an envelope response
+    channelRequest.buildResponseSend(ResponseWithoutError)
+
+    // expect the envelopeResponse result without error
+    val capturedValue: EnvelopeResponse = envelopResponseArgumentCaptor.getValue
+    assertTrue(capturedValue.error().equals(Errors.NONE))
+  }
+
+  @Test
+  def testEnvelopeBuildResponseSendShouldReturnNotControllerErrorIfInnerResponseHasOne(): Unit = {

Review comment:
       Good suggestion! Added `testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoNotControllerError`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r645318499



##########
File path: build.gradle
##########
@@ -829,6 +829,7 @@ project(':core') {
     testImplementation(libs.jfreechart) {
       exclude group: 'junit', module: 'junit'
     }
+    testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.

Review comment:
       Do we really need this?

##########
File path: build.gradle
##########
@@ -829,6 +829,7 @@ project(':core') {
     testImplementation(libs.jfreechart) {
       exclude group: 'junit', module: 'junit'
     }
+    testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.

Review comment:
       Mocking static methods is not ideal, so we should only do it as a last resort.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-851845194


   Jenkins PR build results proved the `RaftClusterTest` tests doesn't fail anymore:
   `#4`: 1 failed test:
   ```
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStop
   ```
   
   `#5`: all tests passed
   `#6`: all tests passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642195443



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##########
@@ -36,10 +37,16 @@
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
+        Throwable throwableToBeDecode = t;
+        // Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR.
+        if (t instanceof CompletionException) {
+            throwableToBeDecode = t.getCause();

Review comment:
       fix 1: unwrap the `CompletionException` to get the original exception inside.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##########
@@ -36,10 +37,16 @@
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
+        Throwable throwableToBeEncode = t;
+        // Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR.
+        if (t instanceof CompletionException) {
+            throwableToBeEncode = t.getCause();

Review comment:
       fix 1: unwrap the CompletionException to get the original exception inside.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r645321936



##########
File path: build.gradle
##########
@@ -829,6 +829,7 @@ project(':core') {
     testImplementation(libs.jfreechart) {
       exclude group: 'junit', module: 'junit'
     }
+    testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.

Review comment:
       Yes, I need this to mock a stack method. This library only added in test, not in production, and also used in other tests, I think it should be fine. Any concern about this? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r645522757



##########
File path: build.gradle
##########
@@ -829,6 +829,7 @@ project(':core') {
     testImplementation(libs.jfreechart) {
       exclude group: 'junit', module: 'junit'
     }
+    testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.

Review comment:
       Ok, I'll see if I can test without it. Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642556841



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -363,6 +386,22 @@ class BrokerToControllerRequestThread(
     }
   }
 
+  def maybeCheckNotControllerErrorInsideEnvelopeResponse(requestHeader: RequestHeader, responseBody: AbstractResponse): Boolean = {
+    if (responseBody.isInstanceOf[EnvelopeResponse] && requestHeader != null) {
+      info(s"Trying to find NOT_CONTROLLER exception inside envelope response")
+      val envelopeResponse = responseBody.asInstanceOf[EnvelopeResponse]
+      val envelopeError = envelopeResponse.error()
+
+      if (envelopeError == Errors.NONE) {
+        val response = AbstractResponse.parseResponse(envelopeResponse.responseData, requestHeader)
+        envelopeResponse.responseData().rewind()
+        return response.errorCounts().containsKey(Errors.NOT_CONTROLLER)
+      }
+    }
+
+    false;

Review comment:
       nit: unnecessary `;`

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -363,6 +386,22 @@ class BrokerToControllerRequestThread(
     }
   }
 
+  def maybeCheckNotControllerErrorInsideEnvelopeResponse(requestHeader: RequestHeader, responseBody: AbstractResponse): Boolean = {

Review comment:
       How about move this to `ForwardingManager` since the envelopResponse and envelopRequest are all in `ForwardingManager`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-851446115


   Tests added. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-864268173


   @showuon Thanks for the patch. One detail in the description does not seem right to me.
   
   > 1. "NOT_CONTROLLER" exception won't be correctly send back, instead, `UNKNOWN_SERVER_ERROR` will be returned. The reason is the `NotControllerException` is wrapped by a `CompletionException` when the `Future` completeExceptionally. And the `CompletionException` will not match any Errors we defined, so the `UNKNOWN_SERVER_ERROR` will be returned.
   
   The client is not necessarily trying to reach the controller. It is sending a request to a broker and the fact that we forward it to a controller is an implementation detail. So the NOT_CONTROLLER error does not make sense for the client. That is why we do not return it. We decided instead that if the controller could not be reached before expiration of the timeout, then we would return REQUEST_TIMED_OUT instead. 
   
   That aside, it is not very clear to me why we need to parse the envelope response. The intent was to return NOT_CONTROLLER as the error code in the EnvelopeResponse itself. Perhaps there is a race condition in the controller handling where this is not working? Probably that is what we should fix.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-876252977


   Failed tests are unrelated. Thanks.
   ```
       Build / ARM / kafka.raft.KafkaMetadataLogTest.testDeleteSnapshots()
       Build / ARM / kafka.raft.KafkaMetadataLogTest.testDeleteSnapshots()
       Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorInitialize
       Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector
       Build / JDK 8 and Scala 2.12 / kafka.raft.KafkaMetadataLogTest.testDeleteSnapshots()
       Build / JDK 8 and Scala 2.12 / kafka.raft.KafkaMetadataLogTest.testDeleteSnapshots()
       Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2]
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2[true]
       Build / JDK 16 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator
       Build / JDK 16 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorClientPolicyIntegrationTest.testCreateWithAllowedOverridesForPrincipalPolicy
       Build / JDK 16 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
       Build / JDK 16 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
       Build / JDK 16 and Scala 2.13 / kafka.api.TransactionsTest.testAbortTransactionTimeout()
       Build / JDK 16 and Scala 2.13 / kafka.api.TransactionsTest.testAbortTransactionTimeout()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: [WIP]KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642196090



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -239,18 +244,34 @@ class BrokerToControllerChannelManagerImpl(
    *
    * @param request         The request to be sent.
    * @param callback        Request completion callback.
+   * @param requestHeader   The request header to be sent, used for parsing the envelop response
    */
   def sendRequest(
     request: AbstractRequest.Builder[_ <: AbstractRequest],
-    callback: ControllerRequestCompletionHandler
+    callback: ControllerRequestCompletionHandler,
+    requestHeader: RequestHeader
   ): Unit = {
     requestThread.enqueue(BrokerToControllerQueueItem(
       time.milliseconds(),
       request,
-      callback
+      callback,
+      requestHeader
     ))
   }
 
+  /**
+   * Send request to the controller.
+   *
+   * @param request         The request to be sent.
+   * @param callback        Request completion callback.
+   */
+  def sendRequest(
+   request: AbstractRequest.Builder[_ <: AbstractRequest],
+   callback: ControllerRequestCompletionHandler,

Review comment:
       Use the method overloading because the scala default parameter value doesn't work well in Mockito. 
   ref: https://stackoverflow.com/questions/32975379/why-mockito-doesnt-handle-default-scala-parameters-properly
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-867662039


   @hachikuji , call for review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10794:
URL: https://github.com/apache/kafka/pull/10794


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r645318499



##########
File path: build.gradle
##########
@@ -829,6 +829,7 @@ project(':core') {
     testImplementation(libs.jfreechart) {
       exclude group: 'junit', module: 'junit'
     }
+    testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.

Review comment:
       Do we really need this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r667171204



##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -29,7 +29,7 @@ import java.util
 import java.util.Collections
 import scala.jdk.CollectionConverters._
 
-@Timeout(120000)
+@Timeout(120)

Review comment:
       I think the default unit is seconds, so this is 120s.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-876993241


   @hachikuji , thanks for the reminder. Yes, I forgot to add tests for that area. Added. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r667072949



##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -29,7 +29,7 @@ import java.util
 import java.util.Collections
 import scala.jdk.CollectionConverters._
 
-@Timeout(120000)
+@Timeout(120)

Review comment:
       Hmm, I don't think 0.12 seconds is a good timeout for these integration tests. I would be OK cutting the time down to 60 seconds per test, though, rather than 120.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-851845194


   Jenkins PR build results proved the flaky tests doesn't fail anymore:
   `#4`: 1 failed test:
   ```
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStop
   ```
   
   `#5`: all tests passed
   `#6`: all tests passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-876103088


   @hachikuji , I've updated addressed your comments. Please take a look again. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: [WIP]KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642195788



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -350,7 +372,8 @@ class BrokerToControllerRequestThread(
     } else if (response.wasDisconnected()) {
       updateControllerAddress(null)
       requestQueue.putFirst(queueItem)
-    } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+    } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER) ||
+      maybeCheckNotControllerErrorInsideEnvelopeResponse(queueItem.requestHeader, response.responseBody())) {

Review comment:
       fix 2: parse the envelope response to check if `NotControllerError` existed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642803795



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -239,18 +244,34 @@ class BrokerToControllerChannelManagerImpl(
    *
    * @param request         The request to be sent.
    * @param callback        Request completion callback.
+   * @param requestHeader   The request header to be sent, used for parsing the envelop response
    */
   def sendRequest(
     request: AbstractRequest.Builder[_ <: AbstractRequest],
-    callback: ControllerRequestCompletionHandler
+    callback: ControllerRequestCompletionHandler,
+    requestHeader: RequestHeader

Review comment:
       add a `requestHeader` parameter to do envelope response parsing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-877715816


   @hachikuji , I've addressed your comments. Please take a look again.
   
   Failed tests are unrelated. Thanks.
   ```
       Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
       Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
       Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testSendOffsetsToTransactionTimeout()
       Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testCommitTransactionTimeout()
       Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
       Build / JDK 8 and Scala 2.12 / kafka.server.RaftClusterSnapshotTest.testContorllerSnapshotGenerated()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r647946747



##########
File path: build.gradle
##########
@@ -829,6 +829,7 @@ project(':core') {
     testImplementation(libs.jfreechart) {
       exclude group: 'junit', module: 'junit'
     }
+    testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.

Review comment:
       @ijuma , I've removed the `mockitoInline` library and mock with original Mockito lib. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642462291



##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -125,7 +125,7 @@ class ForwardingManagerImpl(
       }
     }
 
-    channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler)
+    channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler, request.header)

Review comment:
       This is 2nd of 2 places we use `EnvelopeRequest`, add `requestHeader`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-853761494


   @hachikuji @mumrah @abbccdda @cmccabe , call for review. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-853761494


   @hachikuji @mumrah @abbccdda @cmccabe , call for review. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-850993572


   This is awesome, I also notice that there is no logic about `NOT_CONTROLLER` and active controller, I will take some time to test this PR. 👍


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-853761494


   @hachikuji @mumrah @abbccdda @cmccabe , call for review since the tests keep failing. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-864374754


   @hachikuji , I checked and there's no race condition there. The reason why we didn't put `NOT_CONTROLLER` in EnvelopResponse itself is because we will build envelopeResponse **with no error** as long as we got response from handler (createTopics handler in this case) [here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L124). So, in `ControllerApi#handleCreateTopics`, we'll sendResponse when future completed, and then `RequestHandlerHelper#sendResponseMaybeThrottle`, we'll `buildResponseSend` and then go to the above link location. 
   
   So, to fix this issue, I need to build EnvelopeResponse with `NotControllerError` when the response from handler is having `NotControllerError` (fix 2). But if we don't have (fix 1), we can only get `Unknown_Server_Error` in the response from handler because the `NotControllerError` is wrapped with `CompletionException`.
   
   That's my solution. Please help review. Thank you very much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r666364712



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##########
@@ -36,10 +38,21 @@
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
+        Throwable throwableToBeEncode = t;

Review comment:
       nit: `throwableToBeEncoded`?

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -124,8 +124,15 @@ object RequestChannel extends Logging {
     def buildResponseSend(abstractResponse: AbstractResponse): Send = {
       envelope match {
         case Some(request) =>
-          val responseBytes = context.buildResponseEnvelopePayload(abstractResponse)
-          val envelopeResponse = new EnvelopeResponse(responseBytes, Errors.NONE)
+          val envelopeResponse = if (abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+            // Since it's a Not Controller error response, we need to make envelope response with Not Controller error

Review comment:
       Do we have a test case which fails without this fix?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642748185



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -363,6 +386,22 @@ class BrokerToControllerRequestThread(
     }
   }
 
+  def maybeCheckNotControllerErrorInsideEnvelopeResponse(requestHeader: RequestHeader, responseBody: AbstractResponse): Boolean = {

Review comment:
       Yes, I tried, but it can't. The `activeController` is stored in `BrokerToControllerRequestThread`, which owned by `BrokerToControllerChannelManager`. So, only the `BrokerToControllerRequestThread#handleResponse` can update the activeController and put the reqeust into top of the queue again (retry).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r665696909



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -124,8 +124,15 @@ object RequestChannel extends Logging {
     def buildResponseSend(abstractResponse: AbstractResponse): Send = {
       envelope match {
         case Some(request) =>
-          val responseBytes = context.buildResponseEnvelopePayload(abstractResponse)
-          val envelopeResponse = new EnvelopeResponse(responseBytes, Errors.NONE)
+          val envelopeResponse = if (abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+            // Since it's Not Controller error response, we need to make envelope response with Not Controller error
+            // to notify the requester (i.e. BrokerToControllerRequestThread) to update active controller
+            new EnvelopeResponse(new EnvelopeResponseData()
+              .setErrorCode(Errors.NOT_CONTROLLER.code()))

Review comment:
       Ok, I think this makes sense. Basically we were missing the same logic we have in `KafkaApis.handleEnvelope` to verify the controller status. Instead, we just send the request into the controller thread. Do I have that right? I guess another way we could do it is to send the full envelope into the controller and let it set the NOT_CONTROLLER error at the right level, but that might take some refactoring and this solution seems reasonable as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r666750287



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##########
@@ -36,10 +38,21 @@
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
+        Throwable throwableToBeEncode = t;

Review comment:
       Updated.

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -124,8 +124,15 @@ object RequestChannel extends Logging {
     def buildResponseSend(abstractResponse: AbstractResponse): Send = {
       envelope match {
         case Some(request) =>
-          val responseBytes = context.buildResponseEnvelopePayload(abstractResponse)
-          val envelopeResponse = new EnvelopeResponse(responseBytes, Errors.NONE)
+          val envelopeResponse = if (abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+            // Since it's a Not Controller error response, we need to make envelope response with Not Controller error

Review comment:
       Added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r654866143



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##########
@@ -36,10 +37,16 @@
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
+        Throwable throwableToBeEncode = t;
+        // Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR.
+        if (t instanceof CompletionException) {
+            throwableToBeEncode = t.getCause();

Review comment:
       fix 1: unwrap the `CompletionException` to get the original exception inside. Even if we don't want the `NotControllerException` return back to client, we need to know it to do some check.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##########
@@ -36,10 +37,16 @@
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
+        Throwable throwableToBeEncode = t;
+        // Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR.
+        if (t instanceof CompletionException) {
+            throwableToBeEncode = t.getCause();

Review comment:
       fix 1: unwrap the CompletionException to get the original exception inside.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-864343641


   @hachikuji , thanks for your explanation. 
   > The client is not necessarily trying to reach the controller. It is sending a request to a broker and the fact that we forward it to a controller is an implementation detail. So the NOT_CONTROLLER error does not make sense for the client. That is why we do not return it. 
   
   >   The intent was to return NOT_CONTROLLER as the error code in the EnvelopeResponse itself. Perhaps there is a race condition in the controller handling where this is not working? Probably that is what we should fix.
   
   That makes sense! I'll look into it and let you know. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r667293695



##########
File path: core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
##########
@@ -179,6 +184,108 @@ class RequestChannelTest {
     assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
+  @Test
+  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = {
+    val channelRequest = buildRequestWithEnvelope()

Review comment:
       Put the `buildRequestWithEnvelope` method in TestUtils class to share between `KafkaApisTest` and `RequestChannelTest`. And also use `MetadataRequest` as input. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r654763984



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -124,8 +124,15 @@ object RequestChannel extends Logging {
     def buildResponseSend(abstractResponse: AbstractResponse): Send = {
       envelope match {
         case Some(request) =>
-          val responseBytes = context.buildResponseEnvelopePayload(abstractResponse)
-          val envelopeResponse = new EnvelopeResponse(responseBytes, Errors.NONE)
+          val envelopeResponse = if (abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+            // Since it's Not Controller error response, we need to make envelope response with Not Controller error
+            // to notify the requester (i.e. BrokerToControllerRequestThread) to update active controller
+            new EnvelopeResponse(new EnvelopeResponseData()
+              .setErrorCode(Errors.NOT_CONTROLLER.code()))

Review comment:
       fix 2: Make the envelope response return `NotControllerException` if the controller response has `NotControllerException`. So that we can catch the `NotControllerException` on envelopeResponse to update the active controller.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r667293513



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##########
@@ -36,10 +38,21 @@
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
+        Throwable throwableToBeEncoded = t;
+        // Get the original cause from `CompletionException` and `ExecutionException`, otherwise, it will return unexpected UNKNOWN_SERVER_ERROR.

Review comment:
       Thanks for the suggestion! Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-851846366


   @dengziming , thanks for the comments. I've updated. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r647946840



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -350,7 +372,8 @@ class BrokerToControllerRequestThread(
     } else if (response.wasDisconnected()) {
       updateControllerAddress(null)
       requestQueue.putFirst(queueItem)
-    } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+    } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER) ||
+      maybeCheckNotControllerErrorInsideEnvelopeResponse(queueItem.requestHeader, response.responseBody())) {

Review comment:
       @wenbingshen , thanks for the comment. I've addressed your comment. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642462291



##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -125,7 +125,7 @@ class ForwardingManagerImpl(
       }
     }
 
-    channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler)
+    channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler, request.header)

Review comment:
       This is the 2nd of 2 places we use `EnvelopeRequest`, add `requestHeader`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-863801431


   @hachikuji @mumrah @abbccdda @cmccabe , please help review when available. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642461619



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -198,15 +199,15 @@ class DefaultAutoTopicCreationManager(
 
       // Borrow client information such as client id and correlation id from the original request,
       // in order to correlate the create request with the original metadata request.
-      val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+      requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
         requestVersion,
         context.clientId,
         context.correlationId)
       ForwardingManager.buildEnvelopeRequest(context,
         createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
     }.getOrElse(createTopicsRequest)
 
-    channelManager.sendRequest(request, requestCompletionHandler)
+    channelManager.sendRequest(request, requestCompletionHandler, requestHeader)

Review comment:
       This is the 1st of 2 places we use `EnvelopeRequest`, so add `requestHeader`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r645343056



##########
File path: build.gradle
##########
@@ -829,6 +829,7 @@ project(':core') {
     testImplementation(libs.jfreechart) {
       exclude group: 'junit', module: 'junit'
     }
+    testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.

Review comment:
       Mocking static methods is not ideal, so we should only do it as a last resort.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org