You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/17 03:06:24 UTC

[GitHub] [kafka] abbccdda opened a new pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

abbccdda opened a new pull request #10135:
URL: https://github.com/apache/kafka/pull/10135


   We want to consolidate forwarding and auto creation channel into one channel to reduce the unnecessary connections maintained between brokers and controller.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10135:
URL: https://github.com/apache/kafka/pull/10135#discussion_r577312082



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -134,6 +134,8 @@ class KafkaServer(
 
   var autoTopicCreationManager: AutoTopicCreationManager = null
 
+  var clientToControllerChannelManager: Option[BrokerToControllerChannelManager] = None
+
   var alterIsrManager: AlterIsrManager = null

Review comment:
       Hello, I have a question, should we also share the channel between alterIsrManager and autoCreationManager, furthermore, also share the same one with alterReplicaStateManager proposed in KIP-589.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10135:
URL: https://github.com/apache/kafka/pull/10135#discussion_r592887105



##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -72,11 +54,9 @@ class ForwardingManagerImpl(
 ) extends ForwardingManager with Logging {
 
   override def start(): Unit = {

Review comment:
       Similarly, we don't need these if they are not doing anything.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -101,11 +82,9 @@ class DefaultAutoTopicCreationManager(
   private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
 
   override def start(): Unit = {

Review comment:
       It seems like we don't have a strong need for this anymore. We could probably get rid of `shutdown` as well since there's probably not a strong reason to clear the inflight topics if the manager won't be reused anyway.

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -256,13 +258,22 @@ class KafkaServer(
         tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
         credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
 
+        /* start forwarding manager */
         if (enableForwarding) {
+          clientToControllerChannelManager =
+            Some(
+              BrokerToControllerChannelManager(
+                controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache),
+                time = time,
+                metrics = metrics,
+                config = config,
+                channelName = "clientToControllerChannel",
+                threadNamePrefix = threadNamePrefix,
+                retryTimeoutMs = config.requestTimeoutMs.longValue)
+            )
+          clientToControllerChannelManager.get.start()
           this.forwardingManager = Some(ForwardingManager(
-            config,
-            metadataCache,
-            time,
-            metrics,
-            threadNamePrefix
+            clientToControllerChannelManager.get

Review comment:
       nit: this `get` here is a little ugly. We could create a local `val` when we construct `BrokerToControllerChannelManager`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10135:
URL: https://github.com/apache/kafka/pull/10135#discussion_r580879641



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -134,6 +134,8 @@ class KafkaServer(
 
   var autoTopicCreationManager: AutoTopicCreationManager = null
 
+  var clientToControllerChannelManager: Option[BrokerToControllerChannelManager] = None
+
   var alterIsrManager: AlterIsrManager = null

Review comment:
       Looks good, I will try to file a ticket to consolidate broker to controller channel. 😉
   It seems that the `BrokerServer` also holds AutoTopicCreationManager and ForwardingManager, so it also need to be changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10135:
URL: https://github.com/apache/kafka/pull/10135#discussion_r577322703



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -134,6 +134,8 @@ class KafkaServer(
 
   var autoTopicCreationManager: AutoTopicCreationManager = null
 
+  var clientToControllerChannelManager: Option[BrokerToControllerChannelManager] = None
+
   var alterIsrManager: AlterIsrManager = null

Review comment:
       We are planning to consolidate into two channels eventually:
   1. broker to controller channel
   2. client to controller channel
   
   here, auto topic creation and forwarding fall into the 2nd category, while AlterIsr would be the 1st category.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10135:
URL: https://github.com/apache/kafka/pull/10135#discussion_r593485718



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -179,17 +181,16 @@ class BrokerServer(
       val controllerNodes = RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
       val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
 
-      val forwardingChannelManager = BrokerToControllerChannelManager(
+      clientToControllerChannelManager = BrokerToControllerChannelManager(
         controllerNodeProvider,
         time,
         metrics,
         config,
-        channelName = "forwarding",
+        channelName = "clientToControllerChannel",

Review comment:
       How about "controllerForwardingChannel"? I think it fits for both cases we're handling.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda merged pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

Posted by GitBox <gi...@apache.org>.
abbccdda merged pull request #10135:
URL: https://github.com/apache/kafka/pull/10135


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10135:
URL: https://github.com/apache/kafka/pull/10135#discussion_r593492752



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -179,17 +181,16 @@ class BrokerServer(
       val controllerNodes = RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
       val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
 
-      val forwardingChannelManager = BrokerToControllerChannelManager(
+      clientToControllerChannelManager = BrokerToControllerChannelManager(
         controllerNodeProvider,
         time,
         metrics,
         config,
-        channelName = "forwarding",
+        channelName = "clientToControllerChannel",

Review comment:
       Sg!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org