You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/17 20:47:09 UTC

[GitHub] [kafka] abbccdda opened a new pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

abbccdda opened a new pull request #10142:
URL: https://github.com/apache/kafka/pull/10142


   For the metadata auto topic creation case, it is favorable to use `Envelope` to wrap the CreateTopicsRequest alongside the original client principal for auditing purpose.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r598998992



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -166,7 +179,34 @@ class DefaultAutoTopicCreationManager(
         debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
         clearInflightRequests(creatableTopics)
       }
-    })
+    }
+
+    val channelManager = this.channelManager.getOrElse {
+      throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.")
+    }
+
+    metadataRequestContext match {
+      case Some(context) =>
+        val requestVersion =
+          channelManager.controllerApiVersions() match {
+            case None =>
+              ApiKeys.CREATE_TOPICS.latestVersion()

Review comment:
       This is interesting. I guess the situation is when we have not connected to the controller yet. What happens if the controller does not support this version? It should send back UNSUPPORTED_VERSION in the envelope response I think, right? Should we retry the request or do we just rely on the client to resend the Metadata request so that we can retry? 

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -166,7 +179,34 @@ class DefaultAutoTopicCreationManager(
         debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
         clearInflightRequests(creatableTopics)
       }
-    })
+    }
+
+    val channelManager = this.channelManager.getOrElse {
+      throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.")
+    }
+
+    metadataRequestContext match {
+      case Some(context) =>
+        val requestVersion =
+          channelManager.controllerApiVersions() match {
+            case None =>
+              ApiKeys.CREATE_TOPICS.latestVersion()
+            case Some(nodeApiVersions) =>
+              nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+          }
+
+        // Borrow client information such as client id and correlation id from the original request,
+        // in order to correlate the create request with the original metadata request.
+        val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+          requestVersion,
+          context.clientId,
+          context.correlationId)
+        val envelopeRequest = ForwardingManager.buildEnvelopeRequest(context,
+          createTopicsRequest.build().serializeWithHeader(requestHeader))
+        channelManager.sendRequest(envelopeRequest, requestCompletionHandler)

Review comment:
       nit: this call is shared by both branches, maybe we could simplify a little bit. something like this:
   ```scala
   val request = metadataRequestContext.map { context =>
     ...
   }.getOrElse(createTopicRequest)
   
   channelManager.sendRequest(request, requestCompletionHandler)
   ```

##########
File path: core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
##########
@@ -263,7 +263,7 @@ class AutoTopicCreationManagerTest {
                                          topicName: String,
                                          isInternal: Boolean): Unit = {
     val topicResponses = autoTopicCreationManager.createTopics(
-      Set(topicName), UnboundedControllerMutationQuota)
+      Set(topicName), UnboundedControllerMutationQuota, None)

Review comment:
       Do we have any test cases which provide a metadata context?

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -166,7 +179,34 @@ class DefaultAutoTopicCreationManager(
         debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
         clearInflightRequests(creatableTopics)
       }
-    })
+    }
+
+    val channelManager = this.channelManager.getOrElse {
+      throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.")
+    }
+
+    metadataRequestContext match {
+      case Some(context) =>
+        val requestVersion =
+          channelManager.controllerApiVersions() match {
+            case None =>
+              ApiKeys.CREATE_TOPICS.latestVersion()
+            case Some(nodeApiVersions) =>
+              nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+          }
+
+        // Borrow client information such as client id and correlation id from the original request,
+        // in order to correlate the create request with the original metadata request.
+        val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+          requestVersion,
+          context.clientId,
+          context.correlationId)
+        val envelopeRequest = ForwardingManager.buildEnvelopeRequest(context,
+          createTopicsRequest.build().serializeWithHeader(requestHeader))

Review comment:
       Don't we need to pass `requestVersion` to `build`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10142:
URL: https://github.com/apache/kafka/pull/10142


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r606870392



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -156,17 +169,44 @@ class DefaultAutoTopicCreationManager(
         .setTopics(topicsToCreate)
     )
 
