You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/03 04:04:40 UTC

[GitHub] [kafka] dengziming commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

dengziming commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r606613914



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -156,17 +169,44 @@ class DefaultAutoTopicCreationManager(
         .setTopics(topicsToCreate)
     )
 
-    channelManager.get.sendRequest(createTopicsRequest, new ControllerRequestCompletionHandler {
+    val requestCompletionHandler = new ControllerRequestCompletionHandler {
       override def onTimeout(): Unit = {
         debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
         clearInflightRequests(creatableTopics)
       }
 
       override def onComplete(response: ClientResponse): Unit = {
-        debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
+        debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody.toString}.")
         clearInflightRequests(creatableTopics)
       }
-    })
+    }
+
+    val channelManager = this.channelManager.getOrElse {
+      throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.")
+    }
+
+    val request = metadataRequestContext.map { context =>
+      val requestVersion =
+        channelManager.controllerApiVersions() match {
+          case None =>
+            // We will rely on the Metadata request to be retried in the case
+            // that the latest version is not usable by the controller.
+            ApiKeys.CREATE_TOPICS.latestVersion()
+          case Some(nodeApiVersions) =>
+            nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+        }
+
+      // Borrow client information such as client id and correlation id from the original request,
+      // in order to correlate the create request with the original metadata request.
+      val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+        requestVersion,
+        context.clientId,
+        context.correlationId)
+      ForwardingManager.buildEnvelopeRequest(context,
+        createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))

Review comment:
       How about moving `serializeWithHeader` to `RequestContext`, like this:
   RequestContext.java
   ```java
       public ByteBuffer buildRequestEnvelopePayload(ApiKeys apiKeys, short apiVersion, AbstractRequest body) {
           // Borrow client information such as client id and correlation id from the original request,
           // in order to correlate the create request with the original metadata request.
           RequestHeader requestHeader = new RequestHeader(apiKeys,
               apiVersion,
               clientId(),
               correlationId());
           return body.serializeWithHeader(requestHeader);
       }
   ```
   and we can call it like `context.buildRequestEnvelopePayload(ApiKeys.CREATE_TOPICS, requestVersion, createTopicsRequest.build(requestVersion))`.
   
   This trivial change has 2 benefits:
   1. Keep the symmetry of `RequestContext` since it already has a `buildResponseEnvelopePayload` method
   2. `buildRequestEnvelopePayload` can be reused in the future if we want to forward other client requests to the controller
   
   Please feel free to point my fault if I made any mistakes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org