You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "pprovenzano (via GitHub)" <gi...@apache.org> on 2023/04/24 02:23:44 UTC

[GitHub] [kafka] pprovenzano opened a new pull request, #13628: KAFKA-14859: SCRAM ZK to KRaft migration with dual write

pprovenzano opened a new pull request, #13628:
URL: https://github.com/apache/kafka/pull/13628

   Handle migrating SCRAM records in ZK when migrating from ZK to KRaft.
   
   This includes handling writing back SCRAM records to ZK while in dual write mode where metadata updates are written to both the KRaft metadata log and to ZK. This allows for rollback of migration to include SCRAM metadata changes.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13628: KAFKA-14859: SCRAM ZK to KRaft migration with dual write

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13628:
URL: https://github.com/apache/kafka/pull/13628#discussion_r1179299560


##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -211,6 +213,20 @@ class ZkMigrationClient(
       adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) =>
         val entity = new EntityData().setEntityType(entityType).setEntityName(name)
         val batch = new util.ArrayList[ApiMessageAndVersion]()
+        ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism =>
+          val propertyValue = props.getProperty(mechanism.mechanismName)
+          if (propertyValue != null) {

Review Comment:
   Should we log something if we get a null value here?



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -568,19 +571,58 @@ public void run() throws Exception {
                     });
                 }
 
-                // For configs and client quotas, we need to send all of the data to the ZK client since we persist
-                // everything for a given entity in a single ZK node.
+                // For configs and client quotas, we need to send all of the data to the ZK
+                // client since we persist everything for a given entity in a single ZK node.
                 if (delta.configsDelta() != null) {
                     delta.configsDelta().changes().forEach((configResource, configDelta) ->
                         apply("Updating config resource " + configResource, migrationState ->
                             zkMigrationClient.writeConfigs(configResource, image.configs().configMapForResource(configResource), migrationState)));
                 }
 
-                if (delta.clientQuotasDelta() != null) {
-                    delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
-                        Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                        apply("Updating client quota " + clientQuotaEntity, migrationState ->
-                            zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, migrationState));
+                if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) {
+
+                    // A list of users with scram or quota changes
+                    HashSet<String> users = new HashSet<String>();
+
+                    // Populate list with users with scram changes
+                    if (delta.scramDelta() != null) {
+                        delta.scramDelta().changes().forEach((scramMechanism, changes) -> {
+                            changes.forEach((userName, changeOpt) -> users.add(userName));
+                        });
+                    }
+
+                    // Populate list with users with quota changes 
+                    // and apply quota changes to all non user quota changes
+                    if (delta.clientQuotasDelta() != null) {
+                        Map<String, String> scramMap = new HashMap<String, String>();
+                        delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
+
+                            if ((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
+                                (!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {
+                                String userName = clientQuotaEntity.entries().get(ClientQuotaEntity.USER);
+                                // Add clientQuotaEntity to list to process at the end
+                                users.add(userName);
+                            } else {
+                                Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
+                                apply("Updating client quota " + clientQuotaEntity, migrationState -> 
+                                    zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, scramMap, migrationState));
+                            }
+                        });
+                    }
+                    // Updateuser scram and quota data for each user with changes in either.

Review Comment:
   typo: Update user



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -568,19 +571,58 @@ public void run() throws Exception {
                     });
                 }
 
-                // For configs and client quotas, we need to send all of the data to the ZK client since we persist
-                // everything for a given entity in a single ZK node.
+                // For configs and client quotas, we need to send all of the data to the ZK
+                // client since we persist everything for a given entity in a single ZK node.
                 if (delta.configsDelta() != null) {
                     delta.configsDelta().changes().forEach((configResource, configDelta) ->
                         apply("Updating config resource " + configResource, migrationState ->
                             zkMigrationClient.writeConfigs(configResource, image.configs().configMapForResource(configResource), migrationState)));
                 }
 
