You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "cmccabe (via GitHub)" <gi...@apache.org> on 2023/04/19 23:15:16 UTC

[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1171934758


##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -16,71 +16,79 @@
  */
 package kafka.zk
 
-import kafka.api.LeaderAndIsr
-import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
-import kafka.security.authorizer.{AclAuthorizer, AclEntry}
-import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, VersionedAcls}
-import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, ZkAdminManager}
 import kafka.utils.{Logging, PasswordEncoder}
-import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk.migration.{ZkAclMigrationClient, ZkConfigMigrationClient, ZkTopicMigrationClient}
 import kafka.zookeeper._
 import org.apache.kafka.common.acl.AccessControlEntry
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.ControllerMovedException
-import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
 import org.apache.kafka.common.metadata._
-import org.apache.kafka.common.quota.ClientQuotaEntity
 import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
-import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.metadata.migration.TopicMigrationClient.{TopicVisitor, TopicVisitorInterest}
+import org.apache.kafka.metadata.migration._
 import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
-import org.apache.zookeeper.KeeperException.{AuthFailedException, Code, NoAuthException, SessionClosedRequireAuthException}
-import org.apache.zookeeper.{CreateMode, KeeperException}
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.{AuthFailedException, NoAuthException, SessionClosedRequireAuthException}
 
 import java.util
 import java.util.Properties
-import java.util.function.{BiConsumer, Consumer}
+import java.util.function.Consumer
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 object ZkMigrationClient {
+
   val MaxBatchSize = 100
-}
 
-/**
- * Migration client in KRaft controller responsible for handling communication to Zookeeper and
- * the ZkBrokers present in the cluster. Methods that directly use KafkaZkClient should use the wrapZkException
- * wrapper function in order to translate KeeperExceptions into something usable by the caller.
- */
-class ZkMigrationClient(
-  zkClient: KafkaZkClient,
-  zkConfigEncoder: PasswordEncoder
-) extends MigrationClient with Logging {
+  def apply(
+    zkClient: KafkaZkClient,
+    zkConfigEncoder: PasswordEncoder
+  ): ZkMigrationClient = {
+    val topicClient = new ZkTopicMigrationClient(zkClient)
+    val configClient = new ZkConfigMigrationClient(zkClient, zkConfigEncoder)
+    val aclClient = new ZkAclMigrationClient(zkClient)
+    new ZkMigrationClient(zkClient, topicClient, configClient, aclClient)
+  }
 
   /**
    * Wrap a function such that any KeeperExceptions is captured and converted to a MigrationClientException.
    * Any authentication related exception is converted to a MigrationClientAuthException which may be treated
    * differently by the caller.
    */
   @throws(classOf[MigrationClientException])
-  private def wrapZkException[T](fn: => T): T = {
+  def wrapZkException[T](fn: => T): T = {
     try {
       fn
     } catch {
-      case e @ (_: MigrationClientException | _: MigrationClientAuthException) => throw e
-      case e @ (_: AuthFailedException | _: NoAuthException | _: SessionClosedRequireAuthException) =>
+      case e@(_: MigrationClientException | _: MigrationClientAuthException) => throw e

Review Comment:
   are these whitespace changes needed?



##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -16,71 +16,79 @@
  */
 package kafka.zk
 
-import kafka.api.LeaderAndIsr
-import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
-import kafka.security.authorizer.{AclAuthorizer, AclEntry}
-import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, VersionedAcls}
-import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, ZkAdminManager}
 import kafka.utils.{Logging, PasswordEncoder}
-import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk.migration.{ZkAclMigrationClient, ZkConfigMigrationClient, ZkTopicMigrationClient}
 import kafka.zookeeper._
 import org.apache.kafka.common.acl.AccessControlEntry
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.ControllerMovedException
-import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
 import org.apache.kafka.common.metadata._
-import org.apache.kafka.common.quota.ClientQuotaEntity
 import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
-import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.metadata.migration.TopicMigrationClient.{TopicVisitor, TopicVisitorInterest}
+import org.apache.kafka.metadata.migration._
 import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
-import org.apache.zookeeper.KeeperException.{AuthFailedException, Code, NoAuthException, SessionClosedRequireAuthException}
-import org.apache.zookeeper.{CreateMode, KeeperException}
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.{AuthFailedException, NoAuthException, SessionClosedRequireAuthException}
 
 import java.util
 import java.util.Properties
-import java.util.function.{BiConsumer, Consumer}
+import java.util.function.Consumer
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 object ZkMigrationClient {
+
   val MaxBatchSize = 100
-}
 
-/**
- * Migration client in KRaft controller responsible for handling communication to Zookeeper and
- * the ZkBrokers present in the cluster. Methods that directly use KafkaZkClient should use the wrapZkException
- * wrapper function in order to translate KeeperExceptions into something usable by the caller.
- */
-class ZkMigrationClient(
-  zkClient: KafkaZkClient,
-  zkConfigEncoder: PasswordEncoder
-) extends MigrationClient with Logging {
+  def apply(
+    zkClient: KafkaZkClient,
+    zkConfigEncoder: PasswordEncoder
+  ): ZkMigrationClient = {
+    val topicClient = new ZkTopicMigrationClient(zkClient)
+    val configClient = new ZkConfigMigrationClient(zkClient, zkConfigEncoder)
+    val aclClient = new ZkAclMigrationClient(zkClient)
+    new ZkMigrationClient(zkClient, topicClient, configClient, aclClient)
+  }
 
   /**
    * Wrap a function such that any KeeperExceptions is captured and converted to a MigrationClientException.
    * Any authentication related exception is converted to a MigrationClientAuthException which may be treated
    * differently by the caller.
    */
   @throws(classOf[MigrationClientException])
-  private def wrapZkException[T](fn: => T): T = {
+  def wrapZkException[T](fn: => T): T = {
     try {
       fn
     } catch {
-      case e @ (_: MigrationClientException | _: MigrationClientAuthException) => throw e
-      case e @ (_: AuthFailedException | _: NoAuthException | _: SessionClosedRequireAuthException) =>
+      case e@(_: MigrationClientException | _: MigrationClientAuthException) => throw e
+      case e@(_: AuthFailedException | _: NoAuthException | _: SessionClosedRequireAuthException) =>

Review Comment:
   is this whitespace change needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org