You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/09/11 18:04:45 UTC

[kafka] branch trunk updated: MINOR: rename BrokerToControllerChannelManager to NodeToControllerChannelManager (#14356)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 39cc19c9924 MINOR: rename BrokerToControllerChannelManager to NodeToControllerChannelManager (#14356)
39cc19c9924 is described below

commit 39cc19c9924cd5589dc5b98b75ec8d380c159205
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Mon Sep 11 11:04:38 2023 -0700

    MINOR: rename BrokerToControllerChannelManager to NodeToControllerChannelManager (#14356)
    
    Reviewers: David Jacot <da...@gmail.com>
---
 .../transaction/ProducerIdManager.scala            |  6 ++--
 .../main/scala/kafka/network/RequestChannel.scala  |  2 +-
 .../scala/kafka/raft/KafkaNetworkChannel.scala     |  2 +-
 .../scala/kafka/server/AlterPartitionManager.scala |  6 ++--
 .../kafka/server/AutoTopicCreationManager.scala    |  4 +--
 .../kafka/server/BrokerLifecycleManager.scala      |  8 ++---
 .../src/main/scala/kafka/server/BrokerServer.scala |  6 ++--
 .../server/ControllerRegistrationManager.scala     |  4 +--
 .../main/scala/kafka/server/ControllerServer.scala |  4 +--
 .../scala/kafka/server/ForwardingManager.scala     |  4 +--
 core/src/main/scala/kafka/server/KafkaBroker.scala |  2 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |  8 ++---
 ....scala => NodeToControllerChannelManager.scala} | 32 ++++++++---------
 ...ala => NodeToControllerRequestThreadTest.scala} | 40 +++++++++++-----------
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  2 +-
 .../transaction/ProducerIdManagerTest.scala        |  4 +--
 .../kafka/server/AlterPartitionManagerTest.scala   | 10 +++---
 .../server/AutoTopicCreationManagerTest.scala      |  2 +-
 .../server/BrokerRegistrationRequestTest.scala     | 10 +++---
 .../unit/kafka/server/ForwardingManagerTest.scala  |  2 +-
 ...la => MockNodeToControllerChannelManager.scala} | 10 +++---
 .../kafka/server/RegistrationTestContext.scala     |  2 +-
 22 files changed, 85 insertions(+), 85 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 1e2b6ffac5a..9e7b88d2ed7 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -17,7 +17,7 @@
 package kafka.coordinator.transaction
 
 import kafka.coordinator.transaction.ProducerIdManager.{IterationLimit, NoRetry, RetryBackoffMs}
-import kafka.server.{BrokerToControllerChannelManager, ControllerRequestCompletionHandler}
+import kafka.server.{NodeToControllerChannelManager, ControllerRequestCompletionHandler}
 import kafka.utils.Logging
 import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
 import org.apache.kafka.clients.ClientResponse
