You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/05/01 13:56:32 UTC

[kafka] branch trunk updated: KAFKA-14859: SCRAM ZK to KRaft migration with dual write (#13628)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e29942347ac KAFKA-14859: SCRAM ZK to KRaft migration with dual write (#13628)
e29942347ac is described below

commit e29942347acc70aa85d47e84e2021f9c24cd7c80
Author: Proven Provenzano <93...@users.noreply.github.com>
AuthorDate: Mon May 1 09:56:04 2023 -0400

    KAFKA-14859: SCRAM ZK to KRaft migration with dual write (#13628)
    
    Handle migrating SCRAM records in ZK when migrating from ZK to KRaft.
    
    This includes handling writing back SCRAM records to ZK while in dual write mode where metadata updates are written to both the KRaft metadata log and to ZK. This allows for rollback of migration to include SCRAM metadata changes.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 checkstyle/import-control-metadata.xml             |  1 +
 .../main/scala/kafka/zk/ZkMigrationClient.scala    | 18 ++++++
 .../unit/kafka/zk/ZkMigrationClientTest.scala      | 38 ++++++++++++
 .../metadata/migration/KRaftMigrationDriver.java   | 70 +++++++++++++++++++---
 .../kafka/metadata/migration/MigrationClient.java  |  1 +
 .../migration/KRaftMigrationDriverTest.java        |  4 +-
 6 files changed, 124 insertions(+), 8 deletions(-)

diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml
index 8ec45c5941a..92c3225cd73 100644
--- a/checkstyle/import-control-metadata.xml
+++ b/checkstyle/import-control-metadata.xml
@@ -131,6 +131,7 @@
         <allow pkg="org.apache.kafka.common.message" />
         <allow pkg="org.apache.kafka.common.metadata" />
         <allow pkg="org.apache.kafka.common.protocol" />
+        <allow pkg="org.apache.kafka.common.quota" />
         <allow pkg="org.apache.kafka.common.record" />
         <allow pkg="org.apache.kafka.common.resource" />
         <allow pkg="org.apache.kafka.common.requests" />
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 5d2ba5c7a17..54e0170a71f 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -24,6 +24,7 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, ZkAdminM
 import kafka.utils.{Logging, PasswordEncoder}
 import kafka.zk.TopicZNode.TopicIdReplicaAssignment
 import kafka.zookeeper._
+import org.apache.kafka.clients.admin.ScramMechanism
 import org.apache.kafka.common.acl.AccessControlEntry
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.ControllerMovedException
@@ -32,6 +33,7 @@ import org.apache.kafka.common.metadata._
 import org.apache.kafka.common.quota.ClientQuotaEntity
 import org.apache.kafka.common.resource.ResourcePattern
 import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
 import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
 import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
@@ -211,6 +213,20 @@ class ZkMigrationClient(
       adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) =>
         val entity = new EntityData().setEntityType(entityType).setEntityName(name)
         val batch = new util.ArrayList[ApiMessageAndVersion]()
+        ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism =>
+          val propertyValue = props.getProperty(mechanism.mechanismName)
+          if (propertyValue != null) {
+            val scramCredentials =  ScramCredentialUtils.credentialFromString(propertyValue)
+            batch.add(new ApiMessageAndVersion(new UserScramCredentialRecord()
+              .setName(name)
+              .setMechanism(mechanism.`type`)
+              .setSalt(scramCredentials.salt)
+              .setStoredKey(scramCredentials.storedKey)
+              .setServerKey(scramCredentials.serverKey)
+              .setIterations(scramCredentials.iterations), 0.toShort))
+            props.remove(mechanism.mechanismName)
+          }
+        }
         ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
           batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
             .setEntity(List(entity).asJava)
@@ -470,6 +486,7 @@ class ZkMigrationClient(
   override def writeClientQuotas(
     entity: util.Map[String, String],
     quotas: util.Map[String, java.lang.Double],
+    scram: util.Map[String, String],
     state: ZkMigrationLeadershipState
   ): ZkMigrationLeadershipState = wrapZkException {
     val entityMap = entity.asScala
@@ -478,6 +495,7 @@ class ZkMigrationClient(
     val hasIp = entityMap.contains(ClientQuotaEntity.IP)
     val props = new Properties()
     // We store client quota values as strings in the ZK JSON
+    scram.forEach { case (key, value) => props.put(key, value.toString) }
     quotas.forEach { case (key, value) => props.put(key, value.toString) }
     val (configType, path) = if (hasUser && !hasClient) {
       (Some(ConfigType.User), Some(entityMap(ClientQuotaEntity.USER)))
diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
index 1787791448c..8d65004b6e4 100644
--- a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
@@ -32,11 +32,14 @@ import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.kafka.common.metadata.{AccessControlEntryRecord, ConfigRecord, MetadataRecordType, ProducerIdsRecord}
 import org.apache.kafka.common.quota.ClientQuotaEntity
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
+import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
+import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{SecurityUtils, Time}
 import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.util.MockRandom
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
 import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 
@@ -217,11 +220,13 @@ class ZkMigrationClientTest extends QuorumTestHarness {
                                         migrationState: ZkMigrationLeadershipState,
                                         entity: Map[String, String],
                                         quotas: Map[String, java.lang.Double],
+                                        scram: Map[String, String],
                                         zkEntityType: String,
                                         zkEntityName: String): ZkMigrationLeadershipState = {
     val nextMigrationState = migrationClient.writeClientQuotas(
       entity.asJava,
       quotas.asJava,
+      scram.asJava,
       migrationState)
     val newProps = ZkAdminManager.clientQuotaPropsToDoubleMap(
       adminZkClient.fetchEntityConfig(zkEntityType, zkEntityName).asScala)
@@ -241,24 +246,28 @@ class ZkMigrationClientTest extends QuorumTestHarness {
     migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
       Map(ClientQuotaEntity.USER -> "user1"),
       Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0),
+      Map.empty,
       ConfigType.User, "user1")
     assertEquals(1, migrationState.migrationZkVersion())
 
     migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
       Map(ClientQuotaEntity.USER -> "user1"),
       Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0),
+      Map.empty,
       ConfigType.User, "user1")
     assertEquals(2, migrationState.migrationZkVersion())
 
     migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
       Map(ClientQuotaEntity.USER -> "user1"),
       Map.empty,
+      Map.empty,
       ConfigType.User, "user1")
     assertEquals(3, migrationState.migrationZkVersion())
 
     migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
       Map(ClientQuotaEntity.USER -> "user1"),
       Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
+      Map.empty,
       ConfigType.User, "user1")
     assertEquals(4, migrationState.migrationZkVersion())
   }
