You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/09 17:53:03 UTC

[GitHub] [kafka] abbccdda opened a new pull request #9579: KAFKA-9751: Forward FindCoordinator request when topic creation is needed

abbccdda opened a new pull request #9579:
URL: https://github.com/apache/kafka/pull/9579


   This PR forward the entire FindCoordinator request to the active controller when the internal topic being queried is not ready to be served yet.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#issuecomment-774499237


   I triggered a new build. Is there a reason why you aborted the previous one @abbccdda?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#issuecomment-774326674


   @abbccdda Thanks for the updates. I opened a PR with a few fixes to speed this along since we're trying to get it checked in today: https://github.com/abbccdda/kafka/pull/6. The tests that were previously failing now seem to be passing (at least when testing locally).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561451602



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {

Review comment:
       In the case of forwarding, maybe we can let the controller decide if there are enough alive brokers.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {
+          if (shouldForwardRequest(request)) {
+            forwardingManager.sendInterBrokerRequest(
+              getCreateTopicsRequest(Seq(internalTopicName)),
+              _ => ())
+          } else {
+            val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+            val topicConfigs = Map(internalTopicName -> getTopicConfigs(internalTopicName))
+            adminManager.createTopics(
+              config.requestTimeoutMs,
+              validateOnly = false,
+              topicConfigs,
+              Map.empty,
+              controllerMutationQuota,
+              _ => ())
+          }
         }
-        val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
-        } else {
-          val coordinatorEndpoint = topicMetadata.partitions.asScala
-            .find(_.partitionIndex == partition)
-            .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
-            .flatMap(_.getNode(request.context.listenerName))
-            .filterNot(_.isEmpty)
-
-          coordinatorEndpoint match {
-            case Some(endpoint) =>
-              createFindCoordinatorResponse(Errors.NONE, endpoint)
-            case _ =>
-              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse(
+          Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+      } else {
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) {
+            createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+          } else {
+            val coordinatorEndpoint = topicMetadata.head.partitions.asScala
+              .find(_.partitionIndex == partition)
+              .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
+              .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
+              .flatMap(_.getNode(request.context.listenerName))
+              .filterNot(_.isEmpty)
+
+            coordinatorEndpoint match {
+              case Some(endpoint) =>
+                createFindCoordinatorResponse(Errors.NONE, endpoint, requestThrottleMs)
+              case _ =>
+                createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+            }
           }
+          trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+            .format(responseBody, request.header.correlationId, request.header.clientId))
+          responseBody
         }
-        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
+
+        requestHelper.sendResponseMaybeThrottle(request, createResponse)
+      }
+    }
+  }
+
+  private def getCreateTopicsRequest(topics: Seq[String]): CreateTopicsRequest.Builder = {
+    val topicCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    topics.foreach(topic => {
+      topicCollection.add(getTopicConfigs(topic))
+    })
+
+    new CreateTopicsRequest.Builder(
+      new CreateTopicsRequestData()
+        .setTimeoutMs(config.requestTimeoutMs)
+        .setTopics(topicCollection)
+    )
+  }
+
+  private def getTopicConfigs(topic: String): CreatableTopic = {
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.offsetsTopicPartitions)

Review comment:
       The controller should have these configurations as well. Perhaps it is better to use -1 for this and replication factor and let the controller fill them in?

##########
File path: core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala
##########
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.server
+
+import java.util.Properties
+
+import kafka.server.{KafkaConfig, MetadataRequestTest}
+import org.junit.jupiter.api.Test
+
+class MetadataRequestWithForwardingTest extends MetadataRequestTest {
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    super.brokerPropertyOverrides(properties)
+    properties.put(KafkaConfig.EnableMetadataQuorumProp, true.toString)
+  }
+
+  @Test
+  override def testAutoTopicCreation(): Unit = {
+    super.testAutoTopicCreation()
+  }
+
+  @Test
+  override def testAutoCreateOfCollidingTopics(): Unit = {
+    super.testAutoCreateOfCollidingTopics()
+  }
+
+  @Test
+  override def testAutoCreateTopicWithInvalidReplicationFactor(): Unit = {
+    super.testAutoCreateTopicWithInvalidReplicationFactor()
+  }
+
+  /* the rest of tests are not enabled */

