You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/02/10 22:41:53 UTC
[kafka] branch trunk updated: MINOR: Remove references to HIGHEST_SUPPORTED_VERSION from ZkMigrationClient (#13226)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b5a9b9d67e7 MINOR: Remove references to HIGHEST_SUPPORTED_VERSION from ZkMigrationClient (#13226)
b5a9b9d67e7 is described below
commit b5a9b9d67e76761b8e8d62a77cf09ece0cc0b717
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri Feb 10 14:41:43 2023 -0800
MINOR: Remove references to HIGHEST_SUPPORTED_VERSION from ZkMigrationClient (#13226)
Do not use HIGHEST_SUPPORTED_VERSION in ZkMigrationClient because
it will do the wrong thing when more MV options are added in the future.
Reviewers: David Arthur <mu...@gmail.com>
---
.../main/scala/kafka/zk/ZkMigrationClient.scala | 40 ++++++++++------------
.../unit/kafka/zk/ZkMigrationClientTest.scala | 6 ++--
2 files changed, 22 insertions(+), 24 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 61b8b2afb51..e13609d7fb5 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.metadata.migration.{MigrationClient, ZkMigrationLeadershipState}
-import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
+import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.{CreateMode, KeeperException}
@@ -77,9 +77,10 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
}
}
- def migrateTopics(metadataVersion: MetadataVersion,
- recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
- brokerIdConsumer: Consumer[Integer]): Unit = {
+ def migrateTopics(
+ recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
+ brokerIdConsumer: Consumer[Integer]
+ ): Unit = {
val topics = zkClient.getAllTopicsInCluster()
val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
@@ -89,7 +90,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
val topicBatch = new util.ArrayList[ApiMessageAndVersion]()
topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
.setName(topic)
- .setTopicId(topicIdOpt.get), TopicRecord.HIGHEST_SUPPORTED_VERSION))
+ .setTopicId(topicIdOpt.get), 0.toShort))
partitionAssignments.foreach { case (topicPartition, replicaAssignment) =>
replicaAssignment.replicas.foreach(brokerIdConsumer.accept(_))
@@ -118,7 +119,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
.setPartitionEpoch(0)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
}
- topicBatch.add(new ApiMessageAndVersion(record, PartitionRecord.HIGHEST_SUPPORTED_VERSION))
+ topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
}
val props = topicConfigs(topic)
@@ -127,14 +128,13 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
.setResourceType(ConfigResource.Type.TOPIC.id)
.setResourceName(topic)
.setName(key.toString)
- .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+ .setValue(value.toString), 0.toShort))
}
recordConsumer.accept(topicBatch)
}
}
- def migrateBrokerConfigs(metadataVersion: MetadataVersion,
- recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+ def migrateBrokerConfigs(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
val batch = new util.ArrayList[ApiMessageAndVersion]()
zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) =>
@@ -148,7 +148,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
.setResourceType(ConfigResource.Type.BROKER.id)
.setResourceName(brokerResource)
.setName(key.toString)
- .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+ .setValue(value.toString), 0.toShort))
}
}
if (!batch.isEmpty) {
@@ -156,8 +156,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
}
}
- def migrateClientQuotas(metadataVersion: MetadataVersion,
- recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+ def migrateClientQuotas(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
val adminZkClient = new AdminZkClient(zkClient)
def migrateEntityType(entityType: String): Unit = {
@@ -168,7 +167,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
.setEntity(List(entity).asJava)
.setKey(key)
- .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+ .setValue(value), 0.toShort))
}
recordConsumer.accept(batch)
}
@@ -191,7 +190,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
.setEntity(entity.asJava)
.setKey(key)
- .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+ .setValue(value), 0.toShort))
}
recordConsumer.accept(batch)
}
@@ -199,8 +198,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
migrateEntityType(ConfigType.Ip)
}
- def migrateProducerId(metadataVersion: MetadataVersion,
- recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+ def migrateProducerId(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
dataOpt match {
case Some(data) =>
@@ -208,17 +206,17 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
recordConsumer.accept(List(new ApiMessageAndVersion(new ProducerIdsRecord()
.setBrokerEpoch(-1)
.setBrokerId(producerIdBlock.assignedBrokerId)
- .setNextProducerId(producerIdBlock.firstProducerId()), ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
+ .setNextProducerId(producerIdBlock.firstProducerId()), 0.toShort)).asJava)
case None => // Nothing to migrate
}
}
override def readAllMetadata(batchConsumer: Consumer[util.List[ApiMessageAndVersion]],
brokerIdConsumer: Consumer[Integer]): Unit = {
- migrateTopics(MetadataVersion.latest(), batchConsumer, brokerIdConsumer)
- migrateBrokerConfigs(MetadataVersion.latest(), batchConsumer)
- migrateClientQuotas(MetadataVersion.latest(), batchConsumer)
- migrateProducerId(MetadataVersion.latest(), batchConsumer)
+ migrateTopics(batchConsumer, brokerIdConsumer)
+ migrateBrokerConfigs(batchConsumer)
+ migrateClientQuotas(batchConsumer)
+ migrateProducerId(batchConsumer)
}
override def readBrokerIds(): util.Set[Integer] = {
diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
index 8694f10c14a..77e133a7039 100644
--- a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
-import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
@@ -308,7 +308,7 @@ class ZkMigrationClientTest extends QuorumTestHarness {
manager.generateProducerId()
val records = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
- migrationClient.migrateProducerId(MetadataVersion.latest(), batch => records.add(batch))
+ migrationClient.migrateProducerId(batch => records.add(batch))
assertEquals(1, records.size())
assertEquals(1, records.get(0).size())
@@ -336,7 +336,7 @@ class ZkMigrationClientTest extends QuorumTestHarness {
val brokers = new java.util.ArrayList[Integer]()
val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
- migrationClient.migrateTopics(MetadataVersion.latest(), batch => batches.add(batch), brokerId => brokers.add(brokerId))
+ migrationClient.migrateTopics(batch => batches.add(batch), brokerId => brokers.add(brokerId))
assertEquals(1, batches.size())
val configs = batches.get(0)
.asScala