@@ -269,6 +278,7 @@ class ZkMigrationClientTest extends QuorumTestHarness {
     migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
       Map(ClientQuotaEntity.USER -> "user2"),
       Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
+      Map.empty,
       ConfigType.User, "user2")
 
     assertEquals(1, migrationState.migrationZkVersion())
@@ -276,11 +286,39 @@ class ZkMigrationClientTest extends QuorumTestHarness {
     migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
       Map(ClientQuotaEntity.USER -> "user2", ClientQuotaEntity.CLIENT_ID -> "clientA"),
       Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
+      Map.empty,
       ConfigType.User, "user2/clients/clientA")
 
     assertEquals(2, migrationState.migrationZkVersion())
   }
 
+  @Test
+  def testScram(): Unit = {
+    val random = new MockRandom()
+    def randomBuffer(random: MockRandom, length: Int): Array[Byte] = {
+        val buf = new Array[Byte](length)
+        random.nextBytes(buf)
+        buf
+    }
+    val scramCredential = new ScramCredential(
+        randomBuffer(random, 1024),
+        randomBuffer(random, 1024),
+        randomBuffer(random, 1024),
+        4096)
+
+    val props = new Properties()
+    props.put("SCRAM-SHA-256", ScramCredentialUtils.credentialToString(scramCredential))
+    adminZkClient.changeConfigs(ConfigType.User, "alice", props)
+
+    val brokers = new java.util.ArrayList[Integer]()
+    val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+
+    migrationClient.readAllMetadata(batch => batches.add(batch), brokerId => brokers.add(brokerId))
+    assertEquals(0, brokers.size())
+    assertEquals(1, batches.size())
+    assertEquals(1, batches.get(0).size)
+  }
+
   @Test
   def testClaimAbsentController(): Unit = {
     assertEquals(0, migrationState.migrationZkVersion())
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 44a67b222c8..5a02bbf6c0d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -19,9 +19,11 @@ package org.apache.kafka.metadata.migration;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
 import org.apache.kafka.controller.QuorumFeatures;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
@@ -31,6 +33,7 @@ import org.apache.kafka.image.loader.LoaderManifestType;
 import org.apache.kafka.image.publisher.MetadataPublisher;
 import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.apache.kafka.metadata.ScramCredentialData;
 import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.KafkaEventQueue;
 import org.apache.kafka.raft.LeaderAndEpoch;
@@ -646,19 +649,57 @@ public class KRaftMigrationDriver implements MetadataPublisher {
                     });
                 }
 
