You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/12/21 19:17:50 UTC
[kafka] branch trunk updated: KAFKA-14446: code style improvements for broker-to-controller forwarding (#13001)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 755e04a41df KAFKA-14446: code style improvements for broker-to-controller forwarding (#13001)
755e04a41df is described below
commit 755e04a41dfd00dd4587b0fe0980befd0ae5c433
Author: Akhilesh C <ak...@users.noreply.github.com>
AuthorDate: Wed Dec 21 11:17:27 2022 -0800
KAFKA-14446: code style improvements for broker-to-controller forwarding (#13001)
Reviewers: Colin P. McCabe <cm...@apache.org>, Ismael Juma <is...@juma.me.uk>, David Arthur <mu...@gmail.com>
---
.../server/BrokerToControllerChannelManager.scala | 46 ++++++++++++--------
core/src/main/scala/kafka/server/KafkaServer.scala | 14 +++---
.../main/scala/kafka/server/MetadataCache.scala | 9 ++--
.../BrokerToControllerRequestThreadTest.scala | 50 +++++++++++++---------
4 files changed, 70 insertions(+), 49 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 3446f83b647..b1c27287560 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -38,11 +38,13 @@ import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
-case class ControllerInformation(node: Option[Node],
- listenerName: ListenerName,
- securityProtocol: SecurityProtocol,
- saslMechanism: String,
- isZkController: Boolean)
+case class ControllerInformation(
+ node: Option[Node],
+ listenerName: ListenerName,
+ securityProtocol: SecurityProtocol,
+ saslMechanism: String,
+ isZkController: Boolean
+)
trait ControllerNodeProvider {
def getControllerInfo(): ControllerInformation
@@ -190,7 +192,7 @@ class BrokerToControllerChannelManagerImpl(
}
private[server] def newRequestThread = {
- def networkClient(controllerInfo: ControllerInformation) = {
+ def buildNetworkClient(controllerInfo: ControllerInformation) = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
controllerInfo.securityProtocol,
JaasContext.Type.SERVER,
@@ -239,8 +241,11 @@ class BrokerToControllerChannelManagerImpl(
case Some(name) => s"$name:BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName"
}
+ val controllerInformation = controllerNodeProvider.getControllerInfo()
new BrokerToControllerRequestThread(
- networkClient,
+ buildNetworkClient(controllerInformation),
+ controllerInformation.isZkController,
+ buildNetworkClient,
manualMetadataUpdater,
controllerNodeProvider,
config,
@@ -290,6 +295,8 @@ case class BrokerToControllerQueueItem(
)
class BrokerToControllerRequestThread(
+ initialNetworkClient: KafkaClient,
+ var isNetworkClientForZkController: Boolean,
networkClientFactory: ControllerInformation => KafkaClient,
metadataUpdater: ManualMetadataUpdater,
controllerNodeProvider: ControllerNodeProvider,
@@ -297,22 +304,24 @@ class BrokerToControllerRequestThread(
time: Time,
threadName: String,
retryTimeoutMs: Long
-) extends InterBrokerSendThread(threadName, null, Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, time, isInterruptible = false) {
-
- var isZkController = false
- private def maybeResetNetworkClient(controllerInformation: ControllerInformation,
- initialize: Boolean = false): Unit = {
- if (initialize || isZkController != controllerInformation.isZkController) {
- if (!initialize) {
- debug("Controller changed to " + (if (isZkController) "zk" else "kraft") + " mode. " +
- "Resetting network client")
- }
+) extends InterBrokerSendThread(
+ threadName,
+ initialNetworkClient,
+ Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt,
+ time,
+ isInterruptible = false
+) {
+
+ private def maybeResetNetworkClient(controllerInformation: ControllerInformation): Unit = {
+ if (isNetworkClientForZkController != controllerInformation.isZkController) {
+ debug("Controller changed to " + (if (isNetworkClientForZkController) "kraft" else "zk") + " mode. " +
+ "Resetting network client")
// Close existing network client.
if (networkClient != null) {
networkClient.initiateClose()
networkClient.close()
}
- isZkController = controllerInformation.isZkController
+ isNetworkClientForZkController = controllerInformation.isZkController
updateControllerAddress(controllerInformation.node.orNull)
controllerInformation.node.foreach(n => metadataUpdater.setNodes(Seq(n).asJava))
networkClient = networkClientFactory(controllerInformation)
@@ -321,7 +330,6 @@ class BrokerToControllerRequestThread(
private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
private val activeController = new AtomicReference[Node](null)
- maybeResetNetworkClient(controllerNodeProvider.getControllerInfo(), initialize = true)
// Used for testing
@volatile
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f8e449d8d1e..335e48f25ee 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -151,7 +151,7 @@ class KafkaServer(
var kafkaScheduler: KafkaScheduler = _
- var kraftControllerNodes: Seq[Node] = Seq.empty
+ var kraftControllerNodes: Seq[Node] = _
@volatile var metadataCache: ZkMetadataCache = _
var quotaManagers: QuotaFactory.QuotaManagers = _
@@ -282,6 +282,8 @@ class KafkaServer(
if (config.migrationEnabled) {
kraftControllerNodes = RaftConfig.voterConnectionsToNodes(
RaftConfig.parseVoterConnections(config.quorumVoters)).asScala
+ } else {
+ kraftControllerNodes = Seq.empty
}
metadataCache = MetadataCache.zkMetadataCache(
config.brokerId,
@@ -707,9 +709,9 @@ class KafkaServer(
// 1. Find the controller and establish a connection to it.
// If the controller id or the broker registration are missing, we sleep and retry (if there are remaining retries)
- metadataCache.getControllerId.filter(_.isInstanceOf[ZkCachedControllerId]).map(_.id) match {
- case Some(controllerId) =>
- metadataCache.getAliveBrokerNode(controllerId, config.interBrokerListenerName) match {
+ metadataCache.getControllerId match {
+ case Some(controllerId: ZkCachedControllerId) =>
+ metadataCache.getAliveBrokerNode(controllerId.id, config.interBrokerListenerName) match {
case Some(broker) =>
// if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
// attempt, connect to the most recent controller
@@ -726,8 +728,8 @@ class KafkaServer(
case None =>
info(s"Broker registration for controller $controllerId is not available in the metadata cache")
}
- case None =>
- info("No controller present in the metadata cache")
+ case Some(_: KRaftCachedControllerId) | None =>
+ info("No zk controller present in the metadata cache")
}
// 2. issue a controlled shutdown to the controller
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index e0501ef1ebe..342b23cec4b 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -32,6 +32,10 @@ case class FinalizedFeaturesAndEpoch(features: Map[String, Short], epoch: Long)
}
}
+/**
+ * Used to represent the controller id cached in the metadata cache of the broker. This trait is
+ * extended to represent if the controller is KRaft controller or Zk controller.
+ */
sealed trait CachedControllerId {
val id: Int
}
@@ -118,10 +122,9 @@ object MetadataCache {
def zkMetadataCache(brokerId: Int,
metadataVersion: MetadataVersion,
brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty(),
- kraftControllerNodes: collection.Seq[Node] = null)
+ kraftControllerNodes: collection.Seq[Node] = collection.Seq.empty[Node])
: ZkMetadataCache = {
- new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures,
- Option(kraftControllerNodes).getOrElse(collection.Seq.empty[Node]))
+ new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, kraftControllerNodes)
}
def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = {
diff --git a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
index eea5c7517a0..d62bf74780a 100644
--- a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
@@ -57,8 +57,9 @@ class BrokerToControllerRequestThreadTest {
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
val retryTimeoutMs = 30000
- val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
- config, time, "", retryTimeoutMs)
+ val testRequestThread = new BrokerToControllerRequestThread(
+ mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
+ controllerNodeProvider, config, time, "", retryTimeoutMs)
testRequestThread.started = true
val completionHandler = new TestControllerRequestCompletionHandler(None)
@@ -94,8 +95,9 @@ class BrokerToControllerRequestThreadTest {
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
val expectedResponse = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
- val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
- config, time, "", retryTimeoutMs = Long.MaxValue)
+ val testRequestThread = new BrokerToControllerRequestThread(
+ mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
+ controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
mockClient.prepareResponse(expectedResponse)
@@ -134,10 +136,11 @@ class BrokerToControllerRequestThreadTest {
val newController = new Node(newControllerId, "host2", 1234)
when(controllerNodeProvider.getControllerInfo()).thenReturn(
- emptyControllerInfo, controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
+ controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
- val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(),
+ val testRequestThread = new BrokerToControllerRequestThread(
+ mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
@@ -182,14 +185,15 @@ class BrokerToControllerRequestThreadTest {
val newController = new Node(newControllerId, "host2", port)
when(controllerNodeProvider.getControllerInfo()).thenReturn(
- emptyControllerInfo, controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
+ controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
Collections.singletonMap("a", Errors.NOT_CONTROLLER),
Collections.singletonMap("a", 2))
val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
- val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
- config, time, "", retryTimeoutMs = Long.MaxValue)
+ val testRequestThread = new BrokerToControllerRequestThread(
+ mockClient, isNetworkClientForZkController = true,_ => mockClient, new ManualMetadataUpdater(),
+ controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
@@ -243,7 +247,6 @@ class BrokerToControllerRequestThreadTest {
val newController = new Node(newControllerId, "host2", port)
when(controllerNodeProvider.getControllerInfo()).thenReturn(
- emptyControllerInfo, // call to create network client.
controllerInfo(Some(oldController)),
controllerInfo(Some(newController))
)
@@ -255,8 +258,9 @@ class BrokerToControllerRequestThreadTest {
// response for retry request after receiving NOT_CONTROLLER error
val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
- val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
- config, time, "", retryTimeoutMs = Long.MaxValue)
+ val testRequestThread = new BrokerToControllerRequestThread(
+ mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
+ controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
val completionHandler = new TestControllerRequestCompletionHandler(Some(expectedResponse))
@@ -311,14 +315,15 @@ class BrokerToControllerRequestThreadTest {
val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
val controller = new Node(controllerId, "host1", 1234)
- when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo, controllerInfo(Some(controller)))
+ when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(controller)))
val retryTimeoutMs = 30000
val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
Collections.singletonMap("a", Errors.NOT_CONTROLLER),
Collections.singletonMap("a", 2))
- val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
- config, time, "", retryTimeoutMs)
+ val testRequestThread = new BrokerToControllerRequestThread(
+ mockClient, isNetworkClientForZkController = true,_ => mockClient, new ManualMetadataUpdater(),
+ controllerNodeProvider, config, time, "", retryTimeoutMs)
testRequestThread.started = true
val completionHandler = new TestControllerRequestCompletionHandler()
@@ -375,8 +380,9 @@ class BrokerToControllerRequestThreadTest {
mockClient.prepareUnsupportedVersionResponse(request => request.apiKey == ApiKeys.METADATA)
- val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
- config, time, "", retryTimeoutMs = Long.MaxValue)
+ val testRequestThread = new BrokerToControllerRequestThread(
+ mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
+ controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
testRequestThread.enqueue(queueItem)
@@ -412,8 +418,9 @@ class BrokerToControllerRequestThreadTest {
mockClient.createPendingAuthenticationError(activeController, 50)
- val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
- config, time, "", retryTimeoutMs = Long.MaxValue)
+ val testRequestThread = new BrokerToControllerRequestThread(
+ mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
+ controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
testRequestThread.enqueue(queueItem)
@@ -433,8 +440,9 @@ class BrokerToControllerRequestThreadTest {
val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
- val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
- config, time, "", retryTimeoutMs = Long.MaxValue)
+ val testRequestThread = new BrokerToControllerRequestThread(
+ mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
+ controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
val completionHandler = new TestControllerRequestCompletionHandler(None)
val queueItem = BrokerToControllerQueueItem(