You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/02/10 22:41:53 UTC

[kafka] branch trunk updated: MINOR: Remove references to HIGHEST_SUPPORTED_VERSION from ZkMigrationClient (#13226)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b5a9b9d67e7 MINOR: Remove references to HIGHEST_SUPPORTED_VERSION from ZkMigrationClient (#13226)
b5a9b9d67e7 is described below

commit b5a9b9d67e76761b8e8d62a77cf09ece0cc0b717
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri Feb 10 14:41:43 2023 -0800

    MINOR: Remove references to HIGHEST_SUPPORTED_VERSION from ZkMigrationClient (#13226)
    
    Do not use HIGHEST_SUPPORTED_VERSION in ZkMigrationClient because
    it will do the wrong thing when more MV options are added in the future.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../main/scala/kafka/zk/ZkMigrationClient.scala    | 40 ++++++++++------------
 .../unit/kafka/zk/ZkMigrationClientTest.scala      |  6 ++--
 2 files changed, 22 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 61b8b2afb51..e13609d7fb5 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
 import org.apache.kafka.metadata.migration.{MigrationClient, ZkMigrationLeadershipState}
-import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
+import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.{CreateMode, KeeperException}
 
@@ -77,9 +77,10 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
     }
   }
 
-  def migrateTopics(metadataVersion: MetadataVersion,
-                    recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
-                    brokerIdConsumer: Consumer[Integer]): Unit = {
+  def migrateTopics(
+    recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
+    brokerIdConsumer: Consumer[Integer]
+  ): Unit = {
     val topics = zkClient.getAllTopicsInCluster()
     val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
     val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
@@ -89,7 +90,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
       val topicBatch = new util.ArrayList[ApiMessageAndVersion]()
       topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
         .setName(topic)
-        .setTopicId(topicIdOpt.get), TopicRecord.HIGHEST_SUPPORTED_VERSION))
+        .setTopicId(topicIdOpt.get), 0.toShort))
 
       partitionAssignments.foreach { case (topicPartition, replicaAssignment) =>
         replicaAssignment.replicas.foreach(brokerIdConsumer.accept(_))
@@ -118,7 +119,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
               .setPartitionEpoch(0)
               .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
         }
-        topicBatch.add(new ApiMessageAndVersion(record, PartitionRecord.HIGHEST_SUPPORTED_VERSION))
+        topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
       }
 
       val props = topicConfigs(topic)
@@ -127,14 +128,13 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
           .setResourceType(ConfigResource.Type.TOPIC.id)
           .setResourceName(topic)
           .setName(key.toString)
-          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+          .setValue(value.toString), 0.toShort))
       }
       recordConsumer.accept(topicBatch)
     }
   }
 
-  def migrateBrokerConfigs(metadataVersion: MetadataVersion,
-                           recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+  def migrateBrokerConfigs(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
     val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
     val batch = new util.ArrayList[ApiMessageAndVersion]()
     zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) =>
@@ -148,7 +148,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
           .setResourceType(ConfigResource.Type.BROKER.id)
           .setResourceName(brokerResource)
           .setName(key.toString)
-          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+          .setValue(value.toString), 0.toShort))
       }
     }
     if (!batch.isEmpty) {
@@ -156,8 +156,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
     }
   }
 
-  def migrateClientQuotas(metadataVersion: MetadataVersion,
-                          recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+  def migrateClientQuotas(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
     val adminZkClient = new AdminZkClient(zkClient)
 
     def migrateEntityType(entityType: String): Unit = {
@@ -168,7 +167,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
           batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
             .setEntity(List(entity).asJava)
             .setKey(key)
-            .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+            .setValue(value), 0.toShort))
         }
         recordConsumer.accept(batch)
       }
@@ -191,7 +190,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
         batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
           .setEntity(entity.asJava)
           .setKey(key)
-          .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+          .setValue(value), 0.toShort))
       }
       recordConsumer.accept(batch)
     }
@@ -199,8 +198,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
     migrateEntityType(ConfigType.Ip)
   }
 
-  def migrateProducerId(metadataVersion: MetadataVersion,
-                        recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+  def migrateProducerId(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
     val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
     dataOpt match {
       case Some(data) =>
@@ -208,17 +206,17 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
         recordConsumer.accept(List(new ApiMessageAndVersion(new ProducerIdsRecord()
           .setBrokerEpoch(-1)
           .setBrokerId(producerIdBlock.assignedBrokerId)
-          .setNextProducerId(producerIdBlock.firstProducerId()), ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
+          .setNextProducerId(producerIdBlock.firstProducerId()), 0.toShort)).asJava)
       case None => // Nothing to migrate
     }
   }
 
   override def readAllMetadata(batchConsumer: Consumer[util.List[ApiMessageAndVersion]],
                                brokerIdConsumer: Consumer[Integer]): Unit = {
-    migrateTopics(MetadataVersion.latest(), batchConsumer, brokerIdConsumer)
-    migrateBrokerConfigs(MetadataVersion.latest(), batchConsumer)
-    migrateClientQuotas(MetadataVersion.latest(), batchConsumer)
-    migrateProducerId(MetadataVersion.latest(), batchConsumer)
+    migrateTopics(batchConsumer, brokerIdConsumer)
+    migrateBrokerConfigs(batchConsumer)
+    migrateClientQuotas(batchConsumer)
+    migrateProducerId(batchConsumer)
   }
 
   override def readBrokerIds(): util.Set[Integer] = {
diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
index 8694f10c14a..77e133a7039 100644
--- a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.quota.ClientQuotaEntity
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
-import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
 import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
@@ -308,7 +308,7 @@ class ZkMigrationClientTest extends QuorumTestHarness {
       manager.generateProducerId()
 
       val records = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
-      migrationClient.migrateProducerId(MetadataVersion.latest(), batch => records.add(batch))
+      migrationClient.migrateProducerId(batch => records.add(batch))
       assertEquals(1, records.size())
       assertEquals(1, records.get(0).size())
 
@@ -336,7 +336,7 @@ class ZkMigrationClientTest extends QuorumTestHarness {
 
     val brokers = new java.util.ArrayList[Integer]()
     val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
-    migrationClient.migrateTopics(MetadataVersion.latest(), batch => batches.add(batch), brokerId => brokers.add(brokerId))
+    migrationClient.migrateTopics(batch => batches.add(batch), brokerId => brokers.add(brokerId))
     assertEquals(1, batches.size())
     val configs = batches.get(0)
       .asScala