-    channelManager.get.sendRequest(createTopicsRequest, new ControllerRequestCompletionHandler {
+    val requestCompletionHandler = new ControllerRequestCompletionHandler {
       override def onTimeout(): Unit = {
         debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
         clearInflightRequests(creatableTopics)
       }
 
       override def onComplete(response: ClientResponse): Unit = {
-        debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
+        debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody.toString}.")
         clearInflightRequests(creatableTopics)
       }
-    })
+    }
+
+    val channelManager = this.channelManager.getOrElse {
+      throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.")
+    }
+
+    val request = metadataRequestContext.map { context =>
+      val requestVersion =
+        channelManager.controllerApiVersions() match {
+          case None =>
+            // We will rely on the Metadata request to be retried in the case
+            // that the latest version is not usable by the controller.
+            ApiKeys.CREATE_TOPICS.latestVersion()
+          case Some(nodeApiVersions) =>
+            nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+        }
+
+      // Borrow client information such as client id and correlation id from the original request,
+      // in order to correlate the create request with the original metadata request.
+      val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+        requestVersion,
+        context.clientId,
+        context.correlationId)
+      ForwardingManager.buildEnvelopeRequest(context,
+        createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))

Review comment:
       Thanks for the suggestion, I feel this is a bit overkill as we don't have any specific use case to forward other client requests right now, and feel free to propose a refactoring PR after this one is merged to discuss whether the new approach is better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r599787253



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -166,7 +179,34 @@ class DefaultAutoTopicCreationManager(
         debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
         clearInflightRequests(creatableTopics)
       }
-    })
+    }
+
+    val channelManager = this.channelManager.getOrElse {
+      throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.")
+    }
+
+    metadataRequestContext match {
+      case Some(context) =>
+        val requestVersion =
+          channelManager.controllerApiVersions() match {
+            case None =>
+              ApiKeys.CREATE_TOPICS.latestVersion()

Review comment:
       I guess we could rely on client to retry Metadata request for simplicity.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r596413282



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -77,16 +79,24 @@ class DefaultAutoTopicCreationManager(
 
   private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
 
+  /**
+   * Initiate auto topic creation for the given topics.
+   *
+   * @param requestOpt defined when creating topics on behalf of the client. The goal here is to preserve

Review comment:
       nit: if we document one parameter, we may as well document the others

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##########
@@ -101,6 +101,13 @@ public final Send toSend(RequestHeader header) {
         return SendBuilder.buildRequestSend(header, data());
     }
 
+    /**
+     * Serializes header and body without prefixing with size (unlike `toSend`, which does include a size prefix).
+     */
+    public final ByteBuffer serializeWithHeader(RequestHeader header) {

Review comment:
       It seems useful to validate that the api key and api version from the request header match that in the request object.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -166,7 +177,28 @@ class DefaultAutoTopicCreationManager(
         debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
         clearInflightRequests(creatableTopics)
       }
-    })
+    }
+
+    requestOpt match {
+      case Some(request) =>
+        val requestVersion =
+          channelManager.get.controllerApiVersions() match {
+            case None =>
+              ApiKeys.CREATE_TOPICS.latestVersion()
+            case Some(nodeApiVersions) =>
+              nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+          }
+
+        val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+          requestVersion,
+          request.context.clientId(),
+          request.context.correlationId())
+        val envelopeRequest = ForwardingManager.buildEnvelopeRequest(request,
+          createTopicsRequest.build().serializeWithHeader(requestHeader))
+        channelManager.get.sendRequest(envelopeRequest, requestCompletionHandler)

Review comment:
       nit: we can probably avoid all of these `get` calls if we add something like this above:
   ```scala
   val channelManager = this.channelManager.getOrElse {
     throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests")
   }
   ```

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -77,16 +79,24 @@ class DefaultAutoTopicCreationManager(
 
   private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
 
+  /**
+   * Initiate auto topic creation for the given topics.
+   *
+   * @param requestOpt defined when creating topics on behalf of the client. The goal here is to preserve
+   *                   original client principal for auditing, thus needing to wrap a plain CreateTopicsRequest
+   *                   inside Envelope to send to the controller when forwarding is enabled.
+   */
   override def createTopics(
     topics: Set[String],
-    controllerMutationQuota: ControllerMutationQuota
+    controllerMutationQuota: ControllerMutationQuota,
+    requestOpt: Option[RequestChannel.Request]

Review comment:
       I'm trying to come up with a better name. Maybe it's ok to be explicit about the fact that this should be a metadata request. If we change to `Option[RequestContext]`, then maybe we can just call it `metadataRequestContext` or something like that.

##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -43,6 +43,21 @@ object ForwardingManager {
   ): ForwardingManager = {
     new ForwardingManagerImpl(channelManager)
   }
+
+  private[server] def buildEnvelopeRequest(request: RequestChannel.Request,

Review comment:
       Could we use `RequestContext` as the argument instead of `Request`? As far as I can tell, that is all we need. Similarly, we can change `AutoTopicCreationManager.createTopics`.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -166,7 +177,28 @@ class DefaultAutoTopicCreationManager(
         debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
         clearInflightRequests(creatableTopics)
       }
-    })
+    }
+
+    requestOpt match {
+      case Some(request) =>
+        val requestVersion =
+          channelManager.get.controllerApiVersions() match {
+            case None =>
+              ApiKeys.CREATE_TOPICS.latestVersion()
+            case Some(nodeApiVersions) =>
+              nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+          }
+
+        val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+          requestVersion,
+          request.context.clientId(),
+          request.context.correlationId())

