You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/07 19:39:40 UTC

[GitHub] [kafka] akhileshchg commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

akhileshchg commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1042536606


##########
clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java:
##########
@@ -43,6 +49,8 @@ protected AbstractControlRequest(ApiKeys api, short version) {
 
     public abstract int controllerId();
 
+    public abstract int kraftControllerId();

Review Comment:
   This field is used currently in handling `UpdateMetadataRequest` in `ZkMetadataCache#updateMetadata`



##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -355,6 +392,13 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea
         case id if id < 0 => None
         case id => Some(id)
       }
+      val kraftControllerIdOpt = updateMetadataRequest.kraftControllerId() match {

Review Comment:
   @dengziming is used here. But will be used in more places as we get more migration-related code.



##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -315,7 +324,35 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea
     }.getOrElse(Map.empty[Int, Node])
   }
 
-  def getControllerId: Option[Int] = metadataSnapshot.controllerId
+  def getControllerId: Option[Int] = {
+    val snapshot = metadataSnapshot
+    snapshot.controllerId.orElse(snapshot.kraftControllerId)
+  }
+
+  /**
+   * Return controller id. Additionally returns if the controller is zk or kraft controller.
+   * Returns true for zk controller, false otherwise.
+   * @return
+   */
+  def getZkOrKRaftControllerId: (Option[Int], Boolean) = {
+    val snapshot = metadataSnapshot
+    snapshot.kraftControllerId match {
+      case Some(controller) => (Some(controller), false)
+      case None => (snapshot.controllerId, true)
+    }
+  }
+
+  override def getControllerIdForExternalClient: Option[Int] = {

Review Comment:
   Added to the `MetadataCache` trait.



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -164,25 +182,54 @@ class BrokerToControllerChannelManagerImpl(
   private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ")
   private val manualMetadataUpdater = new ManualMetadataUpdater()
   private val apiVersions = new ApiVersions()
-  private val requestThread = newRequestThread
+  private var isZkControllerThread = true

Review Comment:
   I thought about it. It is just getting accessed in the `KafkaScheduler` thread. So, I thought it is not needed. But as a caution, I can add it.



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -164,25 +182,54 @@ class BrokerToControllerChannelManagerImpl(
   private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ")
   private val manualMetadataUpdater = new ManualMetadataUpdater()
   private val apiVersions = new ApiVersions()
-  private val requestThread = newRequestThread
+  private var isZkControllerThread = true
+  @volatile private var requestThread = newRequestThread
 
   def start(): Unit = {
     requestThread.start()
+    // If migration is enabled for zkBroker, then we might see controller change from zk to kraft
+    // and vice-versa. This periodic task takes care of setting the right channel when such
+    // controller change is noticed.
+    if (config.migrationEnabled && config.requiresZookeeper) {
+      kafkaScheduler.schedule("maybeReinitializeRequestThread",
+        () => maybeReinitializeRequestThread(), 0L, 500, TimeUnit.MILLISECONDS)
+    }
   }
 
   def shutdown(): Unit = {
     requestThread.shutdown()
     info(s"Broker to controller channel manager for $channelName shutdown")
   }
 
+  def maybeReinitializeRequestThread(): Unit = {
+    val controllerInfo = controllerNodeProvider.getControllerInfo()
+    if (isZkControllerThread == controllerInfo.isZkController)
+      return
+    val oldRequestThread = requestThread
+    // Close the old thread and start a new one if old one is not already in the process of
+    // shutting down.
+    if (oldRequestThread.isRunning) {

Review Comment:
   Please let me know if this looks more clear to you.



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -164,25 +182,54 @@ class BrokerToControllerChannelManagerImpl(
   private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ")
   private val manualMetadataUpdater = new ManualMetadataUpdater()
   private val apiVersions = new ApiVersions()
-  private val requestThread = newRequestThread
+  private var isZkControllerThread = true
+  @volatile private var requestThread = newRequestThread
 
   def start(): Unit = {
     requestThread.start()
+    // If migration is enabled for zkBroker, then we might see controller change from zk to kraft
+    // and vice-versa. This periodic task takes care of setting the right channel when such
+    // controller change is noticed.
+    if (config.migrationEnabled && config.requiresZookeeper) {
+      kafkaScheduler.schedule("maybeReinitializeRequestThread",
+        () => maybeReinitializeRequestThread(), 0L, 500, TimeUnit.MILLISECONDS)
+    }
   }
 
   def shutdown(): Unit = {
     requestThread.shutdown()
     info(s"Broker to controller channel manager for $channelName shutdown")
   }
 
+  def maybeReinitializeRequestThread(): Unit = {
+    val controllerInfo = controllerNodeProvider.getControllerInfo()
+    if (isZkControllerThread == controllerInfo.isZkController)
+      return
+    val oldRequestThread = requestThread
+    // Close the old thread and start a new one if old one is not already in the process of
+    // shutting down.
+    if (oldRequestThread.isRunning) {

Review Comment:
   It basically is a check for seeing if `shutdown` ran first. While thinking about this I realized, we might just keep running the scheduler task indefinitely. Let me fix that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org