You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bo...@apache.org on 2021/03/15 15:47:40 UTC

[kafka] branch trunk updated: KAFKA-10348: Share client channel between forwarding and auto creation manager (#10135)

This is an automated email from the ASF dual-hosted git repository.

boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 802ee10  KAFKA-10348: Share client channel between forwarding and auto creation manager (#10135)
802ee10 is described below

commit 802ee10bfd802735f7d4d1d7b1f1212e5ae0c4b2
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Mon Mar 15 08:45:57 2021 -0700

    KAFKA-10348: Share client channel between forwarding and auto creation manager (#10135)
    
    share forwarding channel and auto topic creation channel.
    
    Reviewers: dengziming <de...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/server/AutoTopicCreationManager.scala    | 34 +---------------------
 .../src/main/scala/kafka/server/BrokerServer.scala | 21 +++++--------
 .../scala/kafka/server/ForwardingManager.scala     | 32 +-------------------
 core/src/main/scala/kafka/server/KafkaServer.scala | 34 +++++++++++-----------
 4 files changed, 27 insertions(+), 94 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index f8063a8..bac9fb7 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -32,10 +32,8 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
 import org.apache.kafka.common.message.CreateTopicsRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreateableTopicConfig, CreateableTopicConfigCollection}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
-import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest}
-import org.apache.kafka.common.utils.Time
 
 import scala.collection.{Map, Seq, Set, mutable}
 import scala.jdk.CollectionConverters._
@@ -46,10 +44,6 @@ trait AutoTopicCreationManager {
     topicNames: Set[String],
     controllerMutationQuota: ControllerMutationQuota
   ): Seq[MetadataResponseTopic]
-
-  def start(): Unit
-
-  def shutdown(): Unit
 }
 
 object AutoTopicCreationManager {
@@ -57,30 +51,13 @@ object AutoTopicCreationManager {
   def apply(
     config: KafkaConfig,
     metadataCache: MetadataCache,
-    time: Time,
-    metrics: Metrics,
     threadNamePrefix: Option[String],
+    channelManager: Option[BrokerToControllerChannelManager],
     adminManager: Option[ZkAdminManager],
     controller: Option[KafkaController],
     groupCoordinator: GroupCoordinator,
     txnCoordinator: TransactionCoordinator,
-    enableForwarding: Boolean
   ): AutoTopicCreationManager = {
-
-    val channelManager =
-      if (enableForwarding)
-        Some(new BrokerToControllerChannelManagerImpl(
-          controllerNodeProvider = MetadataCacheControllerNodeProvider(
-            config, metadataCache),
-          time = time,
-          metrics = metrics,
-          config = config,
-          channelName = "autoTopicCreationChannel",
-          threadNamePrefix = threadNamePrefix,
-          retryTimeoutMs = config.requestTimeoutMs.longValue
-        ))
-      else
-        None
     new DefaultAutoTopicCreationManager(config, channelManager, adminManager,
       controller, groupCoordinator, txnCoordinator)
   }
@@ -100,15 +77,6 @@ class DefaultAutoTopicCreationManager(
 
   private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
 
-  override def start(): Unit = {
-    channelManager.foreach(_.start())
-  }
-
-  override def shutdown(): Unit = {
-    channelManager.foreach(_.shutdown())
-    inflightTopics.clear()
-  }
-
   override def createTopics(
     topics: Set[String],
     controllerMutationQuota: ControllerMutationQuota
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 0ce1975..8689afd 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -102,6 +102,8 @@ class BrokerServer(
 
   var transactionCoordinator: TransactionCoordinator = null
 
+  var clientToControllerChannelManager: BrokerToControllerChannelManager = null
+
   var forwardingManager: ForwardingManager = null
 
   var alterIsrManager: AlterIsrManager = null
@@ -179,17 +181,16 @@ class BrokerServer(
       val controllerNodes = RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
       val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
 
-      val forwardingChannelManager = BrokerToControllerChannelManager(
+      clientToControllerChannelManager = BrokerToControllerChannelManager(
         controllerNodeProvider,
         time,
         metrics,
         config,
-        channelName = "forwarding",
+        channelName = "controllerForwardingChannel",
         threadNamePrefix,
         retryTimeoutMs = 60000
       )
-      forwardingManager = new ForwardingManagerImpl(forwardingChannelManager)
-      forwardingManager.start()
+      forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager)
 
       val apiVersionManager = ApiVersionManager(
         ListenerType.BROKER,
@@ -245,12 +246,9 @@ class BrokerServer(
         new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"),
         createTemporaryProducerIdManager, metrics, metadataCache, Time.SYSTEM)
 
-      val autoTopicCreationChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
-        time, metrics, config, "autocreate", threadNamePrefix, 60000)
       autoTopicCreationManager = new DefaultAutoTopicCreationManager(
-        config, Some(autoTopicCreationChannelManager), None, None,
+        config, Some(clientToControllerChannelManager), None, None,
         groupCoordinator, transactionCoordinator)
-      autoTopicCreationManager.start()
 
       /* Add all reconfigurables for config change notification before starting the metadata listener */
       config.dynamicConfig.addReconfigurables(this)
@@ -448,11 +446,8 @@ class BrokerServer(
       if (alterIsrManager != null)
         CoreUtils.swallow(alterIsrManager.shutdown(), this)
 
-      if (forwardingManager != null)
-        CoreUtils.swallow(forwardingManager.shutdown(), this)
-
-      if (autoTopicCreationManager != null)
-        CoreUtils.swallow(autoTopicCreationManager.shutdown(), this)
+      if (clientToControllerChannelManager != null)
+        CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
       if (logManager != null)
         CoreUtils.swallow(logManager.shutdown(), this)
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala b/core/src/main/scala/kafka/server/ForwardingManager.scala
index a6e22e1..788dd4c 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -23,10 +23,8 @@ import kafka.network.RequestChannel
 import kafka.utils.Logging
 import org.apache.kafka.clients.{ClientResponse, NodeApiVersions}
 import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse, RequestHeader}
-import org.apache.kafka.common.utils.Time
 
 import scala.compat.java8.OptionConverters._
 
@@ -37,32 +35,12 @@ trait ForwardingManager {
   ): Unit
 
   def controllerApiVersions: Option[NodeApiVersions]
-
-  def start(): Unit = {}
-
-  def shutdown(): Unit = {}
 }
 
 object ForwardingManager {
-
   def apply(
-    config: KafkaConfig,
-    metadataCache: MetadataCache,
-    time: Time,
-    metrics: Metrics,
-    threadNamePrefix: Option[String]
+    channelManager: BrokerToControllerChannelManager
   ): ForwardingManager = {
-    val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)
-
-    val channelManager = BrokerToControllerChannelManager(
-      controllerNodeProvider = nodeProvider,
-      time = time,
-      metrics = metrics,
-      config = config,
-      channelName = "forwardingChannel",
-      threadNamePrefix = threadNamePrefix,
-      retryTimeoutMs = config.requestTimeoutMs.longValue
-    )
     new ForwardingManagerImpl(channelManager)
   }
 }
@@ -71,14 +49,6 @@ class ForwardingManagerImpl(
   channelManager: BrokerToControllerChannelManager
 ) extends ForwardingManager with Logging {
 
-  override def start(): Unit = {
-    channelManager.start()
-  }
-
-  override def shutdown(): Unit = {
-    channelManager.shutdown()
-  }
-
   /**
    * Forward given request to the active controller.
    *
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 90c57f2..13b72ea 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -135,6 +135,8 @@ class KafkaServer(
 
   var autoTopicCreationManager: AutoTopicCreationManager = null
 
+  var clientToControllerChannelManager: Option[BrokerToControllerChannelManager] = None
+
   var alterIsrManager: AlterIsrManager = null
 
   var kafkaScheduler: KafkaScheduler = null
@@ -256,15 +258,19 @@ class KafkaServer(
         tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
         credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
 
+        /* start forwarding manager */
         if (enableForwarding) {
-          this.forwardingManager = Some(ForwardingManager(
-            config,
-            metadataCache,
-            time,
-            metrics,
-            threadNamePrefix
-          ))
-          forwardingManager.foreach(_.start())
+          val brokerToControllerManager = BrokerToControllerChannelManager(
+            controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache),
+            time = time,
+            metrics = metrics,
+            config = config,
+            channelName = "controllerForwardingChannel",
+            threadNamePrefix = threadNamePrefix,
+            retryTimeoutMs = config.requestTimeoutMs.longValue)
+          brokerToControllerManager.start()
+          this.forwardingManager = Some(ForwardingManager(brokerToControllerManager))
+          clientToControllerChannelManager = Some(brokerToControllerManager)
         }
 
         val apiVersionManager = ApiVersionManager(
@@ -336,16 +342,13 @@ class KafkaServer(
         this.autoTopicCreationManager = AutoTopicCreationManager(
           config,
           metadataCache,
-          time,
-          metrics,
           threadNamePrefix,
+          clientToControllerChannelManager,
           Some(adminManager),
           Some(kafkaController),
           groupCoordinator,
-          transactionCoordinator,
-          enableForwarding
+          transactionCoordinator
         )
-        autoTopicCreationManager.start()
 
         /* Get the authorizer and initialize it if one is specified.*/
         authorizer = config.authorizer
@@ -702,10 +705,7 @@ class KafkaServer(
         if (alterIsrManager != null)
           CoreUtils.swallow(alterIsrManager.shutdown(), this)
 
-        CoreUtils.swallow(forwardingManager.foreach(_.shutdown()), this)
-
-        if (autoTopicCreationManager != null)
-          CoreUtils.swallow(autoTopicCreationManager.shutdown(), this)
+        CoreUtils.swallow(clientToControllerChannelManager.foreach(_.shutdown()), this)
 
         if (logManager != null)
           CoreUtils.swallow(logManager.shutdown(), this)