Review comment:
       Hmm.. It seems a little bit strange to borrow these fields from the original request, but maybe it's not a bad idea. It would potentially allow us to correlate the create request with the original metadata request. It may be worth a comment in the code of explanation.
   
   also nit: unneeded parenthesis




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r606613914



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -156,17 +169,44 @@ class DefaultAutoTopicCreationManager(
         .setTopics(topicsToCreate)
     )
 
-    channelManager.get.sendRequest(createTopicsRequest, new ControllerRequestCompletionHandler {
+    val requestCompletionHandler = new ControllerRequestCompletionHandler {
       override def onTimeout(): Unit = {
         debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
         clearInflightRequests(creatableTopics)
       }
 
       override def onComplete(response: ClientResponse): Unit = {
-        debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
+        debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody.toString}.")
         clearInflightRequests(creatableTopics)
       }
-    })
+    }
+
+    val channelManager = this.channelManager.getOrElse {
+      throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.")
+    }
+
+    val request = metadataRequestContext.map { context =>
+      val requestVersion =
+        channelManager.controllerApiVersions() match {
+          case None =>
+            // We will rely on the Metadata request to be retried in the case
+            // that the latest version is not usable by the controller.
+            ApiKeys.CREATE_TOPICS.latestVersion()
+          case Some(nodeApiVersions) =>
+            nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+        }
+
+      // Borrow client information such as client id and correlation id from the original request,
+      // in order to correlate the create request with the original metadata request.
+      val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+        requestVersion,
+        context.clientId,
+        context.correlationId)
+      ForwardingManager.buildEnvelopeRequest(context,
+        createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))

Review comment:
       How about moving `serializeWithHeader` to `RequestContext`, like this:
   RequestContext.java
   ```java
       public ByteBuffer buildRequestEnvelopePayload(ApiKeys apiKeys, short apiVersion, AbstractRequest body) {
           // Borrow client information such as client id and correlation id from the original request,
           // in order to correlate the create request with the original metadata request.
           RequestHeader requestHeader = new RequestHeader(apiKeys,
               apiVersion,
               clientId(),
               correlationId());
           return body.serializeWithHeader(requestHeader);
       }
   ```
   and we can call it like `context.buildRequestEnvelopePayload(ApiKeys.CREATE_TOPICS, requestVersion, createTopicsRequest.build(requestVersion))`.
   
   This trivial change has 2 benefits:
   1. Keep the symmetry of `RequestContext` since it already has a `buildResponseEnvelopePayload` method
   2. `buildRequestEnvelopePayload` can be reused in the future if we want to forward other client requests to the controller
   
   Please feel free to point my fault if I made any mistakes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r606089230



##########
File path: core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
##########
@@ -219,6 +225,146 @@ class AutoTopicCreationManagerTest {
     testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
   }
 
