You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/03 05:57:03 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

hachikuji commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r569060455



##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}

Review comment:
       nit: do we need default implementations?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1113,82 +1097,36 @@ class KafkaApis(val requestChannel: RequestChannel,
       .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-    if (topic == null)
-      throw new IllegalArgumentException("topic must not be null")
-
-    val aliveBrokers = metadataCache.getAliveBrokers
-
-    topic match {
-      case GROUP_METADATA_TOPIC_NAME =>
-        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
-            s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
-            groupCoordinator.offsetsTopicConfigs)
-        }
-      case TRANSACTION_STATE_TOPIC_NAME =>
-        if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-            s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
-            s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
-            txnCoordinator.transactionTopicConfigs)
-        }
-      case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic")
-    }
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
-    topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, isFetchAllMetadata: Boolean,
-                               topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(isFetchAllMetadata: Boolean,
+                               topics: Set[String],
+                               listenerName: ListenerName,
                                errorUnavailableEndpoints: Boolean,
-                               errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = {
+                               errorUnavailableListeners: Boolean): (Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
     val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
         errorUnavailableEndpoints, errorUnavailableListeners)
 
     if (topics.isEmpty || topicResponses.size == topics.size) {
-      topicResponses
+      (topicResponses, Seq.empty[MetadataResponseTopic])
     } else {
       val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
       val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-        if (isInternal(topic)) {
-          val topicMetadata = createInternalTopic(topic)
-          Some(
-            if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
-              metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
-            else
-              topicMetadata
-          )
-        } else if (isFetchAllMetadata) {
+       if (isFetchAllMetadata) {

Review comment:
       Hmm, I agree the ordering of these checks was weird. So if `isFetchAllMetadata` is set, then `responsesForNonExistentTopics` will be empty and we will return `(topicResponses, Seq.empty[MetadataResponseTopic])`. Does that mean we can add this check to the first clause?
   ```scala 
     if (isFetchAllMetadata || topics.isEmpty || topicResponses.size == topics.size) {
          (topicResponses, Seq.empty[MetadataResponseTopic])
   ```

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {
+    val topicConfigs = topics
+      .filter(topic => !inflightTopics.contains(topic.name()))
+      .map(topic => {(topic.name(), topic)}).toMap

Review comment:
       nit: unnecessary braces

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
     // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
     // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
     val errorUnavailableListeners = requestVersion >= 6
-    val topicMetadata =
+    val (topicMetadata, nonExistTopicMetadata) =
       if (authorizedTopics.isEmpty)
-        Seq.empty[MetadataResponseTopic]
-      else {
-        getTopicMetadata(
-          metadataRequest.allowAutoTopicCreation,
-          metadataRequest.isAllTopics,
-          authorizedTopics,
-          request.context.listenerName,
-          errorUnavailableEndpoints,
-          errorUnavailableListeners
-        )
+        (Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+      else
+        getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+          request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)
+
+    nonExistTopicMetadata.foreach(metadata =>
+      try {
+        // Validate topic name and propagate error if failed
+        Topic.validate(metadata.name())
+      } catch {
+        case e: Exception =>
+          metadata.setErrorCode(Errors.forException(e).code)
       }
+    )
+
+    if (nonExistTopicMetadata.nonEmpty && metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable) {
+      val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+      autoTopicCreationManager.createTopics(
+        nonExistTopicMetadata.map(metadata => getTopicConfigs(metadata.name())).toSet, controllerMutationQuota)

Review comment:
       Not sure if I am missing something, but doesn't `nonExistTopicMetadata` include topics which failed the `Topic.validate` check above?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {

Review comment:
       Ok. My thought is to reduce the reliance on the broker metadata. We will need the replication factor check on the controller anyway, so I'm not sure it is worth optimizing for the case when the cluster is initializing.
   
   Perhaps at least we can move this validation into `AutoTopicCreationManager` to better encapsulate the logic.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(

Review comment:
       Let's leave this for a follow-up, but just want to mention that it is probably better if we can reuse the same `BrokerToControllerChannelManager` as `ForwardingManager`. Can you file a JIRA for a follow-up?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {

Review comment:
       Maybe I'm missing it, but where is this argument used?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {

Review comment:
       Pre-existing issue. `CoordinatorType.forId` returns `IllegalArgumentException` if the key type is unknown. That will get translated to UNKNOWN_SERVER_ERROR. It would be better to return INVALID_REQUEST.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      if (topicMetadata.headOption.isEmpty) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {
+          val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+          autoTopicCreationManager.createTopics(
+            Seq(getTopicConfigs(internalTopicName)).toSet, controllerMutationQuota)
         }
-        val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
-        } else {
-          val coordinatorEndpoint = topicMetadata.partitions.asScala
-            .find(_.partitionIndex == partition)
-            .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
-            .flatMap(_.getNode(request.context.listenerName))
-            .filterNot(_.isEmpty)
-
-          coordinatorEndpoint match {
-            case Some(endpoint) =>
-              createFindCoordinatorResponse(Errors.NONE, endpoint)
-            case _ =>
-              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse(
+          Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+      } else {
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) {
+            createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+          } else {
+            val coordinatorEndpoint = topicMetadata.head.partitions.asScala
+              .find(_.partitionIndex == partition)
+              .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
+              .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
+              .flatMap(_.getNode(request.context.listenerName))
+              .filterNot(_.isEmpty)
+
+            coordinatorEndpoint match {
+              case Some(endpoint) =>
+                createFindCoordinatorResponse(Errors.NONE, endpoint, requestThrottleMs)
+              case _ =>
+                createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+            }
           }
+          trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+            .format(responseBody, request.header.correlationId, request.header.clientId))
+          responseBody
         }
-        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
+
+        requestHelper.sendResponseMaybeThrottle(request, createResponse)
       }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    }
+  }
+
+  private def getTopicConfigs(topic: String): CreatableTopic = {
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.offsetsTopicPartitions)
+          .setReplicationFactor(config.offsetsTopicReplicationFactor)
+          .setConfigs(convertToTopicConfigCollections(groupCoordinator.offsetsTopicConfigs))
+      case TRANSACTION_STATE_TOPIC_NAME =>
+        new CreatableTopic()

Review comment:
       Feels to me like we would benefit by moving this logic into `AutoTopicCreationManager`. The configuration for auto-created topics can always be derived from the broker configuration. Hence we could simplify the interface by letting it take only the topic names. The advantage is that we can move all of this logic out of `KafkaApis` (which is now up to 3500 LOC).

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      if (topicMetadata.headOption.isEmpty) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {
+          val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+          autoTopicCreationManager.createTopics(
+            Seq(getTopicConfigs(internalTopicName)).toSet, controllerMutationQuota)
         }
-        val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
-        } else {
-          val coordinatorEndpoint = topicMetadata.partitions.asScala
-            .find(_.partitionIndex == partition)
-            .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
-            .flatMap(_.getNode(request.context.listenerName))
-            .filterNot(_.isEmpty)
-
-          coordinatorEndpoint match {
-            case Some(endpoint) =>
-              createFindCoordinatorResponse(Errors.NONE, endpoint)
-            case _ =>
-              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse(
+          Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+      } else {
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) {
+            createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+          } else {
+            val coordinatorEndpoint = topicMetadata.head.partitions.asScala
+              .find(_.partitionIndex == partition)
+              .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
+              .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
+              .flatMap(_.getNode(request.context.listenerName))
+              .filterNot(_.isEmpty)
+
+            coordinatorEndpoint match {
+              case Some(endpoint) =>
+                createFindCoordinatorResponse(Errors.NONE, endpoint, requestThrottleMs)
+              case _ =>
+                createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+            }
           }
+          trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+            .format(responseBody, request.header.correlationId, request.header.clientId))
+          responseBody
         }
-        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
+
+        requestHelper.sendResponseMaybeThrottle(request, createResponse)
       }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    }
+  }
+
+  private def getTopicConfigs(topic: String): CreatableTopic = {
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.offsetsTopicPartitions)
+          .setReplicationFactor(config.offsetsTopicReplicationFactor)
+          .setConfigs(convertToTopicConfigCollections(groupCoordinator.offsetsTopicConfigs))
+      case TRANSACTION_STATE_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.transactionTopicPartitions)
+          .setReplicationFactor(config.transactionTopicReplicationFactor)
+          .setConfigs(convertToTopicConfigCollections(
+            txnCoordinator.transactionTopicConfigs))
+      case topicName =>
+        new CreatableTopic()
+          .setName(topicName)
+          .setNumPartitions(config.numPartitions)
+          .setReplicationFactor(config.defaultReplicationFactor.shortValue)
+    }
+  }
+
+  private def convertToTopicConfigCollections(config: Properties): CreateableTopicConfigCollection = {
+    val topicConfigs = new CreateableTopicConfigCollection()
+    config.forEach {
+      case (name, value) =>
+        topicConfigs.add(new CreateableTopicConfig()
+          .setName(name.toString)
+          .setValue(value.toString))
+    }
+    topicConfigs
+  }
+
+  private def hasEnoughAliveBrokers(topic: String): Boolean = {
+    if (topic == null)
+      throw new IllegalArgumentException("topic must not be null")
+
+    val aliveBrokers = metadataCache.getAliveBrokers
+
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
+          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +

Review comment:
       The differences between these log lines are minor. Can we factor out a helper?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1375,55 +1325,137 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))

