You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/19 20:48:57 UTC

[GitHub] [kafka] hachikuji opened a new pull request, #12185: MINOR: Fix buildResponseSend test cases for envelope responses

hachikuji opened a new pull request, #12185:
URL: https://github.com/apache/kafka/pull/12185

   The test cases we have in `RequestChannelTest` for `buildResponseSend` construct the envelope request incorrectly. Basically they confuse the envelope context and the reference to the wrapped envelope request object. This patch fixes `TestUtils.buildEnvelopeRequest` so that the wrapped request is built properly. It also consolidates the tests in `RequestChannelTest` to avoid duplication.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji merged pull request #12185: MINOR: Fix buildResponseSend test cases for envelope responses

Posted by GitBox <gi...@apache.org>.
hachikuji merged PR #12185:
URL: https://github.com/apache/kafka/pull/12185


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12185: MINOR: Fix buildResponseSend test cases for envelope responses

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12185:
URL: https://github.com/apache/kafka/pull/12185#discussion_r879657858


##########
core/src/test/scala/unit/kafka/network/RequestChannelTest.scala:
##########
@@ -191,84 +194,66 @@ class RequestChannelTest {
     assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
-  @Test
-  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = {
-    val channelRequest = buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      .when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response without error
-    val responseWithoutError = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithoutError)
-
-    // expect the envelopeResponse result without error
-    val capturedValue: EnvelopeResponse = envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NONE))
+  @ParameterizedTest
+  @EnumSource(value=classOf[Errors], names=Array("NONE", "CLUSTER_AUTHORIZATION_FAILED", "NOT_CONTROLLER"))
+  def testBuildEnvelopeResponse(error: Errors): Unit = {
+    val topic = "foo"
+    val createTopicRequest = buildCreateTopicRequest(topic)

Review Comment:
   Yes, that's right. Because I'm using `EnvelopeUtils.handleEnvelopeRequest` in the test helper, the validation is stricter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #12185: MINOR: Fix buildResponseSend test cases for envelope responses

Posted by GitBox <gi...@apache.org>.
dengziming commented on code in PR #12185:
URL: https://github.com/apache/kafka/pull/12185#discussion_r879103834


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -2195,22 +2193,18 @@ object TestUtils extends Logging {
 
     RequestHeader.parse(envelopeBuffer)
 
-    var requestContext = new RequestContext(envelopeHeader, "1", InetAddress.getLocalHost,
+    val envelopeContext = new RequestContext(envelopeHeader, "1", InetAddress.getLocalHost,
       KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY,
       fromPrivilegedListener, Optional.of(principalSerde))
 
-    if (shouldSpyRequestContext) {
-      requestContext = Mockito.spy(requestContext)
-    }
-
     new RequestChannel.Request(
       processor = 1,
-      context = requestContext,
+      context = envelopeContext,
       startTimeNanos = startTimeNanos,
       memoryPool = MemoryPool.NONE,
       buffer = envelopeBuffer,
       metrics = requestChannelMetrics,
-      envelope = envelope

Review Comment:
   This is the source of the problem, envelope of the wrapped request should be None, right?



##########
core/src/test/scala/unit/kafka/network/RequestChannelTest.scala:
##########
@@ -191,84 +194,66 @@ class RequestChannelTest {
     assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
-  @Test
-  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = {
-    val channelRequest = buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      .when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response without error
-    val responseWithoutError = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithoutError)
-
-    // expect the envelopeResponse result without error
-    val capturedValue: EnvelopeResponse = envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NONE))
+  @ParameterizedTest
+  @EnumSource(value=classOf[Errors], names=Array("NONE", "CLUSTER_AUTHORIZATION_FAILED", "NOT_CONTROLLER"))
+  def testBuildEnvelopeResponse(error: Errors): Unit = {
+    val topic = "foo"
+    val createTopicRequest = buildCreateTopicRequest(topic)
+    val unwrapped = buildUnwrappedEnvelopeRequest(createTopicRequest)
+
+    val createTopicResponse = buildCreateTopicResponse(topic, error)
+    val envelopeResponse = buildEnvelopeResponse(unwrapped, createTopicResponse)
+
+    error match {
+      case Errors.NOT_CONTROLLER =>
+        assertEquals(Errors.NOT_CONTROLLER, envelopeResponse.error)
+        assertNull(envelopeResponse.responseData)
+      case _ =>
+        assertEquals(Errors.NONE, envelopeResponse.error)
+        val unwrappedResponse = AbstractResponse.parseResponse(envelopeResponse.responseData(), unwrapped.header)
+        assertEquals(createTopicResponse.data, unwrappedResponse.data)
+    }
   }
 
-  @Test
-  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoNotControllerError(): Unit = {
-    val channelRequest = buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      .when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response with REQUEST_TIMED_OUT error
-    val responseWithTimeoutError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
-      Collections.singletonMap("a", Errors.REQUEST_TIMED_OUT),
-      Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithTimeoutError)
-
-    // expect the envelopeResponse result without error
-    val capturedValue: EnvelopeResponse = envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NONE))
+  private def buildCreateTopicRequest(topic: String): CreateTopicsRequest = {
+    val requestData = new CreateTopicsRequestData()
+    requestData.topics.add(new CreatableTopic()
+      .setName(topic)
+      .setReplicationFactor(-1)
+      .setNumPartitions(-1)
+    )
+    new CreateTopicsRequest.Builder(requestData).build()
   }
 
-  @Test
-  def testEnvelopeBuildResponseSendShouldReturnNotControllerErrorIfInnerResponseHasOne(): Unit = {
-    val channelRequest = buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      .when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response with NOT_CONTROLLER error
-    val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
-      Collections.singletonMap("a", Errors.NOT_CONTROLLER),
-      Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithNotControllerError)
-
-    // expect the envelopeResponse result has NOT_CONTROLLER error
-    val capturedValue: EnvelopeResponse = envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NOT_CONTROLLER))
+  private def buildCreateTopicResponse(
+    topic: String,
+    error: Errors,
+  ): CreateTopicsResponse = {
+    val responseData = new CreateTopicsResponseData()
+    responseData.topics.add(new CreateTopicsResponseData.CreatableTopicResult()
+      .setName(topic)
+      .setErrorCode(error.code)
+    )
+    new CreateTopicsResponse(responseData)
   }
 