-                if (delta.clientQuotasDelta() != null) {
-                    delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
-                        Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                        apply("Updating client quota " + clientQuotaEntity, migrationState ->
-                            zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, migrationState));
+                if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) {
+
+                    // A list of users with scram or quota changes
+                    HashSet<String> users = new HashSet<String>();
+
+                    // Populate list with users with scram changes
+                    if (delta.scramDelta() != null) {
+                        delta.scramDelta().changes().forEach((scramMechanism, changes) -> {
+                            changes.forEach((userName, changeOpt) -> users.add(userName));
+                        });
+                    }
+
+                    // Populate list with users with quota changes 
+                    // and apply quota changes to all non user quota changes
+                    if (delta.clientQuotasDelta() != null) {
+                        Map<String, String> scramMap = new HashMap<String, String>();
+                        delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
+
+                            if ((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
+                                (!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {
+                                String userName = clientQuotaEntity.entries().get(ClientQuotaEntity.USER);
+                                // Add clientQuotaEntity to list to process at the end
+                                users.add(userName);
+                            } else {
+                                Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
+                                apply("Updating client quota " + clientQuotaEntity, migrationState -> 
+                                    zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, scramMap, migrationState));
+                            }
+                        });
+                    }
+                    // Updateuser scram and quota data for each user with changes in either.
+                    users.forEach(userName -> {
+                        Map<String, String> userscramMap = getScramCredentialStringsForUser(userName);

Review Comment:
   nit: userScramMap



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -568,19 +571,58 @@ public void run() throws Exception {
                     });
                 }
 
-                // For configs and client quotas, we need to send all of the data to the ZK client since we persist
-                // everything for a given entity in a single ZK node.
+                // For configs and client quotas, we need to send all of the data to the ZK
+                // client since we persist everything for a given entity in a single ZK node.
                 if (delta.configsDelta() != null) {
                     delta.configsDelta().changes().forEach((configResource, configDelta) ->
                         apply("Updating config resource " + configResource, migrationState ->
                             zkMigrationClient.writeConfigs(configResource, image.configs().configMapForResource(configResource), migrationState)));
                 }
 
-                if (delta.clientQuotasDelta() != null) {
-                    delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
-                        Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                        apply("Updating client quota " + clientQuotaEntity, migrationState ->
-                            zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, migrationState));
+                if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) {
+
+                    // A list of users with scram or quota changes
+                    HashSet<String> users = new HashSet<String>();
+
+                    // Populate list with users with scram changes
+                    if (delta.scramDelta() != null) {
+                        delta.scramDelta().changes().forEach((scramMechanism, changes) -> {
+                            changes.forEach((userName, changeOpt) -> users.add(userName));
+                        });
+                    }
+
+                    // Populate list with users with quota changes 
+                    // and apply quota changes to all non user quota changes
+                    if (delta.clientQuotasDelta() != null) {
+                        Map<String, String> scramMap = new HashMap<String, String>();
+                        delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
+
+                            if ((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
+                                (!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {

Review Comment:
   Just so I understand -- we exclude entities like `/user/userA/client/clientX` because we don't store SCRAM credentials for user+client entities? We only store SCRAM for plain user entities?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah merged pull request #13628: KAFKA-14859: SCRAM ZK to KRaft migration with dual write

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah merged PR #13628:
URL: https://github.com/apache/kafka/pull/13628


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] pprovenzano commented on a diff in pull request #13628: KAFKA-14859: SCRAM ZK to KRaft migration with dual write

Posted by "pprovenzano (via GitHub)" <gi...@apache.org>.
pprovenzano commented on code in PR #13628:
URL: https://github.com/apache/kafka/pull/13628#discussion_r1179574301


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -568,19 +571,58 @@ public void run() throws Exception {
                     });
                 }
 
-                // For configs and client quotas, we need to send all of the data to the ZK client since we persist
-                // everything for a given entity in a single ZK node.
+                // For configs and client quotas, we need to send all of the data to the ZK
+                // client since we persist everything for a given entity in a single ZK node.
                 if (delta.configsDelta() != null) {
                     delta.configsDelta().changes().forEach((configResource, configDelta) ->
                         apply("Updating config resource " + configResource, migrationState ->
                             zkMigrationClient.writeConfigs(configResource, image.configs().configMapForResource(configResource), migrationState)));
                 }
 
-                if (delta.clientQuotasDelta() != null) {
-                    delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
-                        Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                        apply("Updating client quota " + clientQuotaEntity, migrationState ->
-                            zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, migrationState));
+                if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) {
+
+                    // A list of users with scram or quota changes
+                    HashSet<String> users = new HashSet<String>();
+
+                    // Populate list with users with scram changes
+                    if (delta.scramDelta() != null) {
+                        delta.scramDelta().changes().forEach((scramMechanism, changes) -> {
+                            changes.forEach((userName, changeOpt) -> users.add(userName));
+                        });
+                    }
+
+                    // Populate list with users with quota changes 
+                    // and apply quota changes to all non user quota changes
+                    if (delta.clientQuotasDelta() != null) {
+                        Map<String, String> scramMap = new HashMap<String, String>();
+                        delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
+
+                            if ((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
+                                (!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {
+                                String userName = clientQuotaEntity.entries().get(ClientQuotaEntity.USER);
+                                // Add clientQuotaEntity to list to process at the end
+                                users.add(userName);
+                            } else {
+                                Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
+                                apply("Updating client quota " + clientQuotaEntity, migrationState -> 
+                                    zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, scramMap, migrationState));
+                            }
+                        });
+                    }
+                    // Updateuser scram and quota data for each user with changes in either.
+                    users.forEach(userName -> {
+                        Map<String, String> userscramMap = getScramCredentialStringsForUser(userName);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] pprovenzano commented on a diff in pull request #13628: KAFKA-14859: SCRAM ZK to KRaft migration with dual write

Posted by "pprovenzano (via GitHub)" <gi...@apache.org>.
pprovenzano commented on code in PR #13628:
URL: https://github.com/apache/kafka/pull/13628#discussion_r1179529283


##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -211,6 +213,20 @@ class ZkMigrationClient(
       adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) =>
         val entity = new EntityData().setEntityType(entityType).setEntityName(name)
         val batch = new util.ArrayList[ApiMessageAndVersion]()
+        ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism =>
+          val propertyValue = props.getProperty(mechanism.mechanismName)
+          if (propertyValue != null) {

Review Comment:
   No. It just means that there are no scram records for the user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] pprovenzano commented on a diff in pull request #13628: KAFKA-14859: SCRAM ZK to KRaft migration with dual write

Posted by "pprovenzano (via GitHub)" <gi...@apache.org>.
pprovenzano commented on code in PR #13628:
URL: https://github.com/apache/kafka/pull/13628#discussion_r1179574482


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -568,19 +571,58 @@ public void run() throws Exception {
                     });
                 }
 
-                // For configs and client quotas, we need to send all of the data to the ZK client since we persist
-                // everything for a given entity in a single ZK node.
+                // For configs and client quotas, we need to send all of the data to the ZK
+                // client since we persist everything for a given entity in a single ZK node.
                 if (delta.configsDelta() != null) {
                     delta.configsDelta().changes().forEach((configResource, configDelta) ->
                         apply("Updating config resource " + configResource, migrationState ->
                             zkMigrationClient.writeConfigs(configResource, image.configs().configMapForResource(configResource), migrationState)));
                 }
 
-                if (delta.clientQuotasDelta() != null) {
-                    delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
-                        Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                        apply("Updating client quota " + clientQuotaEntity, migrationState ->
-                            zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, migrationState));
+                if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) {
+
+                    // A list of users with scram or quota changes
+                    HashSet<String> users = new HashSet<String>();
+
+                    // Populate list with users with scram changes
+                    if (delta.scramDelta() != null) {
+                        delta.scramDelta().changes().forEach((scramMechanism, changes) -> {
+                            changes.forEach((userName, changeOpt) -> users.add(userName));
+                        });
+                    }
+
+                    // Populate list with users with quota changes 
+                    // and apply quota changes to all non user quota changes
+                    if (delta.clientQuotasDelta() != null) {
+                        Map<String, String> scramMap = new HashMap<String, String>();
+                        delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
+
+                            if ((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
+                                (!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {
+                                String userName = clientQuotaEntity.entries().get(ClientQuotaEntity.USER);
+                                // Add clientQuotaEntity to list to process at the end
+                                users.add(userName);
+                            } else {
+                                Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
+                                apply("Updating client quota " + clientQuotaEntity, migrationState -> 
+                                    zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, scramMap, migrationState));
+                            }
+                        });
+                    }
+                    // Updateuser scram and quota data for each user with changes in either.

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on pull request #13628: KAFKA-14859: SCRAM ZK to KRaft migration with dual write

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on PR #13628:
URL: https://github.com/apache/kafka/pull/13628#issuecomment-1529733999

   Two test failures look unrelated 
   
   * kafka.api.PlaintextConsumerTest.testSubsequentPatternSubscription()
   * org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplicateSourceDefault()
   
   JDK 8 and Scala 2.12 without any failures


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] pprovenzano commented on a diff in pull request #13628: KAFKA-14859: SCRAM ZK to KRaft migration with dual write

