You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/09/11 18:04:45 UTC
[kafka] branch trunk updated: MINOR: rename BrokerToControllerChannelManager to NodeToControllerChannelManager (#14356)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 39cc19c9924 MINOR: rename BrokerToControllerChannelManager to NodeToControllerChannelManager (#14356)
39cc19c9924 is described below
commit 39cc19c9924cd5589dc5b98b75ec8d380c159205
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Mon Sep 11 11:04:38 2023 -0700
MINOR: rename BrokerToControllerChannelManager to NodeToControllerChannelManager (#14356)
Reviewers: David Jacot <da...@gmail.com>
---
.../transaction/ProducerIdManager.scala | 6 ++--
.../main/scala/kafka/network/RequestChannel.scala | 2 +-
.../scala/kafka/raft/KafkaNetworkChannel.scala | 2 +-
.../scala/kafka/server/AlterPartitionManager.scala | 6 ++--
.../kafka/server/AutoTopicCreationManager.scala | 4 +--
.../kafka/server/BrokerLifecycleManager.scala | 8 ++---
.../src/main/scala/kafka/server/BrokerServer.scala | 6 ++--
.../server/ControllerRegistrationManager.scala | 4 +--
.../main/scala/kafka/server/ControllerServer.scala | 4 +--
.../scala/kafka/server/ForwardingManager.scala | 4 +--
core/src/main/scala/kafka/server/KafkaBroker.scala | 2 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 8 ++---
....scala => NodeToControllerChannelManager.scala} | 32 ++++++++---------
...ala => NodeToControllerRequestThreadTest.scala} | 40 +++++++++++-----------
.../scala/unit/kafka/cluster/PartitionTest.scala | 2 +-
.../transaction/ProducerIdManagerTest.scala | 4 +--
.../kafka/server/AlterPartitionManagerTest.scala | 10 +++---
.../server/AutoTopicCreationManagerTest.scala | 2 +-
.../server/BrokerRegistrationRequestTest.scala | 10 +++---
.../unit/kafka/server/ForwardingManagerTest.scala | 2 +-
...la => MockNodeToControllerChannelManager.scala} | 10 +++---
.../kafka/server/RegistrationTestContext.scala | 2 +-
22 files changed, 85 insertions(+), 85 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 1e2b6ffac5a..9e7b88d2ed7 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -17,7 +17,7 @@
package kafka.coordinator.transaction
import kafka.coordinator.transaction.ProducerIdManager.{IterationLimit, NoRetry, RetryBackoffMs}
-import kafka.server.{BrokerToControllerChannelManager, ControllerRequestCompletionHandler}
+import kafka.server.{NodeToControllerChannelManager, ControllerRequestCompletionHandler}
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.clients.ClientResponse
@@ -56,7 +56,7 @@ object ProducerIdManager {
def rpc(brokerId: Int,
time: Time,
brokerEpochSupplier: () => Long,
- controllerChannel: BrokerToControllerChannelManager): RPCProducerIdManager = {
+ controllerChannel: NodeToControllerChannelManager): RPCProducerIdManager = {
new RPCProducerIdManager(brokerId, time, brokerEpochSupplier, controllerChannel)
}
@@ -162,7 +162,7 @@ class ZkProducerIdManager(brokerId: Int, zkClient: KafkaZkClient) extends Produc
class RPCProducerIdManager(brokerId: Int,
time: Time,
brokerEpochSupplier: () => Long,
- controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
+ controllerChannel: NodeToControllerChannelManager) extends ProducerIdManager with Logging {
this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 477f02a9c98..2521e581aba 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -140,7 +140,7 @@ object RequestChannel extends Logging {
case Some(request) =>
val envelopeResponse = if (shouldReturnNotController(abstractResponse)) {
// Since it's a NOT_CONTROLLER error response, we need to make envelope response with NOT_CONTROLLER error
- // to notify the requester (i.e. BrokerToControllerRequestThread) to update active controller
+ // to notify the requester (i.e. NodeToControllerRequestThread) to update active controller
new EnvelopeResponse(new EnvelopeResponseData()
.setErrorCode(Errors.NOT_CONTROLLER.code()))
} else {
diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
index 27a489b72a0..7c00961d1dc 100644
--- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
+++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
@@ -128,7 +128,7 @@ class KafkaNetworkChannel(
} else if (clientResponse.authenticationException != null) {
// For now we treat authentication errors as retriable. We use the
// `NETWORK_EXCEPTION` error code for lack of a good alternative.
- // Note that `BrokerToControllerChannelManager` will still log the
+ // Note that `NodeToControllerChannelManager` will still log the
// authentication errors so that users have a chance to fix the problem.
error(s"Request $request failed due to authentication error",
clientResponse.authenticationException)
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index ec90f1ba637..1c5021b69bf 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -84,7 +84,7 @@ object AlterPartitionManager {
threadNamePrefix: String,
brokerEpochSupplier: () => Long,
): AlterPartitionManager = {
- val channelManager = BrokerToControllerChannelManager(
+ val channelManager = NodeToControllerChannelManager(
controllerNodeProvider,
time = time,
metrics = metrics,
@@ -116,7 +116,7 @@ object AlterPartitionManager {
}
class DefaultAlterPartitionManager(
- val controllerChannelManager: BrokerToControllerChannelManager,
+ val controllerChannelManager: NodeToControllerChannelManager,
val scheduler: Scheduler,
val time: Time,
val brokerId: Int,
@@ -200,7 +200,7 @@ class DefaultAlterPartitionManager(
if (response.authenticationException != null) {
// For now we treat authentication errors as retriable. We use the
// `NETWORK_EXCEPTION` error code for lack of a good alternative.
- // Note that `BrokerToControllerChannelManager` will still log the
+ // Note that `NodeToControllerChannelManager` will still log the
// authentication errors so that users have a chance to fix the problem.
Errors.NETWORK_EXCEPTION
} else if (response.versionMismatch != null) {
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index c9dfa019595..25e934d888c 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -53,7 +53,7 @@ object AutoTopicCreationManager {
config: KafkaConfig,
metadataCache: MetadataCache,
threadNamePrefix: Option[String],
- channelManager: Option[BrokerToControllerChannelManager],
+ channelManager: Option[NodeToControllerChannelManager],
adminManager: Option[ZkAdminManager],
controller: Option[KafkaController],
groupCoordinator: GroupCoordinator,
@@ -66,7 +66,7 @@ object AutoTopicCreationManager {
class DefaultAutoTopicCreationManager(
config: KafkaConfig,
- channelManager: Option[BrokerToControllerChannelManager],
+ channelManager: Option[NodeToControllerChannelManager],
adminManager: Option[ZkAdminManager],
controller: Option[KafkaController],
groupCoordinator: GroupCoordinator,
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 9d4682182a7..c17fe7c0c38 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -185,7 +185,7 @@ class BrokerLifecycleManager(
* The channel manager, or null if this manager has not been started yet. This variable
* can only be read or written from the event queue thread.
*/
- private var _channelManager: BrokerToControllerChannelManager = _
+ private var _channelManager: NodeToControllerChannelManager = _
/**
* The event queue.
@@ -199,11 +199,11 @@ class BrokerLifecycleManager(
* Start the BrokerLifecycleManager.
*
* @param highestMetadataOffsetProvider Provides the current highest metadata offset.
- * @param channelManager The brokerToControllerChannelManager to use.
+ * @param channelManager The NodeToControllerChannelManager to use.
* @param clusterId The cluster ID.
*/
def start(highestMetadataOffsetProvider: () => Long,
- channelManager: BrokerToControllerChannelManager,
+ channelManager: NodeToControllerChannelManager,
clusterId: String,
advertisedListeners: ListenerCollection,
supportedFeatures: util.Map[String, VersionRange]): Unit = {
@@ -271,7 +271,7 @@ class BrokerLifecycleManager(
}
private class StartupEvent(highestMetadataOffsetProvider: () => Long,
- channelManager: BrokerToControllerChannelManager,
+ channelManager: NodeToControllerChannelManager,
clusterId: String,
advertisedListeners: ListenerCollection,
supportedFeatures: util.Map[String, VersionRange]) extends EventQueue.Event {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 6decbcfd2ec..63c1889bc83 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -119,7 +119,7 @@ class BrokerServer(
var transactionCoordinator: TransactionCoordinator = _
- var clientToControllerChannelManager: BrokerToControllerChannelManager = _
+ var clientToControllerChannelManager: NodeToControllerChannelManager = _
var forwardingManager: ForwardingManager = _
@@ -215,7 +215,7 @@ class BrokerServer(
val controllerNodes = RaftConfig.voterConnectionsToNodes(voterConnections).asScala
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
- clientToControllerChannelManager = BrokerToControllerChannelManager(
+ clientToControllerChannelManager = NodeToControllerChannelManager(
controllerNodeProvider,
time,
metrics,
@@ -320,7 +320,7 @@ class BrokerServer(
k -> VersionRange.of(v.min, v.max)
}.asJava
- val brokerLifecycleChannelManager = BrokerToControllerChannelManager(
+ val brokerLifecycleChannelManager = NodeToControllerChannelManager(
controllerNodeProvider,
time,
metrics,
diff --git a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
index b2284efa9c6..2732c3b737f 100644
--- a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
@@ -113,7 +113,7 @@ class ControllerRegistrationManager(
* The channel manager, or null if this manager has not been started yet. This variable
* can only be read or written from the event queue thread.
*/
- private var _channelManager: BrokerToControllerChannelManager = _
+ private var _channelManager: NodeToControllerChannelManager = _
/**
* The event queue.
@@ -142,7 +142,7 @@ class ControllerRegistrationManager(
*
* @param channelManager The channel manager to use.
*/
- def start(channelManager: BrokerToControllerChannelManager): Unit = {
+ def start(channelManager: NodeToControllerChannelManager): Unit = {
eventQueue.append(() => {
try {
info(s"initialized channel manager.")
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 36d41cf822a..4f43b71d45d 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -122,7 +122,7 @@ class ControllerServer(
@volatile var registrationsPublisher: ControllerRegistrationsPublisher = _
@volatile var incarnationId: Uuid = _
@volatile var registrationManager: ControllerRegistrationManager = _
- @volatile var registrationChannelManager: BrokerToControllerChannelManager = _
+ @volatile var registrationChannelManager: NodeToControllerChannelManager = _
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
@@ -423,7 +423,7 @@ class ControllerServer(
* Start the KIP-919 controller registration manager.
*/
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes.asScala)
- registrationChannelManager = BrokerToControllerChannelManager(
+ registrationChannelManager = NodeToControllerChannelManager(
controllerNodeProvider,
time,
metrics,
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala b/core/src/main/scala/kafka/server/ForwardingManager.scala
index 686a9abb773..b0b13dfecda 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -83,7 +83,7 @@ trait ForwardingManager {
object ForwardingManager {
def apply(
- channelManager: BrokerToControllerChannelManager
+ channelManager: NodeToControllerChannelManager
): ForwardingManager = {
new ForwardingManagerImpl(channelManager)
}
@@ -104,7 +104,7 @@ object ForwardingManager {
}
class ForwardingManagerImpl(
- channelManager: BrokerToControllerChannelManager
+ channelManager: NodeToControllerChannelManager
) extends ForwardingManager with Logging {
override def forwardRequest(
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala
index 6b0cc2989de..ea1d6cf8ed0 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -94,7 +94,7 @@ trait KafkaBroker extends Logging {
def shutdown(): Unit
def brokerTopicStats: BrokerTopicStats
def credentialProvider: CredentialProvider
- def clientToControllerChannelManager: BrokerToControllerChannelManager
+ def clientToControllerChannelManager: NodeToControllerChannelManager
def tokenCache: DelegationTokenCache
private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 091391e10ca..7ca3373549d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -150,7 +150,7 @@ class KafkaServer(
var autoTopicCreationManager: AutoTopicCreationManager = _
- var clientToControllerChannelManager: BrokerToControllerChannelManager = _
+ var clientToControllerChannelManager: NodeToControllerChannelManager = _
var alterPartitionManager: AlterPartitionManager = _
@@ -307,7 +307,7 @@ class KafkaServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
- clientToControllerChannelManager = BrokerToControllerChannelManager(
+ clientToControllerChannelManager = NodeToControllerChannelManager(
controllerNodeProvider = controllerNodeProvider,
time = time,
metrics = metrics,
@@ -319,7 +319,7 @@ class KafkaServer(
clientToControllerChannelManager.start()
/* start forwarding manager */
- var autoTopicCreationChannel = Option.empty[BrokerToControllerChannelManager]
+ var autoTopicCreationChannel = Option.empty[NodeToControllerChannelManager]
if (enableForwarding) {
this.forwardingManager = Some(ForwardingManager(clientToControllerChannelManager))
autoTopicCreationChannel = Some(clientToControllerChannelManager)
@@ -402,7 +402,7 @@ class KafkaServer(
)
val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
val quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
- val brokerToQuorumChannelManager = BrokerToControllerChannelManager(
+ val brokerToQuorumChannelManager = NodeToControllerChannelManager(
controllerNodeProvider = quorumControllerNodeProvider,
time = time,
metrics = metrics,
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
similarity index 93%
rename from core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
rename to core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
index bbf0792fc37..ee69a74aede 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
@@ -130,7 +130,7 @@ class RaftControllerNodeProvider(
listenerName, securityProtocol, saslMechanism, isZkController = false)
}
-object BrokerToControllerChannelManager {
+object NodeToControllerChannelManager {
def apply(
controllerNodeProvider: ControllerNodeProvider,
time: Time,
@@ -139,8 +139,8 @@ object BrokerToControllerChannelManager {
channelName: String,
threadNamePrefix: String,
retryTimeoutMs: Long
- ): BrokerToControllerChannelManager = {
- new BrokerToControllerChannelManagerImpl(
+ ): NodeToControllerChannelManager = {
+ new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
@@ -152,7 +152,7 @@ object BrokerToControllerChannelManager {
}
}
-trait BrokerToControllerChannelManager {
+trait NodeToControllerChannelManager {
def start(): Unit
def shutdown(): Unit
def controllerApiVersions(): Option[NodeApiVersions]
@@ -164,12 +164,12 @@ trait BrokerToControllerChannelManager {
/**
* This class manages the connection between a broker and the controller. It runs a single
- * [[BrokerToControllerRequestThread]] which uses the broker's metadata cache as its own metadata to find
+ * [[NodeToControllerRequestThread]] which uses the broker's metadata cache as its own metadata to find
* and connect to the controller. The channel is async and runs the network connection in the background.
* The maximum number of in-flight requests are set to one to ensure orderly response from the controller, therefore
* care must be taken to not block on outstanding requests for too long.
*/
-class BrokerToControllerChannelManagerImpl(
+class NodeToControllerChannelManagerImpl(
controllerNodeProvider: ControllerNodeProvider,
time: Time,
metrics: Metrics,
@@ -177,8 +177,8 @@ class BrokerToControllerChannelManagerImpl(
channelName: String,
threadNamePrefix: String,
retryTimeoutMs: Long
-) extends BrokerToControllerChannelManager with Logging {
- private val logContext = new LogContext(s"[BrokerToControllerChannelManager id=${config.brokerId} name=${channelName}] ")
+) extends NodeToControllerChannelManager with Logging {
+ private val logContext = new LogContext(s"[NodeToControllerChannelManager id=${config.nodeId} name=${channelName}] ")
private val manualMetadataUpdater = new ManualMetadataUpdater()
private val apiVersions = new ApiVersions()
private val requestThread = newRequestThread
@@ -189,7 +189,7 @@ class BrokerToControllerChannelManagerImpl(
def shutdown(): Unit = {
requestThread.shutdown()
- info(s"Broker to controller channel manager for $channelName shutdown")
+ info(s"Node to controller channel manager for $channelName shutdown")
}
private[server] def newRequestThread = {
@@ -240,7 +240,7 @@ class BrokerToControllerChannelManagerImpl(
val threadName = s"${threadNamePrefix}to-controller-${channelName}-channel-manager"
val controllerInformation = controllerNodeProvider.getControllerInfo()
- new BrokerToControllerRequestThread(
+ new NodeToControllerRequestThread(
buildNetworkClient(controllerInformation),
controllerInformation.isZkController,
buildNetworkClient,
@@ -263,7 +263,7 @@ class BrokerToControllerChannelManagerImpl(
request: AbstractRequest.Builder[_ <: AbstractRequest],
callback: ControllerRequestCompletionHandler
): Unit = {
- requestThread.enqueue(BrokerToControllerQueueItem(
+ requestThread.enqueue(NodeToControllerQueueItem(
time.milliseconds(),
request,
callback
@@ -286,13 +286,13 @@ abstract class ControllerRequestCompletionHandler extends RequestCompletionHandl
def onTimeout(): Unit
}
-case class BrokerToControllerQueueItem(
+case class NodeToControllerQueueItem(
createdTimeMs: Long,
request: AbstractRequest.Builder[_ <: AbstractRequest],
callback: ControllerRequestCompletionHandler
)
-class BrokerToControllerRequestThread(
+class NodeToControllerRequestThread(
initialNetworkClient: KafkaClient,
var isNetworkClientForZkController: Boolean,
networkClientFactory: ControllerInformation => KafkaClient,
@@ -328,7 +328,7 @@ class BrokerToControllerRequestThread(
}
}
- private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
+ private val requestQueue = new LinkedBlockingDeque[NodeToControllerQueueItem]()
private val activeController = new AtomicReference[Node](null)
// Used for testing
@@ -343,7 +343,7 @@ class BrokerToControllerRequestThread(
activeController.set(newActiveController)
}
- def enqueue(request: BrokerToControllerQueueItem): Unit = {
+ def enqueue(request: NodeToControllerQueueItem): Unit = {
if (!started) {
throw new IllegalStateException("Cannot enqueue a request if the request thread is not running")
}
@@ -381,7 +381,7 @@ class BrokerToControllerRequestThread(
util.Collections.emptyList()
}
- private[server] def handleResponse(queueItem: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
+ private[server] def handleResponse(queueItem: NodeToControllerQueueItem)(response: ClientResponse): Unit = {
debug(s"Request ${queueItem.request} received $response")
if (response.authenticationException != null) {
error(s"Request ${queueItem.request} failed due to authentication error with controller",
diff --git a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
similarity index 94%
rename from core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
rename to core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
index d62bf74780a..01432bf6f47 100644
--- a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
@@ -36,7 +36,7 @@ import org.junit.jupiter.api.Test
import org.mockito.Mockito._
-class BrokerToControllerRequestThreadTest {
+class NodeToControllerRequestThreadTest {
private def controllerInfo(node: Option[Node]): ControllerInformation = {
ControllerInformation(node, new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true)
@@ -57,13 +57,13 @@ class BrokerToControllerRequestThreadTest {
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
val retryTimeoutMs = 30000
- val testRequestThread = new BrokerToControllerRequestThread(
+ val testRequestThread = new NodeToControllerRequestThread(
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs)
testRequestThread.started = true
val completionHandler = new TestControllerRequestCompletionHandler(None)
- val queueItem = BrokerToControllerQueueItem(
+ val queueItem = NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
completionHandler
@@ -95,14 +95,14 @@ class BrokerToControllerRequestThreadTest {
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
val expectedResponse = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
- val testRequestThread = new BrokerToControllerRequestThread(
+ val testRequestThread = new NodeToControllerRequestThread(
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
mockClient.prepareResponse(expectedResponse)
val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
- val queueItem = BrokerToControllerQueueItem(
+ val queueItem = NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
completionHandler
@@ -139,13 +139,13 @@ class BrokerToControllerRequestThreadTest {
controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
- val testRequestThread = new BrokerToControllerRequestThread(
+ val testRequestThread = new NodeToControllerRequestThread(
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
- val queueItem = BrokerToControllerQueueItem(
+ val queueItem = NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
completionHandler,
@@ -191,13 +191,13 @@ class BrokerToControllerRequestThreadTest {
Collections.singletonMap("a", Errors.NOT_CONTROLLER),
Collections.singletonMap("a", 2))
val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
- val testRequestThread = new BrokerToControllerRequestThread(
+ val testRequestThread = new NodeToControllerRequestThread(
mockClient, isNetworkClientForZkController = true,_ => mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
- val queueItem = BrokerToControllerQueueItem(
+ val queueItem = NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()
.setAllowAutoTopicCreation(true)),
@@ -258,7 +258,7 @@ class BrokerToControllerRequestThreadTest {
// response for retry request after receiving NOT_CONTROLLER error
val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
- val testRequestThread = new BrokerToControllerRequestThread(
+ val testRequestThread = new NodeToControllerRequestThread(
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
@@ -271,7 +271,7 @@ class BrokerToControllerRequestThreadTest {
val envelopeRequestBuilder = new EnvelopeRequest.Builder(ByteBuffer.allocate(0),
kafkaPrincipalBuilder.serialize(kafkaPrincipal), "client-address".getBytes)
- val queueItem = BrokerToControllerQueueItem(
+ val queueItem = NodeToControllerQueueItem(
time.milliseconds(),
envelopeRequestBuilder,
completionHandler
@@ -321,13 +321,13 @@ class BrokerToControllerRequestThreadTest {
val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
Collections.singletonMap("a", Errors.NOT_CONTROLLER),
Collections.singletonMap("a", 2))
- val testRequestThread = new BrokerToControllerRequestThread(
+ val testRequestThread = new NodeToControllerRequestThread(
mockClient, isNetworkClientForZkController = true,_ => mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs)
testRequestThread.started = true
val completionHandler = new TestControllerRequestCompletionHandler()
- val queueItem = BrokerToControllerQueueItem(
+ val queueItem = NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()
.setAllowAutoTopicCreation(true)),
@@ -372,7 +372,7 @@ class BrokerToControllerRequestThreadTest {
override def onComplete(response: ClientResponse): Unit = callbackResponse.set(response)
}
- val queueItem = BrokerToControllerQueueItem(
+ val queueItem = NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
completionHandler
@@ -380,7 +380,7 @@ class BrokerToControllerRequestThreadTest {
mockClient.prepareUnsupportedVersionResponse(request => request.apiKey == ApiKeys.METADATA)
- val testRequestThread = new BrokerToControllerRequestThread(
+ val testRequestThread = new NodeToControllerRequestThread(
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
@@ -410,7 +410,7 @@ class BrokerToControllerRequestThreadTest {
override def onComplete(response: ClientResponse): Unit = callbackResponse.set(response)
}
- val queueItem = BrokerToControllerQueueItem(
+ val queueItem = NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
completionHandler
@@ -418,7 +418,7 @@ class BrokerToControllerRequestThreadTest {
mockClient.createPendingAuthenticationError(activeController, 50)
- val testRequestThread = new BrokerToControllerRequestThread(
+ val testRequestThread = new NodeToControllerRequestThread(
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
@@ -440,12 +440,12 @@ class BrokerToControllerRequestThreadTest {
val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
- val testRequestThread = new BrokerToControllerRequestThread(
+ val testRequestThread = new NodeToControllerRequestThread(
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
val completionHandler = new TestControllerRequestCompletionHandler(None)
- val queueItem = BrokerToControllerQueueItem(
+ val queueItem = NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
completionHandler
@@ -456,7 +456,7 @@ class BrokerToControllerRequestThreadTest {
}
private def pollUntil(
- requestThread: BrokerToControllerRequestThread,
+ requestThread: NodeToControllerRequestThread,
condition: () => Boolean,
maxRetries: Int = 10
): Unit = {
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 551003cff9c..4f4547c31f3 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -2428,7 +2428,7 @@ class PartitionTest extends AbstractPartitionTest {
@Test
def testPartitionShouldRetryAlterPartitionRequest(): Unit = {
- val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+ val mockChannelManager = mock(classOf[NodeToControllerChannelManager])
val alterPartitionManager = new DefaultAlterPartitionManager(
controllerChannelManager = mockChannelManager,
scheduler = mock(classOf[KafkaScheduler]),
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 73b208196e6..c950d03399b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -17,7 +17,7 @@
package kafka.coordinator.transaction
import kafka.coordinator.transaction.ProducerIdManager.RetryBackoffMs
-import kafka.server.BrokerToControllerChannelManager
+import kafka.server.NodeToControllerChannelManager
import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.common.KafkaException
@@ -42,7 +42,7 @@ import scala.util.{Failure, Success}
class ProducerIdManagerTest {
- var brokerToController: BrokerToControllerChannelManager = mock(classOf[BrokerToControllerChannelManager])
+ var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager])
val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
// Mutable test implementation that lets us easily set the idStart and error
diff --git a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
index 880d48446f8..ccf3909adcc 100644
--- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
@@ -60,7 +60,7 @@ class AlterPartitionManagerTest {
val metrics = new Metrics
val brokerId = 1
- var brokerToController: BrokerToControllerChannelManager = _
+ var brokerToController: NodeToControllerChannelManager = _
val tp0 = new TopicIdPartition(topicId, 0, topic)
val tp1 = new TopicIdPartition(topicId, 1, topic)
@@ -68,7 +68,7 @@ class AlterPartitionManagerTest {
@BeforeEach
def setup(): Unit = {
- brokerToController = mock(classOf[BrokerToControllerChannelManager])
+ brokerToController = mock(classOf[NodeToControllerChannelManager])
}
@ParameterizedTest
@@ -433,7 +433,7 @@ class AlterPartitionManagerTest {
val controlledEpoch = 0
val brokerEpoch = 2
val scheduler = new MockScheduler(time)
- val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
+ val brokerToController = Mockito.mock(classOf[NodeToControllerChannelManager])
val alterPartitionManager = new DefaultAlterPartitionManager(
brokerToController,
scheduler,
@@ -500,7 +500,7 @@ class AlterPartitionManagerTest {
val controlledEpoch = 0
val brokerEpoch = 2
val scheduler = new MockScheduler(time)
- val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
+ val brokerToController = Mockito.mock(classOf[NodeToControllerChannelManager])
val alterPartitionManager = new DefaultAlterPartitionManager(
brokerToController,
scheduler,
@@ -557,7 +557,7 @@ class AlterPartitionManagerTest {
}
private def verifySendRequest(
- brokerToController: BrokerToControllerChannelManager,
+ brokerToController: NodeToControllerChannelManager,
expectedRequest: ArgumentMatcher[AbstractRequest.Builder[_ <: AbstractRequest]]
): ControllerRequestCompletionHandler = {
val callbackCapture = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
diff --git a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index 11f697b232d..89b81035dc3 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -51,7 +51,7 @@ class AutoTopicCreationManagerTest {
private val requestTimeout = 100
private var config: KafkaConfig = _
private val metadataCache = Mockito.mock(classOf[MetadataCache])
- private val brokerToController = Mockito.mock(classOf[BrokerToControllerChannelManager])
+ private val brokerToController = Mockito.mock(classOf[NodeToControllerChannelManager])
private val adminManager = Mockito.mock(classOf[ZkAdminManager])
private val controller = Mockito.mock(classOf[KafkaController])
private val groupCoordinator = Mockito.mock(classOf[GroupCoordinator])
diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index 5d46687b58e..ae14fa7171b 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -46,8 +46,8 @@ import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class BrokerRegistrationRequestTest {
- def brokerToControllerChannelManager(clusterInstance: ClusterInstance): BrokerToControllerChannelManager = {
- BrokerToControllerChannelManager(
+ def brokerToControllerChannelManager(clusterInstance: ClusterInstance): NodeToControllerChannelManager = {
+ NodeToControllerChannelManager(
new ControllerNodeProvider() {
def node: Option[Node] = Some(new Node(
clusterInstance.anyControllerSocketServer().config.nodeId,
@@ -76,7 +76,7 @@ class BrokerRegistrationRequestTest {
}
def sendAndReceive[T <: AbstractRequest, R <: AbstractResponse](
- channelManager: BrokerToControllerChannelManager,
+ channelManager: NodeToControllerChannelManager,
reqBuilder: AbstractRequest.Builder[T],
timeoutMs: Int
): R = {
@@ -91,7 +91,7 @@ class BrokerRegistrationRequestTest {
}
def registerBroker(
- channelManager: BrokerToControllerChannelManager,
+ channelManager: NodeToControllerChannelManager,
clusterId: String,
brokerId: Int,
zkEpoch: Option[Long],
@@ -121,7 +121,7 @@ class BrokerRegistrationRequestTest {
}
- def createTopics(channelManager: BrokerToControllerChannelManager,
+ def createTopics(channelManager: NodeToControllerChannelManager,
topicName: String): Errors = {
val createTopics = new CreateTopicsRequestData()
createTopics.setTopics(new CreateTopicsRequestData.CreatableTopicCollection())
diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
index b2a19060255..e225f26d0dd 100644
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
@@ -45,7 +45,7 @@ class ForwardingManagerTest {
private val time = new MockTime()
private val client = new MockClient(time)
private val controllerNodeProvider = Mockito.mock(classOf[ControllerNodeProvider])
- private val brokerToController = new MockBrokerToControllerChannelManager(
+ private val brokerToController = new MockNodeToControllerChannelManager(
client, time, controllerNodeProvider, controllerApiVersions)
private val forwardingManager = new ForwardingManagerImpl(brokerToController)
private val principalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
diff --git a/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
similarity index 90%
rename from core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala
rename to core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
index 4626c47a567..1172e266e76 100644
--- a/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala
+++ b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
@@ -21,15 +21,15 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.server.util.MockTime
-class MockBrokerToControllerChannelManager(
+class MockNodeToControllerChannelManager(
val client: MockClient,
time: MockTime,
controllerNodeProvider: ControllerNodeProvider,
controllerApiVersions: NodeApiVersions = NodeApiVersions.create(),
val retryTimeoutMs: Int = 60000,
val requestTimeoutMs: Int = 30000
-) extends BrokerToControllerChannelManager {
- private val unsentQueue = new java.util.ArrayDeque[BrokerToControllerQueueItem]()
+) extends NodeToControllerChannelManager {
+ private val unsentQueue = new java.util.ArrayDeque[NodeToControllerQueueItem]()
client.setNodeApiVersions(controllerApiVersions)
@@ -41,7 +41,7 @@ class MockBrokerToControllerChannelManager(
request: AbstractRequest.Builder[_ <: AbstractRequest],
callback: ControllerRequestCompletionHandler
): Unit = {
- unsentQueue.add(BrokerToControllerQueueItem(
+ unsentQueue.add(NodeToControllerQueueItem(
createdTimeMs = time.milliseconds(),
request = request,
callback = callback
@@ -52,7 +52,7 @@ class MockBrokerToControllerChannelManager(
Some(controllerApiVersions)
}
- private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
+ private[server] def handleResponse(request: NodeToControllerQueueItem)(response: ClientResponse): Unit = {
if (response.authenticationException != null || response.versionMismatch != null) {
request.callback.onComplete(response)
} else if (response.wasDisconnected() || response.responseBody.errorCounts.containsKey(Errors.NOT_CONTROLLER)) {
diff --git a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
index 2f5b705db51..92e318f4412 100644
--- a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
+++ b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
@@ -59,7 +59,7 @@ class RegistrationTestContext(
apiKey => new ApiVersion().setApiKey(apiKey.id).
setMinVersion(apiKey.oldestVersion()).setMaxVersion(apiKey.latestVersion())
}.toList.asJava)
- val mockChannelManager = new MockBrokerToControllerChannelManager(mockClient,
+ val mockChannelManager = new MockNodeToControllerChannelManager(mockClient,
time, controllerNodeProvider, nodeApiVersions)
val clusterId = "x4AJGXQSRnephtTZzujw4w"
val advertisedListeners = new ListenerCollection()