Review comment:
       An alternative that we have done elsewhere would be to introduce an `AbstractMetadataRequestTest` which we can pull the common cases up to. A more elegant option might be to figure out how to use `@ParameterizedTest` so that we can provide config overrides. This would be a little difficult at the moment because we initialize brokers in a `@Before` method. Probably means we need to move away from this approach long term. For now, the abstract class seems preferable. Similar for `CreateTopicsRequestWithForwardingTest`.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1110,82 +1098,39 @@ class KafkaApis(val requestChannel: RequestChannel,
       .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-    if (topic == null)
-      throw new IllegalArgumentException("topic must not be null")
-
-    val aliveBrokers = metadataCache.getAliveBrokers
-
-    topic match {
-      case GROUP_METADATA_TOPIC_NAME =>
-        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
-            s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
-            groupCoordinator.offsetsTopicConfigs)
-        }
-      case TRANSACTION_STATE_TOPIC_NAME =>
-        if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
-            s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
-            txnCoordinator.transactionTopicConfigs)
-        }
-      case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic")
-    }
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
-    topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, isFetchAllMetadata: Boolean,
-                               topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(allowAutoTopicCreation: Boolean,
+                               isFetchAllMetadata: Boolean,
+                               topics: Set[String],
+                               listenerName: ListenerName,
                                errorUnavailableEndpoints: Boolean,
-                               errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = {
+                               errorUnavailableListeners: Boolean): (Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
     val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
         errorUnavailableEndpoints, errorUnavailableListeners)
 
     if (topics.isEmpty || topicResponses.size == topics.size) {
-      topicResponses
+      (topicResponses, Seq.empty[MetadataResponseTopic])
     } else {
       val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
       val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-        if (isInternal(topic)) {
-          val topicMetadata = createInternalTopic(topic)
-          Some(
-            if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
-              metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
-            else
-              topicMetadata
-          )
-        } else if (isFetchAllMetadata) {
+       if (isFetchAllMetadata) {
           // A metadata request for all topics should never result in topic auto creation, but a topic may be deleted
           // in between the creation of the topics parameter and topicResponses, so make sure to return None for this case.
           None
-        } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
-          Some(createTopic(topic, config.numPartitions, config.defaultReplicationFactor))
-        } else {
-          Some(metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList()))
+       } else {
+        Some(metadataResponseTopic(

Review comment:
       nit: seems misaligned

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1110,82 +1098,39 @@ class KafkaApis(val requestChannel: RequestChannel,
       .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-    if (topic == null)
-      throw new IllegalArgumentException("topic must not be null")
-
-    val aliveBrokers = metadataCache.getAliveBrokers
-
-    topic match {
-      case GROUP_METADATA_TOPIC_NAME =>
-        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
-            s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
-            groupCoordinator.offsetsTopicConfigs)
-        }
-      case TRANSACTION_STATE_TOPIC_NAME =>
-        if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
-            s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
-            txnCoordinator.transactionTopicConfigs)
-        }
-      case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic")
-    }
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
-    topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, isFetchAllMetadata: Boolean,
-                               topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(allowAutoTopicCreation: Boolean,
+                               isFetchAllMetadata: Boolean,
+                               topics: Set[String],
+                               listenerName: ListenerName,
                                errorUnavailableEndpoints: Boolean,
-                               errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = {
+                               errorUnavailableListeners: Boolean): (Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
     val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
         errorUnavailableEndpoints, errorUnavailableListeners)
 
     if (topics.isEmpty || topicResponses.size == topics.size) {
-      topicResponses
+      (topicResponses, Seq.empty[MetadataResponseTopic])
     } else {
       val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
       val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-        if (isInternal(topic)) {
-          val topicMetadata = createInternalTopic(topic)
-          Some(
-            if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
-              metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
-            else
-              topicMetadata
-          )
-        } else if (isFetchAllMetadata) {
+       if (isFetchAllMetadata) {
           // A metadata request for all topics should never result in topic auto creation, but a topic may be deleted
           // in between the creation of the topics parameter and topicResponses, so make sure to return None for this case.
           None
-        } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
-          Some(createTopic(topic, config.numPartitions, config.defaultReplicationFactor))
-        } else {
-          Some(metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList()))
+       } else {
+        Some(metadataResponseTopic(
+          if (!hasEnoughAliveBrokers(topic))
+            Errors.INVALID_REPLICATION_FACTOR
+          else if (allowAutoTopicCreation && config.autoCreateTopicsEnable)
+            Errors.LEADER_NOT_AVAILABLE

Review comment:
       Hmm.. In the old logic, we would attempt topic creation through zookeeper first. Then, if the topic was created successfully, we would return LEADER_NOT_AVAILABLE to give time for the controller to elect a leader. Now we return LEADER_NOT_AVAILABLE immediately and we send the CreateTopic request asynchronously. We don't know if the CreateTopic request will ultimately succeed or not. Perhaps it would be better to keep returning `UNKNOWN_TOPIC_OR_PARTITION` until we see that the topic exists.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     (responseTopics ++ unauthorizedResponseStatus).toList
   }
 
-  private def createTopic(topic: String,
-                          numPartitions: Int,
-                          replicationFactor: Int,
-                          properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
-    try {
-      adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
-      info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
-        .format(topic, numPartitions, replicationFactor))
-      metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
-    } catch {
-      case _: TopicExistsException => // let it go, possibly another broker created this topic

Review comment:
       We seem to have lost this handling or am I missing something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r569060455



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}

Review comment:
       nit: do we need default implementations?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1113,82 +1097,36 @@ class KafkaApis(val requestChannel: RequestChannel,
       .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-    if (topic == null)
-      throw new IllegalArgumentException("topic must not be null")
-
-    val aliveBrokers = metadataCache.getAliveBrokers
-
-    topic match {
-      case GROUP_METADATA_TOPIC_NAME =>
-        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
-            s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
-            groupCoordinator.offsetsTopicConfigs)
-        }
-      case TRANSACTION_STATE_TOPIC_NAME =>
-        if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
-            s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
-            txnCoordinator.transactionTopicConfigs)
-        }
-      case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic")
-    }
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
-    topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, isFetchAllMetadata: Boolean,
-                               topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(isFetchAllMetadata: Boolean,
+                               topics: Set[String],
+                               listenerName: ListenerName,
                                errorUnavailableEndpoints: Boolean,
-                               errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = {
+                               errorUnavailableListeners: Boolean): (Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
     val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
         errorUnavailableEndpoints, errorUnavailableListeners)
 
     if (topics.isEmpty || topicResponses.size == topics.size) {
-      topicResponses
+      (topicResponses, Seq.empty[MetadataResponseTopic])
     } else {
       val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
       val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-        if (isInternal(topic)) {
-          val topicMetadata = createInternalTopic(topic)
-          Some(
-            if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
-              metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
-            else
-              topicMetadata
-          )
-        } else if (isFetchAllMetadata) {
+       if (isFetchAllMetadata) {

Review comment:
       Hmm, I agree the ordering of these checks was weird. So if `isFetchAllMetadata` is set, then `responsesForNonExistentTopics` will be empty and we will return `(topicResponses, Seq.empty[MetadataResponseTopic])`. Does that mean we can add this check to the first clause?
   ```scala 
     if (isFetchAllMetadata || topics.isEmpty || topicResponses.size == topics.size) {
          (topicResponses, Seq.empty[MetadataResponseTopic])
   ```

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {
+    val topicConfigs = topics
+      .filter(topic => !inflightTopics.contains(topic.name()))
+      .map(topic => {(topic.name(), topic)}).toMap

Review comment:
       nit: unnecessary braces

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
     // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
     // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
     val errorUnavailableListeners = requestVersion >= 6
-    val topicMetadata =
+    val (topicMetadata, nonExistTopicMetadata) =
       if (authorizedTopics.isEmpty)
-        Seq.empty[MetadataResponseTopic]
-      else {
-        getTopicMetadata(
-          metadataRequest.allowAutoTopicCreation,
-          metadataRequest.isAllTopics,
-          authorizedTopics,
-          request.context.listenerName,
-          errorUnavailableEndpoints,
-          errorUnavailableListeners
-        )
+        (Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+      else
+        getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+          request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)
+
+    nonExistTopicMetadata.foreach(metadata =>
+      try {
+        // Validate topic name and propagate error if failed
+        Topic.validate(metadata.name())
+      } catch {
+        case e: Exception =>
+          metadata.setErrorCode(Errors.forException(e).code)
       }
+    )
+
+    if (nonExistTopicMetadata.nonEmpty && metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable) {
+      val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+      autoTopicCreationManager.createTopics(
+        nonExistTopicMetadata.map(metadata => getTopicConfigs(metadata.name())).toSet, controllerMutationQuota)

Review comment:
       Not sure if I am missing something, but doesn't `nonExistTopicMetadata` include topics which failed the `Topic.validate` check above?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {

Review comment:
       Ok. My thought is to reduce the reliance on the broker metadata. We will need the replication factor check on the controller anyway, so I'm not sure it is worth optimizing for the case when the cluster is initializing.
   
   Perhaps at least we can move this validation into `AutoTopicCreationManager` to better encapsulate the logic.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(

Review comment:
       Let's leave this for a follow-up, but just want to mention that it is probably better if we can reuse the same `BrokerToControllerChannelManager` as `ForwardingManager`. Can you file a JIRA for a follow-up?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {

Review comment:
       Maybe I'm missing it, but where is this argument used?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {

Review comment:
       Pre-existing issue. `CoordinatorType.forId` returns `IllegalArgumentException` if the key type is unknown. That will get translated to UNKNOWN_SERVER_ERROR. It would be better to return INVALID_REQUEST.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      if (topicMetadata.headOption.isEmpty) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {
+          val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+          autoTopicCreationManager.createTopics(
+            Seq(getTopicConfigs(internalTopicName)).toSet, controllerMutationQuota)
         }
-        val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
-        } else {
-          val coordinatorEndpoint = topicMetadata.partitions.asScala
-            .find(_.partitionIndex == partition)
-            .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
-            .flatMap(_.getNode(request.context.listenerName))
-            .filterNot(_.isEmpty)
-
-          coordinatorEndpoint match {
-            case Some(endpoint) =>
-              createFindCoordinatorResponse(Errors.NONE, endpoint)
-            case _ =>
-              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse(
+          Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+      } else {
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) {
+            createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+          } else {
+            val coordinatorEndpoint = topicMetadata.head.partitions.asScala
+              .find(_.partitionIndex == partition)
+              .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
+              .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
+              .flatMap(_.getNode(request.context.listenerName))
+              .filterNot(_.isEmpty)
+
+            coordinatorEndpoint match {
+              case Some(endpoint) =>
+                createFindCoordinatorResponse(Errors.NONE, endpoint, requestThrottleMs)
+              case _ =>
+                createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+            }
           }
+          trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+            .format(responseBody, request.header.correlationId, request.header.clientId))
+          responseBody
         }
-        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
+
+        requestHelper.sendResponseMaybeThrottle(request, createResponse)
       }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    }
+  }
+
+  private def getTopicConfigs(topic: String): CreatableTopic = {
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.offsetsTopicPartitions)
+          .setReplicationFactor(config.offsetsTopicReplicationFactor)
+          .setConfigs(convertToTopicConfigCollections(groupCoordinator.offsetsTopicConfigs))
+      case TRANSACTION_STATE_TOPIC_NAME =>
+        new CreatableTopic()

Review comment:
       Feels to me like we would benefit by moving this logic into `AutoTopicCreationManager`. The configuration for auto-created topics can always be derived from the broker configuration. Hence we could simplify the interface by letting it take only the topic names. The advantage is that we can move all of this logic out of `KafkaApis` (which is now up to 3500 LOC).

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      if (topicMetadata.headOption.isEmpty) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {
+          val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+          autoTopicCreationManager.createTopics(
+            Seq(getTopicConfigs(internalTopicName)).toSet, controllerMutationQuota)
         }
-        val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
-        } else {
-          val coordinatorEndpoint = topicMetadata.partitions.asScala
-            .find(_.partitionIndex == partition)
-            .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
-            .flatMap(_.getNode(request.context.listenerName))
-            .filterNot(_.isEmpty)
-
-          coordinatorEndpoint match {
-            case Some(endpoint) =>
-              createFindCoordinatorResponse(Errors.NONE, endpoint)
-            case _ =>
-              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse(
+          Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+      } else {
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) {
+            createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+          } else {
+            val coordinatorEndpoint = topicMetadata.head.partitions.asScala
+              .find(_.partitionIndex == partition)
+              .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
+              .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
+              .flatMap(_.getNode(request.context.listenerName))
+              .filterNot(_.isEmpty)
+
+            coordinatorEndpoint match {
+              case Some(endpoint) =>
+                createFindCoordinatorResponse(Errors.NONE, endpoint, requestThrottleMs)
+              case _ =>
+                createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+            }
           }
+          trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+            .format(responseBody, request.header.correlationId, request.header.clientId))
+          responseBody
         }
-        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
+
+        requestHelper.sendResponseMaybeThrottle(request, createResponse)
       }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    }
+  }
+
+  private def getTopicConfigs(topic: String): CreatableTopic = {
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.offsetsTopicPartitions)
+          .setReplicationFactor(config.offsetsTopicReplicationFactor)
+          .setConfigs(convertToTopicConfigCollections(groupCoordinator.offsetsTopicConfigs))
+      case TRANSACTION_STATE_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.transactionTopicPartitions)
+          .setReplicationFactor(config.transactionTopicReplicationFactor)
+          .setConfigs(convertToTopicConfigCollections(
+            txnCoordinator.transactionTopicConfigs))
+      case topicName =>
+        new CreatableTopic()
+          .setName(topicName)
+          .setNumPartitions(config.numPartitions)
+          .setReplicationFactor(config.defaultReplicationFactor.shortValue)
+    }
+  }
+
+  private def convertToTopicConfigCollections(config: Properties): CreateableTopicConfigCollection = {
+    val topicConfigs = new CreateableTopicConfigCollection()
+    config.forEach {
+      case (name, value) =>
+        topicConfigs.add(new CreateableTopicConfig()
+          .setName(name.toString)
+          .setValue(value.toString))
+    }
+    topicConfigs
+  }
+
+  private def hasEnoughAliveBrokers(topic: String): Boolean = {
+    if (topic == null)
+      throw new IllegalArgumentException("topic must not be null")
+
+    val aliveBrokers = metadataCache.getAliveBrokers
+
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
+          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +

Review comment:
       The differences between these log lines are minor. Can we factor out a helper?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))