Review comment:
       nit: is `error.message` actually useful to send back? It doesn't provide any information beyond the error code. Could we just use `errorMessage.orNull`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
     // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
     // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
     val errorUnavailableListeners = requestVersion >= 6
-    val topicMetadata =
+    val (topicMetadata, nonExistTopicMetadata) =
       if (authorizedTopics.isEmpty)
-        Seq.empty[MetadataResponseTopic]
-      else {
-        getTopicMetadata(
-          metadataRequest.allowAutoTopicCreation,
-          metadataRequest.isAllTopics,
-          authorizedTopics,
-          request.context.listenerName,
-          errorUnavailableEndpoints,
-          errorUnavailableListeners
-        )
+        (Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+      else
+        getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+          request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)
+
+    nonExistTopicMetadata.foreach(metadata =>
+      try {
+        // Validate topic name and propagate error if failed
+        Topic.validate(metadata.name())

Review comment:
       Is the idea to validate before sending the CreateTopic? Could we move this to `AutoTopicManager`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
     // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
     // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
     val errorUnavailableListeners = requestVersion >= 6
-    val topicMetadata =
+    val (topicMetadata, nonExistTopicMetadata) =
       if (authorizedTopics.isEmpty)
-        Seq.empty[MetadataResponseTopic]
-      else {
-        getTopicMetadata(
-          metadataRequest.allowAutoTopicCreation,
-          metadataRequest.isAllTopics,
-          authorizedTopics,
-          request.context.listenerName,
-          errorUnavailableEndpoints,
-          errorUnavailableListeners
-        )
+        (Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+      else
+        getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+          request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)
+
+    nonExistTopicMetadata.foreach(metadata =>

Review comment:
       nit: usually we write as
   ```scala
   nonExistTopicMetadata.foreach { metadata =>
   ```

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {
+    val topicConfigs = topics
+      .filter(topic => !inflightTopics.contains(topic.name()))

Review comment:
       nit: there are a few of these throughout, but the parenthesis are unnecessary

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {
+    val topicConfigs = topics
+      .filter(topic => !inflightTopics.contains(topic.name()))
+      .map(topic => {(topic.name(), topic)}).toMap
+
+    if (topicConfigs.nonEmpty) {
+      if (!controller.isActive && channelManager.isDefined) {
+        // Mark the topics as inflight during auto creation through forwarding.
+        topicConfigs.foreach(config => inflightTopics.put(config._1, config._2))
+
+        val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection
+        topicConfigs.foreach(config => topicsToCreate.add(config._2))
+        val createTopicsRequest = new CreateTopicsRequest.Builder(
+          new CreateTopicsRequestData()
+            .setTimeoutMs(requestTimeout)
+            .setTopics(topicsToCreate)
+        )
+
+        channelManager.get.sendRequest(createTopicsRequest, new ControllerRequestCompletionHandler {
+          override def onTimeout(): Unit = {
+            clearInflightRequests(topicConfigs)
+          }
+
+          override def onComplete(response: ClientResponse): Unit = {
+            clearInflightRequests(topicConfigs)
+          }
+        })
+      } else {
+        adminManager.createTopics(
+          requestTimeout,
+          validateOnly = false,
+          topicConfigs,
+          Map.empty,
+          controllerMutationQuota,
+          _ => ())
+      }
+    } else {
+      debug(s"Topics $topics are under creation already, skip sending additional " +

Review comment:
       We only get this message if _all_ of the topics are already inflight. Perhaps it is still useful if only some of them are inflight?

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]

Review comment:
       Could this be a set? As far as I can tell, we do not rely on the value.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {
+    val topicConfigs = topics
+      .filter(topic => !inflightTopics.contains(topic.name()))
+      .map(topic => {(topic.name(), topic)}).toMap
+
+    if (topicConfigs.nonEmpty) {
+      if (!controller.isActive && channelManager.isDefined) {
+        // Mark the topics as inflight during auto creation through forwarding.
+        topicConfigs.foreach(config => inflightTopics.put(config._1, config._2))
+
+        val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection
+        topicConfigs.foreach(config => topicsToCreate.add(config._2))
+        val createTopicsRequest = new CreateTopicsRequest.Builder(
+          new CreateTopicsRequestData()
+            .setTimeoutMs(requestTimeout)
+            .setTopics(topicsToCreate)
+        )
+
+        channelManager.get.sendRequest(createTopicsRequest, new ControllerRequestCompletionHandler {

Review comment:
       Probably useful to have some logging when we send the CreateTopic request and in the callbacks.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+    channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+    channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+                            controllerMutationQuota: ControllerMutationQuota): Unit = {

Review comment:
       Eventually we need to figure out quota behavior for forwarded requests. I am wondering if it makes sense to apply the quota on each broker separately before sending the `CreateTopic` to the controller or if we rely on the controller exclusively.
   
   cc @dajac 

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+    topicNames: Set[CreatableTopic],
+    controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+    config: KafkaConfig,
+    metadataCache: MetadataCache,
+    time: Time,
+    metrics: Metrics,
+    threadNamePrefix: Option[String],
+    adminManager: ZkAdminManager,
+    controller: KafkaController,
+    enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+    val channelManager =
+      if (enableForwarding)
+        Some(new BrokerToControllerChannelManager(
+          controllerNodeProvider = MetadataCacheControllerNodeProvider(
+            config, metadataCache),
+          time = time,
+          metrics = metrics,
+          config = config,
+          channelName = "autoTopicCreationChannel",
+          threadNamePrefix = threadNamePrefix,
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        ))
+      else
+        None
+    new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int

Review comment:
       nit: requestTimeoutMs?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {
+          if (shouldForwardRequest(request)) {
+            forwardingManager.sendInterBrokerRequest(
+              getCreateTopicsRequest(Seq(internalTopicName)),
+              _ => ())
+          } else {
+            val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+            val topicConfigs = Map(internalTopicName -> getTopicConfigs(internalTopicName))
+            adminManager.createTopics(
+              config.requestTimeoutMs,
+              validateOnly = false,
+              topicConfigs,
+              Map.empty,
+              controllerMutationQuota,
+              _ => ())
+          }
         }
-        val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
-        } else {
-          val coordinatorEndpoint = topicMetadata.partitions.asScala
-            .find(_.partitionIndex == partition)
-            .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
-            .flatMap(_.getNode(request.context.listenerName))
-            .filterNot(_.isEmpty)
-
-          coordinatorEndpoint match {
-            case Some(endpoint) =>
-              createFindCoordinatorResponse(Errors.NONE, endpoint)
-            case _ =>
-              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse(
+          Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+      } else {
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) {
+            createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+          } else {
+            val coordinatorEndpoint = topicMetadata.head.partitions.asScala
+              .find(_.partitionIndex == partition)
+              .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
+              .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
+              .flatMap(_.getNode(request.context.listenerName))
+              .filterNot(_.isEmpty)
+
+            coordinatorEndpoint match {
+              case Some(endpoint) =>
+                createFindCoordinatorResponse(Errors.NONE, endpoint, requestThrottleMs)
+              case _ =>
+                createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
+            }
           }
+          trace("Sending FindCoordinator response %s for correlation id %d to client %s."
+            .format(responseBody, request.header.correlationId, request.header.clientId))
+          responseBody
         }
-        trace("Sending FindCoordinator response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
+
+        requestHelper.sendResponseMaybeThrottle(request, createResponse)
+      }
+    }
+  }
+
+  private def getCreateTopicsRequest(topics: Seq[String]): CreateTopicsRequest.Builder = {
+    val topicCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    topics.foreach(topic => {
+      topicCollection.add(getTopicConfigs(topic))
+    })
+
+    new CreateTopicsRequest.Builder(
+      new CreateTopicsRequestData()
+        .setTimeoutMs(config.requestTimeoutMs)
+        .setTopics(topicCollection)
+    )
+  }
+
+  private def getTopicConfigs(topic: String): CreatableTopic = {
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.offsetsTopicPartitions)

Review comment:
       I don't think it is equivalent, at least not completely. My thought was to reduce the reliance on the broker's configuration, which is more likely to be stale than the controller. This actually raises an interesting question about the `CreateTopic` API which I had not thought of before. If we receive a `CreateTopic` request for an internal topic, which configuration should we use? Currently it looks like we will apply the standard topic defaults, but that does not seem right. I filed https://issues.apache.org/jira/browse/KAFKA-12280, so we can consider this later.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     (responseTopics ++ unauthorizedResponseStatus).toList
   }
 
-  private def createTopic(topic: String,
-                          numPartitions: Int,
-                          replicationFactor: Int,
-                          properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
-    try {
-      adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
-      info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
-        .format(topic, numPartitions, replicationFactor))
-      metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
-    } catch {
-      case _: TopicExistsException => // let it go, possibly another broker created this topic

Review comment:
       Inside `ZkAdminManager.createTopics`, I see that we catch `TopicExistsException`. However, I do not see any logic to translate this to `LEADER_NOT_AVAILABLE`. Can you show me where this happens?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org