You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/05/31 16:52:46 UTC
[kafka] branch 3.5 updated: KAFKA-15004: Fix configuration dual-write during migration (#13767)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.5 by this push:
new ddff9d5ecc3 KAFKA-15004: Fix configuration dual-write during migration (#13767)
ddff9d5ecc3 is described below
commit ddff9d5ecc32b6f3e8a4ad4505544c5e6284c683
Author: David Arthur <mu...@gmail.com>
AuthorDate: Sat May 27 17:20:44 2023 -0400
KAFKA-15004: Fix configuration dual-write during migration (#13767)
This patch fixes several small bugs with configuration dual-write during migration.
* Topic configs are not written back to ZK while handling snapshot.
* New broker/topic configs in KRaft that did not exist in ZK will not be written to ZK.
* The sensitive configs are not encoded while writing them to Zookeeper.
* Handle topic configs in ConfigMigrationClient and KRaftMigrationZkWriter#handleConfigsSnapshot
Added tests to ensure we no longer have the above mentioned issues.
Co-authored-by: Akhilesh Chaganti <ak...@users.noreply.github.com>
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../main/scala/kafka/zk/ZkMigrationClient.scala | 72 ++++++------
.../zk/migration/ZkConfigMigrationClient.scala | 32 +++++-
.../zk/migration/ZkTopicMigrationClient.scala | 7 --
.../zk/migration/ZkConfigMigrationClientTest.scala | 18 +++
.../kafka/zk/migration/ZkMigrationClientTest.scala | 125 +++++++++++++++++++--
.../zk/migration/ZkMigrationTestHarness.scala | 2 +
.../metadata/migration/ConfigMigrationClient.java | 5 +
.../metadata/migration/KRaftMigrationDriver.java | 2 +-
.../metadata/migration/KRaftMigrationZkWriter.java | 48 ++++++--
.../metadata/migration/TopicMigrationClient.java | 7 +-
.../migration/CapturingConfigMigrationClient.java | 11 ++
11 files changed, 257 insertions(+), 72 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index f3e21e72844..e94f435d71b 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -37,7 +37,6 @@ import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.{AuthFailedException, NoAuthException, SessionClosedRequireAuthException}
import java.{lang, util}
-import java.util.Properties
import java.util.function.Consumer
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@@ -145,44 +144,47 @@ class ZkMigrationClient(
topicClient.iterateTopics(
util.EnumSet.allOf(classOf[TopicVisitorInterest]),
new TopicVisitor() {
- override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {
- if (!topicBatch.isEmpty) {
- recordConsumer.accept(topicBatch)
- topicBatch = new util.ArrayList[ApiMessageAndVersion]()
- }
+ override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {
+ if (!topicBatch.isEmpty) {
+ recordConsumer.accept(topicBatch)
+ topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+ }
- topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
- .setName(topicName)
- .setTopicId(topicId), 0.toShort))
- }
+ topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+ .setName(topicName)
+ .setTopicId(topicId), 0.toShort))
- override def visitPartition(topicIdPartition: TopicIdPartition, partitionRegistration: PartitionRegistration): Unit = {
- val record = new PartitionRecord()
- .setTopicId(topicIdPartition.topicId())
- .setPartitionId(topicIdPartition.partition())
- .setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
- .setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
- .setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
- .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
- .setLeader(partitionRegistration.leader)
- .setLeaderEpoch(partitionRegistration.leaderEpoch)
- .setPartitionEpoch(partitionRegistration.partitionEpoch)
- .setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
- partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
- partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
- topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
- }
+ // This breaks the abstraction a bit, but the topic configs belong in the topic batch
+ // when migrating topics and the logic for reading configs lives elsewhere
+ configClient.readTopicConfigs(topicName, (topicConfigs: util.Map[String, String]) => {
+ topicConfigs.forEach((key: Any, value: Any) => {
+ topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
+ .setResourceType(ConfigResource.Type.TOPIC.id)
+ .setResourceName(topicName)
+ .setName(key.toString)
+ .setValue(value.toString), 0.toShort))
+ })
+ })
+ }
- override def visitConfigs(topicName: String, topicProps: Properties): Unit = {
- topicProps.forEach((key: Any, value: Any) => {
- topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
- .setResourceType(ConfigResource.Type.TOPIC.id)
- .setResourceName(topicName)
- .setName(key.toString)
- .setValue(value.toString), 0.toShort))
- })
+ override def visitPartition(topicIdPartition: TopicIdPartition, partitionRegistration: PartitionRegistration): Unit = {
+ val record = new PartitionRecord()
+ .setTopicId(topicIdPartition.topicId())
+ .setPartitionId(topicIdPartition.partition())
+ .setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
+ .setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
+ .setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
+ .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
+ .setLeader(partitionRegistration.leader)
+ .setLeaderEpoch(partitionRegistration.leaderEpoch)
+ .setPartitionEpoch(partitionRegistration.partitionEpoch)
+ .setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
+ partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
+ partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
+ topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
+ }
}
- })
+ )
if (!topicBatch.isEmpty) {
recordConsumer.accept(topicBatch)
diff --git a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
index dbcc1d99b93..55fb048e686 100644
--- a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
@@ -23,6 +23,7 @@ import kafka.zk.ZkMigrationClient.{logAndRethrow, wrapZkException}
import kafka.zk._
import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest}
import org.apache.kafka.clients.admin.ScramMechanism
+import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
@@ -35,7 +36,7 @@ import org.apache.zookeeper.{CreateMode, KeeperException}
import java.{lang, util}
import java.util.Properties
-import java.util.function.BiConsumer
+import java.util.function.{BiConsumer, Consumer}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@@ -145,6 +146,28 @@ class ZkConfigMigrationClient(
}
}
+ override def iterateTopicConfigs(configConsumer: BiConsumer[String, util.Map[String, String]]): Unit = {
+ val topicEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Topic)
+ topicEntities.foreach { topic =>
+ readTopicConfigs(topic, props => configConsumer.accept(topic, props))
+ }
+ }
+
+ override def readTopicConfigs(topicName: String, configConsumer: Consumer[util.Map[String, String]]): Unit = {
+ val topicResource = fromZkEntityName(topicName)
+ val props = zkClient.getEntityConfigs(ConfigType.Topic, topicResource)
+ val decodedProps = props.asScala.map { case (key, value) =>
+ if (DynamicBrokerConfig.isPasswordConfig(key))
+ key -> passwordEncoder.decode(value).value
+ else
+ key -> value
+ }.toMap.asJava
+
+ logAndRethrow(this, s"Error in topic config consumer. Topic was $topicResource.") {
+ configConsumer.accept(decodedProps)
+ }
+ }
+
override def writeConfigs(
configResource: ConfigResource,
configMap: util.Map[String, String],
@@ -159,7 +182,12 @@ class ZkConfigMigrationClient(
val configName = toZkEntityName(configResource.name())
if (configType.isDefined) {
val props = new Properties()
- configMap.forEach { case (key, value) => props.put(key, value) }
+ configMap.forEach { case (key, value) =>
+ if (DynamicBrokerConfig.isPasswordConfig(key)) {
+ props.put(key, passwordEncoder.encode(new Password(value)))
+ } else
+ props.put(key, value)
+ }
tryWriteEntityConfig(configType.get, configName, props, create = false, state) match {
case Some(newState) =>
newState
diff --git a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala
index a51b7c808c2..37ceef13300 100644
--- a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala
@@ -48,7 +48,6 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
throw new IllegalArgumentException("Must specify at least TOPICS in topic visitor interests.")
}
val topics = zkClient.getAllTopicsInCluster()
- val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) =>
val topicAssignment = partitionAssignments.map { case (partition, assignment) =>
@@ -91,12 +90,6 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
}
}
}
- if (interests.contains(TopicVisitorInterest.CONFIGS)) {
- val props = topicConfigs(topic)
- logAndRethrow(this, s"Error in topic config consumer. Topic was $topic.") {
- visitor.visitConfigs(topic, props)
- }
- }
}
}
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
index 7313c321fe3..b9d86be25c4 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.server.util.MockRandom
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test
+import java.util
import java.util.Properties
import scala.collection.Map
import scala.jdk.CollectionConverters._
@@ -68,6 +69,23 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
}
})
+ // Update the sensitive config value from the config client and check that the value
+ // persisted in Zookeeper is encrypted.
+ val newProps = new util.HashMap[String, String]()
+ newProps.put(KafkaConfig.DefaultReplicationFactorProp, "2") // normal config
+ newProps.put(KafkaConfig.SslKeystorePasswordProp, NEW_SECRET) // sensitive config
+ migrationState = migrationClient.configClient().writeConfigs(
+ new ConfigResource(ConfigResource.Type.BROKER, "1"), newProps, migrationState)
+ val actualPropsInZk = zkClient.getEntityConfigs(ConfigType.Broker, "1")
+ assertEquals(2, actualPropsInZk.size())
+ actualPropsInZk.forEach { case (key, value) =>
+ if (key == KafkaConfig.SslKeystorePasswordProp) {
+ assertEquals(NEW_SECRET, encoder.decode(value.toString).value)
+ } else {
+ assertEquals(newProps.get(key), value)
+ }
+ }
+
migrationState = migrationClient.configClient().deleteConfigs(
new ConfigResource(ConfigResource.Type.BROKER, "1"), migrationState)
assertEquals(0, zkClient.getEntityConfigs(ConfigType.Broker, "1").size())
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
index 7d7dae9f893..22447ee0e17 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
@@ -17,14 +17,15 @@
package kafka.zk.migration
import kafka.api.LeaderAndIsr
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
import kafka.coordinator.transaction.ProducerIdManager
-import kafka.zk.migration.ZkMigrationTestHarness
-import org.apache.kafka.common.config.TopicConfig
+import kafka.server.{ConfigType, KafkaConfig}
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.ControllerMovedException
-import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType, ProducerIdsRecord}
+import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType, PartitionRecord, ProducerIdsRecord, TopicRecord}
import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
+import org.apache.kafka.metadata.migration.{KRaftMigrationZkWriter, ZkMigrationLeadershipState}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
@@ -252,11 +253,115 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
.map {_.message() }
.filter(message => MetadataRecordType.fromId(message.apiKey()).equals(MetadataRecordType.CONFIG_RECORD))
.map { _.asInstanceOf[ConfigRecord] }
- .toSeq
+ .map { record => record.name() -> record.value()}
+ .toMap
assertEquals(2, configs.size)
- assertEquals(TopicConfig.FLUSH_MS_CONFIG, configs.head.name())
- assertEquals("60000", configs.head.value())
- assertEquals(TopicConfig.RETENTION_MS_CONFIG, configs.last.name())
- assertEquals("300000", configs.last.value())
+ assertTrue(configs.contains(TopicConfig.FLUSH_MS_CONFIG))
+ assertEquals("60000", configs(TopicConfig.FLUSH_MS_CONFIG))
+ assertTrue(configs.contains(TopicConfig.RETENTION_MS_CONFIG))
+ assertEquals("300000", configs(TopicConfig.RETENTION_MS_CONFIG))
+ }
+
+ @Test
+ def testTopicAndBrokerConfigsMigrationWithSnapshots(): Unit = {
+ val kraftWriter = new KRaftMigrationZkWriter(migrationClient, (_, operation) => {
+ migrationState = operation.apply(migrationState)
+ })
+
+ // Add add some topics and broker configs and create new image.
+ val topicName = "testTopic"
+ val partition = 0
+ val tp = new TopicPartition(topicName, partition)
+ val leaderPartition = 1
+ val leaderEpoch = 100
+ val partitionEpoch = 10
+ val brokerId = "1"
+ val replicas = List(1, 2, 3).map(int2Integer).asJava
+ val topicId = Uuid.randomUuid()
+ val props = new Properties()
+ props.put(KafkaConfig.DefaultReplicationFactorProp, "1") // normal config
+ props.put(KafkaConfig.SslKeystorePasswordProp, SECRET) // sensitive config
+
+ // // Leave Zk in an incomplete state.
+ // zkClient.createTopicAssignment(topicName, Some(topicId), Map(tp -> Seq(1)))
+
+ val delta = new MetadataDelta(MetadataImage.EMPTY)
+ delta.replay(new TopicRecord()
+ .setTopicId(topicId)
+ .setName(topicName)
+ )
+ delta.replay(new PartitionRecord()
+ .setTopicId(topicId)
+ .setIsr(replicas)
+ .setLeader(leaderPartition)
+ .setReplicas(replicas)
+ .setAddingReplicas(List.empty.asJava)
+ .setRemovingReplicas(List.empty.asJava)
+ .setLeaderEpoch(leaderEpoch)
+ .setPartitionEpoch(partitionEpoch)
+ .setPartitionId(partition)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
+ )
+ // Use same props for the broker and topic.
+ props.asScala.foreach { case (key, value) =>
+ delta.replay(new ConfigRecord()
+ .setName(key)
+ .setValue(value)
+ .setResourceName(topicName)
+ .setResourceType(ConfigResource.Type.TOPIC.id())
+ )
+ delta.replay(new ConfigRecord()
+ .setName(key)
+ .setValue(value)
+ .setResourceName(brokerId)
+ .setResourceType(ConfigResource.Type.BROKER.id())
+ )
+ }
+ val image = delta.apply(MetadataProvenance.EMPTY)
+
+ // Handle migration using the generated snapshot.
+ kraftWriter.handleSnapshot(image)
+
+ // Verify topic state.
+ val topicIdReplicaAssignment =
+ zkClient.getReplicaAssignmentAndTopicIdForTopics(Set(topicName))
+ assertEquals(1, topicIdReplicaAssignment.size)
+ topicIdReplicaAssignment.foreach { assignment =>
+ assertEquals(topicName, assignment.topic)
+ assertEquals(Some(topicId), assignment.topicId)
+ assertEquals(Map(tp -> ReplicaAssignment(replicas.asScala.map(Integer2int).toSeq)),
+ assignment.assignment)
+ }
+
+ // Verify the topic partition states.
+ val topicPartitionState = zkClient.getTopicPartitionState(tp)
+ assertTrue(topicPartitionState.isDefined)
+ topicPartitionState.foreach { state =>
+ assertEquals(leaderPartition, state.leaderAndIsr.leader)
+ assertEquals(leaderEpoch, state.leaderAndIsr.leaderEpoch)
+ assertEquals(LeaderRecoveryState.RECOVERED, state.leaderAndIsr.leaderRecoveryState)
+ assertEquals(replicas.asScala.map(Integer2int).toList, state.leaderAndIsr.isr)
+ }
+
+ // Verify the broker and topic configs (including sensitive configs).
+ val brokerProps = zkClient.getEntityConfigs(ConfigType.Broker, brokerId)
+ val topicProps = zkClient.getEntityConfigs(ConfigType.Topic, topicName)
+ assertEquals(2, brokerProps.size())
+
+ brokerProps.asScala.foreach { case (key, value) =>
+ if (key == KafkaConfig.SslKeystorePasswordProp) {
+ assertEquals(SECRET, encoder.decode(value).value)
+ } else {
+ assertEquals(props.getProperty(key), value)
+ }
+ }
+
+ topicProps.asScala.foreach { case (key, value) =>
+ if (key == KafkaConfig.SslKeystorePasswordProp) {
+ assertEquals(SECRET, encoder.decode(value).value)
+ } else {
+ assertEquals(props.getProperty(key), value)
+ }
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
index 321903ed9ce..d04798542ea 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
@@ -36,6 +36,8 @@ class ZkMigrationTestHarness extends QuorumTestHarness {
val SECRET = "secret"
+ val NEW_SECRET = "newSecret"
+
val encoder: PasswordEncoder = {
val encoderProps = new Properties()
encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get around the config validation
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ConfigMigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ConfigMigrationClient.java
index 9a7b486ff11..57f63fb3cce 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ConfigMigrationClient.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ConfigMigrationClient.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.security.scram.ScramCredential;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
+import java.util.function.Consumer;
public interface ConfigMigrationClient {
@@ -38,6 +39,10 @@ public interface ConfigMigrationClient {
void iterateBrokerConfigs(BiConsumer<String, Map<String, String>> configConsumer);
+ void iterateTopicConfigs(BiConsumer<String, Map<String, String>> configConsumer);
+
+ void readTopicConfigs(String topicName, Consumer<Map<String, String>> configConsumer);
+
ZkMigrationLeadershipState writeConfigs(
ConfigResource configResource,
Map<String, String> configMap,
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 213d86ca4e8..3706c8d3617 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -154,7 +154,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
String maybeDone = migrationLeadershipState.zkMigrationComplete() ? "done" : "not done";
log.info("Recovered migration state {}. ZK migration is {}.", migrationLeadershipState, maybeDone);
- // Once we've recovered the migration state from ZK, install this class as a metadata published
+ // Once we've recovered the migration state from ZK, install this class as a metadata publisher
// by calling the initialZkLoadHandler.
initialZkLoadHandler.accept(this);
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
index fc4d335b36e..bec062f5c98 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -98,7 +98,8 @@ public class KRaftMigrationZkWriter {
/**
* Handle a snapshot of the topic metadata. This requires scanning through all the topics and partitions
- * in ZooKeeper to determine what has changed.
+ * in ZooKeeper to determine what has changed. Topic configs are not handled here since they exist in the
+ * ConfigurationsImage.
*/
void handleTopicsSnapshot(TopicsImage topicsImage) {
Map<Uuid, String> deletedTopics = new HashMap<>();
@@ -194,23 +195,48 @@ public class KRaftMigrationZkWriter {
}
void handleConfigsSnapshot(ConfigurationsImage configsImage) {
- Set<ConfigResource> brokersToUpdate = new HashSet<>();
+ Set<ConfigResource> newResources = new HashSet<>();
+ configsImage.resourceData().keySet().forEach(resource -> {
+ if (EnumSet.of(ConfigResource.Type.BROKER, ConfigResource.Type.TOPIC).contains(resource.type())) {
+ newResources.add(resource);
+ } else {
+ throw new RuntimeException("Unknown config resource type " + resource.type());
+ }
+ });
+ Set<ConfigResource> resourcesToUpdate = new HashSet<>();
+ BiConsumer<ConfigResource, Map<String, String>> processConfigsForResource = (ConfigResource resource, Map<String, String> configs) -> {
+ newResources.remove(resource);
+ Map<String, String> kraftProps = configsImage.configMapForResource(resource);
+ if (!kraftProps.equals(configs)) {
+ resourcesToUpdate.add(resource);
+ }
+ };
+
migrationClient.configClient().iterateBrokerConfigs((broker, configs) -> {
ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, broker);
- Map<String, String> kraftProps = configsImage.configMapForResource(brokerResource);
- if (!kraftProps.equals(configs)) {
- brokersToUpdate.add(brokerResource);
+ processConfigsForResource.accept(brokerResource, configs);
+ });
+ migrationClient.configClient().iterateTopicConfigs((topic, configs) -> {
+ ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
+ processConfigsForResource.accept(topicResource, configs);
+ });
+
+ newResources.forEach(resource -> {
+ Map<String, String> props = configsImage.configMapForResource(resource);
+ if (!props.isEmpty()) {
+ operationConsumer.accept("Create configs for " + resource.type().name() + " " + resource.name(),
+ migrationState -> migrationClient.configClient().writeConfigs(resource, props, migrationState));
}
});
- brokersToUpdate.forEach(brokerResource -> {
- Map<String, String> props = configsImage.configMapForResource(brokerResource);
+ resourcesToUpdate.forEach(resource -> {
+ Map<String, String> props = configsImage.configMapForResource(resource);
if (props.isEmpty()) {
- operationConsumer.accept("Delete configs for broker " + brokerResource.name(), migrationState ->
- migrationClient.configClient().deleteConfigs(brokerResource, migrationState));
+ operationConsumer.accept("Delete configs for " + resource.type().name() + " " + resource.name(),
+ migrationState -> migrationClient.configClient().deleteConfigs(resource, migrationState));
} else {
- operationConsumer.accept("Update configs for broker " + brokerResource.name(), migrationState ->
- migrationClient.configClient().writeConfigs(brokerResource, props, migrationState));
+ operationConsumer.accept("Update configs for " + resource.type().name() + " " + resource.name(),
+ migrationState -> migrationClient.configClient().writeConfigs(resource, props, migrationState));
}
});
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java
index e373d066f2c..e2fac2f1e5c 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java
@@ -24,23 +24,18 @@ import org.apache.kafka.metadata.PartitionRegistration;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
public interface TopicMigrationClient {
enum TopicVisitorInterest {
TOPICS,
- PARTITIONS,
- CONFIGS
+ PARTITIONS
}
interface TopicVisitor {
void visitTopic(String topicName, Uuid topicId, Map<Integer, List<Integer>> assignments);
default void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistration partitionRegistration) {
- }
- default void visitConfigs(String topicName, Properties topicProps) {
-
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingConfigMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingConfigMigrationClient.java
index 19ed12a381b..6c1393dbc3e 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingConfigMigrationClient.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingConfigMigrationClient.java
@@ -24,6 +24,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
+import java.util.function.Consumer;
public class CapturingConfigMigrationClient implements ConfigMigrationClient {
public List<ConfigResource> deletedResources = new ArrayList<>();
@@ -44,6 +45,16 @@ public class CapturingConfigMigrationClient implements ConfigMigrationClient {
}
+ @Override
+ public void iterateTopicConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+
+ }
+
+ @Override
+ public void readTopicConfigs(String topicName, Consumer<Map<String, String>> configConsumer) {
+
+ }
+
@Override
public ZkMigrationLeadershipState writeConfigs(ConfigResource configResource, Map<String, String> configMap, ZkMigrationLeadershipState state) {
writtenConfigs.put(configResource, configMap);