-  private def buildMetadataRequest(): AbstractRequest = {
-    val resourceName = "topic-1"
-    val header = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion,
-      clientId, 0)
+  private def buildUnwrappedEnvelopeRequest(request: AbstractRequest): RequestChannel.Request = {
+    val wrappedRequest = TestUtils.buildEnvelopeRequest(

Review Comment:
   If I understand correctly, the term "wrapped" means the request send from broker to controller, and "unwrapped" means the inner request which is parsed by the controller.



##########
core/src/test/scala/unit/kafka/network/RequestChannelTest.scala:
##########
@@ -191,84 +194,66 @@ class RequestChannelTest {
     assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
-  @Test
-  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = {
-    val channelRequest = buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      .when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response without error
-    val responseWithoutError = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithoutError)
-
-    // expect the envelopeResponse result without error
-    val capturedValue: EnvelopeResponse = envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NONE))
+  @ParameterizedTest
+  @EnumSource(value=classOf[Errors], names=Array("NONE", "CLUSTER_AUTHORIZATION_FAILED", "NOT_CONTROLLER"))
+  def testBuildEnvelopeResponse(error: Errors): Unit = {
+    val topic = "foo"
+    val createTopicRequest = buildCreateTopicRequest(topic)

Review Comment:
   We change`MetatadaRequest` to `CreateTopicRequest` because `MetatadaRequest` is not forwardable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12185: MINOR: Fix buildResponseSend test cases for envelope responses

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12185:
URL: https://github.com/apache/kafka/pull/12185#discussion_r879661710


##########
core/src/test/scala/unit/kafka/network/RequestChannelTest.scala:
##########
@@ -191,84 +194,66 @@ class RequestChannelTest {
     assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
-  @Test
-  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = {
-    val channelRequest = buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      .when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response without error
-    val responseWithoutError = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithoutError)
-
-    // expect the envelopeResponse result without error
-    val capturedValue: EnvelopeResponse = envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NONE))
+  @ParameterizedTest
+  @EnumSource(value=classOf[Errors], names=Array("NONE", "CLUSTER_AUTHORIZATION_FAILED", "NOT_CONTROLLER"))
+  def testBuildEnvelopeResponse(error: Errors): Unit = {
+    val topic = "foo"
+    val createTopicRequest = buildCreateTopicRequest(topic)
+    val unwrapped = buildUnwrappedEnvelopeRequest(createTopicRequest)
+
+    val createTopicResponse = buildCreateTopicResponse(topic, error)
+    val envelopeResponse = buildEnvelopeResponse(unwrapped, createTopicResponse)
+
+    error match {
+      case Errors.NOT_CONTROLLER =>
+        assertEquals(Errors.NOT_CONTROLLER, envelopeResponse.error)
+        assertNull(envelopeResponse.responseData)
+      case _ =>
+        assertEquals(Errors.NONE, envelopeResponse.error)
+        val unwrappedResponse = AbstractResponse.parseResponse(envelopeResponse.responseData(), unwrapped.header)
+        assertEquals(createTopicResponse.data, unwrappedResponse.data)
+    }
   }
 
-  @Test
-  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoNotControllerError(): Unit = {
-    val channelRequest = buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      .when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response with REQUEST_TIMED_OUT error
-    val responseWithTimeoutError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
-      Collections.singletonMap("a", Errors.REQUEST_TIMED_OUT),
-      Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithTimeoutError)
-
-    // expect the envelopeResponse result without error
-    val capturedValue: EnvelopeResponse = envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NONE))
+  private def buildCreateTopicRequest(topic: String): CreateTopicsRequest = {
+    val requestData = new CreateTopicsRequestData()
+    requestData.topics.add(new CreatableTopic()
+      .setName(topic)
+      .setReplicationFactor(-1)
+      .setNumPartitions(-1)
+    )
+    new CreateTopicsRequest.Builder(requestData).build()
   }
 
