You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/12/16 21:05:21 UTC
[kafka] 02/11: KAFKA-14427 ZK client support for migrations (#12946)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4b03c8c4c3dd5a0c32a11efabf9318883b0483ee
Author: David Arthur <mu...@gmail.com>
AuthorDate: Thu Dec 8 13:14:01 2022 -0500
KAFKA-14427 ZK client support for migrations (#12946)
This patch adds support for reading and writing ZooKeeper metadata during a KIP-866 migration.
For reading metadata from ZK, methods from KafkaZkClient and ZkData are reused to ensure we are decoding the JSON consistently.
For writing metadata, we use a new multi-op transaction that ensures only a single controller is writing to ZK. This is similar to the existing multi-op transaction that KafkaController uses, but it also includes a check on the new "/migration" ZNode. The transaction consists of three operations:
* CheckOp on /controller_epoch
* SetDataOp on /migration with zkVersion
* CreateOp/SetDataOp/DeleteOp (the actual operation being applied)
In the case of a batch of operations (such as topic creation), only the final MultiOp has a SetDataOp on /migration while the other requests use a CheckOp (similar to /controller_epoch).
Reviewers: Colin Patrick McCabe <cm...@apache.org>, dengziming <de...@gmail.com>
---
checkstyle/import-control.xml | 1 +
.../main/scala/kafka/server/ZkAdminManager.scala | 27 +-
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 333 ++++++++++++++--
core/src/main/scala/kafka/zk/ZkData.scala | 9 +-
.../main/scala/kafka/zk/ZkMigrationClient.scala | 441 +++++++++++++++++++++
.../kafka/zk/ZkMigrationIntegrationTest.scala | 105 +++++
.../unit/kafka/server/ZkAdminManagerTest.scala | 28 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 51 ++-
.../unit/kafka/zk/ZkMigrationClientTest.scala | 353 +++++++++++++++++
.../org/apache/kafka/image/ClientQuotasImage.java | 3 +-
.../kafka/metadata/migration/MigrationClient.java | 91 +++++
.../migration/ZkMigrationLeadershipState.java | 49 ++-
12 files changed, 1423 insertions(+), 68 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index bd05521964e..7a62b671f84 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -227,6 +227,7 @@
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metadata.authorizer" />
+ <allow pkg="org.apache.kafka.metadata.migration" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.raft" />
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index 634ce6b097c..1a22024d723 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -54,6 +54,19 @@ import org.apache.kafka.common.utils.Sanitizer
import scala.collection.{Map, mutable, _}
import scala.jdk.CollectionConverters._
+object ZkAdminManager {
+ def clientQuotaPropsToDoubleMap(props: Map[String, String]): Map[String, Double] = {
+ props.map { case (key, value) =>
+ val doubleValue = try value.toDouble catch {
+ case _: NumberFormatException =>
+ throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value")
+ }
+ key -> doubleValue
+ }
+ }
+}
+
+
class ZkAdminManager(val config: KafkaConfig,
val metrics: Metrics,
val metadataCache: MetadataCache,
@@ -636,16 +649,6 @@ class ZkAdminManager(val config: KafkaConfig,
private def sanitized(name: Option[String]): String = name.map(n => sanitizeEntityName(n)).getOrElse("")
- private def fromProps(props: Map[String, String]): Map[String, Double] = {
- props.map { case (key, value) =>
- val doubleValue = try value.toDouble catch {
- case _: NumberFormatException =>
- throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value")
- }
- key -> doubleValue
- }
- }
-
def handleDescribeClientQuotas(userComponent: Option[ClientQuotaFilterComponent],
clientIdComponent: Option[ClientQuotaFilterComponent], strict: Boolean): Map[ClientQuotaEntity, Map[String, Double]] = {
@@ -706,7 +709,7 @@ class ZkAdminManager(val config: KafkaConfig,
(userEntries ++ clientIdEntries ++ bothEntries).flatMap { case ((u, c), p) =>
val quotaProps = p.asScala.filter { case (key, _) => QuotaConfigs.isClientOrUserQuotaConfig(key) }
if (quotaProps.nonEmpty && matches(userComponent, u) && matches(clientIdComponent, c))
- Some(userClientIdToEntity(u, c) -> fromProps(quotaProps))
+ Some(userClientIdToEntity(u, c) -> ZkAdminManager.clientQuotaPropsToDoubleMap(quotaProps))
else
None
}.toMap
@@ -732,7 +735,7 @@ class ZkAdminManager(val config: KafkaConfig,
ipEntries.flatMap { case (ip, props) =>
val ipQuotaProps = props.asScala.filter { case (key, _) => DynamicConfig.Ip.names.contains(key) }
if (ipQuotaProps.nonEmpty)
- Some(ipToQuotaEntity(ip) -> fromProps(ipQuotaProps))
+ Some(ipToQuotaEntity(ip) -> ZkAdminManager.clientQuotaPropsToDoubleMap(ipQuotaProps))
else
None
}
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 747673d37db..12f4bfb2c3e 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -34,12 +34,13 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
+import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
-import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult}
+import org.apache.zookeeper.OpResult.{CheckResult, CreateResult, ErrorResult, SetDataResult}
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.common.ZKConfig
import org.apache.zookeeper.data.{ACL, Stat}
-import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
+import org.apache.zookeeper.{CreateMode, KeeperException, OpResult, ZooKeeper}
import scala.collection.{Map, Seq, mutable}
@@ -156,6 +157,83 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
tryCreateControllerZNodeAndIncrementEpoch()
}
+ /**
+ * Registers a given KRaft controller in zookeeper as the active controller. Unlike the ZK equivalent of this method,
+ * this creates /controller as a persistent znode. This prevents ZK brokers from attempting to claim the controller
+ * leadership during a KRaft leadership failover.
+ *
+ * This method is called at the beginning of a KRaft migration and during subsequent KRaft leadership changes during
+ * the migration.
+ *
+ * To ensure that the KRaft controller epoch exceeds the current ZK controller epoch, this registration algorithm
+ * uses a conditional update on the /controller_epoch znode. If a new ZK controller is elected during this method,
+ * the conditional update on /controller_epoch fails which causes the whole multi-op transaction to fail.
+ *
+ * @param kraftControllerId ID of the KRaft controller node
+ * @param kraftControllerEpoch Epoch of the KRaft controller node
+ * @return An optional of the new zkVersion of /controller_epoch. None if we could not register the KRaft controller.
+ */
+ def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): Option[Int] = {
+ val timestamp = time.milliseconds()
+ val curEpochOpt: Option[(Int, Int)] = getControllerEpoch.map(e => (e._1, e._2.getVersion))
+ val controllerOpt = getControllerId
+ val controllerEpochToStore = kraftControllerEpoch + 10000000 // TODO Remove this after KAFKA-14436
+ curEpochOpt match {
+ case None =>
+ throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " +
+ s"since there is no ZK controller epoch present.")
+ case Some((curEpoch: Int, curEpochZk: Int)) =>
+ if (curEpoch >= controllerEpochToStore) {
+ // TODO KAFKA-14436 Need to ensure KRaft has a higher epoch an ZK
+ throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " +
+ s"in ZK since its epoch ${controllerEpochToStore} is not higher than the current ZK epoch ${curEpoch}.")
+ }
+
+ val response = if (controllerOpt.isDefined) {
+ info(s"KRaft controller $kraftControllerId overwriting ${ControllerZNode.path} to become the active " +
+ s"controller with epoch $controllerEpochToStore. The previous controller was ${controllerOpt.get}.")
+ retryRequestUntilConnected(
+ MultiRequest(Seq(
+ SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
+ DeleteOp(ControllerZNode.path, ZkVersion.MatchAnyVersion),
+ CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp),
+ defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
+ )
+ } else {
+ info(s"KRaft controller $kraftControllerId creating ${ControllerZNode.path} to become the active " +
+ s"controller with epoch $controllerEpochToStore. There was no active controller.")
+ retryRequestUntilConnected(
+ MultiRequest(Seq(
+ SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
+ CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp),
+ defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
+ )
+ }
+
+ val failureSuffix = s"while trying to register KRaft controller $kraftControllerId with epoch " +
+ s"$controllerEpochToStore. KRaft controller was not registered."
+ response.resultCode match {
+ case Code.OK =>
+ info(s"Successfully registered KRaft controller $kraftControllerId with epoch $controllerEpochToStore")
+ // First op is always SetData on /controller_epoch
+ val setDataResult = response.zkOpResults(0).rawOpResult.asInstanceOf[SetDataResult]
+ Some(setDataResult.getStat.getVersion)
+ case Code.BADVERSION =>
+ info(s"The controller epoch changed $failureSuffix")
+ None
+ case Code.NONODE =>
+ info(s"The ephemeral node at ${ControllerZNode.path} went away $failureSuffix")
+ None
+ case Code.NODEEXISTS =>
+ info(s"The ephemeral node at ${ControllerZNode.path} was created by another controller $failureSuffix")
+ None
+ case code =>
+ error(s"ZooKeeper had an error $failureSuffix")
+ throw KeeperException.create(code)
+ }
+ }
+ }
+
private def maybeCreateControllerEpochZNode(): (Int, Int) = {
createControllerEpochRaw(KafkaController.InitialControllerEpoch).resultCode match {
case Code.OK =>
@@ -340,6 +418,24 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
}
}
+ def getEntitiesConfigs(rootEntityType: String, sanitizedEntityNames: Set[String]): Map[String, Properties] = {
+ val getDataRequests: Seq[GetDataRequest] = sanitizedEntityNames.map { entityName =>
+ GetDataRequest(ConfigEntityZNode.path(rootEntityType, entityName), Some(entityName))
+ }.toSeq
+
+ val getDataResponses = retryRequestsUntilConnected(getDataRequests)
+ getDataResponses.map { response =>
+ val entityName = response.ctx.get.asInstanceOf[String]
+ response.resultCode match {
+ case Code.OK =>
+ entityName -> ConfigEntityZNode.decode(response.data)
+ case Code.NONODE =>
+ entityName -> new Properties()
+ case _ => throw response.resultException.get
+ }
+ }.toMap
+ }
+
/**
* Sets or creates the entity znode path with the given configs depending
* on whether it already exists or not.
@@ -1554,6 +1650,36 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
}
}
+ def getOrCreateMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ val getDataRequest = GetDataRequest(MigrationZNode.path)
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+ getDataResponse.resultCode match {
+ case Code.OK =>
+ MigrationZNode.decode(getDataResponse.data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime)
+ case Code.NONODE =>
+ createInitialMigrationState(initialState)
+ case _ => throw getDataResponse.resultException.get
+ }
+ }
+
+ def createInitialMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ val createRequest = CreateRequest(
+ MigrationZNode.path,
+ MigrationZNode.encode(initialState),
+ defaultAcls(MigrationZNode.path),
+ CreateMode.PERSISTENT)
+ val response = retryRequestUntilConnected(createRequest)
+ response.maybeThrow()
+ initialState.withMigrationZkVersion(0)
+ }
+
+ def updateMigrationState(migrationState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ val req = SetDataRequest(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
+ val resp = retryRequestUntilConnected(req)
+ resp.maybeThrow()
+ migrationState.withMigrationZkVersion(resp.stat.getVersion)
+ }
+
/**
* Return the ACLs of the node of the given path
* @param path the given path for the node
@@ -1772,6 +1898,137 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
}
}
+ /**
+ * Safely performs a sequence of writes to ZooKeeper as part of a KRaft migration. For each request in {@code requests}, we
+ * wrap the operation in a multi-op transaction that includes a check op on /controller_epoch and /migration. This ensures
+ * that another KRaft controller or another ZK controller has unexpectedly taken leadership.
+ *
+ * In cases of KRaft failover during a migration, it is possible that a write is attempted before the old KRaft controller
+ * receives the new leader information. In this case, the check op on /migration acts as a guard against multiple writers.
+ *
+ * The multi-op for the last request in {@code requests} is used to update the /migration node with the latest migration
+ * state. This effectively checkpoints the progress of the migration in ZK relative to the metadata log.
+ *
+ * Each multi-op request is atomic. The overall sequence of multi-op requests is not atomic and we may fail during any
+ * of them. When the KRaft controller recovers the migration state, it will re-apply all of the writes needed to update
+ * the ZK state with the latest KRaft state. In the case of Create or Delete operations, these will fail if applied
+ * twice, so we need to ignore NodeExists and NoNode failures for those cases.
+ *
+ * @param requests A sequence of ZK requests. Only Create, Delete, and SetData are supported.
+ * @param migrationState The current migration state. This is written out as part of the final multi-op request.
+ * @return The new version of /migration ZNode and the sequence of responses for the given requests.
+ */
+ def retryMigrationRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req],
+ migrationState: ZkMigrationLeadershipState): (Int, Seq[Req#Response]) = {
+
+ if (requests.isEmpty) {
+ return (migrationState.migrationZkVersion(), Seq.empty)
+ }
+
+ def wrapMigrationRequest(request: Req, lastRequestInBatch: Boolean): MultiRequest = {
+ // Wrap a single request with the multi-op transactional request.
+ val checkOp = CheckOp(ControllerEpochZNode.path, migrationState.controllerZkVersion())
+ val migrationOp = if (lastRequestInBatch) {
+ SetDataOp(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
+ } else {
+ CheckOp(MigrationZNode.path, migrationState.migrationZkVersion())
+ }
+
+ request match {
+ case CreateRequest(path, data, acl, createMode, ctx) =>
+ MultiRequest(Seq(checkOp, migrationOp, CreateOp(path, data, acl, createMode)), ctx)
+ case DeleteRequest(path, version, ctx) =>
+ MultiRequest(Seq(checkOp, migrationOp, DeleteOp(path, version)), ctx)
+ case SetDataRequest(path, data, version, ctx) =>
+ MultiRequest(Seq(checkOp, migrationOp, SetDataOp(path, data, version)), ctx)
+ case _ => throw new IllegalStateException(s"$request does not need controller epoch check")
+ }
+ }
+
+ def handleUnwrappedMigrationResult(migrationOp: ZkOp, migrationResult: OpResult): Int = {
+ // Handle just the operation that updated /migration ZNode
+ val (path: String, data: Option[Array[Byte]], version: Int) = migrationOp match {
+ case CheckOp(path, version) => (path, None, version)
+ case SetDataOp(path, data, version) => (path, Some(data), version)
+ case _ => throw new IllegalStateException("Unexpected result on /migration znode")
+ }
+
+ migrationResult match {
+ case _: CheckResult => version
+ case setDataResult: SetDataResult => setDataResult.getStat.getVersion
+ case errorResult: ErrorResult =>
+ if (path.equals(MigrationZNode.path)) {
+ val errorCode = Code.get(errorResult.getErr)
+ if (errorCode == Code.BADVERSION) {
+ data match {
+ case Some(value) =>
+ val failedPayload = MigrationZNode.decode(value, version, -1)
+ throw new RuntimeException(
+ s"Conditional update on KRaft Migration ZNode failed. Expected zkVersion = ${version}. The failed " +
+ s"write was: ${failedPayload}. This indicates that another KRaft controller is making writes to ZooKeeper.")
+ case None =>
+ throw new RuntimeException(s"Check op on KRaft Migration ZNode failed. Expected zkVersion = ${version}. " +
+ s"This indicates that another KRaft controller is making writes to ZooKeeper.")
+ }
+ } else if (errorCode == Code.OK) {
+ // This means the Check or SetData op would have been ok, but failed because of another operation in this multi-op
+ version
+ } else {
+ throw KeeperException.create(errorCode, path)
+ }
+ } else {
+ throw new RuntimeException(s"Got migration result for incorrect path $path")
+ }
+ case _ => throw new RuntimeException(
+ s"Expected either CheckResult, SetDataResult, or ErrorResult for migration op, but saw ${migrationResult}")
+ }
+ }
+
+ def unwrapMigrationResponse(response: AsyncResponse, lastRequestInBatch: Boolean): (AsyncResponse, Int) = {
+ response match {
+ case MultiResponse(resultCode, _, ctx, zkOpResults, responseMetadata) =>
+ zkOpResults match {
+ case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), ZkOpResult(migrationOp: CheckOp, migrationResult), zkOpResult) =>
+ // Matches all requests except or the last one (CheckOp on /migration)
+ if (lastRequestInBatch) {
+ throw new IllegalStateException("Should not see a Check operation on /migration in the last request.")
+ }
+ handleUnwrappedCheckOp(checkOp, checkOpResult)
+ val migrationVersion = handleUnwrappedMigrationResult(migrationOp, migrationResult)
+ (handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata), migrationVersion)
+ case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), ZkOpResult(migrationOp: SetDataOp, migrationResult), zkOpResult) =>
+ // Matches the last request in a batch (SetDataOp on /migration)
+ if (!lastRequestInBatch) {
+ throw new IllegalStateException("Should only see a SetData operation on /migration in the last request.")
+ }
+ handleUnwrappedCheckOp(checkOp, checkOpResult)
+ val migrationVersion = handleUnwrappedMigrationResult(migrationOp, migrationResult)
+ (handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata), migrationVersion)
+ case null => throw KeeperException.create(resultCode)
+ case _ => throw new IllegalStateException(
+ s"Cannot unwrap $response because it does not contain the expected operations for a migration operation.")
+ }
+ case _ => throw new IllegalStateException(s"Cannot unwrap $response because it is not a MultiResponse")
+ }
+ }
+
+ migrationState.controllerZkVersion() match {
+ case ZkVersion.MatchAnyVersion => throw new IllegalArgumentException(
+ s"Expected a controller epoch zkVersion when making migration writes, not -1.")
+ case version if version >= 0 =>
+ logger.trace(s"Performing ${requests.size} migration update(s) with migrationState=$migrationState")
+ val wrappedRequests = requests.map(req => wrapMigrationRequest(req, req == requests.last))
+ val results = retryRequestsUntilConnected(wrappedRequests)
+ val unwrappedResults = results.map(resp => unwrapMigrationResponse(resp, resp == results.last))
+ val migrationZkVersion = unwrappedResults.last._2
+ // Return the new version of /migration and the sequence of responses to the original requests
+ (migrationZkVersion, unwrappedResults.map(_._1.asInstanceOf[Req#Response]))
+ case invalidVersion =>
+ throw new IllegalArgumentException(
+ s"Expected controller epoch zkVersion $invalidVersion should be non-negative or equal to ${ZkVersion.MatchAnyVersion}")
+ }
+ }
+
private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
val remainingRequests = new mutable.ArrayBuffer(requests.size) ++= requests
val responses = new mutable.ArrayBuffer[Req#Response]
@@ -1997,6 +2254,45 @@ object KafkaZkClient {
}
}
+ private def handleUnwrappedCheckOp(checkOp: CheckOp, checkOpResult: OpResult): Unit = {
+ checkOpResult match {
+ case errorResult: ErrorResult =>
+ if (checkOp.path.equals(ControllerEpochZNode.path)) {
+ val errorCode = Code.get(errorResult.getErr)
+ if (errorCode == Code.BADVERSION)
+ // Throw ControllerMovedException when the zkVersionCheck is performed on the controller epoch znode and the check fails
+ throw new ControllerMovedException(s"Controller epoch zkVersion check fails. Expected zkVersion = ${checkOp.version}")
+ else if (errorCode != Code.OK)
+ throw KeeperException.create(errorCode, checkOp.path)
+ }
+ case _ =>
+ }
+ }
+
+ private def handleUnwrappedZkOp(zkOpResult: ZkOpResult,
+ resultCode: Code,
+ ctx: Option[Any],
+ responseMetadata: ResponseMetadata): AsyncResponse = {
+ val rawOpResult = zkOpResult.rawOpResult
+ zkOpResult.zkOp match {
+ case createOp: CreateOp =>
+ val name = rawOpResult match {
+ case c: CreateResult => c.getPath
+ case _ => null
+ }
+ CreateResponse(resultCode, createOp.path, ctx, name, responseMetadata)
+ case deleteOp: DeleteOp =>
+ DeleteResponse(resultCode, deleteOp.path, ctx, responseMetadata)
+ case setDataOp: SetDataOp =>
+ val stat = rawOpResult match {
+ case s: SetDataResult => s.getStat
+ case _ => null
+ }
+ SetDataResponse(resultCode, setDataOp.path, ctx, stat, responseMetadata)
+ case zkOp => throw new IllegalStateException(s"Unexpected zkOp: $zkOp")
+ }
+ }
+
// A helper function to transform a MultiResponse with the check on
// controller epoch znode zkVersion back into a regular response.
// ControllerMovedException will be thrown if the controller epoch
@@ -2006,37 +2302,10 @@ object KafkaZkClient {
response match {
case MultiResponse(resultCode, _, ctx, zkOpResults, responseMetadata) =>
zkOpResults match {
+ // In normal ZK writes, we just have a MultiOp with a CheckOp and the actual operation we're performing
case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), zkOpResult) =>
- checkOpResult match {
- case errorResult: ErrorResult =>
- if (checkOp.path.equals(ControllerEpochZNode.path)) {
- val errorCode = Code.get(errorResult.getErr)
- if (errorCode == Code.BADVERSION)
- // Throw ControllerMovedException when the zkVersionCheck is performed on the controller epoch znode and the check fails
- throw new ControllerMovedException(s"Controller epoch zkVersion check fails. Expected zkVersion = ${checkOp.version}")
- else if (errorCode != Code.OK)
- throw KeeperException.create(errorCode, checkOp.path)
- }
- case _ =>
- }
- val rawOpResult = zkOpResult.rawOpResult
- zkOpResult.zkOp match {
- case createOp: CreateOp =>
- val name = rawOpResult match {
- case c: CreateResult => c.getPath
- case _ => null
- }
- CreateResponse(resultCode, createOp.path, ctx, name, responseMetadata)
- case deleteOp: DeleteOp =>
- DeleteResponse(resultCode, deleteOp.path, ctx, responseMetadata)
- case setDataOp: SetDataOp =>
- val stat = rawOpResult match {
- case s: SetDataResult => s.getStat
- case _ => null
- }
- SetDataResponse(resultCode, setDataOp.path, ctx, stat, responseMetadata)
- case zkOp => throw new IllegalStateException(s"Unexpected zkOp: $zkOp")
- }
+ handleUnwrappedCheckOp(checkOp, checkOpResult)
+ handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata)
case null => throw KeeperException.create(resultCode)
case _ => throw new IllegalStateException(s"Cannot unwrap $response because the first zookeeper op is not check op in original MultiRequest")
}
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 6bd3b19d7dc..84b767d4d3b 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -1048,7 +1048,14 @@ object MigrationZNode {
val controllerEpoch = js("kraft_controller_epoch").to[Int]
val metadataOffset = js("kraft_metadata_offset").to[Long]
val metadataEpoch = js("kraft_metadata_epoch").to[Int]
- Some(new ZkMigrationLeadershipState(controllerId, controllerEpoch, metadataOffset, metadataEpoch, modifyTimeMs, zkVersion, -2))
+ Some(new ZkMigrationLeadershipState(
+ controllerId,
+ controllerEpoch,
+ metadataOffset,
+ metadataEpoch,
+ modifyTimeMs,
+ zkVersion,
+ ZkVersion.UnknownVersion))
}.getOrElse(throw new KafkaException(s"Failed to parse the migration json $jsonDataAsString"))
}
}
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
new file mode 100644
index 00000000000..77f46b9c794
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -0,0 +1,441 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
+import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager}
+import kafka.utils.Logging
+import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zookeeper._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.metadata._
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
+import org.apache.kafka.metadata.migration.{MigrationClient, ZkMigrationLeadershipState}
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.{CreateMode, KeeperException}
+
+import java.util
+import java.util.Properties
+import java.util.function.Consumer
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+
+class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Logging {
+
+ override def getOrCreateMigrationRecoveryState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ zkClient.createTopLevelPaths()
+ zkClient.getOrCreateMigrationState(initialState)
+ }
+
+ override def setMigrationRecoveryState(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ zkClient.updateMigrationState(state)
+ }
+
+ override def claimControllerLeadership(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ val epochZkVersionOpt = zkClient.tryRegisterKRaftControllerAsActiveController(
+ state.kraftControllerId(), state.kraftControllerEpoch())
+ if (epochZkVersionOpt.isDefined) {
+ state.withControllerZkVersion(epochZkVersionOpt.get)
+ } else {
+ state.withControllerZkVersion(-1)
+ }
+ }
+
+ override def releaseControllerLeadership(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ try {
+ zkClient.deleteController(state.controllerZkVersion())
+ state.withControllerZkVersion(-1)
+ } catch {
+ case _: ControllerMovedException =>
+ // If the controller moved, no need to release
+ state.withControllerZkVersion(-1)
+ case t: Throwable =>
+ throw new RuntimeException("Could not release controller leadership due to underlying error", t)
+ }
+ }
+
+ def migrateTopics(metadataVersion: MetadataVersion,
+ recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
+ brokerIdConsumer: Consumer[Integer]): Unit = {
+ val topics = zkClient.getAllTopicsInCluster()
+ val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
+ val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
+ replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) =>
+ val partitions = partitionAssignments.keys.toSeq
+ val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
+ val topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+ topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+ .setName(topic)
+ .setTopicId(topicIdOpt.get), TopicRecord.HIGHEST_SUPPORTED_VERSION))
+
+ partitionAssignments.foreach { case (topicPartition, replicaAssignment) =>
+ replicaAssignment.replicas.foreach(brokerIdConsumer.accept(_))
+ replicaAssignment.addingReplicas.foreach(brokerIdConsumer.accept(_))
+ val replicaList = replicaAssignment.replicas.map(Integer.valueOf).asJava
+ val record = new PartitionRecord()
+ .setTopicId(topicIdOpt.get)
+ .setPartitionId(topicPartition.partition)
+ .setReplicas(replicaList)
+ .setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava)
+ .setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
+ leaderIsrAndControllerEpochs.get(topicPartition) match {
+ case Some(leaderIsrAndEpoch) => record
+ .setIsr(leaderIsrAndEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+ .setLeader(leaderIsrAndEpoch.leaderAndIsr.leader)
+ .setLeaderEpoch(leaderIsrAndEpoch.leaderAndIsr.leaderEpoch)
+ .setPartitionEpoch(leaderIsrAndEpoch.leaderAndIsr.partitionEpoch)
+ .setLeaderRecoveryState(leaderIsrAndEpoch.leaderAndIsr.leaderRecoveryState.value())
+ case None =>
+ warn(s"Could not find partition state in ZK for $topicPartition. Initializing this partition " +
+ s"with ISR={$replicaList} and leaderEpoch=0.")
+ record
+ .setIsr(replicaList)
+ .setLeader(replicaList.get(0))
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(0)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
+ }
+ topicBatch.add(new ApiMessageAndVersion(record, PartitionRecord.HIGHEST_SUPPORTED_VERSION))
+ }
+
+ val props = topicConfigs(topic)
+ props.forEach { case (key: Object, value: Object) =>
+ topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
+ .setResourceType(ConfigResource.Type.TOPIC.id)
+ .setResourceName(topic)
+ .setName(key.toString)
+ .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+ }
+ recordConsumer.accept(topicBatch)
+ }
+ }
+
+ def migrateBrokerConfigs(metadataVersion: MetadataVersion,
+ recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+ val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
+ val batch = new util.ArrayList[ApiMessageAndVersion]()
+ zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) =>
+ val brokerResource = if (broker == ConfigEntityName.Default) {
+ ""
+ } else {
+ broker
+ }
+ props.forEach { case (key: Object, value: Object) =>
+ batch.add(new ApiMessageAndVersion(new ConfigRecord()
+ .setResourceType(ConfigResource.Type.BROKER.id)
+ .setResourceName(brokerResource)
+ .setName(key.toString)
+ .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+ }
+ }
+ if (!batch.isEmpty) {
+ recordConsumer.accept(batch)
+ }
+ }
+
+ def migrateClientQuotas(metadataVersion: MetadataVersion,
+ recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+ val adminZkClient = new AdminZkClient(zkClient)
+
+ def migrateEntityType(entityType: String): Unit = {
+ adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) =>
+ val entity = new EntityData().setEntityType(entityType).setEntityName(name)
+ val batch = new util.ArrayList[ApiMessageAndVersion]()
+ ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
+ batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+ .setEntity(List(entity).asJava)
+ .setKey(key)
+ .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+ }
+ recordConsumer.accept(batch)
+ }
+ }
+
+ migrateEntityType(ConfigType.User)
+ migrateEntityType(ConfigType.Client)
+ adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (name, props) =>
+ // Taken from ZkAdminManager
+ val components = name.split("/")
+ if (components.size != 3 || components(1) != "clients")
+ throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+ val entity = List(
+ new EntityData().setEntityType(ConfigType.User).setEntityName(components(0)),
+ new EntityData().setEntityType(ConfigType.Client).setEntityName(components(2))
+ )
+
+ val batch = new util.ArrayList[ApiMessageAndVersion]()
+ ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
+ batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+ .setEntity(entity.asJava)
+ .setKey(key)
+ .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+ }
+ recordConsumer.accept(batch)
+ }
+
+ migrateEntityType(ConfigType.Ip)
+ }
+
+ def migrateProducerId(metadataVersion: MetadataVersion,
+ recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+ val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
+ dataOpt match {
+ case Some(data) =>
+ val producerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
+ recordConsumer.accept(List(new ApiMessageAndVersion(new ProducerIdsRecord()
+ .setBrokerEpoch(-1)
+ .setBrokerId(producerIdBlock.assignedBrokerId)
+ .setNextProducerId(producerIdBlock.firstProducerId), ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
+ case None => // Nothing to migrate
+ }
+ }
+
+ override def readAllMetadata(batchConsumer: Consumer[util.List[ApiMessageAndVersion]],
+ brokerIdConsumer: Consumer[Integer]): Unit = {
+ migrateTopics(MetadataVersion.latest(), batchConsumer, brokerIdConsumer)
+ migrateBrokerConfigs(MetadataVersion.latest(), batchConsumer)
+ migrateClientQuotas(MetadataVersion.latest(), batchConsumer)
+ migrateProducerId(MetadataVersion.latest(), batchConsumer)
+ }
+
+ override def readBrokerIds(): util.Set[Integer] = {
+ zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava
+ }
+
+ override def readBrokerIdsFromTopicAssignments(): util.Set[Integer] = {
+ val topics = zkClient.getAllTopicsInCluster()
+ val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
+ val brokersWithAssignments = new util.HashSet[Integer]()
+ replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(_, _, assignments) =>
+ assignments.values.foreach { assignment =>
+ assignment.replicas.foreach { brokerId => brokersWithAssignments.add(brokerId) }
+ }
+ }
+ brokersWithAssignments
+ }
+
+ override def createTopic(topicName: String,
+ topicId: Uuid,
+ partitions: util.Map[Integer, PartitionRegistration],
+ state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ val assignments = partitions.asScala.map { case (partitionId, partition) =>
+ new TopicPartition(topicName, partitionId) ->
+ ReplicaAssignment(partition.replicas, partition.addingReplicas, partition.removingReplicas)
+ }
+
+ val createTopicZNode = {
+ val path = TopicZNode.path(topicName)
+ CreateRequest(
+ path,
+ TopicZNode.encode(Some(topicId), assignments),
+ zkClient.defaultAcls(path),
+ CreateMode.PERSISTENT)
+ }
+ val createPartitionsZNode = {
+ val path = TopicPartitionsZNode.path(topicName)
+ CreateRequest(
+ path,
+ null,
+ zkClient.defaultAcls(path),
+ CreateMode.PERSISTENT)
+ }
+
+ val createPartitionZNodeReqs = partitions.asScala.flatMap { case (partitionId, partition) =>
+ val topicPartition = new TopicPartition(topicName, partitionId)
+ Seq(
+ createTopicPartition(topicPartition),
+ createTopicPartitionState(topicPartition, partition, state.kraftControllerEpoch())
+ )
+ }
+
+ val requests = Seq(createTopicZNode, createPartitionsZNode) ++ createPartitionZNodeReqs
+ val (migrationZkVersion, _) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
+ state.withMigrationZkVersion(migrationZkVersion)
+ }
+
+ private def createTopicPartition(topicPartition: TopicPartition): CreateRequest = {
+ val path = TopicPartitionZNode.path(topicPartition)
+ CreateRequest(path, null, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
+ }
+
+ private def partitionStatePathAndData(topicPartition: TopicPartition,
+ partitionRegistration: PartitionRegistration,
+ controllerEpoch: Int): (String, Array[Byte]) = {
+ val path = TopicPartitionStateZNode.path(topicPartition)
+ val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new LeaderAndIsr(
+ partitionRegistration.leader,
+ partitionRegistration.leaderEpoch,
+ partitionRegistration.isr.toList,
+ partitionRegistration.leaderRecoveryState,
+ partitionRegistration.partitionEpoch), controllerEpoch))
+ (path, data)
+ }
+
+ private def createTopicPartitionState(topicPartition: TopicPartition,
+ partitionRegistration: PartitionRegistration,
+ controllerEpoch: Int): CreateRequest = {
+ val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch)
+ CreateRequest(path, data, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
+ }
+
+ private def updateTopicPartitionState(topicPartition: TopicPartition,
+ partitionRegistration: PartitionRegistration,
+ controllerEpoch: Int): SetDataRequest = {
+ val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch)
+ SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
+ }
+
+ override def updateTopicPartitions(topicPartitions: util.Map[String, util.Map[Integer, PartitionRegistration]],
+ state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ val requests = topicPartitions.asScala.flatMap { case (topicName, partitionRegistrations) =>
+ partitionRegistrations.asScala.flatMap { case (partitionId, partitionRegistration) =>
+ val topicPartition = new TopicPartition(topicName, partitionId)
+ Seq(updateTopicPartitionState(topicPartition, partitionRegistration, state.kraftControllerEpoch()))
+ }
+ }
+ if (requests.isEmpty) {
+ state
+ } else {
+ val (migrationZkVersion, _) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state)
+ state.withMigrationZkVersion(migrationZkVersion)
+ }
+ }
+
+ // Try to update an entity config and the migration state. If NoNode is encountered, it probably means we
+ // need to recursively create the parent ZNode. In this case, return None.
+ def tryWriteEntityConfig(entityType: String,
+ path: String,
+ props: Properties,
+ create: Boolean,
+ state: ZkMigrationLeadershipState): Option[ZkMigrationLeadershipState] = {
+ val configData = ConfigEntityZNode.encode(props)
+
+ val requests = if (create) {
+ Seq(CreateRequest(ConfigEntityZNode.path(entityType, path), configData, zkClient.defaultAcls(path), CreateMode.PERSISTENT))
+ } else {
+ Seq(SetDataRequest(ConfigEntityZNode.path(entityType, path), configData, ZkVersion.MatchAnyVersion))
+ }
+ val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
+ if (!create && responses.head.resultCode.equals(Code.NONODE)) {
+ // Not fatal. Just means we need to Create this node instead of SetData
+ None
+ } else if (responses.head.resultCode.equals(Code.OK)) {
+ Some(state.withMigrationZkVersion(migrationZkVersion))
+ } else {
+ throw KeeperException.create(responses.head.resultCode, path)
+ }
+ }
+
+ def writeClientQuotas(entity: ClientQuotaEntity,
+ quotas: util.Map[String, Double],
+ state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ val entityMap = entity.entries().asScala
+ val hasUser = entityMap.contains(ConfigType.User)
+ val hasClient = entityMap.contains(ConfigType.Client)
+ val hasIp = entityMap.contains(ConfigType.Ip)
+ val props = new Properties()
+ // We store client quota values as strings in the ZK JSON
+ quotas.forEach { case (key, value) => props.put(key, value.toString) }
+ val (configType, path) = if (hasUser && !hasClient) {
+ (Some(ConfigType.User), Some(entityMap(ConfigType.User)))
+ } else if (hasUser && hasClient) {
+ (Some(ConfigType.User), Some(s"${entityMap(ConfigType.User)}/clients/${entityMap(ConfigType.Client)}"))
+ } else if (hasClient) {
+ (Some(ConfigType.Client), Some(entityMap(ConfigType.Client)))
+ } else if (hasIp) {
+ (Some(ConfigType.Ip), Some(entityMap(ConfigType.Ip)))
+ } else {
+ (None, None)
+ }
+
+ if (path.isEmpty) {
+ error(s"Skipping unknown client quota entity $entity")
+ return state
+ }
+
+ // Try to write the client quota configs once with create=false, and again with create=true if the first operation fails
+ tryWriteEntityConfig(configType.get, path.get, props, create=false, state) match {
+ case Some(newState) =>
+ newState
+ case None =>
+ // If we didn't update the migration state, we failed to write the client quota. Try again
+ // after recursively create its parent znodes
+ val createPath = if (hasUser && hasClient) {
+ s"${ConfigEntityTypeZNode.path(configType.get)}/${entityMap(ConfigType.User)}/clients"
+ } else {
+ ConfigEntityTypeZNode.path(configType.get)
+ }
+ zkClient.createRecursive(createPath, throwIfPathExists=false)
+ debug(s"Recursively creating ZNode $createPath and attempting to write $entity quotas a second time.")
+
+ tryWriteEntityConfig(configType.get, path.get, props, create=true, state) match {
+ case Some(newStateSecondTry) => newStateSecondTry
+ case None => throw new RuntimeException(
+ s"Could not write client quotas for $entity on second attempt when using Create instead of SetData")
+ }
+ }
+ }
+
+ def writeProducerId(nextProducerId: Long, state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(
+ new ProducerIdsBlock(-1, nextProducerId, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE))
+
+ val request = SetDataRequest(ProducerIdBlockZNode.path, newProducerIdBlockData, ZkVersion.MatchAnyVersion)
+ val (migrationZkVersion, _) = zkClient.retryMigrationRequestsUntilConnected(Seq(request), state)
+ state.withMigrationZkVersion(migrationZkVersion)
+ }
+
+ def writeConfigs(resource: ConfigResource,
+ configs: util.Map[String, String],
+ state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ val configType = resource.`type`() match {
+ case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
+ case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)
+ case _ => None
+ }
+
+ val configName = resource.name()
+ if (configType.isDefined) {
+ val props = new Properties()
+ configs.forEach { case (key, value) => props.put(key, value) }
+ tryWriteEntityConfig(configType.get, configName, props, create=false, state) match {
+ case Some(newState) =>
+ newState
+ case None =>
+ val createPath = ConfigEntityTypeZNode.path(configType.get)
+ debug(s"Recursively creating ZNode $createPath and attempting to write $resource configs a second time.")
+ zkClient.createRecursive(createPath, throwIfPathExists=false)
+
+ tryWriteEntityConfig(configType.get, configName, props, create=true, state) match {
+ case Some(newStateSecondTry) => newStateSecondTry
+ case None => throw new RuntimeException(
+ s"Could not write ${configType.get} configs on second attempt when using Create instead of SetData.")
+ }
+ }
+ } else {
+ debug(s"Not updating ZK for $resource since it is not a Broker or Topic entity.")
+ state
+ }
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
new file mode 100644
index 00000000000..54b1156ccb1
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import java.util
+import java.util.concurrent.TimeUnit
+import scala.jdk.CollectionConverters._
+
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class ZkMigrationIntegrationTest {
+
+ class MetadataDeltaVerifier {
+ val metadataDelta = new MetadataDelta(MetadataImage.EMPTY)
+ var offset = 0
+ def accept(batch: java.util.List[ApiMessageAndVersion]): Unit = {
+ batch.forEach(message => {
+ metadataDelta.replay(offset, 0, message.message())
+ offset += 1
+ })
+ }
+
+ def verify(verifier: MetadataImage => Unit): Unit = {
+ val image = metadataDelta.apply()
+ verifier.apply(image)
+ }
+ }
+
+ @ClusterTest(brokers = 3, clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_4_IV0)
+ def testMigrate(clusterInstance: ClusterInstance): Unit = {
+ val admin = clusterInstance.createAdminClient()
+ val newTopics = new util.ArrayList[NewTopic]()
+ newTopics.add(new NewTopic("test-topic-1", 2, 3.toShort)
+ .configs(Map(TopicConfig.SEGMENT_BYTES_CONFIG -> "102400", TopicConfig.SEGMENT_MS_CONFIG -> "300000").asJava))
+ newTopics.add(new NewTopic("test-topic-2", 1, 3.toShort))
+ newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
+ val createTopicResult = admin.createTopics(newTopics)
+ createTopicResult.all().get(60, TimeUnit.SECONDS)
+
+ val quotas = new util.ArrayList[ClientQuotaAlteration]()
+ quotas.add(new ClientQuotaAlteration(
+ new ClientQuotaEntity(Map("user" -> "user1").asJava),
+ List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
+ quotas.add(new ClientQuotaAlteration(
+ new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava),
+ List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
+ quotas.add(new ClientQuotaAlteration(
+ new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
+ List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
+ admin.alterClientQuotas(quotas)
+
+ val zkClient = clusterInstance.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+ val migrationClient = new ZkMigrationClient(zkClient)
+ var migrationState = migrationClient.getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY)
+ migrationState = migrationState.withNewKRaftController(3000, 42)
+ migrationState = migrationClient.claimControllerLeadership(migrationState)
+
+ val brokers = new java.util.HashSet[Integer]()
+ val verifier = new MetadataDeltaVerifier()
+ migrationClient.readAllMetadata(batch => verifier.accept(batch), brokerId => brokers.add(brokerId))
+ assertEquals(Seq(0, 1, 2), brokers.asScala.toSeq)
+
+ verifier.verify { image =>
+ assertNotNull(image.topics().getTopic("test-topic-1"))
+ assertEquals(2, image.topics().getTopic("test-topic-1").partitions().size())
+
+ assertNotNull(image.topics().getTopic("test-topic-2"))
+ assertEquals(1, image.topics().getTopic("test-topic-2").partitions().size())
+
+ assertNotNull(image.topics().getTopic("test-topic-3"))
+ assertEquals(10, image.topics().getTopic("test-topic-3").partitions().size())
+
+ val clientQuotas = image.clientQuotas().entities()
+ assertEquals(3, clientQuotas.size())
+ }
+
+ migrationState = migrationClient.releaseControllerLeadership(migrationState)
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/ZkAdminManagerTest.scala b/core/src/test/scala/unit/kafka/server/ZkAdminManagerTest.scala
index 2057fda2c52..4fe64622d79 100644
--- a/core/src/test/scala/unit/kafka/server/ZkAdminManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ZkAdminManagerTest.scala
@@ -18,7 +18,6 @@
package kafka.server
import java.util.Properties
-
import kafka.server.metadata.ZkConfigRepository
import kafka.utils.TestUtils
import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -28,10 +27,7 @@ import org.apache.kafka.common.message.DescribeConfigsRequestData
import org.apache.kafka.common.message.DescribeConfigsResponseData
import org.apache.kafka.common.protocol.Errors
import org.junit.jupiter.api.{AfterEach, Test}
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertFalse
-import org.junit.jupiter.api.Assertions.assertNotNull
-import org.junit.jupiter.api.Assertions.assertNotEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertThrows}
import org.mockito.Mockito.{mock, when}
import scala.jdk.CollectionConverters._
@@ -54,6 +50,28 @@ class ZkAdminManagerTest {
new ConfigHelper(metadataCache, KafkaConfig.fromProps(props), new ZkConfigRepository(new AdminZkClient(zkClient)))
}
+ @Test
+ def testClientQuotasToProps(): Unit = {
+ val emptyProps = ZkAdminManager.clientQuotaPropsToDoubleMap(Map.empty)
+ assertEquals(0, emptyProps.size)
+
+ val oneProp = ZkAdminManager.clientQuotaPropsToDoubleMap(Map("foo" -> "1234"))
+ assertEquals(1, oneProp.size)
+ assertEquals(1234.0, oneProp("foo"))
+
+ // This is probably not desired, but kept for compatability with existing usages
+ val emptyKey = ZkAdminManager.clientQuotaPropsToDoubleMap(Map("" -> "-42.1"))
+ assertEquals(1, emptyKey.size)
+ assertEquals(-42.1, emptyKey(""))
+
+ val manyProps = ZkAdminManager.clientQuotaPropsToDoubleMap(Map("foo" -> "1234", "bar" -> "0", "spam" -> "-1234.56"))
+ assertEquals(3, manyProps.size)
+
+ assertThrows(classOf[NullPointerException], () => ZkAdminManager.clientQuotaPropsToDoubleMap(Map("foo" -> null)))
+ assertThrows(classOf[IllegalStateException], () => ZkAdminManager.clientQuotaPropsToDoubleMap(Map("foo" -> "bar")))
+ assertThrows(classOf[IllegalStateException], () => ZkAdminManager.clientQuotaPropsToDoubleMap(Map("foo" -> "")))
+ }
+
@Test
def testDescribeConfigsWithNullConfigurationKeys(): Unit = {
when(zkClient.getEntityConfigs(ConfigType.Topic, topic)).thenReturn(TestUtils.createBrokerConfig(brokerId, "zk"))
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 7b7ddfbc56f..11eae3386f8 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -19,7 +19,6 @@ package kafka.zk
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.{Collections, Properties}
-
import kafka.api.LeaderAndIsr
import kafka.cluster.{Broker, EndPoint}
import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
@@ -43,9 +42,10 @@ import org.apache.kafka.common.security.token.delegation.TokenInformation
import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.server.common.MetadataVersion
import org.apache.zookeeper.KeeperException.{Code, NoAuthException, NoNodeException, NodeExistsException}
-import org.apache.zookeeper.ZooDefs
+import org.apache.zookeeper.{CreateMode, ZooDefs}
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.common.ZKConfig
import org.apache.zookeeper.data.Stat
@@ -1373,6 +1373,53 @@ class KafkaZkClientTest extends QuorumTestHarness {
} finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER)
}
+ @Test
+ def testFailToUpdateMigrationZNode(): Unit = {
+ val (_, stat) = zkClient.getControllerEpoch.get
+ var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, stat.getVersion)
+ migrationState = zkClient.getOrCreateMigrationState(migrationState)
+ assertEquals(0, migrationState.migrationZkVersion())
+
+ // A batch of migration writes to make. The last one will fail causing the migration znode to not be updated
+ val requests_bad = Seq(
+ CreateRequest("/foo", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+ CreateRequest("/foo/bar", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+ CreateRequest("/foo/bar/spam", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+ CreateRequest("/foo", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+ )
+
+ migrationState = migrationState.withControllerZkVersion(stat.getVersion)
+ zkClient.retryMigrationRequestsUntilConnected(requests_bad, migrationState) match {
+ case (zkVersion: Int, requests: Seq[AsyncRequest#Response]) =>
+ assertEquals(0, zkVersion)
+ assert(requests.take(3).forall(resp => resp.resultCode.equals(Code.OK)))
+ assertEquals(Code.NODEEXISTS, requests.last.resultCode)
+ case _ => fail()
+ }
+
+ // Check state again
+ val loadedState = zkClient.getOrCreateMigrationState(ZkMigrationLeadershipState.EMPTY)
+ assertEquals(0, loadedState.migrationZkVersion())
+
+ // Resend the same requests, with the last one succeeding this time. This will result in NODEEXISTS, but
+ // should still update the migration state
+ val requests_good = Seq(
+ CreateRequest("/foo", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+ CreateRequest("/foo/bar", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+ CreateRequest("/foo/bar/spam", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+ CreateRequest("/foo/bar/eggs", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+ )
+
+ migrationState = migrationState.withControllerZkVersion(stat.getVersion)
+ zkClient.retryMigrationRequestsUntilConnected(requests_good, migrationState) match {
+ case (zkVersion: Int, requests: Seq[AsyncRequest#Response]) =>
+ assertEquals(1, zkVersion)
+ assert(requests.take(3).forall(resp => resp.resultCode.equals(Code.NODEEXISTS)))
+ assertEquals(Code.OK, requests.last.resultCode)
+ case _ => fail()
+ }
+ }
+
class ExpiredKafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time)
extends KafkaZkClient(zooKeeperClient, isSecure, time) {
// Overwriting this method from the parent class to force the client to re-register the Broker.
diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
new file mode 100644
index 00000000000..7fae24f650e
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.coordinator.transaction.ProducerIdManager
+import kafka.server.{ConfigType, QuorumTestHarness, ZkAdminManager}
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.config.internals.QuotaConfigs
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType, ProducerIdsRecord}
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
+import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+
+import java.util.Properties
+import scala.collection.Map
+import scala.jdk.CollectionConverters._
+
+/**
+ * ZooKeeper integration tests that verify the interoperability of KafkaZkClient and ZkMigrationClient.
+ */
+class ZkMigrationClientTest extends QuorumTestHarness {
+
+ private var migrationClient: ZkMigrationClient = _
+
+ private var migrationState: ZkMigrationLeadershipState = _
+
+ @BeforeEach
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
+ zkClient.createControllerEpochRaw(1)
+
+ migrationClient = new ZkMigrationClient(zkClient)
+ migrationState = initialMigrationState
+ migrationState = migrationClient.getOrCreateMigrationRecoveryState(migrationState)
+ }
+
+ private def initialMigrationState: ZkMigrationLeadershipState = {
+ val (_, stat) = zkClient.getControllerEpoch.get
+ new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, stat.getVersion)
+ }
+
+ @Test
+ def testMigrateEmptyZk(): Unit = {
+ val brokers = new java.util.ArrayList[Integer]()
+ val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+
+ migrationClient.readAllMetadata(batch => batches.add(batch), brokerId => brokers.add(brokerId))
+ assertEquals(0, brokers.size())
+ assertEquals(0, batches.size())
+ }
+
+ @Test
+ def testEmptyWrite(): Unit = {
+ val (zkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(), migrationState)
+ assertEquals(migrationState.migrationZkVersion(), zkVersion)
+ assertTrue(responses.isEmpty)
+ }
+
+ @Test
+ def testUpdateExistingPartitions(): Unit = {
+ // Create a topic and partition state in ZK like KafkaController would
+ val assignment = Map(
+ new TopicPartition("test", 0) -> List(0, 1, 2),
+ new TopicPartition("test", 1) -> List(1, 2, 3)
+ )
+ zkClient.createTopicAssignment("test", Some(Uuid.randomUuid()), assignment)
+
+ val leaderAndIsrs = Map(
+ new TopicPartition("test", 0) -> LeaderIsrAndControllerEpoch(
+ new LeaderAndIsr(0, 5, List(0, 1, 2), LeaderRecoveryState.RECOVERED, -1), 1),
+ new TopicPartition("test", 1) -> LeaderIsrAndControllerEpoch(
+ new LeaderAndIsr(1, 5, List(1, 2, 3), LeaderRecoveryState.RECOVERED, -1), 1)
+ )
+ zkClient.createTopicPartitionStatesRaw(leaderAndIsrs, 0)
+
+ // Now verify that we can update it with migration client
+ assertEquals(0, migrationState.migrationZkVersion())
+
+ val partitions = Map(
+ 0 -> new PartitionRegistration(Array(0, 1, 2), Array(1, 2), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 6, -1),
+ 1 -> new PartitionRegistration(Array(1, 2, 3), Array(3), Array(), Array(), 3, LeaderRecoveryState.RECOVERED, 7, -1)
+ ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
+ migrationState = migrationClient.updateTopicPartitions(Map("test" -> partitions).asJava, migrationState)
+ assertEquals(1, migrationState.migrationZkVersion())
+
+ // Read back with Zk client
+ val partition0 = zkClient.getTopicPartitionState(new TopicPartition("test", 0)).get.leaderAndIsr
+ assertEquals(1, partition0.leader)
+ assertEquals(6, partition0.leaderEpoch)
+ assertEquals(List(1, 2), partition0.isr)
+
+ val partition1 = zkClient.getTopicPartitionState(new TopicPartition("test", 1)).get.leaderAndIsr
+ assertEquals(3, partition1.leader)
+ assertEquals(7, partition1.leaderEpoch)
+ assertEquals(List(3), partition1.isr)
+ }
+
+ @Test
+ def testCreateNewPartitions(): Unit = {
+ assertEquals(0, migrationState.migrationZkVersion())
+
+ val partitions = Map(
+ 0 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
+ 1 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
+ ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
+ migrationState = migrationClient.createTopic("test", Uuid.randomUuid(), partitions, migrationState)
+ assertEquals(1, migrationState.migrationZkVersion())
+
+ // Read back with Zk client
+ val partition0 = zkClient.getTopicPartitionState(new TopicPartition("test", 0)).get.leaderAndIsr
+ assertEquals(0, partition0.leader)
+ assertEquals(0, partition0.leaderEpoch)
+ assertEquals(List(0, 1, 2), partition0.isr)
+
+ val partition1 = zkClient.getTopicPartitionState(new TopicPartition("test", 1)).get.leaderAndIsr
+ assertEquals(1, partition1.leader)
+ assertEquals(0, partition1.leaderEpoch)
+ assertEquals(List(1, 2, 3), partition1.isr)
+ }
+
+ // Write Client Quotas using ZkMigrationClient and read them back using AdminZkClient
+ private def writeClientQuotaAndVerify(migrationClient: ZkMigrationClient,
+ adminZkClient: AdminZkClient,
+ migrationState: ZkMigrationLeadershipState,
+ entity: Map[String, String],
+ quotas: Map[String, Double],
+ zkEntityType: String,
+ zkEntityName: String): ZkMigrationLeadershipState = {
+ val nextMigrationState = migrationClient.writeClientQuotas(
+ new ClientQuotaEntity(entity.asJava),
+ quotas.asJava,
+ migrationState)
+ val newProps = ZkAdminManager.clientQuotaPropsToDoubleMap(
+ adminZkClient.fetchEntityConfig(zkEntityType, zkEntityName).asScala)
+ assertEquals(quotas, newProps)
+ nextMigrationState
+ }
+
+
+ @Test
+ def testWriteExistingClientQuotas(): Unit = {
+ val props = new Properties()
+ props.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "100000")
+ adminZkClient.changeConfigs(ConfigType.User, "user1", props)
+ adminZkClient.changeConfigs(ConfigType.User, "user1/clients/clientA", props)
+
+ assertEquals(0, migrationState.migrationZkVersion())
+ migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+ Map(ConfigType.User -> "user1"),
+ Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0),
+ ConfigType.User, "user1")
+ assertEquals(1, migrationState.migrationZkVersion())
+
+ migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+ Map(ConfigType.User -> "user1"),
+ Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0),
+ ConfigType.User, "user1")
+ assertEquals(2, migrationState.migrationZkVersion())
+
+ migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+ Map(ConfigType.User -> "user1"),
+ Map.empty,
+ ConfigType.User, "user1")
+ assertEquals(3, migrationState.migrationZkVersion())
+
+ migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+ Map(ConfigType.User -> "user1"),
+ Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
+ ConfigType.User, "user1")
+ assertEquals(4, migrationState.migrationZkVersion())
+ }
+
+ @Test
+ def testWriteNewClientQuotas(): Unit = {
+ assertEquals(0, migrationState.migrationZkVersion())
+ migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+ Map(ConfigType.User -> "user2"),
+ Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
+ ConfigType.User, "user2")
+
+ assertEquals(1, migrationState.migrationZkVersion())
+
+ migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+ Map(ConfigType.User -> "user2", ConfigType.Client -> "clientA"),
+ Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
+ ConfigType.User, "user2/clients/clientA")
+
+ assertEquals(2, migrationState.migrationZkVersion())
+ }
+
+ @Test
+ def testClaimAbsentController(): Unit = {
+ assertEquals(0, migrationState.migrationZkVersion())
+ migrationState = migrationClient.claimControllerLeadership(migrationState)
+ assertEquals(1, migrationState.controllerZkVersion())
+ }
+
+ @Test
+ def testExistingKRaftControllerClaim(): Unit = {
+ assertEquals(0, migrationState.migrationZkVersion())
+ migrationState = migrationClient.claimControllerLeadership(migrationState)
+ assertEquals(1, migrationState.controllerZkVersion())
+
+ // We don't require a KRaft controller to release the controller in ZK before another KRaft controller
+ // can claim it. This is because KRaft leadership comes from Raft and we are just synchronizing it to ZK.
+ var otherNodeState = new ZkMigrationLeadershipState(3001, 43, 100, 42, Time.SYSTEM.milliseconds(), -1, -1)
+ otherNodeState = migrationClient.claimControllerLeadership(otherNodeState)
+ assertEquals(2, otherNodeState.controllerZkVersion())
+ assertEquals(3001, otherNodeState.kraftControllerId())
+ assertEquals(43, otherNodeState.kraftControllerEpoch())
+ }
+
+ @Test
+ def testNonIncreasingKRaftEpoch(): Unit = {
+ assertEquals(0, migrationState.migrationZkVersion())
+
+ migrationState = migrationClient.claimControllerLeadership(migrationState)
+ assertEquals(1, migrationState.controllerZkVersion())
+
+ migrationState = migrationState.withNewKRaftController(3000, 40)
+ val t1 = assertThrows(classOf[IllegalStateException], () => migrationClient.claimControllerLeadership(migrationState))
+ assertEquals("Cannot register KRaft controller 3000 as the active controller in ZK since its epoch 10000040 is not higher than the current ZK epoch 10000042.", t1.getMessage)
+
+ migrationState = migrationState.withNewKRaftController(3000, 42)
+ val t2 = assertThrows(classOf[IllegalStateException], () => migrationClient.claimControllerLeadership(migrationState))
+ assertEquals("Cannot register KRaft controller 3000 as the active controller in ZK since its epoch 10000042 is not higher than the current ZK epoch 10000042.", t2.getMessage)
+ }
+
+ @Test
+ def testClaimAndReleaseExistingController(): Unit = {
+ assertEquals(0, migrationState.migrationZkVersion())
+
+ val (epoch, zkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(100)
+ assertEquals(epoch, 2)
+ assertEquals(zkVersion, 1)
+
+ migrationState = migrationClient.claimControllerLeadership(migrationState)
+ assertEquals(2, migrationState.controllerZkVersion())
+ zkClient.getControllerEpoch match {
+ case Some((kraftEpoch, stat)) =>
+ assertEquals(10000042, kraftEpoch)
+ assertEquals(2, stat.getVersion)
+ case None => fail()
+ }
+ assertEquals(3000, zkClient.getControllerId.get)
+ assertThrows(classOf[ControllerMovedException], () => zkClient.registerControllerAndIncrementControllerEpoch(100))
+
+ migrationState = migrationClient.releaseControllerLeadership(migrationState)
+ val (epoch1, zkVersion1) = zkClient.registerControllerAndIncrementControllerEpoch(100)
+ assertEquals(epoch1, 10000043)
+ assertEquals(zkVersion1, 3)
+ }
+
+ @Test
+ def testReadAndWriteProducerId(): Unit = {
+ def generateNextProducerIdWithZkAndRead(): Long = {
+ // Generate a producer ID in ZK
+ val manager = ProducerIdManager.zk(1, zkClient)
+ manager.generateProducerId()
+
+ val records = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+ migrationClient.migrateProducerId(MetadataVersion.latest(), batch => records.add(batch))
+ assertEquals(1, records.size())
+ assertEquals(1, records.get(0).size())
+
+ val record = records.get(0).get(0).message().asInstanceOf[ProducerIdsRecord]
+ record.nextProducerId()
+ }
+
+ // Initialize with ZK ProducerIdManager
+ assertEquals(0, generateNextProducerIdWithZkAndRead())
+
+ // Update next producer ID via migration client
+ migrationState = migrationClient.writeProducerId(6000, migrationState)
+ assertEquals(1, migrationState.migrationZkVersion())
+
+ // Switch back to ZK, it should provision the next block
+ assertEquals(7000, generateNextProducerIdWithZkAndRead())
+ }
+
+ @Test
+ def testMigrateTopicConfigs(): Unit = {
+ val props = new Properties()
+ props.put(TopicConfig.FLUSH_MS_CONFIG, "60000")
+ props.put(TopicConfig.RETENTION_MS_CONFIG, "300000")
+ adminZkClient.createTopicWithAssignment("test", props, Map(0 -> Seq(0, 1, 2), 1 -> Seq(1, 2, 0), 2 -> Seq(2, 0, 1)), usesTopicId = true)
+
+ val brokers = new java.util.ArrayList[Integer]()
+ val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+ migrationClient.migrateTopics(MetadataVersion.latest(), batch => batches.add(batch), brokerId => brokers.add(brokerId))
+ assertEquals(1, batches.size())
+ val configs = batches.get(0)
+ .asScala
+ .map {_.message()}
+ .filter(message => MetadataRecordType.fromId(message.apiKey()).equals(MetadataRecordType.CONFIG_RECORD))
+ .map {_.asInstanceOf[ConfigRecord]}
+ .toSeq
+ assertEquals(2, configs.size)
+ assertEquals(TopicConfig.FLUSH_MS_CONFIG, configs.head.name())
+ assertEquals("60000", configs.head.value())
+ assertEquals(TopicConfig.RETENTION_MS_CONFIG, configs.last.name())
+ assertEquals("300000", configs.last.value())
+ }
+
+ @Test
+ def testWriteNewTopicConfigs(): Unit = {
+ migrationState = migrationClient.writeConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "test"),
+ java.util.Collections.singletonMap(TopicConfig.SEGMENT_MS_CONFIG, "100000"), migrationState)
+ assertEquals(1, migrationState.migrationZkVersion())
+
+ val newProps = zkClient.getEntityConfigs(ConfigType.Topic, "test")
+ assertEquals(1, newProps.size())
+ assertEquals("100000", newProps.getProperty(TopicConfig.SEGMENT_MS_CONFIG))
+ }
+
+ @Test
+ def testWriteExistingTopicConfigs(): Unit = {
+ val props = new Properties()
+ props.put(TopicConfig.FLUSH_MS_CONFIG, "60000")
+ props.put(TopicConfig.RETENTION_MS_CONFIG, "300000")
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, "test", props)
+
+ migrationState = migrationClient.writeConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "test"),
+ java.util.Collections.singletonMap(TopicConfig.SEGMENT_MS_CONFIG, "100000"), migrationState)
+ assertEquals(1, migrationState.migrationZkVersion())
+
+ val newProps = zkClient.getEntityConfigs(ConfigType.Topic, "test")
+ assertEquals(1, newProps.size())
+ assertEquals("100000", newProps.getProperty(TopicConfig.SEGMENT_MS_CONFIG))
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
index bbdc1bf9779..3eb3e3c4e95 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
@@ -62,7 +62,8 @@ public final class ClientQuotasImage {
return entities.isEmpty();
}
- Map<ClientQuotaEntity, ClientQuotaImage> entities() {
+ // Visible for testing
+ public Map<ClientQuotaEntity, ClientQuotaImage> entities() {
return entities;
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
new file mode 100644
index 00000000000..5eccbc70625
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * Methods for interacting with ZooKeeper during a KIP-866 migration. The migration leadership state is stored under
+ * a ZNode /migration. All write operations to ZK during the migration are performed as a multi-op transaction which
+ * also updates the state of /migration.
+ */
+public interface MigrationClient {
+
+ /**
+ * Read or initialize the ZK migration leader state in ZK. If the ZNode is absent, the given {@code initialState}
+ * will be written and subsequently returned with the zkVersion of the node. If the ZNode is present, it will be
+ * read and returned.
+ * @param initialState An initial, emtpy, state to write to ZooKeeper for the migration state.
+ * @return The existing migration state, or the initial state given.
+ */
+ ZkMigrationLeadershipState getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState initialState);
+
+ /**
+ * Overwrite the migration state in ZK. This is done as a conditional update using
+ * {@link ZkMigrationLeadershipState#migrationZkVersion()}. If the conditional update fails, an exception is thrown.
+ * @param state The migration state to persist
+ * @return The persisted migration state or an exception.
+ */
+ ZkMigrationLeadershipState setMigrationRecoveryState(ZkMigrationLeadershipState state);
+
+ /**
+ * Attempt to claim controller leadership of the cluster in ZooKeeper. This involves overwriting the /controller
+ * and /controller_epoch ZNodes. The epoch given by {@code state} must be greater than the current epoch in ZooKeeper.
+ *
+ *
+ * @param state The current migration leadership state
+ * @return An updated migration leadership state including the version of /controller_epoch ZNode, if the
+ * leadership claim was successful. Otherwise, return the previous state unmodified.
+ */
+ ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state);
+
+ /**
+ * Release an existing claim on the cluster leadership in ZooKeeper. This involves deleting the /controller ZNode
+ * so that another controller can claim leadership.
+ *
+ * @param state The current migration leadership state.
+ * @return An updated migration leadership state with controllerZkVersion = 1, or raise an exception if ZooKeeper
+ *
+ *
+ */
+ ZkMigrationLeadershipState releaseControllerLeadership(ZkMigrationLeadershipState state);
+
+ ZkMigrationLeadershipState createTopic(
+ String topicName,
+ Uuid topicId,
+ Map<Integer, PartitionRegistration> topicPartitions,
+ ZkMigrationLeadershipState state
+ );
+
+ ZkMigrationLeadershipState updateTopicPartitions(
+ Map<String, Map<Integer, PartitionRegistration>> topicPartitions,
+ ZkMigrationLeadershipState state
+ );
+
+ void readAllMetadata(Consumer<List<ApiMessageAndVersion>> batchConsumer, Consumer<Integer> brokerIdConsumer);
+
+ Set<Integer> readBrokerIds();
+
+ Set<Integer> readBrokerIdsFromTopicAssignments();
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
index c54cf7d0e4f..b6217ee80c2 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
@@ -53,16 +53,22 @@ public class ZkMigrationLeadershipState {
this.controllerZkVersion = controllerZkVersion;
}
- public ZkMigrationLeadershipState withZkVersion(int zkVersion) {
+ public ZkMigrationLeadershipState withMigrationZkVersion(int zkVersion) {
return new ZkMigrationLeadershipState(
- this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
- this.kraftMetadataEpoch, this.lastUpdatedTimeMs, zkVersion, this.controllerZkVersion);
+ this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
+ this.kraftMetadataEpoch, this.lastUpdatedTimeMs, zkVersion, this.controllerZkVersion);
+ }
+
+ public ZkMigrationLeadershipState withControllerZkVersion(int zkVersion) {
+ return new ZkMigrationLeadershipState(
+ this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
+ this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, zkVersion);
}
public ZkMigrationLeadershipState withNewKRaftController(int controllerId, int controllerEpoch) {
return new ZkMigrationLeadershipState(
- controllerId, controllerEpoch, this.kraftMetadataOffset,
- this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, this.controllerZkVersion);
+ controllerId, controllerEpoch, this.kraftMetadataOffset,
+ this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, this.controllerZkVersion);
}
public int kraftControllerId() {
@@ -100,14 +106,14 @@ public class ZkMigrationLeadershipState {
@Override
public String toString() {
return "ZkMigrationLeadershipState{" +
- "kraftControllerId=" + kraftControllerId +
- ", kraftControllerEpoch=" + kraftControllerEpoch +
- ", kraftMetadataOffset=" + kraftMetadataOffset +
- ", kraftMetadataEpoch=" + kraftMetadataEpoch +
- ", lastUpdatedTimeMs=" + lastUpdatedTimeMs +
- ", migrationZkVersion=" + migrationZkVersion +
- ", controllerZkVersion=" + controllerZkVersion +
- '}';
+ "kraftControllerId=" + kraftControllerId +
+ ", kraftControllerEpoch=" + kraftControllerEpoch +
+ ", kraftMetadataOffset=" + kraftMetadataOffset +
+ ", kraftMetadataEpoch=" + kraftMetadataEpoch +
+ ", lastUpdatedTimeMs=" + lastUpdatedTimeMs +
+ ", migrationZkVersion=" + migrationZkVersion +
+ ", controllerZkVersion=" + controllerZkVersion +
+ '}';
}
@Override
@@ -115,11 +121,24 @@ public class ZkMigrationLeadershipState {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ZkMigrationLeadershipState that = (ZkMigrationLeadershipState) o;
- return kraftControllerId == that.kraftControllerId && kraftControllerEpoch == that.kraftControllerEpoch && kraftMetadataOffset == that.kraftMetadataOffset && kraftMetadataEpoch == that.kraftMetadataEpoch && lastUpdatedTimeMs == that.lastUpdatedTimeMs && migrationZkVersion == that.migrationZkVersion && controllerZkVersion == that.controllerZkVersion;
+ return kraftControllerId == that.kraftControllerId
+ && kraftControllerEpoch == that.kraftControllerEpoch
+ && kraftMetadataOffset == that.kraftMetadataOffset
+ && kraftMetadataEpoch == that.kraftMetadataEpoch
+ && lastUpdatedTimeMs == that.lastUpdatedTimeMs
+ && migrationZkVersion == that.migrationZkVersion
+ && controllerZkVersion == that.controllerZkVersion;
}
@Override
public int hashCode() {
- return Objects.hash(kraftControllerId, kraftControllerEpoch, kraftMetadataOffset, kraftMetadataEpoch, lastUpdatedTimeMs, migrationZkVersion, controllerZkVersion);
+ return Objects.hash(
+ kraftControllerId,
+ kraftControllerEpoch,
+ kraftMetadataOffset,
+ kraftMetadataEpoch,
+ lastUpdatedTimeMs,
+ migrationZkVersion,
+ controllerZkVersion);
}
}