Posted by "pprovenzano (via GitHub)" <gi...@apache.org>.
pprovenzano commented on code in PR #13628:
URL: https://github.com/apache/kafka/pull/13628#discussion_r1179530134


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -568,19 +571,58 @@ public void run() throws Exception {
                     });
                 }
 
-                // For configs and client quotas, we need to send all of the data to the ZK client since we persist
-                // everything for a given entity in a single ZK node.
+                // For configs and client quotas, we need to send all of the data to the ZK
+                // client since we persist everything for a given entity in a single ZK node.
                 if (delta.configsDelta() != null) {
                     delta.configsDelta().changes().forEach((configResource, configDelta) ->
                         apply("Updating config resource " + configResource, migrationState ->
                             zkMigrationClient.writeConfigs(configResource, image.configs().configMapForResource(configResource), migrationState)));
                 }
 
-                if (delta.clientQuotasDelta() != null) {
-                    delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
-                        Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                        apply("Updating client quota " + clientQuotaEntity, migrationState ->
-                            zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, migrationState));
+                if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) {
+
+                    // A list of users with scram or quota changes
+                    HashSet<String> users = new HashSet<String>();
+
+                    // Populate list with users with scram changes
+                    if (delta.scramDelta() != null) {
+                        delta.scramDelta().changes().forEach((scramMechanism, changes) -> {
+                            changes.forEach((userName, changeOpt) -> users.add(userName));
+                        });
+                    }
+
+                    // Populate list with users with quota changes 
+                    // and apply quota changes to all non user quota changes
+                    if (delta.clientQuotasDelta() != null) {
+                        Map<String, String> scramMap = new HashMap<String, String>();
+                        delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
+
+                            if ((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
+                                (!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {

Review Comment:
   That is my understanding. SCRAM records are only applied to principals and are not client specific.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org