-  @Test
-  def testEnvelopeBuildResponseSendShouldReturnNotControllerErrorIfInnerResponseHasOne(): Unit = {
-    val channelRequest = buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      .when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response with NOT_CONTROLLER error
-    val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
-      Collections.singletonMap("a", Errors.NOT_CONTROLLER),
-      Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithNotControllerError)
-
-    // expect the envelopeResponse result has NOT_CONTROLLER error
-    val capturedValue: EnvelopeResponse = envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NOT_CONTROLLER))
+  private def buildCreateTopicResponse(
+    topic: String,
+    error: Errors,
+  ): CreateTopicsResponse = {
+    val responseData = new CreateTopicsResponseData()
+    responseData.topics.add(new CreateTopicsResponseData.CreatableTopicResult()
+      .setName(topic)
+      .setErrorCode(error.code)
+    )
+    new CreateTopicsResponse(responseData)
   }
 
-  private def buildMetadataRequest(): AbstractRequest = {
-    val resourceName = "topic-1"
-    val header = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion,
-      clientId, 0)
+  private def buildUnwrappedEnvelopeRequest(request: AbstractRequest): RequestChannel.Request = {
+    val wrappedRequest = TestUtils.buildEnvelopeRequest(

Review Comment:
   Right. The envelope requests are "wrapped." I guess I could also just call it `envelopeRequest` if that is clearer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12185: MINOR: Fix buildResponseSend test cases for envelope responses

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12185:
URL: https://github.com/apache/kafka/pull/12185#discussion_r879659761


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -2195,22 +2193,18 @@ object TestUtils extends Logging {
 
     RequestHeader.parse(envelopeBuffer)
 
-    var requestContext = new RequestContext(envelopeHeader, "1", InetAddress.getLocalHost,
+    val envelopeContext = new RequestContext(envelopeHeader, "1", InetAddress.getLocalHost,
       KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY,
       fromPrivilegedListener, Optional.of(principalSerde))
 
-    if (shouldSpyRequestContext) {
-      requestContext = Mockito.spy(requestContext)
-    }
-
     new RequestChannel.Request(
       processor = 1,
-      context = requestContext,
+      context = envelopeContext,
       startTimeNanos = startTimeNanos,
       memoryPool = MemoryPool.NONE,
       buffer = envelopeBuffer,
       metrics = requestChannelMetrics,
-      envelope = envelope

Review Comment:
   Yes, exactly. There should be no envelope if the request itself is representing one. I think this code might be leftover from the early days when we were doing the envelope unwrapping in the network code before it reached the request handler. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #12185: MINOR: Fix buildResponseSend test cases for envelope responses

Posted by GitBox <gi...@apache.org>.
dengziming commented on code in PR #12185:
URL: https://github.com/apache/kafka/pull/12185#discussion_r879998889


##########
core/src/test/scala/unit/kafka/network/RequestChannelTest.scala:
##########
@@ -191,84 +194,66 @@ class RequestChannelTest {
     assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
-  @Test
-  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = {
-    val channelRequest = buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      .when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response without error
-    val responseWithoutError = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithoutError)
-
-    // expect the envelopeResponse result without error
-    val capturedValue: EnvelopeResponse = envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NONE))
+  @ParameterizedTest
+  @EnumSource(value=classOf[Errors], names=Array("NONE", "CLUSTER_AUTHORIZATION_FAILED", "NOT_CONTROLLER"))
+  def testBuildEnvelopeResponse(error: Errors): Unit = {
+    val topic = "foo"
+    val createTopicRequest = buildCreateTopicRequest(topic)
+    val unwrapped = buildUnwrappedEnvelopeRequest(createTopicRequest)
+
+    val createTopicResponse = buildCreateTopicResponse(topic, error)
+    val envelopeResponse = buildEnvelopeResponse(unwrapped, createTopicResponse)
+
+    error match {
+      case Errors.NOT_CONTROLLER =>
+        assertEquals(Errors.NOT_CONTROLLER, envelopeResponse.error)
+        assertNull(envelopeResponse.responseData)
+      case _ =>
+        assertEquals(Errors.NONE, envelopeResponse.error)
+        val unwrappedResponse = AbstractResponse.parseResponse(envelopeResponse.responseData(), unwrapped.header)
+        assertEquals(createTopicResponse.data, unwrappedResponse.data)
+    }
   }
 
-  @Test
-  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoNotControllerError(): Unit = {
-    val channelRequest = buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      .when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response with REQUEST_TIMED_OUT error
-    val responseWithTimeoutError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
-      Collections.singletonMap("a", Errors.REQUEST_TIMED_OUT),
-      Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithTimeoutError)
-
-    // expect the envelopeResponse result without error
-    val capturedValue: EnvelopeResponse = envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NONE))
+  private def buildCreateTopicRequest(topic: String): CreateTopicsRequest = {
+    val requestData = new CreateTopicsRequestData()
+    requestData.topics.add(new CreatableTopic()
+      .setName(topic)
+      .setReplicationFactor(-1)
+      .setNumPartitions(-1)
+    )
+    new CreateTopicsRequest.Builder(requestData).build()
   }
 
-  @Test
-  def testEnvelopeBuildResponseSendShouldReturnNotControllerErrorIfInnerResponseHasOne(): Unit = {
-    val channelRequest = buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      .when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response with NOT_CONTROLLER error
-    val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
-      Collections.singletonMap("a", Errors.NOT_CONTROLLER),
-      Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithNotControllerError)
-
-    // expect the envelopeResponse result has NOT_CONTROLLER error
-    val capturedValue: EnvelopeResponse = envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NOT_CONTROLLER))
+  private def buildCreateTopicResponse(
+    topic: String,
+    error: Errors,
+  ): CreateTopicsResponse = {
+    val responseData = new CreateTopicsResponseData()
+    responseData.topics.add(new CreateTopicsResponseData.CreatableTopicResult()
+      .setName(topic)
+      .setErrorCode(error.code)
+    )
+    new CreateTopicsResponse(responseData)
   }
 
