You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/18 06:00:34 UTC
[kafka] branch 2.8 updated: MINOR: Add KIP-500 BrokerServer and
ControllerServer (#10113)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new d77759d MINOR: Add KIP-500 BrokerServer and ControllerServer (#10113)
d77759d is described below
commit d77759d0fe64094a26a6aeaecebc045be317af6f
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Thu Feb 18 00:35:13 2021 -0500
MINOR: Add KIP-500 BrokerServer and ControllerServer (#10113)
This PR adds the KIP-500 BrokerServer and ControllerServer classes and
makes some related changes to get them working. Note that the ControllerServer
does not instantiate a QuorumController object yet, since that will be added in
PR #10070.
* Add BrokerServer and ControllerServer
* Change ApiVersions#computeMaxUsableProduceMagic so that it can handle
endpoints which do not support PRODUCE (such as KIP-500 controller nodes)
* KafkaAdminClientTest: fix some lingering references to decommissionBroker
that should be references to unregisterBroker.
* Make some changes to allow SocketServer to be used by ControllerServer as
we as by the broker.
* We now return a random active Broker ID as the Controller ID in
MetadataResponse for the Raft-based case as per KIP-590.
* Add the RaftControllerNodeProvider
* Add EnvelopeUtils
* Add MetaLogRaftShim
* In ducktape, in config_property.py: use a KIP-500 compatible cluster ID.
Reviewers: Colin P. McCabe <cm...@apache.org>, David Arthur <mu...@gmail.com>
---
checkstyle/import-control.xml | 1 +
.../java/org/apache/kafka/clients/ApiVersions.java | 13 +-
.../org/apache/kafka/clients/ApiVersionsTest.java | 17 +
.../kafka/clients/admin/KafkaAdminClientTest.java | 4 +-
core/src/main/scala/kafka/cluster/Broker.scala | 2 +-
core/src/main/scala/kafka/log/LogConfig.scala | 2 +-
.../main/scala/kafka/network/SocketServer.scala | 37 +-
.../scala/kafka/raft/KafkaNetworkChannel.scala | 3 +-
core/src/main/scala/kafka/raft/RaftManager.scala | 2 +
.../main/scala/kafka/server/AlterIsrManager.scala | 5 +-
.../kafka/server/AutoTopicCreationManager.scala | 15 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 468 ++++++++++++++++++++-
.../server/BrokerToControllerChannelManager.scala | 36 ++
.../main/scala/kafka/server/ControllerApis.scala | 453 ++++++++++++++++++++
.../main/scala/kafka/server/ControllerServer.scala | 198 ++++++++-
.../main/scala/kafka/server/EnvelopeUtils.scala | 137 ++++++
core/src/main/scala/kafka/server/KafkaApis.scala | 103 +----
core/src/main/scala/kafka/server/KafkaBroker.scala | 8 +
core/src/main/scala/kafka/server/KafkaConfig.scala | 32 +-
.../main/scala/kafka/server/KafkaRaftServer.scala | 33 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 11 +-
.../main/scala/kafka/server/MetadataSupport.scala | 20 +-
core/src/main/scala/kafka/server/Server.scala | 12 +
.../main/scala/kafka/tools/TestRaftServer.scala | 2 +-
.../server/AutoTopicCreationManagerTest.scala | 12 +-
.../unit/kafka/server/ControllerApisTest.scala | 143 +++++++
.../scala/unit/kafka/server/KafkaApisTest.scala | 40 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 114 ++++-
.../unit/kafka/server/KafkaRaftServerTest.scala | 2 +-
.../jmh/metadata/MetadataRequestBenchmark.java | 5 +-
.../org/apache/kafka/raft/KafkaRaftClient.java | 21 +-
.../java/org/apache/kafka/raft/NetworkChannel.java | 5 +
.../java/org/apache/kafka/raft/RaftClient.java | 11 +-
.../java/org/apache/kafka/raft/RaftConfig.java | 13 +
.../org/apache/kafka/raft/ReplicatedCounter.java | 2 +-
.../kafka/raft/metadata/MetaLogRaftShim.java | 119 ++++++
.../org/apache/kafka/raft/MockNetworkChannel.java | 5 +
.../apache/kafka/raft/RaftClientTestContext.java | 2 +-
tests/kafkatest/services/kafka/config_property.py | 2 +-
39 files changed, 1908 insertions(+), 202 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index bc0491e..9ec16b9 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -385,6 +385,7 @@
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.test"/>
<allow pkg="com.fasterxml.jackson" />
</subpackage>
diff --git a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
index 8001f1c..a09d581 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.requests.ProduceRequest;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
/**
* Maintains node api versions for access outside of NetworkClient (which is where the information is derived).
@@ -51,12 +52,12 @@ public class ApiVersions {
private byte computeMaxUsableProduceMagic() {
// use a magic version which is supported by all brokers to reduce the chance that
// we will need to convert the messages when they are ready to be sent.
- byte maxUsableMagic = RecordBatch.CURRENT_MAGIC_VALUE;
- for (NodeApiVersions versions : this.nodeApiVersions.values()) {
- byte nodeMaxUsableMagic = ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE));
- maxUsableMagic = (byte) Math.min(nodeMaxUsableMagic, maxUsableMagic);
- }
- return maxUsableMagic;
+ Optional<Byte> knownBrokerNodesMinRequiredMagicForProduce = this.nodeApiVersions.values().stream()
+ .filter(versions -> versions.apiVersion(ApiKeys.PRODUCE) != null) // filter out Raft controller nodes
+ .map(versions -> ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE)))
+ .min(Byte::compare);
+ return (byte) Math.min(RecordBatch.CURRENT_MAGIC_VALUE,
+ knownBrokerNodesMinRequiredMagicForProduce.orElse(RecordBatch.CURRENT_MAGIC_VALUE));
}
public synchronized byte maxUsableProduceMagic() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
index 4a5c98d..206e95e4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
@@ -16,10 +16,13 @@
*/
package org.apache.kafka.clients;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.RecordBatch;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ApiVersionsTest {
@@ -38,4 +41,18 @@ public class ApiVersionsTest {
apiVersions.remove("1");
assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
}
+
+ @Test
+ public void testMaxUsableProduceMagicWithRaftController() {
+ ApiVersions apiVersions = new ApiVersions();
+ assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
+
+ // something that doesn't support PRODUCE, which is the case with Raft-based controllers
+ apiVersions.update("2", new NodeApiVersions(Collections.singleton(
+ new ApiVersionsResponseData.ApiVersion()
+ .setApiKey(ApiKeys.FETCH.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 2))));
+ assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index ddd01be..97b4b66 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -5082,7 +5082,7 @@ public class KafkaAdminClientTest {
}
@Test
- public void testDecommissionBrokerTimeoutMaxRetry() {
+ public void testUnregisterBrokerTimeoutMaxRetry() {
int nodeId = 1;
try (final AdminClientUnitTestEnv env = mockClientEnv(Time.SYSTEM, AdminClientConfig.RETRIES_CONFIG, "1")) {
env.kafkaClient().setNodeApiVersions(
@@ -5099,7 +5099,7 @@ public class KafkaAdminClientTest {
}
@Test
- public void testDecommissionBrokerTimeoutMaxWait() {
+ public void testUnregisterBrokerTimeoutMaxWait() {
int nodeId = 1;
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 46483d0..657d89b 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -32,7 +32,7 @@ import scala.collection.Seq
import scala.jdk.CollectionConverters._
object Broker {
- private[cluster] case class ServerInfo(clusterResource: ClusterResource,
+ private[kafka] case class ServerInfo(clusterResource: ClusterResource,
brokerId: Int,
endpoints: util.List[Endpoint],
interBrokerEndpoint: Endpoint) extends AuthorizerServerInfo
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 4299534..c2ab1d8 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -228,7 +228,7 @@ object LogConfig {
}
// Package private for testing, return a copy since it's a mutable global variable
- private[log] def configDefCopy: LogConfigDef = new LogConfigDef(configDef)
+ private[kafka] def configDefCopy: LogConfigDef = new LogConfigDef(configDef)
private val configDef: LogConfigDef = {
import org.apache.kafka.common.config.ConfigDef.Importance._
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 905c556..72c5141 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -78,12 +78,16 @@ class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider,
- val allowControllerOnlyApis: Boolean = false)
+ allowControllerOnlyApis: Boolean = false,
+ controllerSocketServer: Boolean = false)
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
private val maxQueuedRequests = config.queuedMaxRequests
- private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
+ private val nodeId = config.brokerId
+
+ private val logContext = new LogContext(s"[SocketServer ${if (controllerSocketServer) "controller" else "broker"}Id=${nodeId}] ")
+
this.logIdent = logContext.logPrefix
private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
@@ -117,11 +121,15 @@ class SocketServer(val config: KafkaConfig,
* when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]].
*
* @param startProcessingRequests Flag indicating whether `Processor`s must be started.
+ * @param controlPlaneListener The control plane listener, or None if there is none.
+ * @param dataPlaneListeners The data plane listeners.
*/
- def startup(startProcessingRequests: Boolean = true): Unit = {
+ def startup(startProcessingRequests: Boolean = true,
+ controlPlaneListener: Option[EndPoint] = config.controlPlaneListener,
+ dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): Unit = {
this.synchronized {
- createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
- createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
+ createControlPlaneAcceptorAndProcessor(controlPlaneListener)
+ createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, dataPlaneListeners)
if (startProcessingRequests) {
this.startProcessingRequests()
}
@@ -224,9 +232,11 @@ class SocketServer(val config: KafkaConfig,
private def startDataPlaneProcessorsAndAcceptors(authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
val interBrokerListener = dataPlaneAcceptors.asScala.keySet
.find(_.listenerName == config.interBrokerListenerName)
- .getOrElse(throw new IllegalStateException(s"Inter-broker listener ${config.interBrokerListenerName} not found, endpoints=${dataPlaneAcceptors.keySet}"))
- val orderedAcceptors = List(dataPlaneAcceptors.get(interBrokerListener)) ++
- dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values
+ val orderedAcceptors = interBrokerListener match {
+ case Some(interBrokerListener) => List(dataPlaneAcceptors.get(interBrokerListener)) ++
+ dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values
+ case None => dataPlaneAcceptors.asScala.values
+ }
orderedAcceptors.foreach { acceptor =>
val endpoint = acceptor.endPoint
startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor, authorizerFutures)
@@ -276,8 +286,7 @@ class SocketServer(val config: KafkaConfig,
private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = {
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
- val brokerId = config.brokerId
- new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas, metricPrefix, time)
+ new Acceptor(endPoint, sendBufferSize, recvBufferSize, nodeId, connectionQuotas, metricPrefix, time)
}
private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = {
@@ -540,11 +549,13 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
- brokerId: Int,
+ nodeId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String,
- time: Time) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+ time: Time,
+ logPrefix: String = "") extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+ this.logIdent = logPrefix
private val nioSelector = NSelector.open()
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
private val processors = new ArrayBuffer[Processor]()
@@ -573,7 +584,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
processors.foreach { processor =>
KafkaThread.nonDaemon(
- s"${processorThreadPrefix}-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
+ s"${processorThreadPrefix}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
processor
).start()
}
diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
index f3b7f11..68f7b4a 100644
--- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
+++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
@@ -165,7 +165,7 @@ class KafkaNetworkChannel(
RaftUtil.errorResponse(apiKey, error)
}
- def updateEndpoint(id: Int, spec: InetAddressSpec): Unit = {
+ override def updateEndpoint(id: Int, spec: InetAddressSpec): Unit = {
val node = new Node(id, spec.address.getHostString, spec.address.getPort)
endpoints.put(id, node)
}
@@ -181,5 +181,4 @@ class KafkaNetworkChannel(
override def close(): Unit = {
requestThread.shutdown()
}
-
}
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index b9a77b7..6a74c27 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -121,6 +121,8 @@ class KafkaRaftManager[T](
private val raftClient = buildRaftClient()
private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix)
+ def kafkaRaftClient: KafkaRaftClient[T] = raftClient
+
def startup(): Unit = {
// Update the voter endpoints (if valid) with what's in RaftConfig
val voterAddresses: util.Map[Integer, AddressSpec] = raftConfig.quorumVoterConnections
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala
index 70c0fc2..b58ca89 100644
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
@@ -70,7 +70,8 @@ object AlterIsrManager {
time: Time,
metrics: Metrics,
threadNamePrefix: Option[String],
- brokerEpochSupplier: () => Long
+ brokerEpochSupplier: () => Long,
+ brokerId: Int
): AlterIsrManager = {
val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)
@@ -87,7 +88,7 @@ object AlterIsrManager {
controllerChannelManager = channelManager,
scheduler = scheduler,
time = time,
- brokerId = config.brokerId,
+ brokerId = brokerId,
brokerEpochSupplier = brokerEpochSupplier
)
}
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index ec7f2df..01dabed 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -61,8 +61,8 @@ object AutoTopicCreationManager {
time: Time,
metrics: Metrics,
threadNamePrefix: Option[String],
- adminManager: ZkAdminManager,
- controller: KafkaController,
+ adminManager: Option[ZkAdminManager],
+ controller: Option[KafkaController],
groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator,
enableForwarding: Boolean
@@ -91,11 +91,14 @@ class DefaultAutoTopicCreationManager(
config: KafkaConfig,
metadataCache: MetadataCache,
channelManager: Option[BrokerToControllerChannelManager],
- adminManager: ZkAdminManager,
- controller: KafkaController,
+ adminManager: Option[ZkAdminManager],
+ controller: Option[KafkaController],
groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator
) extends AutoTopicCreationManager with Logging {
+ if (controller.isEmpty && channelManager.isEmpty) {
+ throw new IllegalArgumentException("Must supply a channel manager if not supplying a controller")
+ }
private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
@@ -116,7 +119,7 @@ class DefaultAutoTopicCreationManager(
val creatableTopicResponses = if (creatableTopics.isEmpty) {
Seq.empty
- } else if (!controller.isActive && channelManager.isDefined) {
+ } else if (controller.isEmpty || !controller.get.isActive && channelManager.isDefined) {
sendCreateTopicRequest(creatableTopics)
} else {
createTopicsInZk(creatableTopics, controllerMutationQuota)
@@ -133,7 +136,7 @@ class DefaultAutoTopicCreationManager(
try {
// Note that we use timeout = 0 since we do not need to wait for metadata propagation
// and we want to get the response error immediately.
- adminManager.createTopics(
+ adminManager.get.createTopics(
timeout = 0,
validateOnly = false,
creatableTopics,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 90f95ed..57ceb46 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -1,10 +1,10 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -14,14 +14,464 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package kafka.server
+import java.util
+import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.ReentrantLock
+
+import kafka.cluster.Broker.ServerInfo
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.{ProducerIdGenerator, TransactionCoordinator}
+import kafka.log.LogManager
+import kafka.metrics.KafkaYammerMetrics
+import kafka.network.SocketServer
+import kafka.security.CredentialProvider
+import kafka.server.KafkaBroker.metricsPrefix
+import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache}
+import kafka.utils.{CoreUtils, KafkaScheduler}
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.internals.ScramMechanism
+import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
+import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
+import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
+import org.apache.kafka.metadata.{BrokerState, VersionRange}
+import org.apache.kafka.metalog.MetaLogManager
+import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
/**
- * Stubbed implementation of the KIP-500 broker which processes state
- * from the `@metadata` topic which is replicated through Raft.
+ * A KIP-500 Kafka broker.
*/
-class BrokerServer {
- def startup(): Unit = ???
- def shutdown(): Unit = ???
- def awaitShutdown(): Unit = ???
+class BrokerServer(
+ val config: KafkaConfig,
+ val metaProps: MetaProperties,
+ val metaLogManager: MetaLogManager,
+ val time: Time,
+ val metrics: Metrics,
+ val threadNamePrefix: Option[String],
+ val initialOfflineDirs: Seq[String],
+ val controllerQuorumVotersFuture: CompletableFuture[util.List[String]],
+ val supportedFeatures: util.Map[String, VersionRange]
+ ) extends KafkaBroker {
+
+ import kafka.server.Server._
+
+ private val logContext: LogContext = new LogContext(s"[BrokerServer id=${config.nodeId}] ")
+
+ this.logIdent = logContext.logPrefix
+
+ val lifecycleManager: BrokerLifecycleManager =
+ new BrokerLifecycleManager(config, time, threadNamePrefix)
+
+ private val isShuttingDown = new AtomicBoolean(false)
+
+ val lock = new ReentrantLock()
+ val awaitShutdownCond = lock.newCondition()
+ var status: ProcessStatus = SHUTDOWN
+
+ var dataPlaneRequestProcessor: KafkaApis = null
+ var controlPlaneRequestProcessor: KafkaApis = null
+
+ var authorizer: Option[Authorizer] = None
+ var socketServer: SocketServer = null
+ var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
+ var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
+
+ var logDirFailureChannel: LogDirFailureChannel = null
+ var logManager: LogManager = null
+
+ var tokenManager: DelegationTokenManager = null
+
+ var replicaManager: RaftReplicaManager = null
+
+ var credentialProvider: CredentialProvider = null
+ var tokenCache: DelegationTokenCache = null
+
+ var groupCoordinator: GroupCoordinator = null
+
+ var transactionCoordinator: TransactionCoordinator = null
+
+ var forwardingManager: ForwardingManager = null
+
+ var alterIsrManager: AlterIsrManager = null
+
+ var autoTopicCreationManager: AutoTopicCreationManager = null
+
+ var kafkaScheduler: KafkaScheduler = null
+
+ var metadataCache: RaftMetadataCache = null
+
+ var quotaManagers: QuotaFactory.QuotaManagers = null
+ var quotaCache: ClientQuotaCache = null
+
+ private var _brokerTopicStats: BrokerTopicStats = null
+
+ val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
+
+ val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures)
+
+ val clusterId: String = metaProps.clusterId.toString
+
+ val configRepository = new CachedConfigRepository()
+
+ var brokerMetadataListener: BrokerMetadataListener = null
+
+ def kafkaYammerMetrics: kafka.metrics.KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
+
+ private[kafka] def brokerTopicStats = _brokerTopicStats
+
+ private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
+ lock.lock()
+ try {
+ if (status != from) return false
+ status = to
+ if (to == SHUTTING_DOWN) {
+ isShuttingDown.set(true)
+ } else if (to == SHUTDOWN) {
+ isShuttingDown.set(false)
+ awaitShutdownCond.signalAll()
+ }
+ } finally {
+ lock.unlock()
+ }
+ true
+ }
+
+ def startup(): Unit = {
+ if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
+ try {
+ info("Starting broker")
+
+ /* start scheduler */
+ kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
+ kafkaScheduler.startup()
+
+ /* register broker metrics */
+ _brokerTopicStats = new BrokerTopicStats
+
+ quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
+ quotaCache = new ClientQuotaCache()
+
+ logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
+
+ // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery
+ // until we catch up on the metadata log and have up-to-date topic and broker configs.
+ logManager = LogManager(config, initialOfflineDirs, configRepository, kafkaScheduler, time,
+ brokerTopicStats, logDirFailureChannel, true)
+
+ metadataCache = MetadataCache.raftMetadataCache(config.nodeId)
+ // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
+ // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
+ tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
+ credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
+
+ // Create and start the socket server acceptor threads so that the bound port is known.
+ // Delay starting processors until the end of the initialization sequence to ensure
+ // that credentials have been loaded before processing authentications.
+ socketServer = new SocketServer(config, metrics, time, credentialProvider, allowControllerOnlyApis = false)
+ socketServer.startup(startProcessingRequests = false)
+
+ val controllerNodes =
+ RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
+ val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
+ val alterIsrChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
+ time, metrics, config, "alterisr", threadNamePrefix, 60000)
+ alterIsrManager = new DefaultAlterIsrManager(
+ controllerChannelManager = alterIsrChannelManager,
+ scheduler = kafkaScheduler,
+ time = time,
+ brokerId = config.nodeId,
+ brokerEpochSupplier = () => lifecycleManager.brokerEpoch()
+ )
+ alterIsrManager.start()
+
+ this.replicaManager = new RaftReplicaManager(config, metrics, time,
+ kafkaScheduler, logManager, isShuttingDown, quotaManagers,
+ brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager,
+ configRepository, threadNamePrefix)
+
+ val forwardingChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
+ time, metrics, config, "forwarding", threadNamePrefix, 60000)
+ forwardingManager = new ForwardingManagerImpl(forwardingChannelManager)
+ forwardingManager.start()
+
+ /* start token manager */
+ if (config.tokenAuthEnabled) {
+ throw new UnsupportedOperationException("Delegation tokens are not supported")
+ }
+ tokenManager = new DelegationTokenManager(config, tokenCache, time , null)
+ tokenManager.startup() // does nothing, we just need a token manager in order to compile right now...
+
+ // Create group coordinator, but don't start it until we've started replica manager.
+ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
+ groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics)
+
+ // Create transaction coordinator, but don't start it until we've started replica manager.
+ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
+ transactionCoordinator = TransactionCoordinator(config, replicaManager,
+ new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"),
+ createTemporaryProducerIdManager, metrics, metadataCache, Time.SYSTEM)
+
+ val autoTopicCreationChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
+ time, metrics, config, "autocreate", threadNamePrefix, 60000)
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config, metadataCache, Some(autoTopicCreationChannelManager), None, None,
+ groupCoordinator, transactionCoordinator)
+ autoTopicCreationManager.start()
+
+ /* Add all reconfigurables for config change notification before starting the metadata listener */
+ config.dynamicConfig.addReconfigurables(this)
+
+ val clientQuotaMetadataManager = new ClientQuotaMetadataManager(
+ quotaManagers, socketServer.connectionQuotas, quotaCache)
+
+ brokerMetadataListener = new BrokerMetadataListener(
+ config.nodeId,
+ time,
+ metadataCache,
+ configRepository,
+ groupCoordinator,
+ replicaManager,
+ transactionCoordinator,
+ logManager,
+ threadNamePrefix,
+ clientQuotaMetadataManager)
+
+ val networkListeners = new ListenerCollection()
+ config.advertisedListeners.foreach { ep =>
+ networkListeners.add(new Listener().
+ setHost(ep.host).
+ setName(ep.listenerName.value()).
+ setPort(socketServer.boundPort(ep.listenerName)).
+ setSecurityProtocol(ep.securityProtocol.id))
+ }
+ lifecycleManager.start(() => brokerMetadataListener.highestMetadataOffset(),
+ BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
+ "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
+ metaProps.clusterId, networkListeners, supportedFeatures)
+
+ // Register a listener with the Raft layer to receive metadata event notifications
+ metaLogManager.register(brokerMetadataListener)
+
+ val endpoints = new util.ArrayList[Endpoint](networkListeners.size())
+ var interBrokerListener: Endpoint = null
+ networkListeners.iterator().forEachRemaining(listener => {
+ val endPoint = new Endpoint(listener.name(),
+ SecurityProtocol.forId(listener.securityProtocol()),
+ listener.host(), listener.port())
+ endpoints.add(endPoint)
+ if (listener.name().equals(config.interBrokerListenerName.value())) {
+ interBrokerListener = endPoint
+ }
+ })
+ if (interBrokerListener == null) {
+ throw new RuntimeException("Unable to find inter-broker listener " +
+ config.interBrokerListenerName.value() + ". Found listener(s): " +
+ endpoints.asScala.map(ep => ep.listenerName().orElse("(none)")).mkString(", "))
+ }
+ val authorizerInfo = ServerInfo(new ClusterResource(clusterId),
+ config.nodeId, endpoints, interBrokerListener)
+
+ /* Get the authorizer and initialize it if one is specified.*/
+ authorizer = config.authorizer
+ authorizer.foreach(_.configure(config.originals))
+ val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
+ case Some(authZ) =>
+ authZ.start(authorizerInfo).asScala.map { case (ep, cs) =>
+ ep -> cs.toCompletableFuture
+ }
+ case None =>
+ authorizerInfo.endpoints.asScala.map { ep =>
+ ep -> CompletableFuture.completedFuture[Void](null)
+ }.toMap
+ }
+
+ val fetchManager = new FetchManager(Time.SYSTEM,
+ new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
+ KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+
+ // Start processing requests once we've caught up on the metadata log, recovered logs if necessary,
+ // and started all services that we previously delayed starting.
+ val raftSupport = RaftSupport(forwardingManager, metadataCache)
+ dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport,
+ replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
+ config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
+ fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
+
+ dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
+ config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
+
+ socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
+ controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, raftSupport,
+ replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
+ config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
+ fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
+
+ controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
+ 1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
+ }
+
+ // Block until we've caught up on the metadata log
+ lifecycleManager.initialCatchUpFuture.get()
+ // Start log manager, which will perform (potentially lengthy) recovery-from-unclean-shutdown if required.
+ logManager.startup(metadataCache.getAllTopics())
+ // Start other services that we've delayed starting, in the appropriate order.
+ replicaManager.endMetadataChangeDeferral(
+ RequestHandlerHelper.onLeadershipChange(groupCoordinator, transactionCoordinator, _, _))
+ replicaManager.startup()
+ replicaManager.startHighWatermarkCheckPointThread()
+ groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME).
+ getOrElse(config.offsetsTopicPartitions))
+ transactionCoordinator.startup(() => metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME).
+ getOrElse(config.transactionTopicPartitions))
+
+ socketServer.startProcessingRequests(authorizerFutures)
+
+ // We're now ready to unfence the broker.
+ lifecycleManager.setReadyToUnfence()
+
+ maybeChangeStatus(STARTING, STARTED)
+ } catch {
+ case e: Throwable =>
+ maybeChangeStatus(STARTING, STARTED)
+ fatal("Fatal error during broker startup. Prepare to shutdown", e)
+ shutdown()
+ throw e
+ }
+ }
+
+ class TemporaryProducerIdManager() extends ProducerIdGenerator {
+ val maxProducerIdsPerBrokerEpoch = 1000000
+ var currentOffset = -1
+ override def generateProducerId(): Long = {
+ currentOffset = currentOffset + 1
+ if (currentOffset >= maxProducerIdsPerBrokerEpoch) {
+ fatal(s"Exhausted all demo/temporary producerIds as the next one will has extend past the block size of $maxProducerIdsPerBrokerEpoch")
+ throw new KafkaException("Have exhausted all demo/temporary producerIds.")
+ }
+ lifecycleManager.initialCatchUpFuture.get()
+ lifecycleManager.brokerEpoch() * maxProducerIdsPerBrokerEpoch + currentOffset
+ }
+ }
+
+ def createTemporaryProducerIdManager(): ProducerIdGenerator = {
+ new TemporaryProducerIdManager()
+ }
+
+ def shutdown(): Unit = {
+ if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return
+ try {
+ info("shutting down")
+
+ if (config.controlledShutdownEnable) {
+ lifecycleManager.beginControlledShutdown()
+ try {
+ lifecycleManager.controlledShutdownFuture.get(5L, TimeUnit.MINUTES)
+ } catch {
+ case _: TimeoutException =>
+ error("Timed out waiting for the controller to approve controlled shutdown")
+ case e: Throwable =>
+ error("Got unexpected exception waiting for controlled shutdown future", e)
+ }
+ }
+ lifecycleManager.beginShutdown()
+
+ // Stop socket server to stop accepting any more connections and requests.
+ // Socket server will be shutdown towards the end of the sequence.
+ if (socketServer != null) {
+ CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
+ }
+ if (dataPlaneRequestHandlerPool != null)
+ CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
+ if (controlPlaneRequestHandlerPool != null)
+ CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this)
+ if (kafkaScheduler != null)
+ CoreUtils.swallow(kafkaScheduler.shutdown(), this)
+
+ if (dataPlaneRequestProcessor != null)
+ CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
+ if (controlPlaneRequestProcessor != null)
+ CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
+ CoreUtils.swallow(authorizer.foreach(_.close()), this)
+
+ if (brokerMetadataListener != null) {
+ CoreUtils.swallow(brokerMetadataListener.close(), this)
+ }
+ if (transactionCoordinator != null)
+ CoreUtils.swallow(transactionCoordinator.shutdown(), this)
+ if (groupCoordinator != null)
+ CoreUtils.swallow(groupCoordinator.shutdown(), this)
+
+ if (tokenManager != null)
+ CoreUtils.swallow(tokenManager.shutdown(), this)
+
+ if (replicaManager != null)
+ CoreUtils.swallow(replicaManager.shutdown(), this)
+
+ if (alterIsrManager != null)
+ CoreUtils.swallow(alterIsrManager.shutdown(), this)
+
+ if (forwardingManager != null)
+ CoreUtils.swallow(forwardingManager.shutdown(), this)
+
+ if (autoTopicCreationManager != null)
+ CoreUtils.swallow(autoTopicCreationManager.shutdown(), this)
+
+ if (logManager != null)
+ CoreUtils.swallow(logManager.shutdown(), this)
+
+ if (quotaManagers != null)
+ CoreUtils.swallow(quotaManagers.shutdown(), this)
+
+ if (socketServer != null)
+ CoreUtils.swallow(socketServer.shutdown(), this)
+ if (metrics != null)
+ CoreUtils.swallow(metrics.close(), this)
+ if (brokerTopicStats != null)
+ CoreUtils.swallow(brokerTopicStats.close(), this)
+
+ // Clear all reconfigurable instances stored in DynamicBrokerConfig
+ config.dynamicConfig.clear()
+
+ isShuttingDown.set(false)
+
+ CoreUtils.swallow(lifecycleManager.close(), this)
+
+ CoreUtils.swallow(AppInfoParser.unregisterAppInfo(metricsPrefix, config.nodeId.toString, metrics), this)
+ info("shut down completed")
+ } catch {
+ case e: Throwable =>
+ fatal("Fatal error during broker shutdown.", e)
+ throw e
+ } finally {
+ maybeChangeStatus(SHUTTING_DOWN, SHUTDOWN)
+ }
+ }
+
+ def awaitShutdown(): Unit = {
+ lock.lock()
+ try {
+ while (true) {
+ if (status == SHUTDOWN) return
+ awaitShutdownCond.awaitUninterruptibly()
+ }
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName)
+
+ def currentState(): BrokerState = lifecycleManager.state()
+
}
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 1e7af76..3b53522 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -31,7 +31,9 @@ import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.metalog.MetaLogManager
+import scala.collection.Seq
import scala.jdk.CollectionConverters._
trait ControllerNodeProvider {
@@ -71,6 +73,40 @@ class MetadataCacheControllerNodeProvider(
}
}
+object RaftControllerNodeProvider {
+ def apply(metaLogManager: MetaLogManager,
+ config: KafkaConfig,
+ controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = {
+
+ val listenerName = new ListenerName(config.controllerListenerNames.head)
+ val securityProtocol = config.listenerSecurityProtocolMap.getOrElse(listenerName, SecurityProtocol.forName(listenerName.value()))
+ new RaftControllerNodeProvider(metaLogManager, controllerQuorumVoterNodes, listenerName, securityProtocol)
+ }
+}
+
+/**
+ * Finds the controller node by checking the metadata log manager.
+ * This provider is used when we are using a Raft-based metadata quorum.
+ */
+class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
+ controllerQuorumVoterNodes: Seq[Node],
+ val listenerName: ListenerName,
+ val securityProtocol: SecurityProtocol
+ ) extends ControllerNodeProvider with Logging {
+ val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap
+
+ override def get(): Option[Node] = {
+ val leader = metaLogManager.leader()
+ if (leader == null) {
+ None
+ } else if (leader.nodeId() < 0) {
+ None
+ } else {
+ idToNode.get(leader.nodeId())
+ }
+ }
+}
+
object BrokerToControllerChannelManager {
def apply(
controllerNodeProvider: ControllerNodeProvider,
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
new file mode 100644
index 0000000..2386da5
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import kafka.network.RequestChannel
+import kafka.raft.RaftManager
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DESCRIBE}
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ApiException
+import org.apache.kafka.common.internals.FatalExitError
+import org.apache.kafka.common.message.ApiVersionsResponseData.{ApiVersion, SupportedFeatureKey}
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
+import org.apache.kafka.common.message.{ApiVersionsResponseData, BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, UnregisterBrokerResponseData, VoteResponseData}
+import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
+import org.apache.kafka.common.record.BaseRecords
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.resource.Resource
+import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
+import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.Node
+import org.apache.kafka.controller.Controller
+import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, FeatureMap, FeatureMapAndEpoch, VersionRange}
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Request handler for Controller APIs
+ */
+class ControllerApis(val requestChannel: RequestChannel,
+ val authorizer: Option[Authorizer],
+ val quotas: QuotaManagers,
+ val time: Time,
+ val supportedFeatures: Map[String, VersionRange],
+ val controller: Controller,
+ val raftManager: RaftManager[ApiMessageAndVersion],
+ val config: KafkaConfig,
+ val metaProperties: MetaProperties,
+ val controllerNodes: Seq[Node]) extends ApiRequestHandler with Logging {
+
+ val authHelper = new AuthHelper(authorizer)
+ val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time, s"[ControllerApis id=${config.nodeId}] ")
+
+ var supportedApiKeys = Set(
+ ApiKeys.FETCH,
+ ApiKeys.METADATA,
+ //ApiKeys.SASL_HANDSHAKE
+ ApiKeys.API_VERSIONS,
+ ApiKeys.CREATE_TOPICS,
+ //ApiKeys.DELETE_TOPICS,
+ //ApiKeys.DESCRIBE_ACLS,
+ //ApiKeys.CREATE_ACLS,
+ //ApiKeys.DELETE_ACLS,
+ //ApiKeys.DESCRIBE_CONFIGS,
+ //ApiKeys.ALTER_CONFIGS,
+ //ApiKeys.SASL_AUTHENTICATE,
+ //ApiKeys.CREATE_PARTITIONS,
+ //ApiKeys.CREATE_DELEGATION_TOKEN
+ //ApiKeys.RENEW_DELEGATION_TOKEN
+ //ApiKeys.EXPIRE_DELEGATION_TOKEN
+ //ApiKeys.DESCRIBE_DELEGATION_TOKEN
+ //ApiKeys.ELECT_LEADERS
+ ApiKeys.INCREMENTAL_ALTER_CONFIGS,
+ //ApiKeys.ALTER_PARTITION_REASSIGNMENTS
+ //ApiKeys.LIST_PARTITION_REASSIGNMENTS
+ ApiKeys.ALTER_CLIENT_QUOTAS,
+ //ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS
+ //ApiKeys.ALTER_USER_SCRAM_CREDENTIALS
+ //ApiKeys.UPDATE_FEATURES
+ ApiKeys.ENVELOPE,
+ ApiKeys.VOTE,
+ ApiKeys.BEGIN_QUORUM_EPOCH,
+ ApiKeys.END_QUORUM_EPOCH,
+ ApiKeys.DESCRIBE_QUORUM,
+ ApiKeys.ALTER_ISR,
+ ApiKeys.BROKER_REGISTRATION,
+ ApiKeys.BROKER_HEARTBEAT,
+ ApiKeys.UNREGISTER_BROKER,
+ )
+
+ private def maybeHandleInvalidEnvelope(
+ envelope: RequestChannel.Request,
+ forwardedApiKey: ApiKeys
+ ): Boolean = {
+ def sendEnvelopeError(error: Errors): Unit = {
+ requestHelper.sendErrorResponseMaybeThrottle(envelope, error.exception)
+ }
+
+ if (!authHelper.authorize(envelope.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
+ // Forwarding request must have CLUSTER_ACTION authorization to reduce the risk of impersonation.
+ sendEnvelopeError(Errors.CLUSTER_AUTHORIZATION_FAILED)
+ true
+ } else if (!forwardedApiKey.forwardable) {
+ sendEnvelopeError(Errors.INVALID_REQUEST)
+ true
+ } else {
+ false
+ }
+ }
+
+ override def handle(request: RequestChannel.Request): Unit = {
+ try {
+ val handled = request.envelope.exists(envelope => {
+ maybeHandleInvalidEnvelope(envelope, request.header.apiKey)
+ })
+
+ if (handled)
+ return
+
+ request.header.apiKey match {
+ case ApiKeys.FETCH => handleFetch(request)
+ case ApiKeys.METADATA => handleMetadataRequest(request)
+ case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
+ case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
+ case ApiKeys.VOTE => handleVote(request)
+ case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request)
+ case ApiKeys.END_QUORUM_EPOCH => handleEndQuorumEpoch(request)
+ case ApiKeys.DESCRIBE_QUORUM => handleDescribeQuorum(request)
+ case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
+ case ApiKeys.BROKER_REGISTRATION => handleBrokerRegistration(request)
+ case ApiKeys.BROKER_HEARTBEAT => handleBrokerHeartBeatRequest(request)
+ case ApiKeys.UNREGISTER_BROKER => handleUnregisterBroker(request)
+ case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotas(request)
+ case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
+ case ApiKeys.ENVELOPE => EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
+ case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey()}")
+ }
+ } catch {
+ case e: FatalExitError => throw e
+ case e: Throwable => requestHelper.handleError(request, e)
+ }
+ }
+
+ private def handleFetch(request: RequestChannel.Request): Unit = {
+ authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+ handleRaftRequest(request, response => new FetchResponse[BaseRecords](response.asInstanceOf[FetchResponseData]))
+ }
+
+ def handleMetadataRequest(request: RequestChannel.Request): Unit = {
+ val metadataRequest = request.body[MetadataRequest]
+ def createResponseCallback(requestThrottleMs: Int): MetadataResponse = {
+ val metadataResponseData = new MetadataResponseData()
+ metadataResponseData.setThrottleTimeMs(requestThrottleMs)
+ controllerNodes.foreach { node =>
+ metadataResponseData.brokers().add(new MetadataResponseBroker()
+ .setHost(node.host)
+ .setNodeId(node.id)
+ .setPort(node.port)
+ .setRack(node.rack))
+ }
+ metadataResponseData.setClusterId(metaProperties.clusterId.toString)
+ if (controller.curClaimEpoch() > 0) {
+ metadataResponseData.setControllerId(config.nodeId)
+ } else {
+ metadataResponseData.setControllerId(MetadataResponse.NO_CONTROLLER_ID)
+ }
+ val clusterAuthorizedOperations = if (metadataRequest.data.includeClusterAuthorizedOperations) {
+ if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
+ authHelper.authorizedOperations(request, Resource.CLUSTER)
+ } else {
+ 0
+ }
+ } else {
+ Int.MinValue
+ }
+ // TODO: fill in information about the metadata topic
+ metadataResponseData.setClusterAuthorizedOperations(clusterAuthorizedOperations)
+ new MetadataResponse(metadataResponseData, request.header.apiVersion)
+ }
+ requestHelper.sendResponseMaybeThrottle(request,
+ requestThrottleMs => createResponseCallback(requestThrottleMs))
+ }
+
+ def handleCreateTopics(request: RequestChannel.Request): Unit = {
+ val createTopicRequest = request.body[CreateTopicsRequest]
+ val (authorizedCreateRequest, unauthorizedTopics) =
+ if (authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME)) {
+ (createTopicRequest.data, Seq.empty)
+ } else {
+ val duplicate = createTopicRequest.data.duplicate()
+ val authorizedTopics = new CreatableTopicCollection()
+ val unauthorizedTopics = mutable.Buffer.empty[String]
+
+ createTopicRequest.data.topics.asScala.foreach { topicData =>
+ if (authHelper.authorize(request.context, CREATE, TOPIC, topicData.name)) {
+ authorizedTopics.add(topicData)
+ } else {
+ unauthorizedTopics += topicData.name
+ }
+ }
+ (duplicate.setTopics(authorizedTopics), unauthorizedTopics)
+ }
+
+ def sendResponse(response: CreateTopicsResponseData): Unit = {
+ unauthorizedTopics.foreach { topic =>
+ val result = new CreatableTopicResult()
+ .setName(topic)
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ response.topics.add(result)
+ }
+
+ requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+ response.setThrottleTimeMs(throttleTimeMs)
+ new CreateTopicsResponse(response)
+ })
+ }
+
+ if (authorizedCreateRequest.topics.isEmpty) {
+ sendResponse(new CreateTopicsResponseData())
+ } else {
+ val future = controller.createTopics(authorizedCreateRequest)
+ future.whenComplete((responseData, exception) => {
+ val response = if (exception != null) {
+ createTopicRequest.getErrorResponse(exception).asInstanceOf[CreateTopicsResponse].data
+ } else {
+ responseData
+ }
+ sendResponse(response)
+ })
+ }
+ }
+
+ def handleApiVersionsRequest(request: RequestChannel.Request): Unit = {
+ // Note that broker returns its full list of supported ApiKeys and versions regardless of current
+ // authentication state (e.g., before SASL authentication on an SASL listener, do note that no
+ // Kafka protocol requests may take place on an SSL listener before the SSL handshake is finished).
+ // If this is considered to leak information about the broker version a workaround is to use SSL
+ // with client authentication which is performed at an earlier stage of the connection where the
+ // ApiVersionRequest is not available.
+ def createResponseCallback(features: FeatureMapAndEpoch,
+ requestThrottleMs: Int): ApiVersionsResponse = {
+ val apiVersionRequest = request.body[ApiVersionsRequest]
+ if (apiVersionRequest.hasUnsupportedRequestVersion)
+ apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
+ else if (!apiVersionRequest.isValid)
+ apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
+ else {
+ val data = new ApiVersionsResponseData().
+ setErrorCode(0.toShort).
+ setThrottleTimeMs(requestThrottleMs).
+ setFinalizedFeaturesEpoch(features.epoch())
+ supportedFeatures.foreach {
+ case (k, v) => data.supportedFeatures().add(new SupportedFeatureKey().
+ setName(k).setMaxVersion(v.max()).setMinVersion(v.min()))
+ }
+ // features.finalizedFeatures().asScala.foreach {
+ // case (k, v) => data.finalizedFeatures().add(new FinalizedFeatureKey().
+ // setName(k).setMaxVersionLevel(v.max()).setMinVersionLevel(v.min()))
+ // }
+ ApiKeys.values().foreach {
+ key =>
+ if (supportedApiKeys.contains(key)) {
+ data.apiKeys().add(new ApiVersion().
+ setApiKey(key.id).
+ setMaxVersion(key.latestVersion()).
+ setMinVersion(key.oldestVersion()))
+ }
+ }
+ new ApiVersionsResponse(data)
+ }
+ }
+ // FutureConverters.toScala(controller.finalizedFeatures()).onComplete {
+ // case Success(features) =>
+ requestHelper.sendResponseMaybeThrottle(request,
+ requestThrottleMs => createResponseCallback(new FeatureMapAndEpoch(
+ new FeatureMap(new util.HashMap()), 0), requestThrottleMs))
+ // case Failure(e) => requestHelper.handleError(request, e)
+ // }
+ }
+
+ private def handleVote(request: RequestChannel.Request): Unit = {
+ authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+ handleRaftRequest(request, response => new VoteResponse(response.asInstanceOf[VoteResponseData]))
+ }
+
+ private def handleBeginQuorumEpoch(request: RequestChannel.Request): Unit = {
+ authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+ handleRaftRequest(request, response => new BeginQuorumEpochResponse(response.asInstanceOf[BeginQuorumEpochResponseData]))
+ }
+
+ private def handleEndQuorumEpoch(request: RequestChannel.Request): Unit = {
+ authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+ handleRaftRequest(request, response => new EndQuorumEpochResponse(response.asInstanceOf[EndQuorumEpochResponseData]))
+ }
+
+ private def handleDescribeQuorum(request: RequestChannel.Request): Unit = {
+ authHelper.authorizeClusterOperation(request, DESCRIBE)
+ handleRaftRequest(request, response => new DescribeQuorumResponse(response.asInstanceOf[DescribeQuorumResponseData]))
+ }
+
+ def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
+ authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+ val alterIsrRequest = request.body[AlterIsrRequest]
+ val future = controller.alterIsr(alterIsrRequest.data())
+ future.whenComplete((result, exception) => {
+ val response = if (exception != null) {
+ alterIsrRequest.getErrorResponse(exception)
+ } else {
+ new AlterIsrResponse(result)
+ }
+ requestHelper.sendResponseExemptThrottle(request, response)
+ })
+ }
+
+ def handleBrokerHeartBeatRequest(request: RequestChannel.Request): Unit = {
+ val heartbeatRequest = request.body[BrokerHeartbeatRequest]
+ authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+
+ controller.processBrokerHeartbeat(heartbeatRequest.data).handle[Unit]((reply, e) => {
+ def createResponseCallback(requestThrottleMs: Int,
+ reply: BrokerHeartbeatReply,
+ e: Throwable): BrokerHeartbeatResponse = {
+ if (e != null) {
+ new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
+ setThrottleTimeMs(requestThrottleMs).
+ setErrorCode(Errors.forException(e).code()))
+ } else {
+ new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
+ setThrottleTimeMs(requestThrottleMs).
+ setErrorCode(Errors.NONE.code()).
+ setIsCaughtUp(reply.isCaughtUp()).
+ setIsFenced(reply.isFenced()).
+ setShouldShutDown(reply.shouldShutDown()))
+ }
+ }
+ requestHelper.sendResponseMaybeThrottle(request,
+ requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e))
+ })
+ }
+
+ def handleUnregisterBroker(request: RequestChannel.Request): Unit = {
+ val decommissionRequest = request.body[UnregisterBrokerRequest]
+ authHelper.authorizeClusterOperation(request, ALTER)
+
+ controller.unregisterBroker(decommissionRequest.data().brokerId()).handle[Unit]((_, e) => {
+ def createResponseCallback(requestThrottleMs: Int,
+ e: Throwable): UnregisterBrokerResponse = {
+ if (e != null) {
+ new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
+ setThrottleTimeMs(requestThrottleMs).
+ setErrorCode(Errors.forException(e).code()))
+ } else {
+ new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
+ setThrottleTimeMs(requestThrottleMs))
+ }
+ }
+ requestHelper.sendResponseMaybeThrottle(request,
+ requestThrottleMs => createResponseCallback(requestThrottleMs, e))
+ })
+ }
+
+ def handleBrokerRegistration(request: RequestChannel.Request): Unit = {
+ val registrationRequest = request.body[BrokerRegistrationRequest]
+ authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+
+ controller.registerBroker(registrationRequest.data).handle[Unit]((reply, e) => {
+ def createResponseCallback(requestThrottleMs: Int,
+ reply: BrokerRegistrationReply,
+ e: Throwable): BrokerRegistrationResponse = {
+ if (e != null) {
+ new BrokerRegistrationResponse(new BrokerRegistrationResponseData().
+ setThrottleTimeMs(requestThrottleMs).
+ setErrorCode(Errors.forException(e).code()))
+ } else {
+ new BrokerRegistrationResponse(new BrokerRegistrationResponseData().
+ setThrottleTimeMs(requestThrottleMs).
+ setErrorCode(Errors.NONE.code()).
+ setBrokerEpoch(reply.epoch))
+ }
+ }
+ requestHelper.sendResponseMaybeThrottle(request,
+ requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e))
+ })
+ }
+
+ private def handleRaftRequest(request: RequestChannel.Request,
+ buildResponse: ApiMessage => AbstractResponse): Unit = {
+ val requestBody = request.body[AbstractRequest]
+ val future = raftManager.handleRequest(request.header, requestBody.data, time.milliseconds())
+
+ future.whenComplete((responseData, exception) => {
+ val response = if (exception != null) {
+ requestBody.getErrorResponse(exception)
+ } else {
+ buildResponse(responseData)
+ }
+ requestHelper.sendResponseExemptThrottle(request, response)
+ })
+ }
+
+ def handleAlterClientQuotas(request: RequestChannel.Request): Unit = {
+ val quotaRequest = request.body[AlterClientQuotasRequest]
+ authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
+
+ controller.alterClientQuotas(quotaRequest.entries(), quotaRequest.validateOnly())
+ .whenComplete((results, exception) => {
+ if (exception != null) {
+ requestHelper.handleError(request, exception)
+ } else {
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+ AlterClientQuotasResponse.fromQuotaEntities(results, requestThrottleMs))
+ }
+ })
+ }
+
+ def handleIncrementalAlterConfigs(request: RequestChannel.Request): Unit = {
+ val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
+ authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
+ val configChanges = new util.HashMap[ConfigResource, util.Map[String, util.Map.Entry[AlterConfigOp.OpType, String]]]()
+ alterConfigsRequest.data.resources.forEach { resource =>
+ val configResource = new ConfigResource(ConfigResource.Type.forId(resource.resourceType()), resource.resourceName())
+ val altersByName = new util.HashMap[String, util.Map.Entry[AlterConfigOp.OpType, String]]()
+ resource.configs.forEach { config =>
+ altersByName.put(config.name(), new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
+ AlterConfigOp.OpType.forId(config.configOperation()), config.value()))
+ }
+ configChanges.put(configResource, altersByName)
+ }
+ controller.incrementalAlterConfigs(configChanges, alterConfigsRequest.data().validateOnly())
+ .whenComplete((results, exception) => {
+ if (exception != null) {
+ requestHelper.handleError(request, exception)
+ } else {
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new IncrementalAlterConfigsResponse(requestThrottleMs, results))
+ }
+ })
+ }
+}
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index b648e77..efcebb4 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -1,10 +1,10 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -14,14 +14,194 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package kafka.server
+import java.util.concurrent.CompletableFuture
+import java.util
+import java.util.concurrent.locks.ReentrantLock
+
+import kafka.cluster.Broker.ServerInfo
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.raft.RaftManager
+import kafka.security.CredentialProvider
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.{CoreUtils, Logging}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.security.scram.internals.ScramMechanism
+import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.{ClusterResource, Endpoint}
+import org.apache.kafka.controller.Controller
+import org.apache.kafka.metadata.{ApiMessageAndVersion, VersionRange}
+import org.apache.kafka.metalog.MetaLogManager
+import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.jdk.CollectionConverters._
+
/**
- * Stubbed implementation of the KIP-500 controller which is responsible
- * for managing the `@metadata` topic which is replicated through Raft.
+ * A KIP-500 Kafka controller.
*/
-class ControllerServer {
- def startup(): Unit = ???
- def shutdown(): Unit = ???
- def awaitShutdown(): Unit = ???
+class ControllerServer(
+ val metaProperties: MetaProperties,
+ val config: KafkaConfig,
+ val metaLogManager: MetaLogManager,
+ val raftManager: RaftManager[ApiMessageAndVersion],
+ val time: Time,
+ val metrics: Metrics,
+ val threadNamePrefix: Option[String],
+ val controllerQuorumVotersFuture: CompletableFuture[util.List[String]]
+ ) extends Logging with KafkaMetricsGroup {
+ import kafka.server.Server._
+
+ val lock = new ReentrantLock()
+ val awaitShutdownCond = lock.newCondition()
+ var status: ProcessStatus = SHUTDOWN
+
+ var linuxIoMetricsCollector: LinuxIoMetricsCollector = null
+ var authorizer: Option[Authorizer] = null
+ var tokenCache: DelegationTokenCache = null
+ var credentialProvider: CredentialProvider = null
+ var socketServer: SocketServer = null
+ val socketServerFirstBoundPortFuture = new CompletableFuture[Integer]()
+ var controller: Controller = null
+ val supportedFeatures: Map[String, VersionRange] = Map()
+ var quotaManagers: QuotaManagers = null
+ var controllerApis: ControllerApis = null
+ var controllerApisHandlerPool: KafkaRequestHandlerPool = null
+
+ private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
+ lock.lock()
+ try {
+ if (status != from) return false
+ status = to
+ if (to == SHUTDOWN) awaitShutdownCond.signalAll()
+ } finally {
+ lock.unlock()
+ }
+ true
+ }
+
+ def clusterId: String = metaProperties.clusterId.toString
+
+ def startup(): Unit = {
+ if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
+ try {
+ info("Starting controller")
+
+ maybeChangeStatus(STARTING, STARTED)
+ // TODO: initialize the log dir(s)
+ this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix()
+
+ newGauge("ClusterId", () => clusterId)
+ newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+
+ linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger.underlying)
+ if (linuxIoMetricsCollector.usable()) {
+ newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
+ newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
+ }
+
+ val javaListeners = config.controllerListeners.map(_.toJava).asJava
+ authorizer = config.authorizer
+ authorizer.foreach(_.configure(config.originals))
+
+ val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
+ case Some(authZ) =>
+ // It would be nice to remove some of the broker-specific assumptions from
+ // AuthorizerServerInfo, such as the assumption that there is an inter-broker
+ // listener, or that ID is named brokerId.
+ val controllerAuthorizerInfo = ServerInfo(
+ new ClusterResource(clusterId), config.nodeId, javaListeners, javaListeners.get(0))
+ authZ.start(controllerAuthorizerInfo).asScala.map { case (ep, cs) =>
+ ep -> cs.toCompletableFuture
+ }.toMap
+ case None =>
+ javaListeners.asScala.map {
+ ep => ep -> CompletableFuture.completedFuture[Void](null)
+ }.toMap
+ }
+
+ tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
+ credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
+ socketServer = new SocketServer(config,
+ metrics,
+ time,
+ credentialProvider,
+ allowControllerOnlyApis = true,
+ controllerSocketServer = true)
+ socketServer.startup(false, None, config.controllerListeners)
+ socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
+ config.controllerListeners.head.listenerName))
+
+ controller = null
+ quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
+ val controllerNodes =
+ RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
+ controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
+ authorizer,
+ quotaManagers,
+ time,
+ supportedFeatures,
+ controller,
+ raftManager,
+ config,
+ metaProperties,
+ controllerNodes.toSeq)
+ controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
+ socketServer.dataPlaneRequestChannel,
+ controllerApis,
+ time,
+ config.numIoThreads,
+ s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",
+ SocketServer.DataPlaneThreadPrefix)
+ socketServer.startProcessingRequests(authorizerFutures)
+ } catch {
+ case e: Throwable =>
+ maybeChangeStatus(STARTING, STARTED)
+ fatal("Fatal error during controller startup. Prepare to shutdown", e)
+ shutdown()
+ throw e
+ }
+ }
+
+ def shutdown(): Unit = {
+ if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return
+ try {
+ info("shutting down")
+ if (socketServer != null)
+ CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
+ if (controller != null)
+ controller.beginShutdown()
+ if (socketServer != null)
+ CoreUtils.swallow(socketServer.shutdown(), this)
+ if (controllerApisHandlerPool != null)
+ CoreUtils.swallow(controllerApisHandlerPool.shutdown(), this)
+ if (quotaManagers != null)
+ CoreUtils.swallow(quotaManagers.shutdown(), this)
+ if (controller != null)
+ controller.close()
+ socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down"))
+ } catch {
+ case e: Throwable =>
+ fatal("Fatal error during controller shutdown.", e)
+ throw e
+ } finally {
+ maybeChangeStatus(SHUTTING_DOWN, SHUTDOWN)
+ }
+ }
+
+ def awaitShutdown(): Unit = {
+ lock.lock()
+ try {
+ while (true) {
+ if (status == SHUTDOWN) return
+ awaitShutdownCond.awaitUninterruptibly()
+ }
+ } finally {
+ lock.unlock()
+ }
+ }
}
diff --git a/core/src/main/scala/kafka/server/EnvelopeUtils.scala b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
new file mode 100644
index 0000000..ec8871f
--- /dev/null
+++ b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.net.{InetAddress, UnknownHostException}
+import java.nio.ByteBuffer
+
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.{InvalidRequestException, PrincipalDeserializationException, UnsupportedVersionException}
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.requests.{EnvelopeRequest, RequestContext, RequestHeader}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+
+import scala.compat.java8.OptionConverters._
+
+object EnvelopeUtils {
+ def handleEnvelopeRequest(
+ request: RequestChannel.Request,
+ requestChannelMetrics: RequestChannel.Metrics,
+ handler: RequestChannel.Request => Unit): Unit = {
+ val envelope = request.body[EnvelopeRequest]
+ val forwardedPrincipal = parseForwardedPrincipal(request.context, envelope.requestPrincipal)
+ val forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress)
+
+ val forwardedRequestBuffer = envelope.requestData.duplicate()
+ val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer)
+
+ val forwardedApi = forwardedRequestHeader.apiKey
+ if (!forwardedApi.forwardable) {
+ throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding")
+ }
+
+ val forwardedContext = new RequestContext(
+ forwardedRequestHeader,
+ request.context.connectionId,
+ forwardedClientAddress,
+ forwardedPrincipal,
+ request.context.listenerName,
+ request.context.securityProtocol,
+ ClientInformation.EMPTY,
+ request.context.fromPrivilegedListener
+ )
+
+ val forwardedRequest = parseForwardedRequest(
+ request,
+ forwardedContext,
+ forwardedRequestBuffer,
+ requestChannelMetrics
+ )
+ handler(forwardedRequest)
+ }
+
+ private def parseForwardedClientAddress(
+ address: Array[Byte]
+ ): InetAddress = {
+ try {
+ InetAddress.getByAddress(address)
+ } catch {
+ case e: UnknownHostException =>
+ throw new InvalidRequestException("Failed to parse client address from envelope", e)
+ }
+ }
+
+ private def parseForwardedRequest(
+ envelope: RequestChannel.Request,
+ forwardedContext: RequestContext,
+ buffer: ByteBuffer,
+ requestChannelMetrics: RequestChannel.Metrics
+ ): RequestChannel.Request = {
+ try {
+ new RequestChannel.Request(
+ processor = envelope.processor,
+ context = forwardedContext,
+ startTimeNanos = envelope.startTimeNanos,
+ envelope.memoryPool,
+ buffer,
+ requestChannelMetrics,
+ Some(envelope)
+ )
+ } catch {
+ case e: InvalidRequestException =>
+ // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
+ // The purpose is to disambiguate structural errors in the envelope request
+ // itself, such as an invalid client address.
+ throw new UnsupportedVersionException(s"Failed to parse forwarded request " +
+ s"with header ${forwardedContext.header}", e)
+ }
+ }
+
+ private def parseForwardedRequestHeader(
+ buffer: ByteBuffer
+ ): RequestHeader = {
+ try {
+ RequestHeader.parse(buffer)
+ } catch {
+ case e: InvalidRequestException =>
+ // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
+ // The purpose is to disambiguate structural errors in the envelope request
+ // itself, such as an invalid client address.
+ throw new UnsupportedVersionException("Failed to parse request header from envelope", e)
+ }
+ }
+
+ private def parseForwardedPrincipal(
+ envelopeContext: RequestContext,
+ principalBytes: Array[Byte]
+ ): KafkaPrincipal = {
+ envelopeContext.principalSerde.asScala match {
+ case Some(serde) =>
+ try {
+ serde.deserialize(principalBytes)
+ } catch {
+ case e: Exception =>
+ throw new PrincipalDeserializationException("Failed to deserialize client principal from envelope", e)
+ }
+
+ case None =>
+ throw new PrincipalDeserializationException("Could not deserialize principal since " +
+ "no `KafkaPrincipalSerde` has been defined")
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c3195be..0545c74 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,7 +18,6 @@
package kafka.server
import java.lang.{Long => JLong}
-import java.net.{InetAddress, UnknownHostException}
import java.nio.ByteBuffer
import java.util
import java.util.concurrent.ConcurrentHashMap
@@ -64,7 +63,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, ApiVersionsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeClusterResponseD [...]
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ClientInformation, ListenerName, Send}
+import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
@@ -1224,7 +1223,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestThrottleMs,
brokers.flatMap(_.endpoints.get(request.context.listenerName.value())).toList.asJava,
clusterId,
- metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
+ metadataSupport.controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
completeTopicMetadata.asJava,
clusterAuthorizedOperations
))
@@ -3210,7 +3209,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val brokers = metadataCache.getAliveBrokers
- val controllerId = metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)
+ val controllerId = metadataSupport.controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
val data = new DescribeClusterResponseData()
@@ -3234,7 +3233,6 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleEnvelope(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
- val envelope = request.body[EnvelopeRequest]
// If forwarding is not yet enabled or this request has been received on an invalid endpoint,
// then we treat the request as unparsable and close the connection.
@@ -3258,101 +3256,8 @@ class KafkaApis(val requestChannel: RequestChannel,
s"Broker $brokerId is not the active controller"))
return
}
-
- val forwardedPrincipal = parseForwardedPrincipal(request.context, envelope.requestPrincipal)
- val forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress)
-
- val forwardedRequestBuffer = envelope.requestData.duplicate()
- val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer)
-
- val forwardedApi = forwardedRequestHeader.apiKey
- if (!forwardedApi.forwardable) {
- throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding")
- }
-
- val forwardedContext = new RequestContext(
- forwardedRequestHeader,
- request.context.connectionId,
- forwardedClientAddress,
- forwardedPrincipal,
- request.context.listenerName,
- request.context.securityProtocol,
- ClientInformation.EMPTY,
- request.context.fromPrivilegedListener
- )
-
- val forwardedRequest = parseForwardedRequest(request, forwardedContext, forwardedRequestBuffer)
- handle(forwardedRequest)
- }
-
- private def parseForwardedClientAddress(
- address: Array[Byte]
- ): InetAddress = {
- try {
- InetAddress.getByAddress(address)
- } catch {
- case e: UnknownHostException =>
- throw new InvalidRequestException("Failed to parse client address from envelope", e)
- }
- }
-
- private def parseForwardedRequest(
- envelope: RequestChannel.Request,
- forwardedContext: RequestContext,
- buffer: ByteBuffer
- ): RequestChannel.Request = {
- try {
- new RequestChannel.Request(
- processor = envelope.processor,
- context = forwardedContext,
- startTimeNanos = envelope.startTimeNanos,
- envelope.memoryPool,
- buffer,
- requestChannel.metrics,
- Some(envelope)
- )
- } catch {
- case e: InvalidRequestException =>
- // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
- // The purpose is to disambiguate structural errors in the envelope request
- // itself, such as an invalid client address.
- throw new UnsupportedVersionException(s"Failed to parse forwarded request " +
- s"with header ${forwardedContext.header}", e)
- }
- }
-
- private def parseForwardedRequestHeader(
- buffer: ByteBuffer
- ): RequestHeader = {
- try {
- RequestHeader.parse(buffer)
- } catch {
- case e: InvalidRequestException =>
- // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
- // The purpose is to disambiguate structural errors in the envelope request
- // itself, such as an invalid client address.
- throw new UnsupportedVersionException("Failed to parse request header from envelope", e)
- }
- }
-
- private def parseForwardedPrincipal(
- envelopeContext: RequestContext,
- principalBytes: Array[Byte]
- ): KafkaPrincipal = {
- envelopeContext.principalSerde.asScala match {
- case Some(serde) =>
- try {
- serde.deserialize(principalBytes)
- } catch {
- case e: Exception =>
- throw new PrincipalDeserializationException("Failed to deserialize client principal from envelope", e)
- }
-
- case None =>
- throw new PrincipalDeserializationException("Could not deserialize principal since " +
- "no `KafkaPrincipalSerde` has been defined")
+ EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
}
- }
def handleDescribeProducersRequest(request: RequestChannel.Request): Unit = {
val describeProducersRequest = request.body[DescribeProducersRequest]
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala
index d47283e..5fed236 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -68,6 +68,14 @@ object KafkaBroker {
case _ => //do nothing
}
}
+
+ /**
+ * The log message that we print when the broker has been successfully started.
+ * The ducktape system tests look for a line matching the regex 'Kafka\s*Server.*started'
+ * to know when the broker is started, so it is best not to change this message -- but if
+ * you do change it, be sure to make it match that regex or the system tests will fail.
+ */
+ val STARTED_MESSAGE = "Kafka Server started"
}
trait KafkaBroker extends KafkaMetricsGroup {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2fd04ae..e01bf60 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1025,7 +1025,7 @@ object KafkaConfig {
val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords."
val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords."
- private val configDef = {
+ private[server] val configDef = {
import ConfigDef.Importance._
import ConfigDef.Range._
import ConfigDef.Type._
@@ -1893,14 +1893,25 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
validateValues()
private def validateValues(): Unit = {
- if(brokerIdGenerationEnable) {
- require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id")
+ if (requiresZookeeper) {
+ if (zkConnect == null) {
+ throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.")
+ }
+ if (brokerIdGenerationEnable) {
+ require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id")
+ } else {
+ require(brokerId >= 0, "broker.id must be greater than or equal to 0")
+ }
} else {
- require(brokerId >= 0, "broker.id must be equal or greater than 0")
+ // Raft-based metadata quorum
+ if (nodeId < 0) {
+ throw new ConfigException(s"Missing configuration `${KafkaConfig.NodeIdProp}` which is required " +
+ s"when `process.roles` is defined (i.e. when using the self-managed quorum).")
+ }
}
- require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1")
- require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0")
- require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1")
+ require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
+ require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
+ require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1")
require(logDirs.nonEmpty, "At least one log directory must be defined via log.dirs or log.dir.")
require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
@@ -1975,12 +1986,5 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" +
s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
s" authentication responses from timing out")
-
- if (requiresZookeeper && zkConnect == null) {
- throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.")
- } else if (usesSelfManagedQuorum && nodeId < 0) {
- throw new ConfigException(s"Missing required configuration `${KafkaConfig.NodeIdProp}` which is required " +
- s"when `process.roles` is defined (i.e. when using the self-managed quorum).")
- }
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 1a072c3..dc3fd16 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -17,6 +17,7 @@
package kafka.server
import java.io.File
+import java.util.concurrent.CompletableFuture
import kafka.common.{InconsistentNodeIdException, KafkaException}
import kafka.log.Log
@@ -26,7 +27,10 @@ import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.{AppInfoParser, Time}
-import org.apache.kafka.raft.internals.StringSerde
+import org.apache.kafka.metadata.ApiMessageAndVersion
+import org.apache.kafka.raft.metadata.{MetaLogRaftShim, MetadataRecordSerde}
+
+import scala.collection.Seq
/**
* This class implements the KIP-500 server which relies on a self-managed
@@ -47,7 +51,7 @@ class KafkaRaftServer(
KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
KafkaYammerMetrics.INSTANCE.configure(config.originals)
- private val (metaProps, _) = KafkaRaftServer.initializeLogDirs(config)
+ private val (metaProps, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
private val metrics = Server.initializeMetrics(
config,
@@ -55,24 +59,38 @@ class KafkaRaftServer(
metaProps.clusterId.toString
)
- private val raftManager = new KafkaRaftManager(
+ private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(config.quorumVoters)
+
+ private val raftManager = new KafkaRaftManager[ApiMessageAndVersion](
metaProps,
config,
- new StringSerde,
+ new MetadataRecordSerde,
KafkaRaftServer.MetadataPartition,
time,
metrics,
threadNamePrefix
)
+ private val metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient, config.nodeId)
+
private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
- Some(new BrokerServer())
+ Some(new BrokerServer(config, metaProps, metaLogShim, time, metrics, threadNamePrefix,
+ offlineDirs, controllerQuorumVotersFuture, Server.SUPPORTED_FEATURES))
} else {
None
}
private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) {
- Some(new ControllerServer())
+ Some(new ControllerServer(
+ metaProps,
+ config,
+ metaLogShim,
+ raftManager,
+ time,
+ metrics,
+ threadNamePrefix,
+ CompletableFuture.completedFuture(config.quorumVoters)
+ ))
} else {
None
}
@@ -83,6 +101,7 @@ class KafkaRaftServer(
controller.foreach(_.startup())
broker.foreach(_.startup())
AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
+ info(KafkaBroker.STARTED_MESSAGE)
}
override def shutdown(): Unit = {
@@ -118,7 +137,7 @@ object KafkaRaftServer {
* be consistent across all log dirs) and the offline directories
*/
def initializeLogDirs(config: KafkaConfig): (MetaProperties, Seq[String]) = {
- val logDirs = config.logDirs :+ config.metadataLogDir
+ val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq
val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint.
getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7ec7a29..df37026 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -138,7 +138,7 @@ class KafkaServer(
var kafkaScheduler: KafkaScheduler = null
- var metadataCache: MetadataCache = null
+ var metadataCache: ZkMetadataCache = null
var quotaManagers: QuotaFactory.QuotaManagers = null
val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
@@ -275,7 +275,8 @@ class KafkaServer(
time = time,
metrics = metrics,
threadNamePrefix = threadNamePrefix,
- brokerEpochSupplier = () => kafkaController.brokerEpoch
+ brokerEpochSupplier = () => kafkaController.brokerEpoch,
+ config.brokerId
)
} else {
AlterIsrManager(kafkaScheduler, time, zkClient)
@@ -332,8 +333,8 @@ class KafkaServer(
time,
metrics,
threadNamePrefix,
- adminManager,
- kafkaController,
+ Some(adminManager),
+ Some(kafkaController),
groupCoordinator,
transactionCoordinator,
enableForwarding
@@ -359,7 +360,7 @@ class KafkaServer(
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
/* start processing requests */
- val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager)
+ val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache)
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
diff --git a/core/src/main/scala/kafka/server/MetadataSupport.scala b/core/src/main/scala/kafka/server/MetadataSupport.scala
index 00b029f..86390ea 100644
--- a/core/src/main/scala/kafka/server/MetadataSupport.scala
+++ b/core/src/main/scala/kafka/server/MetadataSupport.scala
@@ -19,6 +19,7 @@ package kafka.server
import kafka.controller.KafkaController
import kafka.network.RequestChannel
+import kafka.server.metadata.RaftMetadataCache
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.requests.AbstractResponse
@@ -58,12 +59,15 @@ sealed trait MetadataSupport {
def maybeForward(request: RequestChannel.Request,
handler: RequestChannel.Request => Unit,
responseCallback: Option[AbstractResponse] => Unit): Unit
+
+ def controllerId: Option[Int]
}
case class ZkSupport(adminManager: ZkAdminManager,
controller: KafkaController,
zkClient: KafkaZkClient,
- forwardingManager: Option[ForwardingManager]) extends MetadataSupport {
+ forwardingManager: Option[ForwardingManager],
+ metadataCache: ZkMetadataCache) extends MetadataSupport {
val adminZkClient = new AdminZkClient(zkClient)
override def requireZkOrThrow(createException: => Exception): ZkSupport = this
@@ -83,9 +87,11 @@ case class ZkSupport(adminManager: ZkAdminManager,
case _ => handler(request)
}
}
+
+ override def controllerId: Option[Int] = metadataCache.getControllerId
}
-case class RaftSupport(fwdMgr: ForwardingManager) extends MetadataSupport {
+case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache) extends MetadataSupport {
override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException
override def requireRaftOrThrow(createException: => Exception): RaftSupport = this
@@ -105,4 +111,14 @@ case class RaftSupport(fwdMgr: ForwardingManager) extends MetadataSupport {
handler(request) // will reject
}
}
+
+ override def controllerId: Option[Int] = {
+ // We send back a random controller ID when running with a Raft-based metadata quorum.
+ // Raft-based controllers are not directly accessible to clients; rather, clients can send
+ // requests destined for the controller to any broker node, and the receiving broker will
+ // automatically forward the request on the client's behalf to the active Raft-based
+ // controller as per KIP-590.
+ metadataCache.currentImage().brokers.randomAliveBrokerId()
+ }
+
}
diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala
index 9126114..1b5aa59 100644
--- a/core/src/main/scala/kafka/server/Server.scala
+++ b/core/src/main/scala/kafka/server/Server.scala
@@ -16,11 +16,15 @@
*/
package kafka.server
+import java.util.Collections
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor}
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.metadata.VersionRange
+
+import scala.jdk.CollectionConverters._
trait Server {
def startup(): Unit
@@ -91,4 +95,12 @@ object Server {
reporters
}
+ sealed trait ProcessStatus
+ case object SHUTDOWN extends ProcessStatus
+ case object STARTING extends ProcessStatus
+ case object STARTED extends ProcessStatus
+ case object SHUTTING_DOWN extends ProcessStatus
+
+ val SUPPORTED_FEATURES = Collections.
+ unmodifiableMap[String, VersionRange](Map[String, VersionRange]().asJava)
}
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 0a83fd9..5db6f84 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -160,7 +160,7 @@ class TestRaftServer(
eventQueue.offer(HandleClaim(epoch))
}
- override def handleResign(): Unit = {
+ override def handleResign(epoch: Int): Unit = {
eventQueue.offer(HandleResign)
}
diff --git a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index dc4dd06..9f9749b 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -96,8 +96,8 @@ class AutoTopicCreationManagerTest {
config,
metadataCache,
Some(brokerToController),
- adminManager,
- controller,
+ Some(adminManager),
+ Some(controller),
groupCoordinator,
transactionCoordinator)
@@ -125,8 +125,8 @@ class AutoTopicCreationManagerTest {
config,
metadataCache,
None,
- adminManager,
- controller,
+ Some(adminManager),
+ Some(controller),
groupCoordinator,
transactionCoordinator)
@@ -155,8 +155,8 @@ class AutoTopicCreationManagerTest {
config,
metadataCache,
Some(brokerToController),
- adminManager,
- controller,
+ Some(adminManager),
+ Some(controller),
groupCoordinator,
transactionCoordinator)
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
new file mode 100644
index 0000000..fc0a38b
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.server
+
+import java.net.InetAddress
+import java.util.Properties
+
+import kafka.network.RequestChannel
+import kafka.raft.RaftManager
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.{ClientQuotaManager, ClientRequestQuotaManager, ControllerApis, ControllerMutationQuotaManager, KafkaConfig, MetaProperties, ReplicationQuotaManager}
+import kafka.utils.MockTime
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.BrokerRegistrationRequestData
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AbstractRequest, BrokerRegistrationRequest, RequestContext, RequestHeader, RequestTestUtils}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.controller.Controller
+import org.apache.kafka.metadata.{ApiMessageAndVersion, VersionRange}
+import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, AuthorizationResult, Authorizer}
+import org.easymock.{Capture, EasyMock, IAnswer}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterEach, Test}
+
+class ControllerApisTest {
+ // Mocks
+ private val nodeId = 1
+ private val brokerRack = "Rack1"
+ private val clientID = "Client1"
+ private val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+ private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
+ private val time = new MockTime
+ private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
+ private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
+ private val clientControllerQuotaManager: ControllerMutationQuotaManager = EasyMock.createNiceMock(classOf[ControllerMutationQuotaManager])
+ private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
+ private val raftManager: RaftManager[ApiMessageAndVersion] = EasyMock.createNiceMock(classOf[RaftManager[ApiMessageAndVersion]])
+ private val quotas = QuotaManagers(
+ clientQuotaManager,
+ clientQuotaManager,
+ clientRequestQuotaManager,
+ clientControllerQuotaManager,
+ replicaQuotaManager,
+ replicaQuotaManager,
+ replicaQuotaManager,
+ None)
+ private val controller: Controller = EasyMock.createNiceMock(classOf[Controller])
+
+ private def createControllerApis(authorizer: Option[Authorizer],
+ supportedFeatures: Map[String, VersionRange] = Map.empty): ControllerApis = {
+ val props = new Properties()
+ props.put(KafkaConfig.NodeIdProp, nodeId)
+ props.put(KafkaConfig.ProcessRolesProp, "controller")
+ new ControllerApis(
+ requestChannel,
+ authorizer,
+ quotas,
+ time,
+ supportedFeatures,
+ controller,
+ raftManager,
+ new KafkaConfig(props),
+
+ // FIXME: Would make more sense to set controllerId here
+ MetaProperties(Uuid.fromString("JgxuGe9URy-E-ceaL04lEw"), nodeId = nodeId),
+ Seq.empty
+ )
+ }
+
+ /**
+ * Build a RequestChannel.Request from the AbstractRequest
+ *
+ * @param request - AbstractRequest
+ * @param listenerName - Default listener for the RequestChannel
+ * @tparam T - Type of AbstractRequest
+ * @return
+ */
+ private def buildRequest[T <: AbstractRequest](request: AbstractRequest,
+ listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): RequestChannel.Request = {
+ val buffer = RequestTestUtils.serializeRequestWithHeader(
+ new RequestHeader(request.apiKey, request.version, clientID, 0), request)
+
+ // read the header from the buffer first so that the body can be read next from the Request constructor
+ val header = RequestHeader.parse(buffer)
+ val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
+ listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+ new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
+ requestChannelMetrics)
+ }
+
+ @Test
+ def testBrokerRegistration(): Unit = {
+ val brokerRegistrationRequest = new BrokerRegistrationRequest.Builder(
+ new BrokerRegistrationRequestData()
+ .setBrokerId(nodeId)
+ .setRack(brokerRack)
+ ).build()
+
+ val request = buildRequest(brokerRegistrationRequest)
+
+ val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
+ EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+
+ val authorizer = Some[Authorizer](EasyMock.createNiceMock(classOf[Authorizer]))
+ EasyMock.expect(authorizer.get.authorize(EasyMock.anyObject[AuthorizableRequestContext](), EasyMock.anyObject())).andAnswer(
+ new IAnswer[java.util.List[AuthorizationResult]]() {
+ override def answer(): java.util.List[AuthorizationResult] = {
+ new java.util.ArrayList[AuthorizationResult](){
+ add(AuthorizationResult.DENIED)
+ }
+ }
+ }
+ )
+ EasyMock.replay(requestChannel, authorizer.get)
+
+ val assertion = assertThrows(classOf[ClusterAuthorizationException],
+ () => createControllerApis(authorizer = authorizer).handleBrokerRegistration(request))
+ assert(Errors.forException(assertion) == Errors.CLUSTER_AUTHORIZATION_FAILED)
+ }
+
+ @AfterEach
+ def tearDown(): Unit = {
+ quotas.shutdown()
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 88bf8ebc..5138bf6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -34,7 +34,7 @@ import kafka.log.AppendOrigin
import kafka.network.RequestChannel
import kafka.network.RequestChannel.{CloseConnectionResponse, SendResponse}
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.{ConfigRepository, CachedConfigRepository}
+import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, RaftMetadataCache}
import kafka.utils.{MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.NodeApiVersions
@@ -148,8 +148,23 @@ class KafkaApisTest {
else
None
+ val metadataSupport = if (raftSupport) {
+ // it will be up to the test to replace the default ZkMetadataCache implementation
+ // with a RaftMetadataCache instance
+ metadataCache match {
+ case raftMetadataCache: RaftMetadataCache =>
+ RaftSupport(forwardingManager, raftMetadataCache)
+ case _ => throw new IllegalStateException("Test must set an instance of RaftMetadataCache")
+ }
+ } else {
+ metadataCache match {
+ case zkMetadataCache: ZkMetadataCache =>
+ ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt, zkMetadataCache)
+ case _ => throw new IllegalStateException("Test must set an instance of ZkMetadataCache")
+ }
+ }
new KafkaApis(requestChannel,
- if (raftSupport) RaftSupport(forwardingManager) else ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt),
+ metadataSupport,
replicaManager,
groupCoordinator,
txnCoordinator,
@@ -321,6 +336,7 @@ class KafkaApisTest {
EasyMock.expect(controller.isActive).andReturn(true)
+ EasyMock.expect(requestChannel.metrics).andReturn(EasyMock.niceMock(classOf[RequestChannel.Metrics]))
EasyMock.expect(requestChannel.updateErrorMetrics(ApiKeys.ENVELOPE, Map(Errors.INVALID_REQUEST -> 1)))
val capturedResponse = expectNoThrottling()
@@ -3460,101 +3476,121 @@ class KafkaApisTest {
@Test
def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest)
}
@Test
def testRaftShouldNeverHandleStopReplicaRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleStopReplicaRequest)
}
@Test
def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest)
}
@Test
def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleControlledShutdownRequest)
}
@Test
def testRaftShouldNeverHandleAlterIsrRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleAlterIsrRequest)
}
@Test
def testRaftShouldNeverHandleEnvelope(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleEnvelope)
}
@Test
def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest)
}
@Test
def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateAcls)
}
@Test
def testRaftShouldAlwaysForwardDeleteAcls(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteAcls)
}
@Test
def testRaftShouldAlwaysForwardAlterConfigsRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterConfigsRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest)
}
@Test
def testRaftShouldAlwaysForwardIncrementalAlterConfigsRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleRenewTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleExpireTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest)
}
@Test
def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
+ metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleUpdateFeatures)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index d6c456b..6271105 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -33,10 +33,13 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import java.net.InetSocketAddress
import java.util
-import java.util.Properties
+import java.util.{Collections, Properties}
+import org.apache.kafka.common.Node
import org.junit.jupiter.api.function.Executable
+import scala.jdk.CollectionConverters._
+
class KafkaConfigTest {
@Test
@@ -1034,7 +1037,17 @@ class KafkaConfigTest {
}
@Test
- def testInvalidQuorumVotersConfig(): Unit = {
+ def testControllerQuorumVoterStringsToNodes(): Unit = {
+ assertThrows(classOf[ConfigException], () => RaftConfig.quorumVoterStringsToNodes(Collections.singletonList("")))
+ assertEquals(Seq(new Node(3000, "example.com", 9093)),
+ RaftConfig.quorumVoterStringsToNodes(util.Arrays.asList("3000@example.com:9093")).asScala.toSeq)
+ assertEquals(Seq(new Node(3000, "example.com", 9093),
+ new Node(3001, "example.com", 9094)),
+ RaftConfig.quorumVoterStringsToNodes(util.Arrays.asList("3000@example.com:9093","3001@example.com:9094")).asScala.toSeq)
+ }
+
+ @Test
+ def testInvalidQuorumVoterConfig(): Unit = {
assertInvalidQuorumVoters("1")
assertInvalidQuorumVoters("1@")
assertInvalidQuorumVoters("1:")
@@ -1046,6 +1059,7 @@ class KafkaConfigTest {
assertInvalidQuorumVoters("1@kafka1:9092,2@")
assertInvalidQuorumVoters("1@kafka1:9092,2@blah")
assertInvalidQuorumVoters("1@kafka1:9092,2@blah,")
+ assertInvalidQuorumVoters("1@kafka1:9092:1@kafka2:9092")
}
private def assertInvalidQuorumVoters(value: String): Unit = {
@@ -1081,6 +1095,102 @@ class KafkaConfigTest {
}
@Test
+ def testAcceptsLargeNodeIdForRaftBasedCase(): Unit = {
+ // Generation of Broker IDs is not supported when using Raft-based controller quorums,
+ // so pick a broker ID greater than reserved.broker.max.id, which defaults to 1000,
+ // and make sure it is allowed despite broker.id.generation.enable=true (true is the default)
+ val largeBrokerId = 2000
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "broker")
+ props.put(KafkaConfig.NodeIdProp, largeBrokerId.toString)
+ assertTrue(isValidKafkaConfig(props))
+ }
+
+ @Test
+ def testRejectsNegativeNodeIdForRaftBasedBrokerCaseWithAutoGenEnabled(): Unit = {
+ // -1 is the default for both node.id and broker.id
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "broker")
+ assertFalse(isValidKafkaConfig(props))
+ }
+
+ @Test
+ def testRejectsNegativeNodeIdForRaftBasedControllerCaseWithAutoGenEnabled(): Unit = {
+ // -1 is the default for both node.id and broker.id
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "controller")
+ assertFalse(isValidKafkaConfig(props))
+ }
+
+ @Test
+ def testRejectsNegativeNodeIdForRaftBasedCaseWithAutoGenDisabled(): Unit = {
+ // -1 is the default for both node.id and broker.id
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "broker")
+ props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+ assertFalse(isValidKafkaConfig(props))
+ }
+
+ @Test
+ def testRejectsLargeNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = {
+ // Generation of Broker IDs is supported when using ZooKeeper-based controllers,
+ // so pick a broker ID greater than reserved.broker.max.id, which defaults to 1000,
+ // and make sure it is not allowed with broker.id.generation.enable=true (true is the default)
+ val largeBrokerId = 2000
+ val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+ val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+ props.put(KafkaConfig.ListenersProp, listeners)
+ props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+ assertFalse(isValidKafkaConfig(props))
+ }
+
+ @Test
+ def testAcceptsNegativeOneNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = {
+ // -1 is the default for both node.id and broker.id; it implies "auto-generate" and should succeed
+ val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+ val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+ props.put(KafkaConfig.ListenersProp, listeners)
+ props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+ assertTrue(isValidKafkaConfig(props))
+ }
+
+ @Test
+ def testRejectsNegativeTwoNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = {
+ // -1 implies "auto-generate" and should succeed, but -2 does not and should fail
+ val negativeTwoNodeId = -2
+ val props = TestUtils.createBrokerConfig(negativeTwoNodeId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+ val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+ props.put(KafkaConfig.ListenersProp, listeners)
+ props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+ props.put(KafkaConfig.NodeIdProp, negativeTwoNodeId.toString)
+ props.put(KafkaConfig.BrokerIdProp, negativeTwoNodeId.toString)
+ assertFalse(isValidKafkaConfig(props))
+ }
+
+ @Test
+ def testAcceptsLargeNodeIdForZkBasedCaseWithAutoGenDisabled(): Unit = {
+ // Ensure a broker ID greater than reserved.broker.max.id, which defaults to 1000,
+ // is allowed with broker.id.generation.enable=false
+ val largeBrokerId = 2000
+ val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+ val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+ props.put(KafkaConfig.ListenersProp, listeners)
+ props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+ props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+ assertTrue(isValidKafkaConfig(props))
+ }
+
+ @Test
+ def testRejectsNegativeNodeIdForZkBasedCaseWithAutoGenDisabled(): Unit = {
+ // -1 is the default for both node.id and broker.id
+ val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+ val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+ props.put(KafkaConfig.ListenersProp, listeners)
+ props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+ assertFalse(isValidKafkaConfig(props))
+ }
+
+ @Test
def testZookeeperConnectRequiredIfEmptyProcessRoles(): Unit = {
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "")
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 4cf7d1e..6166d73 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -66,7 +66,7 @@ class KafkaRaftServerTest {
private def invokeLoadMetaProperties(
metaProperties: MetaProperties,
configProperties: Properties
- ): (MetaProperties, Seq[String]) = {
+ ): (MetaProperties, collection.Seq[String]) = {
val tempLogDir = TestUtils.tempDirectory()
try {
writeMetaProperties(tempLogDir, metaProperties)
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 887d53da..c71c058 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -34,6 +34,7 @@ import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache;
+import kafka.server.ZkMetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicationQuotaManager;
@@ -105,7 +106,7 @@ public class MetadataRequestBenchmark {
private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
private Metrics metrics = new Metrics();
private int brokerId = 1;
- private MetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId);
+ private ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId);
private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);
@@ -173,7 +174,7 @@ public class MetadataRequestBenchmark {
kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + "");
BrokerFeatures brokerFeatures = BrokerFeatures.createDefault();
return new KafkaApis(requestChannel,
- new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty()),
+ new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty(), metadataCache),
replicaManager,
groupCoordinator,
transactionCoordinator,
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 2823186..d0d93c8 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -243,6 +243,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
random);
this.kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum);
kafkaRaftMetrics.updateNumUnknownVoterConnections(quorum.remoteVoters().size());
+
+ // Update the voter endpoints with what's in RaftConfig
+ Map<Integer, RaftConfig.AddressSpec> voterAddresses = raftConfig.quorumVoterConnections();
+ voterAddresses.entrySet().stream()
+ .filter(e -> e.getValue() instanceof RaftConfig.InetAddressSpec)
+ .forEach(e -> this.channel.updateEndpoint(e.getKey(), (RaftConfig.InetAddressSpec) e.getValue()));
}
private void updateFollowerHighWatermark(
@@ -336,9 +342,9 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
}
- private void fireHandleResign() {
+ private void fireHandleResign(int epoch) {
for (ListenerContext listenerContext : listenerContexts) {
- listenerContext.fireHandleResign();
+ listenerContext.fireHandleResign(epoch);
}
}
@@ -370,6 +376,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
wakeup();
}
+ @Override
+ public LeaderAndEpoch leaderAndEpoch() {
+ return quorum.leaderAndEpoch();
+ }
+
private OffsetAndEpoch endOffset() {
return new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch());
}
@@ -457,7 +468,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private void maybeResignLeadership() {
if (quorum.isLeader()) {
- fireHandleResign();
+ fireHandleResign(quorum.epoch());
}
if (accumulator != null) {
@@ -2357,8 +2368,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
}
- void fireHandleResign() {
- listener.handleResign();
+ void fireHandleResign(int epoch) {
+ listener.handleResign(epoch);
}
public synchronized void onClose(BatchReader<T> reader) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/NetworkChannel.java b/raft/src/main/java/org/apache/kafka/raft/NetworkChannel.java
index b88241b..ee74b5b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/NetworkChannel.java
+++ b/raft/src/main/java/org/apache/kafka/raft/NetworkChannel.java
@@ -37,6 +37,11 @@ public interface NetworkChannel extends Closeable {
*/
void send(RaftRequest.Outbound request);
+ /**
+ * Update connection information for the given id.
+ */
+ void updateEndpoint(int id, RaftConfig.InetAddressSpec address);
+
default void close() {}
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 554ce61..e2bec0e 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -57,15 +57,16 @@ public interface RaftClient<T> extends Closeable {
/**
* Invoked after a leader has stepped down. This callback may or may not
* fire before the next leader has been elected.
+ *
+ * @param epoch the epoch that the leader is resigning from
*/
- default void handleResign() {}
+ default void handleResign(int epoch) {}
}
/**
* Initialize the client.
* This should only be called once on startup.
*
- * @param raftConfig the Raft quorum configuration
* @throws IOException For any IO errors during initialization
*/
void initialize() throws IOException;
@@ -78,6 +79,12 @@ public interface RaftClient<T> extends Closeable {
void register(Listener<T> listener);
/**
+ * Return the current {@link LeaderAndEpoch}.
+ * @return the current {@link LeaderAndEpoch}
+ */
+ LeaderAndEpoch leaderAndEpoch();
+
+ /**
* Append a list of records to the log. The write will be scheduled for some time
* in the future. There is no guarantee that appended records will be written to
* the log and eventually committed. However, it is guaranteed that if any of the
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
index de40b35..13dd879 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
@@ -17,6 +17,7 @@
package org.apache.kafka.raft;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
@@ -28,6 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* RaftConfig encapsulates configuration specific to the Raft quorum voter nodes.
@@ -233,6 +235,17 @@ public class RaftConfig {
return voterMap;
}
+ public static List<Node> quorumVoterStringsToNodes(List<String> voters) {
+ return parseVoterConnections(voters).entrySet().stream()
+ .filter(connection -> connection.getValue() instanceof InetAddressSpec)
+ .map(connection -> {
+ InetAddressSpec inetAddressSpec = InetAddressSpec.class.cast(connection.getValue());
+ return new Node(connection.getKey(), inetAddressSpec.address.getHostName(),
+ inetAddressSpec.address.getPort());
+ })
+ .collect(Collectors.toList());
+ }
+
public static class ControllerQuorumVotersValidator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
index 47dae5d..3db4d73 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -96,7 +96,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
}
@Override
- public synchronized void handleResign() {
+ public synchronized void handleResign(int epoch) {
log.debug("Counter uncommitted value reset after resigning leadership");
this.uncommitted = -1;
this.claimedEpoch = Optional.empty();
diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
new file mode 100644
index 0000000..bf88e7d
--- /dev/null
+++ b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.metadata;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.RaftClient;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * For now, we rely on a shim to translate from `RaftClient` to `MetaLogManager`.
+ * Once we check in to trunk, we can drop `RaftClient` and implement `MetaLogManager`
+ * directly.
+ */
+public class MetaLogRaftShim implements MetaLogManager {
+ private final RaftClient<ApiMessageAndVersion> client;
+ private final int nodeId;
+
+ public MetaLogRaftShim(RaftClient<ApiMessageAndVersion> client, int nodeId) {
+ this.client = client;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public void initialize() {
+ // NO-OP - The RaftClient is initialized externally
+ }
+
+ @Override
+ public void register(MetaLogListener listener) {
+ client.register(new ListenerShim(listener));
+ }
+
+ @Override
+ public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
+ return client.scheduleAppend((int) epoch, batch);
+ }
+
+ @Override
+ public void renounce(long epoch) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public MetaLogLeader leader() {
+ LeaderAndEpoch leaderAndEpoch = client.leaderAndEpoch();
+ return new MetaLogLeader(leaderAndEpoch.leaderId.orElse(-1), leaderAndEpoch.epoch);
+ }
+
+ @Override
+ public int nodeId() {
+ return nodeId;
+ }
+
+ private class ListenerShim implements RaftClient.Listener<ApiMessageAndVersion> {
+ private final MetaLogListener listener;
+
+ private ListenerShim(MetaLogListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
+ try {
+ // TODO: The `BatchReader` might need to read from disk if this is
+ // not a leader. We want to move this IO to the state machine so that
+ // it does not block Raft replication
+ while (reader.hasNext()) {
+ BatchReader.Batch<ApiMessageAndVersion> batch = reader.next();
+ List<ApiMessage> records = batch.records().stream()
+ .map(ApiMessageAndVersion::message)
+ .collect(Collectors.toList());
+ listener.handleCommits(batch.lastOffset(), records);
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Override
+ public void handleClaim(int epoch) {
+ listener.handleNewLeader(new MetaLogLeader(nodeId, epoch));
+ }
+
+ @Override
+ public void handleResign(int epoch) {
+ listener.handleRenounce(epoch);
+ }
+
+ @Override
+ public String toString() {
+ return "ListenerShim(" +
+ "listener=" + listener +
+ ')';
+ }
+ }
+
+}
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java b/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
index 7a5b385..2a97931 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
@@ -56,6 +56,11 @@ public class MockNetworkChannel implements NetworkChannel {
sendQueue.add(request);
}
+ @Override
+ public void updateEndpoint(int id, RaftConfig.InetAddressSpec address) {
+ // empty
+ }
+
public List<RaftRequest.Outbound> drainSendQueue() {
return drainSentRequests(Optional.empty());
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index efe7c95..9d19b86 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -975,7 +975,7 @@ public final class RaftClientTestContext {
}
@Override
- public void handleResign() {
+ public void handleResign(int epoch) {
this.currentClaimedEpoch = OptionalInt.empty();
}
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
index 2222c16..42243cf 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -22,7 +22,7 @@ NODE_ID = "node.id"
FIRST_BROKER_PORT = 9092
FIRST_CONTROLLER_PORT = FIRST_BROKER_PORT + 500
FIRST_CONTROLLER_ID = 3001
-CLUSTER_ID = "6bd37820-6745-4790-ae98-620300e1f61b"
+CLUSTER_ID = "I2eXt9rvSnyhct8BYmW6-w"
PORT = "port"
ADVERTISED_HOSTNAME = "advertised.host.name"