@@ -56,7 +56,7 @@ object ProducerIdManager {
   def rpc(brokerId: Int,
           time: Time,
           brokerEpochSupplier: () => Long,
-          controllerChannel: BrokerToControllerChannelManager): RPCProducerIdManager = {
+          controllerChannel: NodeToControllerChannelManager): RPCProducerIdManager = {
 
     new RPCProducerIdManager(brokerId, time, brokerEpochSupplier, controllerChannel)
   }
@@ -162,7 +162,7 @@ class ZkProducerIdManager(brokerId: Int, zkClient: KafkaZkClient) extends Produc
 class RPCProducerIdManager(brokerId: Int,
                            time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
+                           controllerChannel: NodeToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 477f02a9c98..2521e581aba 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -140,7 +140,7 @@ object RequestChannel extends Logging {
         case Some(request) =>
           val envelopeResponse = if (shouldReturnNotController(abstractResponse)) {
             // Since it's a NOT_CONTROLLER error response, we need to make envelope response with NOT_CONTROLLER error
-            // to notify the requester (i.e. BrokerToControllerRequestThread) to update active controller
+            // to notify the requester (i.e. NodeToControllerRequestThread) to update active controller
             new EnvelopeResponse(new EnvelopeResponseData()
               .setErrorCode(Errors.NOT_CONTROLLER.code()))
           } else {
diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
index 27a489b72a0..7c00961d1dc 100644
--- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
+++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
@@ -128,7 +128,7 @@ class KafkaNetworkChannel(
       } else if (clientResponse.authenticationException != null) {
         // For now we treat authentication errors as retriable. We use the
         // `NETWORK_EXCEPTION` error code for lack of a good alternative.
-        // Note that `BrokerToControllerChannelManager` will still log the
+        // Note that `NodeToControllerChannelManager` will still log the
         // authentication errors so that users have a chance to fix the problem.
         error(s"Request $request failed due to authentication error",
           clientResponse.authenticationException)
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index ec90f1ba637..1c5021b69bf 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -84,7 +84,7 @@ object AlterPartitionManager {
     threadNamePrefix: String,
     brokerEpochSupplier: () => Long,
   ): AlterPartitionManager = {
-    val channelManager = BrokerToControllerChannelManager(
+    val channelManager = NodeToControllerChannelManager(
       controllerNodeProvider,
       time = time,
       metrics = metrics,
@@ -116,7 +116,7 @@ object AlterPartitionManager {
 }
 
 class DefaultAlterPartitionManager(
-  val controllerChannelManager: BrokerToControllerChannelManager,
+  val controllerChannelManager: NodeToControllerChannelManager,
   val scheduler: Scheduler,
   val time: Time,
   val brokerId: Int,
@@ -200,7 +200,7 @@ class DefaultAlterPartitionManager(
             if (response.authenticationException != null) {
               // For now we treat authentication errors as retriable. We use the
               // `NETWORK_EXCEPTION` error code for lack of a good alternative.
-              // Note that `BrokerToControllerChannelManager` will still log the
+              // Note that `NodeToControllerChannelManager` will still log the
               // authentication errors so that users have a chance to fix the problem.
               Errors.NETWORK_EXCEPTION
             } else if (response.versionMismatch != null) {
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index c9dfa019595..25e934d888c 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -53,7 +53,7 @@ object AutoTopicCreationManager {
     config: KafkaConfig,
     metadataCache: MetadataCache,
     threadNamePrefix: Option[String],
-    channelManager: Option[BrokerToControllerChannelManager],
+    channelManager: Option[NodeToControllerChannelManager],
     adminManager: Option[ZkAdminManager],
     controller: Option[KafkaController],
     groupCoordinator: GroupCoordinator,
@@ -66,7 +66,7 @@ object AutoTopicCreationManager {
 
 class DefaultAutoTopicCreationManager(
   config: KafkaConfig,
-  channelManager: Option[BrokerToControllerChannelManager],
+  channelManager: Option[NodeToControllerChannelManager],
   adminManager: Option[ZkAdminManager],
   controller: Option[KafkaController],
   groupCoordinator: GroupCoordinator,
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 9d4682182a7..c17fe7c0c38 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -185,7 +185,7 @@ class BrokerLifecycleManager(
    * The channel manager, or null if this manager has not been started yet.  This variable
    * can only be read or written from the event queue thread.
    */
-  private var _channelManager: BrokerToControllerChannelManager = _
+  private var _channelManager: NodeToControllerChannelManager = _
 
   /**
    * The event queue.
@@ -199,11 +199,11 @@ class BrokerLifecycleManager(
    * Start the BrokerLifecycleManager.
    *
    * @param highestMetadataOffsetProvider Provides the current highest metadata offset.
-   * @param channelManager                The brokerToControllerChannelManager to use.
+   * @param channelManager                The NodeToControllerChannelManager to use.
    * @param clusterId                     The cluster ID.
    */
   def start(highestMetadataOffsetProvider: () => Long,
-            channelManager: BrokerToControllerChannelManager,
+            channelManager: NodeToControllerChannelManager,
             clusterId: String,
             advertisedListeners: ListenerCollection,
             supportedFeatures: util.Map[String, VersionRange]): Unit = {
@@ -271,7 +271,7 @@ class BrokerLifecycleManager(
   }
 
   private class StartupEvent(highestMetadataOffsetProvider: () => Long,
-                     channelManager: BrokerToControllerChannelManager,
+                     channelManager: NodeToControllerChannelManager,
                      clusterId: String,
                      advertisedListeners: ListenerCollection,
                      supportedFeatures: util.Map[String, VersionRange]) extends EventQueue.Event {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 6decbcfd2ec..63c1889bc83 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -119,7 +119,7 @@ class BrokerServer(
 
   var transactionCoordinator: TransactionCoordinator = _
 
-  var clientToControllerChannelManager: BrokerToControllerChannelManager = _
+  var clientToControllerChannelManager: NodeToControllerChannelManager = _
 
   var forwardingManager: ForwardingManager = _
 
@@ -215,7 +215,7 @@ class BrokerServer(
       val controllerNodes = RaftConfig.voterConnectionsToNodes(voterConnections).asScala
       val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
 
-      clientToControllerChannelManager = BrokerToControllerChannelManager(
+      clientToControllerChannelManager = NodeToControllerChannelManager(
         controllerNodeProvider,
         time,
         metrics,
@@ -320,7 +320,7 @@ class BrokerServer(
           k -> VersionRange.of(v.min, v.max)
       }.asJava
 
-      val brokerLifecycleChannelManager = BrokerToControllerChannelManager(
+      val brokerLifecycleChannelManager = NodeToControllerChannelManager(
         controllerNodeProvider,
         time,
         metrics,
diff --git a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
index b2284efa9c6..2732c3b737f 100644
--- a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
@@ -113,7 +113,7 @@ class ControllerRegistrationManager(
    * The channel manager, or null if this manager has not been started yet.  This variable
    * can only be read or written from the event queue thread.
    */
-  private var _channelManager: BrokerToControllerChannelManager = _
+  private var _channelManager: NodeToControllerChannelManager = _
 
   /**
    * The event queue.
@@ -142,7 +142,7 @@ class ControllerRegistrationManager(
    *
    * @param channelManager                The channel manager to use.
    */
-  def start(channelManager: BrokerToControllerChannelManager): Unit = {
+  def start(channelManager: NodeToControllerChannelManager): Unit = {
     eventQueue.append(() => {
       try {
         info(s"initialized channel manager.")
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 36d41cf822a..4f43b71d45d 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -122,7 +122,7 @@ class ControllerServer(
   @volatile var registrationsPublisher: ControllerRegistrationsPublisher = _
   @volatile var incarnationId: Uuid = _
   @volatile var registrationManager: ControllerRegistrationManager = _
-  @volatile var registrationChannelManager: BrokerToControllerChannelManager = _
+  @volatile var registrationChannelManager: NodeToControllerChannelManager = _
 
   private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
     lock.lock()
@@ -423,7 +423,7 @@ class ControllerServer(
        * Start the KIP-919 controller registration manager.
        */
       val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes.asScala)
-      registrationChannelManager = BrokerToControllerChannelManager(
+      registrationChannelManager = NodeToControllerChannelManager(
         controllerNodeProvider,
         time,
         metrics,
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala b/core/src/main/scala/kafka/server/ForwardingManager.scala
index 686a9abb773..b0b13dfecda 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -83,7 +83,7 @@ trait ForwardingManager {
 
 object ForwardingManager {
   def apply(
-    channelManager: BrokerToControllerChannelManager
+    channelManager: NodeToControllerChannelManager
   ): ForwardingManager = {
     new ForwardingManagerImpl(channelManager)
   }
@@ -104,7 +104,7 @@ object ForwardingManager {
 }
 
 class ForwardingManagerImpl(
-  channelManager: BrokerToControllerChannelManager
+  channelManager: NodeToControllerChannelManager
 ) extends ForwardingManager with Logging {
 
   override def forwardRequest(
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala
index 6b0cc2989de..ea1d6cf8ed0 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -94,7 +94,7 @@ trait KafkaBroker extends Logging {
   def shutdown(): Unit
   def brokerTopicStats: BrokerTopicStats
   def credentialProvider: CredentialProvider
-  def clientToControllerChannelManager: BrokerToControllerChannelManager
+  def clientToControllerChannelManager: NodeToControllerChannelManager
   def tokenCache: DelegationTokenCache
 
   private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 091391e10ca..7ca3373549d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -150,7 +150,7 @@ class KafkaServer(
 
   var autoTopicCreationManager: AutoTopicCreationManager = _
 
-  var clientToControllerChannelManager: BrokerToControllerChannelManager = _
+  var clientToControllerChannelManager: NodeToControllerChannelManager = _
 
   var alterPartitionManager: AlterPartitionManager = _
 
@@ -307,7 +307,7 @@ class KafkaServer(
         tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
         credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
 
-        clientToControllerChannelManager = BrokerToControllerChannelManager(
+        clientToControllerChannelManager = NodeToControllerChannelManager(
           controllerNodeProvider = controllerNodeProvider,
           time = time,
           metrics = metrics,
@@ -319,7 +319,7 @@ class KafkaServer(
         clientToControllerChannelManager.start()
 
         /* start forwarding manager */
-        var autoTopicCreationChannel = Option.empty[BrokerToControllerChannelManager]
+        var autoTopicCreationChannel = Option.empty[NodeToControllerChannelManager]
         if (enableForwarding) {
           this.forwardingManager = Some(ForwardingManager(clientToControllerChannelManager))
           autoTopicCreationChannel = Some(clientToControllerChannelManager)
@@ -402,7 +402,7 @@ class KafkaServer(
           )
           val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
           val quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
-          val brokerToQuorumChannelManager = BrokerToControllerChannelManager(
+          val brokerToQuorumChannelManager = NodeToControllerChannelManager(
             controllerNodeProvider = quorumControllerNodeProvider,
             time = time,
             metrics = metrics,
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
similarity index 93%
rename from core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
rename to core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
index bbf0792fc37..ee69a74aede 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
@@ -130,7 +130,7 @@ class RaftControllerNodeProvider(
       listenerName, securityProtocol, saslMechanism, isZkController = false)
 }
 
-object BrokerToControllerChannelManager {
+object NodeToControllerChannelManager {
   def apply(
     controllerNodeProvider: ControllerNodeProvider,
     time: Time,
@@ -139,8 +139,8 @@ object BrokerToControllerChannelManager {
     channelName: String,
     threadNamePrefix: String,
     retryTimeoutMs: Long
-  ): BrokerToControllerChannelManager = {
-    new BrokerToControllerChannelManagerImpl(
+  ): NodeToControllerChannelManager = {
+    new NodeToControllerChannelManagerImpl(
       controllerNodeProvider,
       time,
       metrics,
@@ -152,7 +152,7 @@ object BrokerToControllerChannelManager {
   }
 }
 
-trait BrokerToControllerChannelManager {
+trait NodeToControllerChannelManager {
   def start(): Unit
   def shutdown(): Unit
   def controllerApiVersions(): Option[NodeApiVersions]
@@ -164,12 +164,12 @@ trait BrokerToControllerChannelManager {
 
 /**
  * This class manages the connection between a broker and the controller. It runs a single
- * [[BrokerToControllerRequestThread]] which uses the broker's metadata cache as its own metadata to find
+ * [[NodeToControllerRequestThread]] which uses the broker's metadata cache as its own metadata to find
  * and connect to the controller. The channel is async and runs the network connection in the background.
  * The maximum number of in-flight requests are set to one to ensure orderly response from the controller, therefore
  * care must be taken to not block on outstanding requests for too long.
  */
-class BrokerToControllerChannelManagerImpl(
+class NodeToControllerChannelManagerImpl(
   controllerNodeProvider: ControllerNodeProvider,
   time: Time,
   metrics: Metrics,
@@ -177,8 +177,8 @@ class BrokerToControllerChannelManagerImpl(
   channelName: String,
   threadNamePrefix: String,
   retryTimeoutMs: Long
-) extends BrokerToControllerChannelManager with Logging {
-  private val logContext = new LogContext(s"[BrokerToControllerChannelManager id=${config.brokerId} name=${channelName}] ")
+) extends NodeToControllerChannelManager with Logging {
+  private val logContext = new LogContext(s"[NodeToControllerChannelManager id=${config.nodeId} name=${channelName}] ")
   private val manualMetadataUpdater = new ManualMetadataUpdater()
   private val apiVersions = new ApiVersions()
   private val requestThread = newRequestThread
@@ -189,7 +189,7 @@ class BrokerToControllerChannelManagerImpl(
 
   def shutdown(): Unit = {
     requestThread.shutdown()
-    info(s"Broker to controller channel manager for $channelName shutdown")
+    info(s"Node to controller channel manager for $channelName shutdown")
   }
 
   private[server] def newRequestThread = {
@@ -240,7 +240,7 @@ class BrokerToControllerChannelManagerImpl(
     val threadName = s"${threadNamePrefix}to-controller-${channelName}-channel-manager"
 
     val controllerInformation = controllerNodeProvider.getControllerInfo()
-    new BrokerToControllerRequestThread(
+    new NodeToControllerRequestThread(
       buildNetworkClient(controllerInformation),
       controllerInformation.isZkController,
       buildNetworkClient,
@@ -263,7 +263,7 @@ class BrokerToControllerChannelManagerImpl(
     request: AbstractRequest.Builder[_ <: AbstractRequest],
     callback: ControllerRequestCompletionHandler
   ): Unit = {
-    requestThread.enqueue(BrokerToControllerQueueItem(
+    requestThread.enqueue(NodeToControllerQueueItem(
       time.milliseconds(),
       request,
       callback
@@ -286,13 +286,13 @@ abstract class ControllerRequestCompletionHandler extends RequestCompletionHandl
   def onTimeout(): Unit
 }
 
-case class BrokerToControllerQueueItem(
+case class NodeToControllerQueueItem(
   createdTimeMs: Long,
   request: AbstractRequest.Builder[_ <: AbstractRequest],
   callback: ControllerRequestCompletionHandler
 )
 
-class BrokerToControllerRequestThread(
+class NodeToControllerRequestThread(
   initialNetworkClient: KafkaClient,
   var isNetworkClientForZkController: Boolean,
   networkClientFactory: ControllerInformation => KafkaClient,
@@ -328,7 +328,7 @@ class BrokerToControllerRequestThread(
     }
   }
 
-  private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
+  private val requestQueue = new LinkedBlockingDeque[NodeToControllerQueueItem]()
   private val activeController = new AtomicReference[Node](null)
 
   // Used for testing
@@ -343,7 +343,7 @@ class BrokerToControllerRequestThread(
     activeController.set(newActiveController)
   }
 
-  def enqueue(request: BrokerToControllerQueueItem): Unit = {
+  def enqueue(request: NodeToControllerQueueItem): Unit = {
     if (!started) {
       throw new IllegalStateException("Cannot enqueue a request if the request thread is not running")
     }
@@ -381,7 +381,7 @@ class BrokerToControllerRequestThread(
     util.Collections.emptyList()
   }
 
-  private[server] def handleResponse(queueItem: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
+  private[server] def handleResponse(queueItem: NodeToControllerQueueItem)(response: ClientResponse): Unit = {
     debug(s"Request ${queueItem.request} received $response")
     if (response.authenticationException != null) {
       error(s"Request ${queueItem.request} failed due to authentication error with controller",
diff --git a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
similarity index 94%
rename from core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
rename to core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
index d62bf74780a..01432bf6f47 100644
--- a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
@@ -36,7 +36,7 @@ import org.junit.jupiter.api.Test
 import org.mockito.Mockito._
 
 
-class BrokerToControllerRequestThreadTest {
+class NodeToControllerRequestThreadTest {
 
   private def controllerInfo(node: Option[Node]): ControllerInformation = {
     ControllerInformation(node, new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true)
@@ -57,13 +57,13 @@ class BrokerToControllerRequestThreadTest {
     when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
 
     val retryTimeoutMs = 30000
-    val testRequestThread = new BrokerToControllerRequestThread(
+    val testRequestThread = new NodeToControllerRequestThread(
       mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs)
     testRequestThread.started = true
 
     val completionHandler = new TestControllerRequestCompletionHandler(None)
-    val queueItem = BrokerToControllerQueueItem(
+    val queueItem = NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
       completionHandler
@@ -95,14 +95,14 @@ class BrokerToControllerRequestThreadTest {
     when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
 
     val expectedResponse = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
-    val testRequestThread = new BrokerToControllerRequestThread(
+    val testRequestThread = new NodeToControllerRequestThread(
       mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
     mockClient.prepareResponse(expectedResponse)
 
     val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
-    val queueItem = BrokerToControllerQueueItem(
+    val queueItem = NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
       completionHandler
@@ -139,13 +139,13 @@ class BrokerToControllerRequestThreadTest {
       controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
 
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
-    val testRequestThread = new BrokerToControllerRequestThread(
+    val testRequestThread = new NodeToControllerRequestThread(
       mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
     val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
-    val queueItem = BrokerToControllerQueueItem(
+    val queueItem = NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
       completionHandler,
@@ -191,13 +191,13 @@ class BrokerToControllerRequestThreadTest {
       Collections.singletonMap("a", Errors.NOT_CONTROLLER),
       Collections.singletonMap("a", 2))
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
-    val testRequestThread = new BrokerToControllerRequestThread(
+    val testRequestThread = new NodeToControllerRequestThread(
       mockClient, isNetworkClientForZkController = true,_ => mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
     val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
-    val queueItem = BrokerToControllerQueueItem(
+    val queueItem = NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()
         .setAllowAutoTopicCreation(true)),
@@ -258,7 +258,7 @@ class BrokerToControllerRequestThreadTest {
     // response for retry request after receiving NOT_CONTROLLER error
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
 
-    val testRequestThread = new BrokerToControllerRequestThread(
+    val testRequestThread = new NodeToControllerRequestThread(
       mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
@@ -271,7 +271,7 @@ class BrokerToControllerRequestThreadTest {
     val envelopeRequestBuilder = new EnvelopeRequest.Builder(ByteBuffer.allocate(0),
       kafkaPrincipalBuilder.serialize(kafkaPrincipal), "client-address".getBytes)
 
-    val queueItem = BrokerToControllerQueueItem(
+    val queueItem = NodeToControllerQueueItem(
       time.milliseconds(),
       envelopeRequestBuilder,
       completionHandler
@@ -321,13 +321,13 @@ class BrokerToControllerRequestThreadTest {
     val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
       Collections.singletonMap("a", Errors.NOT_CONTROLLER),
       Collections.singletonMap("a", 2))
-    val testRequestThread = new BrokerToControllerRequestThread(
+    val testRequestThread = new NodeToControllerRequestThread(
       mockClient, isNetworkClientForZkController = true,_ => mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs)
     testRequestThread.started = true
 
     val completionHandler = new TestControllerRequestCompletionHandler()
-    val queueItem = BrokerToControllerQueueItem(
+    val queueItem = NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()
         .setAllowAutoTopicCreation(true)),
@@ -372,7 +372,7 @@ class BrokerToControllerRequestThreadTest {
       override def onComplete(response: ClientResponse): Unit = callbackResponse.set(response)
     }
 
-    val queueItem = BrokerToControllerQueueItem(
+    val queueItem = NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
       completionHandler
@@ -380,7 +380,7 @@ class BrokerToControllerRequestThreadTest {
 
     mockClient.prepareUnsupportedVersionResponse(request => request.apiKey == ApiKeys.METADATA)
 
-    val testRequestThread = new BrokerToControllerRequestThread(
+    val testRequestThread = new NodeToControllerRequestThread(
       mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
@@ -410,7 +410,7 @@ class BrokerToControllerRequestThreadTest {
       override def onComplete(response: ClientResponse): Unit = callbackResponse.set(response)
     }
 
-    val queueItem = BrokerToControllerQueueItem(
+    val queueItem = NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
       completionHandler
@@ -418,7 +418,7 @@ class BrokerToControllerRequestThreadTest {
 
     mockClient.createPendingAuthenticationError(activeController, 50)
 
-    val testRequestThread = new BrokerToControllerRequestThread(
+    val testRequestThread = new NodeToControllerRequestThread(
       mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
@@ -440,12 +440,12 @@ class BrokerToControllerRequestThreadTest {
     val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
     when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
 
-    val testRequestThread = new BrokerToControllerRequestThread(
+    val testRequestThread = new NodeToControllerRequestThread(
       mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
 
     val completionHandler = new TestControllerRequestCompletionHandler(None)
-    val queueItem = BrokerToControllerQueueItem(
+    val queueItem = NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
       completionHandler
@@ -456,7 +456,7 @@ class BrokerToControllerRequestThreadTest {
   }
 
   private def pollUntil(
-    requestThread: BrokerToControllerRequestThread,
+    requestThread: NodeToControllerRequestThread,
     condition: () => Boolean,
     maxRetries: Int = 10
   ): Unit = {
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 551003cff9c..4f4547c31f3 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -2428,7 +2428,7 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testPartitionShouldRetryAlterPartitionRequest(): Unit = {
-    val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+    val mockChannelManager = mock(classOf[NodeToControllerChannelManager])
     val alterPartitionManager = new DefaultAlterPartitionManager(
       controllerChannelManager = mockChannelManager,
       scheduler = mock(classOf[KafkaScheduler]),
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 73b208196e6..c950d03399b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -17,7 +17,7 @@
 package kafka.coordinator.transaction
 
 import kafka.coordinator.transaction.ProducerIdManager.RetryBackoffMs
-import kafka.server.BrokerToControllerChannelManager
+import kafka.server.NodeToControllerChannelManager
 import kafka.utils.TestUtils
 import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
 import org.apache.kafka.common.KafkaException
@@ -42,7 +42,7 @@ import scala.util.{Failure, Success}
 
 class ProducerIdManagerTest {
 
-  var brokerToController: BrokerToControllerChannelManager = mock(classOf[BrokerToControllerChannelManager])
+  var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
   // Mutable test implementation that lets us easily set the idStart and error
diff --git a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
index 880d48446f8..ccf3909adcc 100644
--- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
@@ -60,7 +60,7 @@ class AlterPartitionManagerTest {
   val metrics = new Metrics
   val brokerId = 1
 
-  var brokerToController: BrokerToControllerChannelManager = _
+  var brokerToController: NodeToControllerChannelManager = _
 
   val tp0 = new TopicIdPartition(topicId, 0, topic)
   val tp1 = new TopicIdPartition(topicId, 1, topic)
@@ -68,7 +68,7 @@ class AlterPartitionManagerTest {
 
   @BeforeEach
   def setup(): Unit = {
-    brokerToController = mock(classOf[BrokerToControllerChannelManager])
+    brokerToController = mock(classOf[NodeToControllerChannelManager])
   }
 
   @ParameterizedTest
@@ -433,7 +433,7 @@ class AlterPartitionManagerTest {
     val controlledEpoch = 0
     val brokerEpoch = 2
     val scheduler = new MockScheduler(time)
-    val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
+    val brokerToController = Mockito.mock(classOf[NodeToControllerChannelManager])
     val alterPartitionManager = new DefaultAlterPartitionManager(
       brokerToController,
       scheduler,
@@ -500,7 +500,7 @@ class AlterPartitionManagerTest {
     val controlledEpoch = 0
     val brokerEpoch = 2
     val scheduler = new MockScheduler(time)
-    val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
+    val brokerToController = Mockito.mock(classOf[NodeToControllerChannelManager])
     val alterPartitionManager = new DefaultAlterPartitionManager(
       brokerToController,
       scheduler,
@@ -557,7 +557,7 @@ class AlterPartitionManagerTest {
   }
 
   private def verifySendRequest(
-    brokerToController: BrokerToControllerChannelManager,
+    brokerToController: NodeToControllerChannelManager,
     expectedRequest: ArgumentMatcher[AbstractRequest.Builder[_ <: AbstractRequest]]
   ): ControllerRequestCompletionHandler = {
     val callbackCapture = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
diff --git a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index 11f697b232d..89b81035dc3 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -51,7 +51,7 @@ class AutoTopicCreationManagerTest {
   private val requestTimeout = 100
   private var config: KafkaConfig = _
   private val metadataCache = Mockito.mock(classOf[MetadataCache])
-  private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
+  private val brokerToController = Mockito.mock(classOf[NodeToControllerChannelManager])
   private val adminManager = Mockito.mock(classOf[ZkAdminManager])
   private val controller = Mockito.mock(classOf[KafkaController])
   private val groupCoordinator = Mockito.mock(classOf[GroupCoordinator])
diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index 5d46687b58e..ae14fa7171b 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -46,8 +46,8 @@ import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class BrokerRegistrationRequestTest {
 
-  def brokerToControllerChannelManager(clusterInstance: ClusterInstance): BrokerToControllerChannelManager = {
-    BrokerToControllerChannelManager(
+  def brokerToControllerChannelManager(clusterInstance: ClusterInstance): NodeToControllerChannelManager = {
+    NodeToControllerChannelManager(
       new ControllerNodeProvider() {
         def node: Option[Node] = Some(new Node(
           clusterInstance.anyControllerSocketServer().config.nodeId,
@@ -76,7 +76,7 @@ class BrokerRegistrationRequestTest {
   }
 
   def sendAndReceive[T <: AbstractRequest, R <: AbstractResponse](
-    channelManager: BrokerToControllerChannelManager,
+    channelManager: NodeToControllerChannelManager,
     reqBuilder: AbstractRequest.Builder[T],
     timeoutMs: Int
   ): R = {
@@ -91,7 +91,7 @@ class BrokerRegistrationRequestTest {
   }
 
   def registerBroker(
-    channelManager: BrokerToControllerChannelManager,
+    channelManager: NodeToControllerChannelManager,
     clusterId: String,
     brokerId: Int,
     zkEpoch: Option[Long],
@@ -121,7 +121,7 @@ class BrokerRegistrationRequestTest {
   }
 
 
-  def createTopics(channelManager: BrokerToControllerChannelManager,
+  def createTopics(channelManager: NodeToControllerChannelManager,
                    topicName: String): Errors = {
     val createTopics = new CreateTopicsRequestData()
     createTopics.setTopics(new CreateTopicsRequestData.CreatableTopicCollection())
diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
index b2a19060255..e225f26d0dd 100644
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
@@ -45,7 +45,7 @@ class ForwardingManagerTest {
   private val time = new MockTime()
   private val client = new MockClient(time)
   private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])
-  private val brokerToController = new MockBrokerToControllerChannelManager(
+  private val brokerToController = new MockNodeToControllerChannelManager(
     client, time, controllerNodeProvider, controllerApiVersions)
   private val forwardingManager = new ForwardingManagerImpl(brokerToController)
   private val principalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
diff --git a/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
similarity index 90%
rename from core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala
rename to core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
index 4626c47a567..1172e266e76 100644
--- a/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala
+++ b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
@@ -21,15 +21,15 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.AbstractRequest
 import org.apache.kafka.server.util.MockTime
 
-class MockBrokerToControllerChannelManager(
+class MockNodeToControllerChannelManager(
   val client: MockClient,
   time: MockTime,
   controllerNodeProvider: ControllerNodeProvider,
   controllerApiVersions: NodeApiVersions = NodeApiVersions.create(),
   val retryTimeoutMs: Int = 60000,
   val requestTimeoutMs: Int = 30000
-) extends BrokerToControllerChannelManager {
-  private val unsentQueue = new java.util.ArrayDeque[BrokerToControllerQueueItem]()
+) extends NodeToControllerChannelManager {
+  private val unsentQueue = new java.util.ArrayDeque[NodeToControllerQueueItem]()
 
   client.setNodeApiVersions(controllerApiVersions)
 
@@ -41,7 +41,7 @@ class MockBrokerToControllerChannelManager(
     request: AbstractRequest.Builder[_ <: AbstractRequest],
     callback: ControllerRequestCompletionHandler
   ): Unit = {
-    unsentQueue.add(BrokerToControllerQueueItem(
+    unsentQueue.add(NodeToControllerQueueItem(
       createdTimeMs = time.milliseconds(),
       request = request,
       callback = callback
@@ -52,7 +52,7 @@ class MockBrokerToControllerChannelManager(
     Some(controllerApiVersions)
   }
 
-  private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
+  private[server] def handleResponse(request: NodeToControllerQueueItem)(response: ClientResponse): Unit = {
     if (response.authenticationException != null || response.versionMismatch != null) {
       request.callback.onComplete(response)
     } else if (response.wasDisconnected() || response.responseBody.errorCounts.containsKey(Errors.NOT_CONTROLLER)) {
diff --git a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
index 2f5b705db51..92e318f4412 100644
--- a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
+++ b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
@@ -59,7 +59,7 @@ class RegistrationTestContext(
     apiKey => new ApiVersion().setApiKey(apiKey.id).
       setMinVersion(apiKey.oldestVersion()).setMaxVersion(apiKey.latestVersion())
   }.toList.asJava)
-  val mockChannelManager = new MockBrokerToControllerChannelManager(mockClient,
+  val mockChannelManager = new MockNodeToControllerChannelManager(mockClient,
     time, controllerNodeProvider, nodeApiVersions)
   val clusterId = "x4AJGXQSRnephtTZzujw4w"
   val advertisedListeners = new ListenerCollection()