You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "cmccabe (via GitHub)" <gi...@apache.org> on 2023/04/19 23:15:16 UTC
[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration
cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1171934758
##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -16,71 +16,79 @@
*/
package kafka.zk
-import kafka.api.LeaderAndIsr
-import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
-import kafka.security.authorizer.{AclAuthorizer, AclEntry}
-import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, VersionedAcls}
-import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, ZkAdminManager}
import kafka.utils.{Logging, PasswordEncoder}
-import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk.migration.{ZkAclMigrationClient, ZkConfigMigrationClient, ZkTopicMigrationClient}
import kafka.zookeeper._
import org.apache.kafka.common.acl.AccessControlEntry
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.ControllerMovedException
-import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
import org.apache.kafka.common.metadata._
-import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
-import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.metadata.migration.TopicMigrationClient.{TopicVisitor, TopicVisitorInterest}
+import org.apache.kafka.metadata.migration._
import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
-import org.apache.zookeeper.KeeperException.{AuthFailedException, Code, NoAuthException, SessionClosedRequireAuthException}
-import org.apache.zookeeper.{CreateMode, KeeperException}
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.{AuthFailedException, NoAuthException, SessionClosedRequireAuthException}
import java.util
import java.util.Properties
-import java.util.function.{BiConsumer, Consumer}
+import java.util.function.Consumer
import scala.collection.Seq
import scala.jdk.CollectionConverters._
object ZkMigrationClient {
+
val MaxBatchSize = 100
-}
-/**
- * Migration client in KRaft controller responsible for handling communication to Zookeeper and
- * the ZkBrokers present in the cluster. Methods that directly use KafkaZkClient should use the wrapZkException
- * wrapper function in order to translate KeeperExceptions into something usable by the caller.
- */
-class ZkMigrationClient(
- zkClient: KafkaZkClient,
- zkConfigEncoder: PasswordEncoder
-) extends MigrationClient with Logging {
+ def apply(
+ zkClient: KafkaZkClient,
+ zkConfigEncoder: PasswordEncoder
+ ): ZkMigrationClient = {
+ val topicClient = new ZkTopicMigrationClient(zkClient)
+ val configClient = new ZkConfigMigrationClient(zkClient, zkConfigEncoder)
+ val aclClient = new ZkAclMigrationClient(zkClient)
+ new ZkMigrationClient(zkClient, topicClient, configClient, aclClient)
+ }
/**
* Wrap a function such that any KeeperExceptions is captured and converted to a MigrationClientException.
* Any authentication related exception is converted to a MigrationClientAuthException which may be treated
* differently by the caller.
*/
@throws(classOf[MigrationClientException])
- private def wrapZkException[T](fn: => T): T = {
+ def wrapZkException[T](fn: => T): T = {
try {
fn
} catch {
- case e @ (_: MigrationClientException | _: MigrationClientAuthException) => throw e
- case e @ (_: AuthFailedException | _: NoAuthException | _: SessionClosedRequireAuthException) =>
+ case e@(_: MigrationClientException | _: MigrationClientAuthException) => throw e
Review Comment:
are these whitespace changes needed?
##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -16,71 +16,79 @@
*/
package kafka.zk
-import kafka.api.LeaderAndIsr
-import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
-import kafka.security.authorizer.{AclAuthorizer, AclEntry}
-import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, VersionedAcls}
-import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, ZkAdminManager}
import kafka.utils.{Logging, PasswordEncoder}
-import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk.migration.{ZkAclMigrationClient, ZkConfigMigrationClient, ZkTopicMigrationClient}
import kafka.zookeeper._
import org.apache.kafka.common.acl.AccessControlEntry
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.ControllerMovedException
-import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
import org.apache.kafka.common.metadata._
-import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
-import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.metadata.migration.TopicMigrationClient.{TopicVisitor, TopicVisitorInterest}
+import org.apache.kafka.metadata.migration._
import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
-import org.apache.zookeeper.KeeperException.{AuthFailedException, Code, NoAuthException, SessionClosedRequireAuthException}
-import org.apache.zookeeper.{CreateMode, KeeperException}
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.{AuthFailedException, NoAuthException, SessionClosedRequireAuthException}
import java.util
import java.util.Properties
-import java.util.function.{BiConsumer, Consumer}
+import java.util.function.Consumer
import scala.collection.Seq
import scala.jdk.CollectionConverters._
object ZkMigrationClient {
+
val MaxBatchSize = 100
-}
-/**
- * Migration client in KRaft controller responsible for handling communication to Zookeeper and
- * the ZkBrokers present in the cluster. Methods that directly use KafkaZkClient should use the wrapZkException
- * wrapper function in order to translate KeeperExceptions into something usable by the caller.
- */
-class ZkMigrationClient(
- zkClient: KafkaZkClient,
- zkConfigEncoder: PasswordEncoder
-) extends MigrationClient with Logging {
+ def apply(
+ zkClient: KafkaZkClient,
+ zkConfigEncoder: PasswordEncoder
+ ): ZkMigrationClient = {
+ val topicClient = new ZkTopicMigrationClient(zkClient)
+ val configClient = new ZkConfigMigrationClient(zkClient, zkConfigEncoder)
+ val aclClient = new ZkAclMigrationClient(zkClient)
+ new ZkMigrationClient(zkClient, topicClient, configClient, aclClient)
+ }
/**
* Wrap a function such that any KeeperExceptions is captured and converted to a MigrationClientException.
* Any authentication related exception is converted to a MigrationClientAuthException which may be treated
* differently by the caller.
*/
@throws(classOf[MigrationClientException])
- private def wrapZkException[T](fn: => T): T = {
+ def wrapZkException[T](fn: => T): T = {
try {
fn
} catch {
- case e @ (_: MigrationClientException | _: MigrationClientAuthException) => throw e
- case e @ (_: AuthFailedException | _: NoAuthException | _: SessionClosedRequireAuthException) =>
+ case e@(_: MigrationClientException | _: MigrationClientAuthException) => throw e
+ case e@(_: AuthFailedException | _: NoAuthException | _: SessionClosedRequireAuthException) =>
Review Comment:
is this whitespace change needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org