-  private def buildMetadataRequest(): AbstractRequest = {
-    val resourceName = "topic-1"
-    val header = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion,
-      clientId, 0)
+  private def buildUnwrappedEnvelopeRequest(request: AbstractRequest): RequestChannel.Request = {
+    val wrappedRequest = TestUtils.buildEnvelopeRequest(

Review Comment:
   Both are ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12185: MINOR: Fix buildResponseSend test cases for envelope responses

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12185:
URL: https://github.com/apache/kafka/pull/12185#discussion_r884533015


##########
core/src/test/scala/unit/kafka/network/RequestChannelTest.scala:
##########
@@ -191,84 +194,66 @@ class RequestChannelTest {
     assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
-  @Test
-  def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = {
-    val channelRequest = buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      .when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response without error
-    val responseWithoutError = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithoutError)
-
-    // expect the envelopeResponse result without error
-    val capturedValue: EnvelopeResponse = envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NONE))
+  @ParameterizedTest
+  @EnumSource(value=classOf[Errors], names=Array("NONE", "CLUSTER_AUTHORIZATION_FAILED", "NOT_CONTROLLER"))
+  def testBuildEnvelopeResponse(error: Errors): Unit = {
+    val topic = "foo"
+    val createTopicRequest = buildCreateTopicRequest(topic)
+    val unwrapped = buildUnwrappedEnvelopeRequest(createTopicRequest)
+
+    val createTopicResponse = buildCreateTopicResponse(topic, error)
+    val envelopeResponse = buildEnvelopeResponse(unwrapped, createTopicResponse)
+
+    error match {
+      case Errors.NOT_CONTROLLER =>
+        assertEquals(Errors.NOT_CONTROLLER, envelopeResponse.error)
+        assertNull(envelopeResponse.responseData)
+      case _ =>
+        assertEquals(Errors.NONE, envelopeResponse.error)
+        val unwrappedResponse = AbstractResponse.parseResponse(envelopeResponse.responseData(), unwrapped.header)

Review Comment:
   nit: We could omit parenthesis when calling `responseData`.



##########
core/src/test/scala/unit/kafka/network/RequestChannelTest.scala:
##########
@@ -24,36 +24,39 @@ import java.nio.ByteBuffer
 import java.util.Collections
 import com.fasterxml.jackson.databind.ObjectMapper
 import kafka.network
+import kafka.server.EnvelopeUtils
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, SslConfigs, TopicConfig}
 import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
+import org.apache.kafka.common.message.{CreateTopicsRequestData, CreateTopicsResponseData, IncrementalAlterConfigsRequestData}
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
-import org.apache.kafka.common.network.{ByteBufferSend, ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, MetadataRequest, RequestTestUtils}
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AbstractRequest, MetadataRequest}
 import org.apache.kafka.common.requests.AlterConfigsRequest._
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.test
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api._
 import org.mockito.Mockito.mock
-import org.mockito.{ArgumentCaptor, Mockito}
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
 
+import java.util.concurrent.atomic.AtomicReference

Review Comment:
   nit: Could we put this one next to the other `java.util.*`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org