+  @Test
+  def testTopicCreationWithMetadataContextPassPrincipal(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator)
+
+    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    val topicName = "topic"
+    topicsCollection.add(getNewTopic(topicName))
+    val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
+      .setApiKey(ApiKeys.CREATE_TOPICS.id)
+      .setMinVersion(0)
+      .setMaxVersion(0)
+    Mockito.when(brokerToController.controllerApiVersions())
+      .thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
+
+    Mockito.when(controller.isActive).thenReturn(false)
+
+    val requestHeader = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion,
+      "clientId", 0)
+
+    val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
+    val principalSerde = new KafkaPrincipalSerde {
+      override def serialize(principal: KafkaPrincipal): Array[Byte] = {
+        assertEquals(principal, userPrincipal)
+        Utils.utf8(principal.toString)
+      }
+      override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
+    }
+
+    val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
+      userPrincipal, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false, Optional.of(principalSerde))
+
+    autoTopicCreationManager.createTopics(

Review comment:
       I was validating the same principal in principal serde call, see 
   ```
   assertEquals(principal, userPrincipal)
   
   ```
   will also add a boolean flag to indicate it's being called.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r599962199



##########
File path: core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
##########
@@ -263,7 +263,7 @@ class AutoTopicCreationManagerTest {
                                          topicName: String,
                                          isInternal: Boolean): Unit = {
     val topicResponses = autoTopicCreationManager.createTopics(
-      Set(topicName), UnboundedControllerMutationQuota)
+      Set(topicName), UnboundedControllerMutationQuota, None)

Review comment:
       Added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r605099066



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -166,7 +179,32 @@ class DefaultAutoTopicCreationManager(
         debug(s"Auto topic creation completed for ${creatableTopics.keys}.")

Review comment:
       This message will probably cause confusion since `ClientResponse` may still indicate an error. How about we add the response to the log message?

##########
File path: core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
##########
@@ -219,6 +223,39 @@ class AutoTopicCreationManagerTest {
     testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
   }
 
+  @Test
+  def testTopicCreationWithMetadataContext(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator)
+
+    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    val topicName = "topic"
+    topicsCollection.add(getNewTopic(topicName))
+    val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
+      .setApiKey(ApiKeys.CREATE_TOPICS.id)
+      .setMinVersion(0)
+      .setMaxVersion(0)
+    Mockito.when(brokerToController.controllerApiVersions())
+      .thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
+
+    Mockito.when(controller.isActive).thenReturn(false)
+
+    val requestHeader = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion,
+      "clientId", 0)
+    val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
+      KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+
+    // Throw upon undefined principal serde when building the forward request
+    assertThrows(classOf[IllegalArgumentException], () => autoTopicCreationManager.createTopics(

Review comment:
       I think there are few interesting cases that should be covered. This is one of them. We should also have assertions in the successful path that the principal is carried over to the envelope correctly. Another interesting case is when the envelope response indicates an UNSUPPORTED_VERSION error.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -166,7 +179,32 @@ class DefaultAutoTopicCreationManager(
         debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
         clearInflightRequests(creatableTopics)
       }
-    })
+    }
+
+    val channelManager = this.channelManager.getOrElse {
+      throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.")
+    }
+
+    val request = metadataRequestContext.map { context =>
+      val requestVersion =
+        channelManager.controllerApiVersions() match {
+          case None =>
+            ApiKeys.CREATE_TOPICS.latestVersion()

Review comment:
       Let's add a comment that we rely on the Metadata request to be retried in the case that the latest version is not usable by the controller.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r607224516



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -156,17 +169,44 @@ class DefaultAutoTopicCreationManager(
         .setTopics(topicsToCreate)
     )
 
-    channelManager.get.sendRequest(createTopicsRequest, new ControllerRequestCompletionHandler {
+    val requestCompletionHandler = new ControllerRequestCompletionHandler {
       override def onTimeout(): Unit = {
         debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
         clearInflightRequests(creatableTopics)
       }
 
       override def onComplete(response: ClientResponse): Unit = {
-        debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
+        debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody.toString}.")
         clearInflightRequests(creatableTopics)
       }
-    })
+    }
+
+    val channelManager = this.channelManager.getOrElse {
+      throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.")
+    }
+
+    val request = metadataRequestContext.map { context =>
+      val requestVersion =
+        channelManager.controllerApiVersions() match {
+          case None =>
+            // We will rely on the Metadata request to be retried in the case
+            // that the latest version is not usable by the controller.
+            ApiKeys.CREATE_TOPICS.latestVersion()
+          case Some(nodeApiVersions) =>
+            nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+        }
+
+      // Borrow client information such as client id and correlation id from the original request,
+      // in order to correlate the create request with the original metadata request.
+      val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+        requestVersion,
+        context.clientId,
+        context.correlationId)
+      ForwardingManager.buildEnvelopeRequest(context,
+        createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))

Review comment:
       @dengziming It's not a bad idea. We could even simplify it a little since the api key and version can be obtained from the request. I tend to agree that this is kind of a niche usage though, so I'm not sure it calls for the generality. Perhaps you could submit a follow-up once this is merged and we can see what it looks like.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r607216022



##########
File path: core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
##########
@@ -219,6 +225,146 @@ class AutoTopicCreationManagerTest {
     testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
   }
 
