You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/17 22:10:44 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

hachikuji commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r596413282



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -77,16 +79,24 @@ class DefaultAutoTopicCreationManager(
 
   private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
 
+  /**
+   * Initiate auto topic creation for the given topics.
+   *
+   * @param requestOpt defined when creating topics on behalf of the client. The goal here is to preserve

Review comment:
       nit: if we document one parameter, we may as well document the others

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##########
@@ -101,6 +101,13 @@ public final Send toSend(RequestHeader header) {
         return SendBuilder.buildRequestSend(header, data());
     }
 
+    /**
+     * Serializes header and body without prefixing with size (unlike `toSend`, which does include a size prefix).
+     */
+    public final ByteBuffer serializeWithHeader(RequestHeader header) {

Review comment:
       It seems useful to validate that the api key and api version from the request header match that in the request object.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -166,7 +177,28 @@ class DefaultAutoTopicCreationManager(
         debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
         clearInflightRequests(creatableTopics)
       }
-    })
+    }
+
+    requestOpt match {
+      case Some(request) =>
+        val requestVersion =
+          channelManager.get.controllerApiVersions() match {
+            case None =>
+              ApiKeys.CREATE_TOPICS.latestVersion()
+            case Some(nodeApiVersions) =>
+              nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+          }
+
+        val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+          requestVersion,
+          request.context.clientId(),
+          request.context.correlationId())
+        val envelopeRequest = ForwardingManager.buildEnvelopeRequest(request,
+          createTopicsRequest.build().serializeWithHeader(requestHeader))
+        channelManager.get.sendRequest(envelopeRequest, requestCompletionHandler)

Review comment:
       nit: we can probably avoid all of these `get` calls if we add something like this above:
   ```scala
   val channelManager = this.channelManager.getOrElse {
     throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests")
   }
   ```

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -77,16 +79,24 @@ class DefaultAutoTopicCreationManager(
 
   private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
 
+  /**
+   * Initiate auto topic creation for the given topics.
+   *
+   * @param requestOpt defined when creating topics on behalf of the client. The goal here is to preserve
+   *                   original client principal for auditing, thus needing to wrap a plain CreateTopicsRequest
+   *                   inside Envelope to send to the controller when forwarding is enabled.
+   */
   override def createTopics(
     topics: Set[String],
-    controllerMutationQuota: ControllerMutationQuota
+    controllerMutationQuota: ControllerMutationQuota,
+    requestOpt: Option[RequestChannel.Request]

Review comment:
       I'm trying to come up with a better name. Maybe it's ok to be explicit about the fact that this should be a metadata request. If we change to `Option[RequestContext]`, then maybe we can just call it `metadataRequestContext` or something like that.

##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -43,6 +43,21 @@ object ForwardingManager {
   ): ForwardingManager = {
     new ForwardingManagerImpl(channelManager)
   }
+
+  private[server] def buildEnvelopeRequest(request: RequestChannel.Request,

Review comment:
       Could we use `RequestContext` as the argument instead of `Request`? As far as I can tell, that is all we need. Similarly, we can change `AutoTopicCreationManager.createTopics`.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -166,7 +177,28 @@ class DefaultAutoTopicCreationManager(
         debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
         clearInflightRequests(creatableTopics)
       }
-    })
+    }
+
+    requestOpt match {
+      case Some(request) =>
+        val requestVersion =
+          channelManager.get.controllerApiVersions() match {
+            case None =>
+              ApiKeys.CREATE_TOPICS.latestVersion()
+            case Some(nodeApiVersions) =>
+              nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+          }
+
+        val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+          requestVersion,
+          request.context.clientId(),
+          request.context.correlationId())

Review comment:
       Hmm.. It seems a little bit strange to borrow these fields from the original request, but maybe it's not a bad idea. It would potentially allow us to correlate the create request with the original metadata request. It may be worth a comment in the code of explanation.
   
   also nit: unneeded parenthesis




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org