You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bo...@apache.org on 2021/03/15 15:47:40 UTC
[kafka] branch trunk updated: KAFKA-10348: Share client channel
between forwarding and auto creation manager (#10135)
This is an automated email from the ASF dual-hosted git repository.
boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 802ee10 KAFKA-10348: Share client channel between forwarding and auto creation manager (#10135)
802ee10 is described below
commit 802ee10bfd802735f7d4d1d7b1f1212e5ae0c4b2
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Mon Mar 15 08:45:57 2021 -0700
KAFKA-10348: Share client channel between forwarding and auto creation manager (#10135)
share forwarding channel and auto topic creation channel.
Reviewers: dengziming <de...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/server/AutoTopicCreationManager.scala | 34 +---------------------
.../src/main/scala/kafka/server/BrokerServer.scala | 21 +++++--------
.../scala/kafka/server/ForwardingManager.scala | 32 +-------------------
core/src/main/scala/kafka/server/KafkaServer.scala | 34 +++++++++++-----------
4 files changed, 27 insertions(+), 94 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index f8063a8..bac9fb7 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -32,10 +32,8 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreateableTopicConfig, CreateableTopicConfigCollection}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
-import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest}
-import org.apache.kafka.common.utils.Time
import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._
@@ -46,10 +44,6 @@ trait AutoTopicCreationManager {
topicNames: Set[String],
controllerMutationQuota: ControllerMutationQuota
): Seq[MetadataResponseTopic]
-
- def start(): Unit
-
- def shutdown(): Unit
}
object AutoTopicCreationManager {
@@ -57,30 +51,13 @@ object AutoTopicCreationManager {
def apply(
config: KafkaConfig,
metadataCache: MetadataCache,
- time: Time,
- metrics: Metrics,
threadNamePrefix: Option[String],
+ channelManager: Option[BrokerToControllerChannelManager],
adminManager: Option[ZkAdminManager],
controller: Option[KafkaController],
groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator,
- enableForwarding: Boolean
): AutoTopicCreationManager = {
-
- val channelManager =
- if (enableForwarding)
- Some(new BrokerToControllerChannelManagerImpl(
- controllerNodeProvider = MetadataCacheControllerNodeProvider(
- config, metadataCache),
- time = time,
- metrics = metrics,
- config = config,
- channelName = "autoTopicCreationChannel",
- threadNamePrefix = threadNamePrefix,
- retryTimeoutMs = config.requestTimeoutMs.longValue
- ))
- else
- None
new DefaultAutoTopicCreationManager(config, channelManager, adminManager,
controller, groupCoordinator, txnCoordinator)
}
@@ -100,15 +77,6 @@ class DefaultAutoTopicCreationManager(
private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
- override def start(): Unit = {
- channelManager.foreach(_.start())
- }
-
- override def shutdown(): Unit = {
- channelManager.foreach(_.shutdown())
- inflightTopics.clear()
- }
-
override def createTopics(
topics: Set[String],
controllerMutationQuota: ControllerMutationQuota
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 0ce1975..8689afd 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -102,6 +102,8 @@ class BrokerServer(
var transactionCoordinator: TransactionCoordinator = null
+ var clientToControllerChannelManager: BrokerToControllerChannelManager = null
+
var forwardingManager: ForwardingManager = null
var alterIsrManager: AlterIsrManager = null
@@ -179,17 +181,16 @@ class BrokerServer(
val controllerNodes = RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
- val forwardingChannelManager = BrokerToControllerChannelManager(
+ clientToControllerChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider,
time,
metrics,
config,
- channelName = "forwarding",
+ channelName = "controllerForwardingChannel",
threadNamePrefix,
retryTimeoutMs = 60000
)
- forwardingManager = new ForwardingManagerImpl(forwardingChannelManager)
- forwardingManager.start()
+ forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager)
val apiVersionManager = ApiVersionManager(
ListenerType.BROKER,
@@ -245,12 +246,9 @@ class BrokerServer(
new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"),
createTemporaryProducerIdManager, metrics, metadataCache, Time.SYSTEM)
- val autoTopicCreationChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
- time, metrics, config, "autocreate", threadNamePrefix, 60000)
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
- config, Some(autoTopicCreationChannelManager), None, None,
+ config, Some(clientToControllerChannelManager), None, None,
groupCoordinator, transactionCoordinator)
- autoTopicCreationManager.start()
/* Add all reconfigurables for config change notification before starting the metadata listener */
config.dynamicConfig.addReconfigurables(this)
@@ -448,11 +446,8 @@ class BrokerServer(
if (alterIsrManager != null)
CoreUtils.swallow(alterIsrManager.shutdown(), this)
- if (forwardingManager != null)
- CoreUtils.swallow(forwardingManager.shutdown(), this)
-
- if (autoTopicCreationManager != null)
- CoreUtils.swallow(autoTopicCreationManager.shutdown(), this)
+ if (clientToControllerChannelManager != null)
+ CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
if (logManager != null)
CoreUtils.swallow(logManager.shutdown(), this)
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala b/core/src/main/scala/kafka/server/ForwardingManager.scala
index a6e22e1..788dd4c 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -23,10 +23,8 @@ import kafka.network.RequestChannel
import kafka.utils.Logging
import org.apache.kafka.clients.{ClientResponse, NodeApiVersions}
import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse, RequestHeader}
-import org.apache.kafka.common.utils.Time
import scala.compat.java8.OptionConverters._
@@ -37,32 +35,12 @@ trait ForwardingManager {
): Unit
def controllerApiVersions: Option[NodeApiVersions]
-
- def start(): Unit = {}
-
- def shutdown(): Unit = {}
}
object ForwardingManager {
-
def apply(
- config: KafkaConfig,
- metadataCache: MetadataCache,
- time: Time,
- metrics: Metrics,
- threadNamePrefix: Option[String]
+ channelManager: BrokerToControllerChannelManager
): ForwardingManager = {
- val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)
-
- val channelManager = BrokerToControllerChannelManager(
- controllerNodeProvider = nodeProvider,
- time = time,
- metrics = metrics,
- config = config,
- channelName = "forwardingChannel",
- threadNamePrefix = threadNamePrefix,
- retryTimeoutMs = config.requestTimeoutMs.longValue
- )
new ForwardingManagerImpl(channelManager)
}
}
@@ -71,14 +49,6 @@ class ForwardingManagerImpl(
channelManager: BrokerToControllerChannelManager
) extends ForwardingManager with Logging {
- override def start(): Unit = {
- channelManager.start()
- }
-
- override def shutdown(): Unit = {
- channelManager.shutdown()
- }
-
/**
* Forward given request to the active controller.
*
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 90c57f2..13b72ea 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -135,6 +135,8 @@ class KafkaServer(
var autoTopicCreationManager: AutoTopicCreationManager = null
+ var clientToControllerChannelManager: Option[BrokerToControllerChannelManager] = None
+
var alterIsrManager: AlterIsrManager = null
var kafkaScheduler: KafkaScheduler = null
@@ -256,15 +258,19 @@ class KafkaServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
+ /* start forwarding manager */
if (enableForwarding) {
- this.forwardingManager = Some(ForwardingManager(
- config,
- metadataCache,
- time,
- metrics,
- threadNamePrefix
- ))
- forwardingManager.foreach(_.start())
+ val brokerToControllerManager = BrokerToControllerChannelManager(
+ controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache),
+ time = time,
+ metrics = metrics,
+ config = config,
+ channelName = "controllerForwardingChannel",
+ threadNamePrefix = threadNamePrefix,
+ retryTimeoutMs = config.requestTimeoutMs.longValue)
+ brokerToControllerManager.start()
+ this.forwardingManager = Some(ForwardingManager(brokerToControllerManager))
+ clientToControllerChannelManager = Some(brokerToControllerManager)
}
val apiVersionManager = ApiVersionManager(
@@ -336,16 +342,13 @@ class KafkaServer(
this.autoTopicCreationManager = AutoTopicCreationManager(
config,
metadataCache,
- time,
- metrics,
threadNamePrefix,
+ clientToControllerChannelManager,
Some(adminManager),
Some(kafkaController),
groupCoordinator,
- transactionCoordinator,
- enableForwarding
+ transactionCoordinator
)
- autoTopicCreationManager.start()
/* Get the authorizer and initialize it if one is specified.*/
authorizer = config.authorizer
@@ -702,10 +705,7 @@ class KafkaServer(
if (alterIsrManager != null)
CoreUtils.swallow(alterIsrManager.shutdown(), this)
- CoreUtils.swallow(forwardingManager.foreach(_.shutdown()), this)
-
- if (autoTopicCreationManager != null)
- CoreUtils.swallow(autoTopicCreationManager.shutdown(), this)
+ CoreUtils.swallow(clientToControllerChannelManager.foreach(_.shutdown()), this)
if (logManager != null)
CoreUtils.swallow(logManager.shutdown(), this)