+  @Test
+  def testTopicCreationWithMetadataContextPassPrincipal(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator)
+
+    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    val topicName = "topic"
+    topicsCollection.add(getNewTopic(topicName))
+    val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
+      .setApiKey(ApiKeys.CREATE_TOPICS.id)
+      .setMinVersion(0)
+      .setMaxVersion(0)
+    Mockito.when(brokerToController.controllerApiVersions())
+      .thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
+
+    Mockito.when(controller.isActive).thenReturn(false)
+
+    val requestHeader = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion,
+      "clientId", 0)
+
+    val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
+    val principalSerde = new KafkaPrincipalSerde {
+      override def serialize(principal: KafkaPrincipal): Array[Byte] = {
+        assertEquals(principal, userPrincipal)
+        Utils.utf8(principal.toString)
+      }
+      override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
+    }
+
+    val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
+      userPrincipal, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false, Optional.of(principalSerde))
+
+    autoTopicCreationManager.createTopics(

Review comment:
       This ensures that an attempt was made to serialize the principal, but how do we know that it made it to the envelope? I think it would be better to intercept the request that is sent to the channel and verify it directly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r606008081



##########
File path: core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
##########
@@ -219,6 +225,146 @@ class AutoTopicCreationManagerTest {
     testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
   }
 
+  @Test
+  def testTopicCreationWithMetadataContextPassPrincipal(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator)
+
+    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    val topicName = "topic"
+    topicsCollection.add(getNewTopic(topicName))
+    val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
+      .setApiKey(ApiKeys.CREATE_TOPICS.id)
+      .setMinVersion(0)
+      .setMaxVersion(0)
+    Mockito.when(brokerToController.controllerApiVersions())
+      .thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
+
+    Mockito.when(controller.isActive).thenReturn(false)
+
+    val requestHeader = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion,
+      "clientId", 0)
+
+    val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
+    val principalSerde = new KafkaPrincipalSerde {
+      override def serialize(principal: KafkaPrincipal): Array[Byte] = {
+        assertEquals(principal, userPrincipal)
+        Utils.utf8(principal.toString)
+      }
+      override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
+    }
+
+    val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
+      userPrincipal, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false, Optional.of(principalSerde))
+
+    autoTopicCreationManager.createTopics(

Review comment:
       Maybe I am missing it, but where is the validation? How do we know that the envelope includes the serialized principal? Maybe we could verify the call to `brokerToController.sendRequest`?

##########
File path: core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
##########
@@ -219,6 +225,146 @@ class AutoTopicCreationManagerTest {
     testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
   }
 
+  @Test
+  def testTopicCreationWithMetadataContextPassPrincipal(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator)
+
+    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    val topicName = "topic"
+    topicsCollection.add(getNewTopic(topicName))
+    val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
+      .setApiKey(ApiKeys.CREATE_TOPICS.id)
+      .setMinVersion(0)
+      .setMaxVersion(0)
+    Mockito.when(brokerToController.controllerApiVersions())
+      .thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
+
+    Mockito.when(controller.isActive).thenReturn(false)
+
+    val requestHeader = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion,
+      "clientId", 0)
+
+    val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
+    val principalSerde = new KafkaPrincipalSerde {
+      override def serialize(principal: KafkaPrincipal): Array[Byte] = {
+        assertEquals(principal, userPrincipal)
+        Utils.utf8(principal.toString)
+      }
+      override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
+    }
+
+    val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
+      userPrincipal, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false, Optional.of(principalSerde))
+
+    autoTopicCreationManager.createTopics(
+      Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
+  }
+
+  @Test
+  def testTopicCreationWithMetadataContextWhenPrincipalSerdeNotDefined(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator)
+
+    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    val topicName = "topic"
+    topicsCollection.add(getNewTopic(topicName))
+    val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
+      .setApiKey(ApiKeys.CREATE_TOPICS.id)
+      .setMinVersion(0)
+      .setMaxVersion(0)
+    Mockito.when(brokerToController.controllerApiVersions())
+      .thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
+
+    Mockito.when(controller.isActive).thenReturn(false)
+
+    val requestHeader = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion,
+      "clientId", 0)
+
+    val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
+      KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+
+    // Throw upon undefined principal serde when building the forward request
+    assertThrows(classOf[IllegalArgumentException], () => autoTopicCreationManager.createTopics(
+      Set(topicName), UnboundedControllerMutationQuota, Some(requestContext)))
+  }
+
+  @Test
+  def testTopicCreationWithMetadataContextNoRetryUponUnsupportedVersion(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator)
+
+    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection

Review comment:
       nit: some helpers would improve readability of these tests. It looks like the 3 new tests share a lot of this initialization logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org