You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/12/21 19:17:50 UTC

[kafka] branch trunk updated: KAFKA-14446: code style improvements for broker-to-controller forwarding (#13001)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 755e04a41df KAFKA-14446: code style improvements for broker-to-controller forwarding (#13001)
755e04a41df is described below

commit 755e04a41dfd00dd4587b0fe0980befd0ae5c433
Author: Akhilesh C <ak...@users.noreply.github.com>
AuthorDate: Wed Dec 21 11:17:27 2022 -0800

    KAFKA-14446: code style improvements for broker-to-controller forwarding (#13001)
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, Ismael Juma <is...@juma.me.uk>, David Arthur <mu...@gmail.com>
---
 .../server/BrokerToControllerChannelManager.scala  | 46 ++++++++++++--------
 core/src/main/scala/kafka/server/KafkaServer.scala | 14 +++---
 .../main/scala/kafka/server/MetadataCache.scala    |  9 ++--
 .../BrokerToControllerRequestThreadTest.scala      | 50 +++++++++++++---------
 4 files changed, 70 insertions(+), 49 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 3446f83b647..b1c27287560 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -38,11 +38,13 @@ import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
-case class ControllerInformation(node: Option[Node],
-                                 listenerName: ListenerName,
-                                 securityProtocol: SecurityProtocol,
-                                 saslMechanism: String,
-                                 isZkController: Boolean)
+case class ControllerInformation(
+  node: Option[Node],
+  listenerName: ListenerName,
+  securityProtocol: SecurityProtocol,
+  saslMechanism: String,
+  isZkController: Boolean
+)
 
 trait ControllerNodeProvider {
   def getControllerInfo(): ControllerInformation
@@ -190,7 +192,7 @@ class BrokerToControllerChannelManagerImpl(
   }
 
   private[server] def newRequestThread = {
-    def networkClient(controllerInfo: ControllerInformation) = {
+    def buildNetworkClient(controllerInfo: ControllerInformation) = {
       val channelBuilder = ChannelBuilders.clientChannelBuilder(
         controllerInfo.securityProtocol,
         JaasContext.Type.SERVER,
@@ -239,8 +241,11 @@ class BrokerToControllerChannelManagerImpl(
       case Some(name) => s"$name:BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName"
     }
 
+    val controllerInformation = controllerNodeProvider.getControllerInfo()
     new BrokerToControllerRequestThread(
-      networkClient,
+      buildNetworkClient(controllerInformation),
+      controllerInformation.isZkController,
+      buildNetworkClient,
       manualMetadataUpdater,
       controllerNodeProvider,
       config,
@@ -290,6 +295,8 @@ case class BrokerToControllerQueueItem(
 )
 
 class BrokerToControllerRequestThread(
+  initialNetworkClient: KafkaClient,
+  var isNetworkClientForZkController: Boolean,
   networkClientFactory: ControllerInformation => KafkaClient,
   metadataUpdater: ManualMetadataUpdater,
   controllerNodeProvider: ControllerNodeProvider,
@@ -297,22 +304,24 @@ class BrokerToControllerRequestThread(
   time: Time,
   threadName: String,
   retryTimeoutMs: Long
-) extends InterBrokerSendThread(threadName, null, Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, time, isInterruptible = false) {
-
-  var isZkController = false
-  private def maybeResetNetworkClient(controllerInformation: ControllerInformation,
-                                      initialize: Boolean = false): Unit = {
-    if (initialize || isZkController != controllerInformation.isZkController) {
-      if (!initialize) {
-        debug("Controller changed to " + (if (isZkController) "zk" else "kraft") + " mode. " +
-          "Resetting network client")
-      }
+) extends InterBrokerSendThread(
+  threadName,
+  initialNetworkClient,
+  Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt,
+  time,
+  isInterruptible = false
+) {
+
+  private def maybeResetNetworkClient(controllerInformation: ControllerInformation): Unit = {
+    if (isNetworkClientForZkController != controllerInformation.isZkController) {
+      debug("Controller changed to " + (if (isNetworkClientForZkController) "kraft" else "zk") + " mode. " +
+        "Resetting network client")
       // Close existing network client.
       if (networkClient != null) {
         networkClient.initiateClose()
         networkClient.close()
       }
-      isZkController = controllerInformation.isZkController
+      isNetworkClientForZkController = controllerInformation.isZkController
       updateControllerAddress(controllerInformation.node.orNull)
       controllerInformation.node.foreach(n => metadataUpdater.setNodes(Seq(n).asJava))
       networkClient = networkClientFactory(controllerInformation)
@@ -321,7 +330,6 @@ class BrokerToControllerRequestThread(
 
   private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
   private val activeController = new AtomicReference[Node](null)
-  maybeResetNetworkClient(controllerNodeProvider.getControllerInfo(), initialize = true)
 
   // Used for testing
   @volatile
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f8e449d8d1e..335e48f25ee 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -151,7 +151,7 @@ class KafkaServer(
 
   var kafkaScheduler: KafkaScheduler = _
 
-  var kraftControllerNodes: Seq[Node] = Seq.empty
+  var kraftControllerNodes: Seq[Node] = _
   @volatile var metadataCache: ZkMetadataCache = _
   var quotaManagers: QuotaFactory.QuotaManagers = _
 
@@ -282,6 +282,8 @@ class KafkaServer(
         if (config.migrationEnabled) {
           kraftControllerNodes = RaftConfig.voterConnectionsToNodes(
             RaftConfig.parseVoterConnections(config.quorumVoters)).asScala
+        } else {
+          kraftControllerNodes = Seq.empty
         }
         metadataCache = MetadataCache.zkMetadataCache(
           config.brokerId,
@@ -707,9 +709,9 @@ class KafkaServer(
 
           // 1. Find the controller and establish a connection to it.
           // If the controller id or the broker registration are missing, we sleep and retry (if there are remaining retries)
-          metadataCache.getControllerId.filter(_.isInstanceOf[ZkCachedControllerId]).map(_.id) match {
-            case Some(controllerId) =>
-              metadataCache.getAliveBrokerNode(controllerId, config.interBrokerListenerName) match {
+          metadataCache.getControllerId match {
+            case Some(controllerId: ZkCachedControllerId)  =>
+              metadataCache.getAliveBrokerNode(controllerId.id, config.interBrokerListenerName) match {
                 case Some(broker) =>
                   // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
                   // attempt, connect to the most recent controller
@@ -726,8 +728,8 @@ class KafkaServer(
                 case None =>
                   info(s"Broker registration for controller $controllerId is not available in the metadata cache")
               }
-            case None =>
-              info("No controller present in the metadata cache")
+            case Some(_: KRaftCachedControllerId) | None =>
+              info("No zk controller present in the metadata cache")
           }
 
           // 2. issue a controlled shutdown to the controller
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index e0501ef1ebe..342b23cec4b 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -32,6 +32,10 @@ case class FinalizedFeaturesAndEpoch(features: Map[String, Short], epoch: Long)
   }
 }
 
+/**
+ * Used to represent the controller id cached in the metadata cache of the broker. This trait is
+ * extended to represent if the controller is KRaft controller or Zk controller.
+ */
 sealed trait CachedControllerId {
   val id: Int
 }
@@ -118,10 +122,9 @@ object MetadataCache {
   def zkMetadataCache(brokerId: Int,
                       metadataVersion: MetadataVersion,
                       brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty(),
-                      kraftControllerNodes: collection.Seq[Node] = null)
+                      kraftControllerNodes: collection.Seq[Node] = collection.Seq.empty[Node])
   : ZkMetadataCache = {
-    new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures,
-      Option(kraftControllerNodes).getOrElse(collection.Seq.empty[Node]))
+    new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, kraftControllerNodes)
   }
 
   def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = {
diff --git a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
index eea5c7517a0..d62bf74780a 100644
--- a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
@@ -57,8 +57,9 @@ class BrokerToControllerRequestThreadTest {
     when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
 
     val retryTimeoutMs = 30000
-    val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
-      config, time, "", retryTimeoutMs)
+    val testRequestThread = new BrokerToControllerRequestThread(
+      mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
+      controllerNodeProvider, config, time, "", retryTimeoutMs)
     testRequestThread.started = true
 
     val completionHandler = new TestControllerRequestCompletionHandler(None)
@@ -94,8 +95,9 @@ class BrokerToControllerRequestThreadTest {
     when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
 
     val expectedResponse = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
-    val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
-      config, time, "", retryTimeoutMs = Long.MaxValue)
+    val testRequestThread = new BrokerToControllerRequestThread(
+      mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
+      controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
     mockClient.prepareResponse(expectedResponse)
 
@@ -134,10 +136,11 @@ class BrokerToControllerRequestThreadTest {
     val newController = new Node(newControllerId, "host2", 1234)
 
     when(controllerNodeProvider.getControllerInfo()).thenReturn(
-      emptyControllerInfo, controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
+      controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
 
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
-    val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(),
+    val testRequestThread = new BrokerToControllerRequestThread(
+      mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
@@ -182,14 +185,15 @@ class BrokerToControllerRequestThreadTest {
     val newController = new Node(newControllerId, "host2", port)
 
     when(controllerNodeProvider.getControllerInfo()).thenReturn(
-      emptyControllerInfo, controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
+      controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
 
     val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
       Collections.singletonMap("a", Errors.NOT_CONTROLLER),
       Collections.singletonMap("a", 2))
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
-    val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
-      config, time, "", retryTimeoutMs = Long.MaxValue)
+    val testRequestThread = new BrokerToControllerRequestThread(
+      mockClient, isNetworkClientForZkController = true,_ => mockClient, new ManualMetadataUpdater(),
+      controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
     val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
@@ -243,7 +247,6 @@ class BrokerToControllerRequestThreadTest {
     val newController = new Node(newControllerId, "host2", port)
 
     when(controllerNodeProvider.getControllerInfo()).thenReturn(
-      emptyControllerInfo,                  // call to create network client.
       controllerInfo(Some(oldController)),
       controllerInfo(Some(newController))
     )
@@ -255,8 +258,9 @@ class BrokerToControllerRequestThreadTest {
     // response for retry request after receiving NOT_CONTROLLER error
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
 
-    val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
-      config, time, "", retryTimeoutMs = Long.MaxValue)
+    val testRequestThread = new BrokerToControllerRequestThread(
+      mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
+      controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
     val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
@@ -311,14 +315,15 @@ class BrokerToControllerRequestThreadTest {
     val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
     val controller = new Node(controllerId, "host1", 1234)
 
-    when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo, controllerInfo(Some(controller)))
+    when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(controller)))
 
     val retryTimeoutMs = 30000
     val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
       Collections.singletonMap("a", Errors.NOT_CONTROLLER),
       Collections.singletonMap("a", 2))
-    val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
-      config, time, "", retryTimeoutMs)
+    val testRequestThread = new BrokerToControllerRequestThread(
+      mockClient, isNetworkClientForZkController = true,_ => mockClient, new ManualMetadataUpdater(),
+      controllerNodeProvider, config, time, "", retryTimeoutMs)
     testRequestThread.started = true
 
     val completionHandler = new TestControllerRequestCompletionHandler()
@@ -375,8 +380,9 @@ class BrokerToControllerRequestThreadTest {
 
     mockClient.prepareUnsupportedVersionResponse(request => request.apiKey == ApiKeys.METADATA)
 
-    val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
-      config, time, "", retryTimeoutMs = Long.MaxValue)
+    val testRequestThread = new BrokerToControllerRequestThread(
+      mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
+      controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
     testRequestThread.enqueue(queueItem)
@@ -412,8 +418,9 @@ class BrokerToControllerRequestThreadTest {
 
     mockClient.createPendingAuthenticationError(activeController, 50)
 
-    val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
-      config, time, "", retryTimeoutMs = Long.MaxValue)
+    val testRequestThread = new BrokerToControllerRequestThread(
+      mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
+      controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
     testRequestThread.enqueue(queueItem)
@@ -433,8 +440,9 @@ class BrokerToControllerRequestThreadTest {
     val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
     when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
 
-    val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
-      config, time, "", retryTimeoutMs = Long.MaxValue)
+    val testRequestThread = new BrokerToControllerRequestThread(
+      mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
+      controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
 
     val completionHandler = new TestControllerRequestCompletionHandler(None)
     val queueItem = BrokerToControllerQueueItem(