Review comment:
       nit: is `error.message` actually useful to send back? It doesn't provide any information beyond the error code. Could we just use `errorMessage.orNull`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
     // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
     // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
     val errorUnavailableListeners = requestVersion >= 6
-    val topicMetadata =
+    val (topicMetadata, nonExistTopicMetadata) =
       if (authorizedTopics.isEmpty)
-        Seq.empty[MetadataResponseTopic]
-      else {
-        getTopicMetadata(
-          metadataRequest.allowAutoTopicCreation,
-          metadataRequest.isAllTopics,
-          authorizedTopics,
-          request.context.listenerName,
-          errorUnavailableEndpoints,
-          errorUnavailableListeners
-        )
+        (Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+      else
+        getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+          request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)
+
+    nonExistTopicMetadata.foreach(metadata =>
+      try {
+        // Validate topic name and propagate error if failed
+        Topic.validate(metadata.name())

Review comment:
       Is the idea to validate before sending the CreateTopic? Could we move this to `AutoTopicManager`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
     // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
     // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
     val errorUnavailableListeners = requestVersion >= 6
-    val topicMetadata =
+    val (topicMetadata, nonExistTopicMetadata) =
       if (authorizedTopics.isEmpty)
-        Seq.empty[MetadataResponseTopic]
-      else {
-        getTopicMetadata(
-          metadataRequest.allowAutoTopicCreation,
-          metadataRequest.isAllTopics,
-          authorizedTopics,
-          request.context.listenerName,
-          errorUnavailableEndpoints,
-          errorUnavailableListeners
-        )
+        (Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+      else
+        getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+          request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)
+
+    nonExistTopicMetadata.foreach(metadata =>

Review comment:
       nit: usually we write as
   ```scala
   nonExistTopicMetadata.foreach { metadata =>
   ```

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {
+    val topicConfigs = topics
+      .filter(topic => !inflightTopics.contains(topic.name()))

Review comment:
       nit: there are a few of these throughout, but the parenthesis are unnecessary

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {
+    val topicConfigs = topics
+      .filter(topic => !inflightTopics.contains(topic.name()))
+      .map(topic => {(topic.name(), topic)}).toMap
+
+    if (topicConfigs.nonEmpty) {
+      if (!controller.isActive && channelManager.isDefined) {
+        // Mark the topics as inflight during auto creation through forwarding.
+        topicConfigs.foreach(config => inflightTopics.put(config._1, config._2))
+
+        val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection
+        topicConfigs.foreach(config => topicsToCreate.add(config._2))
+        val createTopicsRequest = new CreateTopicsRequest.Builder(
+          new CreateTopicsRequestData()
+            .setTimeoutMs(requestTimeout)
+            .setTopics(topicsToCreate)
+        )
+
+        channelManager.get.sendRequest(createTopicsRequest, new ControllerRequestCompletionHandler {
+          override def onTimeout(): Unit = {
+            clearInflightRequests(topicConfigs)
+          }
+
+          override def onComplete(response: ClientResponse): Unit = {
+            clearInflightRequests(topicConfigs)
+          }
+        })
+      } else {
+        adminManager.createTopics(
+          requestTimeout,
+          validateOnly = false,
+          topicConfigs,
+          Map.empty,
+          controllerMutationQuota,
+          _ => ())
+      }
+    } else {
+      debug(s"Topics $topics are under creation already, skip sending additional " +

Review comment:
       We only get this message if _all_ of the topics are already inflight. Perhaps it is still useful if only some of them are inflight?

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]

Review comment:
       Could this be a set? As far as I can tell, we do not rely on the value.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {
+    val topicConfigs = topics
+      .filter(topic => !inflightTopics.contains(topic.name()))
+      .map(topic => {(topic.name(), topic)}).toMap
+
+    if (topicConfigs.nonEmpty) {
+      if (!controller.isActive && channelManager.isDefined) {
+        // Mark the topics as inflight during auto creation through forwarding.
+        topicConfigs.foreach(config => inflightTopics.put(config._1, config._2))
+
+        val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection
+        topicConfigs.foreach(config => topicsToCreate.add(config._2))
+        val createTopicsRequest = new CreateTopicsRequest.Builder(
+          new CreateTopicsRequestData()
+            .setTimeoutMs(requestTimeout)
+            .setTopics(topicsToCreate)
+        )
+
+        channelManager.get.sendRequest(createTopicsRequest, new ControllerRequestCompletionHandler {

Review comment:
       Probably useful to have some logging when we send the CreateTopic request and in the callbacks.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {

Review comment:
       Eventually we need to figure out quota behavior for forwarded requests. I am wondering if it makes sense to apply the quota on each broker separately before sending the `CreateTopic` to the controller or if we rely on the controller exclusively.
   
   cc @dajac 

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int

Review comment:
       nit: requestTimeoutMs?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {
+          if (shouldForwardRequest(request)) {
+            forwardingManager.sendInterBrokerRequest(
+              getCreateTopicsRequest(Seq(internalTopicName)),
+              _ => ())
+          } else {
+            val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+            val topicConfigs = Map(internalTopicName -> getTopicConfigs(internalTopicName))
+            adminManager.createTopics(
+              config.requestTimeoutMs,
+              validateOnly = false,
+              topicConfigs,
+              Map.empty,
+              controllerMutationQuota,
+              _ => ())
+          }
         }
-        val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
-        } else {
-          val coordinatorEndpoint = topicMetadata.partitions.asScala
-            .find(_.partitionIndex == partition)
-            .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
-            .flatMap(_.getNode(request.context.listenerName))
-            .filterNot(_.isEmpty)
-
-          coordinatorEndpoint match {
-            case Some(endpoint) =>
-              createFindCoordinatorResponse(Errors.NONE, endpoint)
-            case _ =>
-              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse(
+          Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+      } else {
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) {
+            createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+          } else {
+            val coordinatorEndpoint = topicMetadata.head.partitions.asScala
+              .find(_.partitionIndex == partition)
+              .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
+              .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
+              .flatMap(_.getNode(request.context.listenerName))
+              .filterNot(_.isEmpty)
+
+            coordinatorEndpoint match {
+              case Some(endpoint) =>
+                createFindCoordinatorResponse(Errors.NONE, endpoint, requestThrottleMs)
+              case _ =>
+                createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+            }
           }
+          trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+            .format(responseBody, request.header.correlationId, request.header.clientId))
+          responseBody
         }
-        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
+
+        requestHelper.sendResponseMaybeThrottle(request, createResponse)
+      }
+    }
+  }
+
+  private def getCreateTopicsRequest(topics: Seq[String]): CreateTopicsRequest.Builder = {
+    val topicCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    topics.foreach(topic => {
+      topicCollection.add(getTopicConfigs(topic))
+    })
+
+    new CreateTopicsRequest.Builder(
+      new CreateTopicsRequestData()
+        .setTimeoutMs(config.requestTimeoutMs)
+        .setTopics(topicCollection)
+    )
+  }
+
+  private def getTopicConfigs(topic: String): CreatableTopic = {
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.offsetsTopicPartitions)

Review comment:
       I don't think it is equivalent, at least not completely. My thought was to reduce the reliance on the broker's configuration, which is more likely to be stale than the controller. This actually raises an interesting question about the `CreateTopic` API which I had not thought of before. If we receive a `CreateTopic` request for an internal topic, which configuration should we use? Currently it looks like we will apply the standard topic defaults, but that does not seem right. I filed https://issues.apache.org/jira/browse/KAFKA-12280, so we can consider this later.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     (responseTopics ++ unauthorizedResponseStatus).toList
   }
 
-  private def createTopic(topic: String,
-                          numPartitions: Int,
-                          replicationFactor: Int,
-                          properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
-    try {
-      adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
-      info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
-        .format(topic, numPartitions, replicationFactor))
-      metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
-    } catch {
-      case _: TopicExistsException => // let it go, possibly another broker created this topic

Review comment:
       Inside `ZkAdminManager.createTopics`, I see that we catch `TopicExistsException`. However, I do not see any logic to translate this to `LEADER_NOT_AVAILABLE`. Can you show me where this happens?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji edited a comment on pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
hachikuji edited a comment on pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#issuecomment-772283780


   @abbccdda I had one additional thought here. When we receive a normal `CreateTopic` request, we forward it to the controller through the `Envelope` request. This allows us to preserve the original client principal, which is useful for auditing. We are losing that here for auto-created topics, which is unfortunate. One idea I was thinking about is whether we can wrap the `CreateTopics` request for the auto-created topic in an `Envelope` in order to preserve the client principal. So basically we take the implicit topic creation request from the client and turn it into an explicit forwarded request on behalf of the client.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r569912931



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))

Review comment:
       Makes sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r563003389



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1110,82 +1098,39 @@ class KafkaApis(val requestChannel: RequestChannel,
       .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-    if (topic == null)
-      throw new IllegalArgumentException("topic must not be null")
-
-    val aliveBrokers = metadataCache.getAliveBrokers
-
-    topic match {
-      case GROUP_METADATA_TOPIC_NAME =>
-        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
-            s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
-            groupCoordinator.offsetsTopicConfigs)
-        }
-      case TRANSACTION_STATE_TOPIC_NAME =>
-        if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
-            s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
-            txnCoordinator.transactionTopicConfigs)
-        }
-      case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic")
-    }
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
-    topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, isFetchAllMetadata: Boolean,
-                               topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(allowAutoTopicCreation: Boolean,
+                               isFetchAllMetadata: Boolean,
+                               topics: Set[String],
+                               listenerName: ListenerName,
                                errorUnavailableEndpoints: Boolean,
-                               errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = {
+                               errorUnavailableListeners: Boolean): (Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
     val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
         errorUnavailableEndpoints, errorUnavailableListeners)
 
     if (topics.isEmpty || topicResponses.size == topics.size) {
-      topicResponses
+      (topicResponses, Seq.empty[MetadataResponseTopic])
     } else {
       val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
       val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-        if (isInternal(topic)) {
-          val topicMetadata = createInternalTopic(topic)
-          Some(
-            if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
-              metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
-            else
-              topicMetadata
-          )
-        } else if (isFetchAllMetadata) {
+       if (isFetchAllMetadata) {
           // A metadata request for all topics should never result in topic auto creation, but a topic may be deleted
           // in between the creation of the topics parameter and topicResponses, so make sure to return None for this case.
           None
-        } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
-          Some(createTopic(topic, config.numPartitions, config.defaultReplicationFactor))
-        } else {
-          Some(metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList()))
+       } else {
+        Some(metadataResponseTopic(
+          if (!hasEnoughAliveBrokers(topic))
+            Errors.INVALID_REPLICATION_FACTOR
+          else if (allowAutoTopicCreation && config.autoCreateTopicsEnable)
+            Errors.LEADER_NOT_AVAILABLE

Review comment:
       That makes sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561451602



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {

Review comment:
       In the case of forwarding, maybe we can let the controller decide if there are enough alive brokers.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {
+          if (shouldForwardRequest(request)) {
+            forwardingManager.sendInterBrokerRequest(
+              getCreateTopicsRequest(Seq(internalTopicName)),
+              _ => ())
+          } else {
+            val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+            val topicConfigs = Map(internalTopicName -> getTopicConfigs(internalTopicName))
+            adminManager.createTopics(
+              config.requestTimeoutMs,
+              validateOnly = false,
+              topicConfigs,
+              Map.empty,
+              controllerMutationQuota,
+              _ => ())
+          }
         }
-        val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
-        } else {
-          val coordinatorEndpoint = topicMetadata.partitions.asScala
-            .find(_.partitionIndex == partition)
-            .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
-            .flatMap(_.getNode(request.context.listenerName))
-            .filterNot(_.isEmpty)
-
-          coordinatorEndpoint match {
-            case Some(endpoint) =>
-              createFindCoordinatorResponse(Errors.NONE, endpoint)
-            case _ =>
-              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse(
+          Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+      } else {
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) {
+            createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+          } else {
+            val coordinatorEndpoint = topicMetadata.head.partitions.asScala
+              .find(_.partitionIndex == partition)
+              .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
+              .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
+              .flatMap(_.getNode(request.context.listenerName))
+              .filterNot(_.isEmpty)
+
+            coordinatorEndpoint match {
+              case Some(endpoint) =>
+                createFindCoordinatorResponse(Errors.NONE, endpoint, requestThrottleMs)
+              case _ =>
+                createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+            }
           }
+          trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+            .format(responseBody, request.header.correlationId, request.header.clientId))
+          responseBody
         }
-        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
+
+        requestHelper.sendResponseMaybeThrottle(request, createResponse)
+      }
+    }
+  }
+
+  private def getCreateTopicsRequest(topics: Seq[String]): CreateTopicsRequest.Builder = {
+    val topicCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    topics.foreach(topic => {
+      topicCollection.add(getTopicConfigs(topic))
+    })
+
+    new CreateTopicsRequest.Builder(
+      new CreateTopicsRequestData()
+        .setTimeoutMs(config.requestTimeoutMs)
+        .setTopics(topicCollection)
+    )
+  }
+
+  private def getTopicConfigs(topic: String): CreatableTopic = {
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.offsetsTopicPartitions)

Review comment:
       The controller should have these configurations as well. Perhaps it is better to use -1 for this and replication factor and let the controller fill them in?

##########
File path: core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala
##########
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.server
+
+import java.util.Properties
+
+import kafka.server.{KafkaConfig, MetadataRequestTest}
+import org.junit.jupiter.api.Test
+
+class MetadataRequestWithForwardingTest extends MetadataRequestTest {
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    super.brokerPropertyOverrides(properties)
+    properties.put(KafkaConfig.EnableMetadataQuorumProp, true.toString)
+  }
+
+  @Test
+  override def testAutoTopicCreation(): Unit = {
+    super.testAutoTopicCreation()
+  }
+
+  @Test
+  override def testAutoCreateOfCollidingTopics(): Unit = {
+    super.testAutoCreateOfCollidingTopics()
+  }
+
+  @Test
+  override def testAutoCreateTopicWithInvalidReplicationFactor(): Unit = {
+    super.testAutoCreateTopicWithInvalidReplicationFactor()
+  }
+
+  /* the rest of tests are not enabled */

Review comment:
       An alternative that we have done elsewhere would be to introduce an `AbstractMetadataRequestTest` which we can pull the common cases up to. A more elegant option might be to figure out how to use `@ParameterizedTest` so that we can provide config overrides. This would be a little difficult at the moment because we initialize brokers in a `@Before` method. Probably means we need to move away from this approach long term. For now, the abstract class seems preferable. Similar for `CreateTopicsRequestWithForwardingTest`.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1110,82 +1098,39 @@ class KafkaApis(val requestChannel: RequestChannel,
       .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-    if (topic == null)
-      throw new IllegalArgumentException("topic must not be null")
-
-    val aliveBrokers = metadataCache.getAliveBrokers
-
-    topic match {
-      case GROUP_METADATA_TOPIC_NAME =>
-        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
-            s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
-            groupCoordinator.offsetsTopicConfigs)
-        }
-      case TRANSACTION_STATE_TOPIC_NAME =>
-        if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
-            s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
-            txnCoordinator.transactionTopicConfigs)
-        }
-      case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic")
-    }
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
-    topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, isFetchAllMetadata: Boolean,
-                               topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(allowAutoTopicCreation: Boolean,
+                               isFetchAllMetadata: Boolean,
+                               topics: Set[String],
+                               listenerName: ListenerName,
                                errorUnavailableEndpoints: Boolean,
-                               errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = {
+                               errorUnavailableListeners: Boolean): (Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
     val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
         errorUnavailableEndpoints, errorUnavailableListeners)
 
     if (topics.isEmpty || topicResponses.size == topics.size) {
-      topicResponses
+      (topicResponses, Seq.empty[MetadataResponseTopic])
     } else {
       val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
       val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-        if (isInternal(topic)) {
-          val topicMetadata = createInternalTopic(topic)
-          Some(
-            if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
-              metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
-            else
-              topicMetadata
-          )
-        } else if (isFetchAllMetadata) {
+       if (isFetchAllMetadata) {
           // A metadata request for all topics should never result in topic auto creation, but a topic may be deleted
           // in between the creation of the topics parameter and topicResponses, so make sure to return None for this case.
           None
-        } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
-          Some(createTopic(topic, config.numPartitions, config.defaultReplicationFactor))
-        } else {
-          Some(metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList()))
+       } else {
+        Some(metadataResponseTopic(

Review comment:
       nit: seems misaligned

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1110,82 +1098,39 @@ class KafkaApis(val requestChannel: RequestChannel,
       .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-    if (topic == null)
-      throw new IllegalArgumentException("topic must not be null")
-
-    val aliveBrokers = metadataCache.getAliveBrokers
-
-    topic match {
-      case GROUP_METADATA_TOPIC_NAME =>
-        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
-            s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
-            groupCoordinator.offsetsTopicConfigs)
-        }
-      case TRANSACTION_STATE_TOPIC_NAME =>
-        if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
-            s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
-            txnCoordinator.transactionTopicConfigs)
-        }
-      case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic")
-    }
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
-    topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, isFetchAllMetadata: Boolean,
-                               topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(allowAutoTopicCreation: Boolean,
+                               isFetchAllMetadata: Boolean,
+                               topics: Set[String],
+                               listenerName: ListenerName,
                                errorUnavailableEndpoints: Boolean,
-                               errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = {
+                               errorUnavailableListeners: Boolean): (Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
     val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
         errorUnavailableEndpoints, errorUnavailableListeners)
 
     if (topics.isEmpty || topicResponses.size == topics.size) {
-      topicResponses
+      (topicResponses, Seq.empty[MetadataResponseTopic])
     } else {
       val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
       val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-        if (isInternal(topic)) {
-          val topicMetadata = createInternalTopic(topic)
-          Some(
-            if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
-              metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
-            else
-              topicMetadata
-          )
-        } else if (isFetchAllMetadata) {
+       if (isFetchAllMetadata) {
           // A metadata request for all topics should never result in topic auto creation, but a topic may be deleted
           // in between the creation of the topics parameter and topicResponses, so make sure to return None for this case.
           None
-        } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
-          Some(createTopic(topic, config.numPartitions, config.defaultReplicationFactor))
-        } else {
-          Some(metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList()))
+       } else {
+        Some(metadataResponseTopic(
+          if (!hasEnoughAliveBrokers(topic))
+            Errors.INVALID_REPLICATION_FACTOR
+          else if (allowAutoTopicCreation && config.autoCreateTopicsEnable)
+            Errors.LEADER_NOT_AVAILABLE

Review comment:
       Hmm.. In the old logic, we would attempt topic creation through zookeeper first. Then, if the topic was created successfully, we would return LEADER_NOT_AVAILABLE to give time for the controller to elect a leader. Now we return LEADER_NOT_AVAILABLE immediately and we send the CreateTopic request asynchronously. We don't know if the CreateTopic request will ultimately succeed or not. Perhaps it would be better to keep returning `UNKNOWN_TOPIC_OR_PARTITION` until we see that the topic exists.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     (responseTopics ++ unauthorizedResponseStatus).toList
   }
 
-  private def createTopic(topic: String,
-                          numPartitions: Int,
-                          replicationFactor: Int,
-                          properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
-    try {
-      adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
-      info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
-        .format(topic, numPartitions, replicationFactor))
-      metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
-    } catch {
-      case _: TopicExistsException => // let it go, possibly another broker created this topic

Review comment:
       We seem to have lost this handling or am I missing something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570492221



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {
+    val topicConfigs = topics
+      .filter(topic => !inflightTopics.contains(topic.name()))

Review comment:
       You mean omit () for `topic.name()`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {
+          if (shouldForwardRequest(request)) {
+            forwardingManager.sendInterBrokerRequest(
+              getCreateTopicsRequest(Seq(internalTopicName)),
+              _ => ())
+          } else {
+            val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+            val topicConfigs = Map(internalTopicName -> getTopicConfigs(internalTopicName))
+            adminManager.createTopics(
+              config.requestTimeoutMs,
+              validateOnly = false,
+              topicConfigs,
+              Map.empty,
+              controllerMutationQuota,
+              _ => ())
+          }
         }
-        val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
-        } else {
-          val coordinatorEndpoint = topicMetadata.partitions.asScala
-            .find(_.partitionIndex == partition)
-            .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
-            .flatMap(_.getNode(request.context.listenerName))
-            .filterNot(_.isEmpty)
-
-          coordinatorEndpoint match {
-            case Some(endpoint) =>
-              createFindCoordinatorResponse(Errors.NONE, endpoint)
-            case _ =>
-              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse(
+          Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+      } else {
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) {
+            createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+          } else {
+            val coordinatorEndpoint = topicMetadata.head.partitions.asScala
+              .find(_.partitionIndex == partition)
+              .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
+              .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
+              .flatMap(_.getNode(request.context.listenerName))
+              .filterNot(_.isEmpty)
+
+            coordinatorEndpoint match {
+              case Some(endpoint) =>
+                createFindCoordinatorResponse(Errors.NONE, endpoint, requestThrottleMs)
+              case _ =>
+                createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+            }
           }
+          trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+            .format(responseBody, request.header.correlationId, request.header.clientId))
+          responseBody
         }
-        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
+
+        requestHelper.sendResponseMaybeThrottle(request, createResponse)
+      }
+    }
+  }
+
+  private def getCreateTopicsRequest(topics: Seq[String]): CreateTopicsRequest.Builder = {
+    val topicCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    topics.foreach(topic => {
+      topicCollection.add(getTopicConfigs(topic))
+    })
+
+    new CreateTopicsRequest.Builder(
+      new CreateTopicsRequestData()
+        .setTimeoutMs(config.requestTimeoutMs)
+        .setTopics(topicCollection)
+    )
+  }
+
+  private def getTopicConfigs(topic: String): CreatableTopic = {
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.offsetsTopicPartitions)

Review comment:
       I agree it's equivalent, but I think we could be conservative here to keep the logic on broker side for now, to reduce logical change in this PR. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r569706514



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]

Review comment:
       Seems I could omit the value here: https://stackoverflow.com/questions/40993683/scala-thread-safe-hashset




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r569926897



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(

Review comment:
       Sure, we do have https://issues.apache.org/jira/browse/KAFKA-10348 to track.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#issuecomment-774542939


   only one no-related test failure in connect. Verified on local, will merge the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570492431



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
     // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
     // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
     val errorUnavailableListeners = requestVersion >= 6
-    val topicMetadata =
+    val (topicMetadata, nonExistTopicMetadata) =
       if (authorizedTopics.isEmpty)
-        Seq.empty[MetadataResponseTopic]
-      else {
-        getTopicMetadata(
-          metadataRequest.allowAutoTopicCreation,
-          metadataRequest.isAllTopics,
-          authorizedTopics,
-          request.context.listenerName,
-          errorUnavailableEndpoints,
-          errorUnavailableListeners
-        )
+        (Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+      else
+        getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+          request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)
+
+    nonExistTopicMetadata.foreach(metadata =>
+      try {
+        // Validate topic name and propagate error if failed
+        Topic.validate(metadata.name())
+      } catch {
+        case e: Exception =>
+          metadata.setErrorCode(Errors.forException(e).code)
       }
+    )
+
+    if (nonExistTopicMetadata.nonEmpty && metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable) {
+      val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+      autoTopicCreationManager.createTopics(
+        nonExistTopicMetadata.map(metadata => getTopicConfigs(metadata.name())).toSet, controllerMutationQuota)

Review comment:
       I guess we could rely on admin manager to do the validation for us.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r571004759



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {

Review comment:
       @hachikuji Sorry for my late reply. I've missed the notification. We have to enforce the quota on the controller exclusively. It is a global quota and we can't really distribute it fairly in the cluster. In this case, it would be great if we could propagate the principal and clientId to the controller to enforce the quota. However, I wonder how we could propagate the error and the delay to the client if the topic creation is throttled. Perhaps, we could reply with `UNKNOW_TOPIC_OR_PARTITION` until the topic can be created.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#issuecomment-772283780


   @abbccdda I had one additional thought here. When we receive a normal `CreateTopic` request, we forward it to the controller through the `Envelope` request. This allows us to preserve the original client principal, which is useful for auditing. We are losing that here for auto-created topics, which is unfortunate. One idea I was thinking about is whether we can wrap the `CreateTopics` request for the auto-created topic in an `Envelope` in order to preserve the client principal. So basically we take the implicit topic creation request from the client and turn it into an explicit forwarded request on behalf of the principal.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda merged pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda merged pull request #9579:
URL: https://github.com/apache/kafka/pull/9579


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r569929139



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {
+    val topicConfigs = topics
+      .filter(topic => !inflightTopics.contains(topic.name()))
+      .map(topic => {(topic.name(), topic)}).toMap
+
+    if (topicConfigs.nonEmpty) {
+      if (!controller.isActive && channelManager.isDefined) {
+        // Mark the topics as inflight during auto creation through forwarding.
+        topicConfigs.foreach(config => inflightTopics.put(config._1, config._2))
+
+        val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection
+        topicConfigs.foreach(config => topicsToCreate.add(config._2))
+        val createTopicsRequest = new CreateTopicsRequest.Builder(
+          new CreateTopicsRequestData()
+            .setTimeoutMs(requestTimeout)
+            .setTopics(topicsToCreate)
+        )
+
+        channelManager.get.sendRequest(createTopicsRequest, new ControllerRequestCompletionHandler {
+          override def onTimeout(): Unit = {
+            clearInflightRequests(topicConfigs)
+          }
+
+          override def onComplete(response: ClientResponse): Unit = {
+            clearInflightRequests(topicConfigs)
+          }
+        })
+      } else {
+        adminManager.createTopics(
+          requestTimeout,
+          validateOnly = false,
+          topicConfigs,
+          Map.empty,
+          controllerMutationQuota,
+          _ => ())
+      }
+    } else {
+      debug(s"Topics $topics are under creation already, skip sending additional " +

Review comment:
       Actually I don't think this logging is very useful, will replace it with something more explicit about state change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570396673



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1113,82 +1097,36 @@ class KafkaApis(val requestChannel: RequestChannel,
       .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-    if (topic == null)
-      throw new IllegalArgumentException("topic must not be null")
-
-    val aliveBrokers = metadataCache.getAliveBrokers
-
-    topic match {
-      case GROUP_METADATA_TOPIC_NAME =>
-        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
-            s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
-            groupCoordinator.offsetsTopicConfigs)
-        }
-      case TRANSACTION_STATE_TOPIC_NAME =>
-        if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
-            s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
-            txnCoordinator.transactionTopicConfigs)
-        }
-      case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic")
-    }
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
-    topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, isFetchAllMetadata: Boolean,
-                               topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(isFetchAllMetadata: Boolean,
+                               topics: Set[String],
+                               listenerName: ListenerName,
                                errorUnavailableEndpoints: Boolean,
-                               errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = {
+                               errorUnavailableListeners: Boolean): (Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
     val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
         errorUnavailableEndpoints, errorUnavailableListeners)
 
     if (topics.isEmpty || topicResponses.size == topics.size) {
-      topicResponses
+      (topicResponses, Seq.empty[MetadataResponseTopic])
     } else {
       val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
       val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-        if (isInternal(topic)) {
-          val topicMetadata = createInternalTopic(topic)
-          Some(
-            if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
-              metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
-            else
-              topicMetadata
-          )
-        } else if (isFetchAllMetadata) {
+       if (isFetchAllMetadata) {

Review comment:
       Yes, I think so.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {
+          if (shouldForwardRequest(request)) {
+            forwardingManager.sendInterBrokerRequest(
+              getCreateTopicsRequest(Seq(internalTopicName)),
+              _ => ())
+          } else {
+            val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+            val topicConfigs = Map(internalTopicName -> getTopicConfigs(internalTopicName))
+            adminManager.createTopics(
+              config.requestTimeoutMs,
+              validateOnly = false,
+              topicConfigs,
+              Map.empty,
+              controllerMutationQuota,
+              _ => ())
+          }
         }
-        val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
-        } else {
-          val coordinatorEndpoint = topicMetadata.partitions.asScala
-            .find(_.partitionIndex == partition)
-            .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
-            .flatMap(_.getNode(request.context.listenerName))
-            .filterNot(_.isEmpty)
-
-          coordinatorEndpoint match {
-            case Some(endpoint) =>
-              createFindCoordinatorResponse(Errors.NONE, endpoint)
-            case _ =>
-              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse(
+          Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+      } else {
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) {
+            createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+          } else {
+            val coordinatorEndpoint = topicMetadata.head.partitions.asScala
+              .find(_.partitionIndex == partition)
+              .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
+              .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
+              .flatMap(_.getNode(request.context.listenerName))
+              .filterNot(_.isEmpty)
+
+            coordinatorEndpoint match {
+              case Some(endpoint) =>
+                createFindCoordinatorResponse(Errors.NONE, endpoint, requestThrottleMs)
+              case _ =>
+                createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+            }
           }
+          trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+            .format(responseBody, request.header.correlationId, request.header.clientId))
+          responseBody
         }
-        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
+
+        requestHelper.sendResponseMaybeThrottle(request, createResponse)
+      }
+    }
+  }
+
+  private def getCreateTopicsRequest(topics: Seq[String]): CreateTopicsRequest.Builder = {
+    val topicCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    topics.foreach(topic => {
+      topicCollection.add(getTopicConfigs(topic))
+    })
+
+    new CreateTopicsRequest.Builder(
+      new CreateTopicsRequestData()
+        .setTimeoutMs(config.requestTimeoutMs)
+        .setTopics(topicCollection)
+    )
+  }
+
+  private def getTopicConfigs(topic: String): CreatableTopic = {
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.offsetsTopicPartitions)

Review comment:
       Not sure whether the controller would detect the given topic is internal.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r562792214



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {

Review comment:
       I guess the purpose of doing live broker check here is to avoid sending excessive create topic requests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r569130098



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     (responseTopics ++ unauthorizedResponseStatus).toList
   }
 
-  private def createTopic(topic: String,
-                          numPartitions: Int,
-                          replicationFactor: Int,
-                          properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
-    try {
-      adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
-      info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
-        .format(topic, numPartitions, replicationFactor))
-      metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
-    } catch {
-      case _: TopicExistsException => // let it go, possibly another broker created this topic

Review comment:
       Inside `ZkAdminManager.createTopics`, I see that we catch `TopicExistsException`. However, I do not see any logic to translate it to `LEADER_NOT_AVAILABLE`. Can you show me where this happens?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570568151



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     (responseTopics ++ unauthorizedResponseStatus).toList
   }
 
-  private def createTopic(topic: String,
-                          numPartitions: Int,
-                          replicationFactor: Int,
-                          properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
-    try {
-      adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
-      info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
-        .format(topic, numPartitions, replicationFactor))
-      metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
-    } catch {
-      case _: TopicExistsException => // let it go, possibly another broker created this topic

Review comment:
       The problem we have is that `ZkAdminManager.createTopics` only takes a callback instead of responding to you in realtime whether we hit TopicExists. Right now we are doing the topic creation async, so unless this is necessary to be fixed (which today we would just return UNKNOWN_PARTITION which seems to be semantically similar to LEADER_NOT_AVAILABLE), I think we could just returning unknown partition immediately without waiting for the async creation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570654953



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {

Review comment:
       Sounds good




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r569913506



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {

Review comment:
       Seems no longer in use, will remove and revert back to using error code message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {
+          if (shouldForwardRequest(request)) {
+            forwardingManager.sendInterBrokerRequest(
+              getCreateTopicsRequest(Seq(internalTopicName)),
+              _ => ())
+          } else {
+            val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+            val topicConfigs = Map(internalTopicName -> getTopicConfigs(internalTopicName))
+            adminManager.createTopics(
+              config.requestTimeoutMs,
+              validateOnly = false,
+              topicConfigs,
+              Map.empty,
+              controllerMutationQuota,
+              _ => ())
+          }
         }
-        val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
-        } else {
-          val coordinatorEndpoint = topicMetadata.partitions.asScala
-            .find(_.partitionIndex == partition)
-            .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
-            .flatMap(_.getNode(request.context.listenerName))
-            .filterNot(_.isEmpty)
-
-          coordinatorEndpoint match {
-            case Some(endpoint) =>
-              createFindCoordinatorResponse(Errors.NONE, endpoint)
-            case _ =>
-              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse(
+          Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+      } else {
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) {
+            createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+          } else {
+            val coordinatorEndpoint = topicMetadata.head.partitions.asScala
+              .find(_.partitionIndex == partition)
+              .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
+              .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
+              .flatMap(_.getNode(request.context.listenerName))
+              .filterNot(_.isEmpty)
+
+            coordinatorEndpoint match {
+              case Some(endpoint) =>
+                createFindCoordinatorResponse(Errors.NONE, endpoint, requestThrottleMs)
+              case _ =>
+                createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+            }
           }
+          trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+            .format(responseBody, request.header.correlationId, request.header.clientId))
+          responseBody
         }
-        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
+
+        requestHelper.sendResponseMaybeThrottle(request, createResponse)
+      }
+    }
+  }
+
+  private def getCreateTopicsRequest(topics: Seq[String]): CreateTopicsRequest.Builder = {
+    val topicCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    topics.foreach(topic => {
+      topicCollection.add(getTopicConfigs(topic))
+    })
+
+    new CreateTopicsRequest.Builder(
+      new CreateTopicsRequestData()
+        .setTimeoutMs(config.requestTimeoutMs)
+        .setTopics(topicCollection)
+    )
+  }
+
+  private def getTopicConfigs(topic: String): CreatableTopic = {
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.offsetsTopicPartitions)

Review comment:
       Not sure whether the controller would detect the given topic is internal.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r571362924



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -169,6 +169,8 @@ class DefaultAutoTopicCreationManager(
         }
     }
 
+    clearInflightRequests(creatableTopics)

Review comment:
       Can you use a `try/finally` here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r565035383



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -61,7 +61,6 @@ class BrokerToControllerChannelManager(
 
   def shutdown(): Unit = {
     requestThread.shutdown()
-    requestThread.awaitShutdown()

Review comment:
       Side cleanup which is called within `requestThread.shutdown()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570396673



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1113,82 +1097,36 @@ class KafkaApis(val requestChannel: RequestChannel,
       .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-    if (topic == null)
-      throw new IllegalArgumentException("topic must not be null")
-
-    val aliveBrokers = metadataCache.getAliveBrokers
-
-    topic match {
-      case GROUP_METADATA_TOPIC_NAME =>
-        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
-            s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
-            groupCoordinator.offsetsTopicConfigs)
-        }
-      case TRANSACTION_STATE_TOPIC_NAME =>
-        if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
-            s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
-            txnCoordinator.transactionTopicConfigs)
-        }
-      case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic")
-    }
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
-    topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, isFetchAllMetadata: Boolean,
-                               topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(isFetchAllMetadata: Boolean,
+                               topics: Set[String],
+                               listenerName: ListenerName,
                                errorUnavailableEndpoints: Boolean,
-                               errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = {
+                               errorUnavailableListeners: Boolean): (Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
     val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
         errorUnavailableEndpoints, errorUnavailableListeners)
 
     if (topics.isEmpty || topicResponses.size == topics.size) {
-      topicResponses
+      (topicResponses, Seq.empty[MetadataResponseTopic])
     } else {
       val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
       val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-        if (isInternal(topic)) {
-          val topicMetadata = createInternalTopic(topic)
-          Some(
-            if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
-              metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
-            else
-              topicMetadata
-          )
-        } else if (isFetchAllMetadata) {
+       if (isFetchAllMetadata) {

Review comment:
       Yes, I think so.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
     // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
     // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
     val errorUnavailableListeners = requestVersion >= 6
-    val topicMetadata =
+    val (topicMetadata, nonExistTopicMetadata) =
       if (authorizedTopics.isEmpty)
-        Seq.empty[MetadataResponseTopic]
-      else {
-        getTopicMetadata(
-          metadataRequest.allowAutoTopicCreation,
-          metadataRequest.isAllTopics,
-          authorizedTopics,
-          request.context.listenerName,
-          errorUnavailableEndpoints,
-          errorUnavailableListeners
-        )
+        (Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+      else
+        getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+          request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)
+
+    nonExistTopicMetadata.foreach(metadata =>
+      try {
+        // Validate topic name and propagate error if failed
+        Topic.validate(metadata.name())

Review comment:
       Actually after looking into the zk admin manager logic, I don't think it's necessary to do the topic validation here.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {
+    val topicConfigs = topics
+      .filter(topic => !inflightTopics.contains(topic.name()))

Review comment:
       You mean omit () for `topic.name()`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
     // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
     // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
     val errorUnavailableListeners = requestVersion >= 6
-    val topicMetadata =
+    val (topicMetadata, nonExistTopicMetadata) =
       if (authorizedTopics.isEmpty)
-        Seq.empty[MetadataResponseTopic]
-      else {
-        getTopicMetadata(
-          metadataRequest.allowAutoTopicCreation,
-          metadataRequest.isAllTopics,
-          authorizedTopics,
-          request.context.listenerName,
-          errorUnavailableEndpoints,
-          errorUnavailableListeners
-        )
+        (Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+      else
+        getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+          request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)
+
+    nonExistTopicMetadata.foreach(metadata =>
+      try {
+        // Validate topic name and propagate error if failed
+        Topic.validate(metadata.name())
+      } catch {
+        case e: Exception =>
+          metadata.setErrorCode(Errors.forException(e).code)
       }
+    )
+
+    if (nonExistTopicMetadata.nonEmpty && metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable) {
+      val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+      autoTopicCreationManager.createTopics(
+        nonExistTopicMetadata.map(metadata => getTopicConfigs(metadata.name())).toSet, controllerMutationQuota)

Review comment:
       I guess we could rely on admin manager to do the validation for us.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     (responseTopics ++ unauthorizedResponseStatus).toList
   }
 
-  private def createTopic(topic: String,
-                          numPartitions: Int,
-                          replicationFactor: Int,
-                          properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
-    try {
-      adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
-      info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
-        .format(topic, numPartitions, replicationFactor))
-      metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
-    } catch {
-      case _: TopicExistsException => // let it go, possibly another broker created this topic

Review comment:
       The problem we have is that `ZkAdminManager.createTopics` only takes a callback instead of responding to you in realtime whether we hit TopicExists. Right now we are doing the topic creation async, so unless this is necessary to be fixed (which today we would just return UNKNOWN_PARTITION which seems to be semantically similar to LEADER_NOT_AVAILABLE), I think we could just returning unknown partition immediately without waiting for the async creation?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {

Review comment:
       Sounds good




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r563002911



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     (responseTopics ++ unauthorizedResponseStatus).toList
   }
 
-  private def createTopic(topic: String,
-                          numPartitions: Int,
-                          replicationFactor: Int,
-                          properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
-    try {
-      adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
-      info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
-        .format(topic, numPartitions, replicationFactor))
-      metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
-    } catch {
-      case _: TopicExistsException => // let it go, possibly another broker created this topic

Review comment:
       We intentionally avoid using adminZkClient so that we could go through topic creation rules through `zkAdminManager`. TopicExistsException is handled there. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570480399



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
     // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
     // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
     val errorUnavailableListeners = requestVersion >= 6
-    val topicMetadata =
+    val (topicMetadata, nonExistTopicMetadata) =
       if (authorizedTopics.isEmpty)
-        Seq.empty[MetadataResponseTopic]
-      else {
-        getTopicMetadata(
-          metadataRequest.allowAutoTopicCreation,
-          metadataRequest.isAllTopics,
-          authorizedTopics,
-          request.context.listenerName,
-          errorUnavailableEndpoints,
-          errorUnavailableListeners
-        )
+        (Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+      else
+        getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+          request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)
+
+    nonExistTopicMetadata.foreach(metadata =>
+      try {
+        // Validate topic name and propagate error if failed
+        Topic.validate(metadata.name())

Review comment:
       Actually after looking into the zk admin manager logic, I don't think it's necessary to do the topic validation here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org