-                // For configs and client quotas, we need to send all of the data to the ZK client since we persist
-                // everything for a given entity in a single ZK node.
+                // For configs and client quotas, we need to send all of the data to the ZK
+                // client since we persist everything for a given entity in a single ZK node.
                 if (delta.configsDelta() != null) {
                     delta.configsDelta().changes().forEach((configResource, configDelta) ->
                         apply("Updating config resource " + configResource, migrationState ->
                             zkMigrationClient.writeConfigs(configResource, image.configs().configMapForResource(configResource), migrationState)));
                 }
 
-                if (delta.clientQuotasDelta() != null) {
-                    delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
-                        Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                        apply("Updating client quota " + clientQuotaEntity, migrationState ->
-                            zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, migrationState));
+                if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) {
+                    // A list of users with scram or quota changes
+                    HashSet<String> users = new HashSet<String>();
+
+                    // Populate list with users with scram changes
+                    if (delta.scramDelta() != null) {
+                        delta.scramDelta().changes().forEach((scramMechanism, changes) -> {
+                            changes.forEach((userName, changeOpt) -> users.add(userName));
+                        });
+                    }
+
+                    // Populate list with users with quota changes 
+                    // and apply quota changes to all non user quota changes
+                    if (delta.clientQuotasDelta() != null) {
+                        Map<String, String> scramMap = new HashMap<String, String>();
+                        delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
+
+                            if ((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
+                                (!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {
+                                String userName = clientQuotaEntity.entries().get(ClientQuotaEntity.USER);
+                                // Add clientQuotaEntity to list to process at the end
+                                users.add(userName);
+                            } else {
+                                Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
+                                apply("Updating client quota " + clientQuotaEntity, migrationState -> 
+                                    zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, scramMap, migrationState));
+                            }
+                        });
+                    }
+                    // Update user scram and quota data for each user with changes in either.
+                    users.forEach(userName -> {
+                        Map<String, String> userScramMap = getScramCredentialStringsForUser(userName);
+                        ClientQuotaEntity clientQuotaEntity = new
+                            ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName));
+                        if (image.clientQuotas() == null) {
+                            Map<String, Double> quotaMap = new HashMap<String, Double>();
+                            apply("Updating client quota " + clientQuotaEntity, migrationState ->
+                                zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState));
+                        } else {
+                            Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
+                            apply("Updating client quota " + clientQuotaEntity, migrationState ->
+                                zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState));
+                        }
                     });
                 }
 
@@ -712,6 +753,21 @@ public class KRaftMigrationDriver implements MetadataPublisher {
             completionHandler.accept(null);
         }
 
+        private Map<String, String> getScramCredentialStringsForUser(String userName) {
+            Map<String, String> userScramCredentialStrings = new HashMap<String, String>();
+            if (image.scram() != null) {
+                image.scram().mechanisms().forEach((scramMechanism, scramMechanismMap) -> {
+                    ScramCredentialData scramCredentialData = scramMechanismMap.get(userName);
+                    if (scramCredentialData != null) {
+                        userScramCredentialStrings.put(scramMechanism.mechanismName(),
+                            ScramCredentialUtils.credentialToString(
+                                scramCredentialData.toCredential(scramMechanism)));
+                    }
+                });
+            }
+            return userScramCredentialStrings;
+        }
+
         private void addStandardAclToMap(Map<ResourcePattern, List<AccessControlEntry>> aclMap, StandardAcl acl) {
             ResourcePattern resource = new ResourcePattern(acl.resourceType(), acl.resourceName(), acl.patternType());
             aclMap.computeIfAbsent(resource, __ -> new ArrayList<>()).add(
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
index 3af3cd10959..8ed1cd2e7ae 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
@@ -96,6 +96,7 @@ public interface MigrationClient {
     ZkMigrationLeadershipState writeClientQuotas(
         Map<String, String> clientQuotaEntity,
         Map<String, Double> quotas,
+        Map<String, String> scram,
         ZkMigrationLeadershipState state
     );
 
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index 4bf58f99cc2..d4e7db6dd91 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -183,6 +183,7 @@ public class KRaftMigrationDriverTest {
         public ZkMigrationLeadershipState writeClientQuotas(
             Map<String, String> clientQuotaEntity,
             Map<String, Double> quotas,
+            Map<String, String> scram,
             ZkMigrationLeadershipState state
         ) {
             this.state = state;
@@ -533,4 +534,5 @@ public class KRaftMigrationDriverTest {
                 "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
         }
     }
-}
\ No newline at end of file
+}
+