You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/05/26 17:51:37 UTC

[kafka] branch 3.5 updated: KAFKA-15009: Handle new ACLs in KRaft snapshot during migration (#13741)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.5 by this push:
     new 82dae82de3d KAFKA-15009: Handle new ACLs in KRaft snapshot during migration (#13741)
82dae82de3d is described below

commit 82dae82de3df78b5cf954cf0e8748fad232943c9
Author: Akhilesh C <ak...@users.noreply.github.com>
AuthorDate: Tue May 23 07:43:02 2023 -0700

    KAFKA-15009: Handle new ACLs in KRaft snapshot during migration (#13741)
    
    When loading a snapshot during dual-write mode, we were missing the logic to detect new ACLs that
    had been added on the KRaft side. This patch adds support for finding those new ACLs as well as tests
    to verify the correct behavior.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../zk/migration/ZkAclMigrationClientTest.scala    | 86 ++++++++++++++++++++--
 .../metadata/migration/KRaftMigrationZkWriter.java |  9 +++
 2 files changed, 90 insertions(+), 5 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala
index 6df29f651a2..2e5e58348a0 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala
@@ -16,19 +16,21 @@
  */
 package kafka.zk.migration
 
-import kafka.security.authorizer.AclAuthorizer
+import kafka.security.authorizer.{AclAuthorizer, AclEntry}
 import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
 import kafka.utils.TestUtils
+import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.metadata.AccessControlEntryRecord
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.SecurityUtils
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
+import org.apache.kafka.metadata.migration.KRaftMigrationZkWriter
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Test
 
-import java.util.UUID
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
@@ -73,9 +75,9 @@ class ZkAclMigrationClientTest extends ZkMigrationTestHarness {
 
   @Test
   def testAclsMigrateAndDualWrite(): Unit = {
-    val resource1 = new ResourcePattern(ResourceType.TOPIC, "foo-" + UUID.randomUUID(), PatternType.LITERAL)
-    val resource2 = new ResourcePattern(ResourceType.TOPIC, "bar-" + UUID.randomUUID(), PatternType.LITERAL)
-    val prefixedResource = new ResourcePattern(ResourceType.TOPIC, "bar-", PatternType.PREFIXED)
+    val resource1 = new ResourcePattern(ResourceType.TOPIC, "foo-" + Uuid.randomUuid(), PatternType.LITERAL)
+    val resource2 = new ResourcePattern(ResourceType.TOPIC, "bar-" + Uuid.randomUuid(), PatternType.LITERAL)
+    val prefixedResource = new ResourcePattern(ResourceType.TOPIC, "bar-" + Uuid.randomUuid(), PatternType.PREFIXED)
     val username = "alice"
     val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val wildcardPrincipal = SecurityUtils.parseKafkaPrincipal(WildcardPrincipalString)
@@ -115,4 +117,78 @@ class ZkAclMigrationClientTest extends ZkMigrationTestHarness {
       authorizer.close()
     }
   }
+
+
+  @Test
+  def testAclsChangesInSnapshot(): Unit = {
+    // Create some ACLs in Zookeeper.
+    val resource1 = new ResourcePattern(ResourceType.TOPIC, "foo-" + Uuid.randomUuid(), PatternType.LITERAL)
+    val resource2 = new ResourcePattern(ResourceType.TOPIC, "bar-" + Uuid.randomUuid(), PatternType.LITERAL)
+    val resource3 = new ResourcePattern(ResourceType.TOPIC, "baz-" + Uuid.randomUuid(), PatternType.LITERAL)
+    val username1 = "alice"
+    val username2 = "blah"
+    val principal1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username1)
+    val principal2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username2)
+    val acl1Resource1 = new AclEntry(new AccessControlEntry(principal1.toString, WildcardHost, AclOperation.WRITE, AclPermissionType.ALLOW))
+    val acl1Resource2 = new AclEntry(new AccessControlEntry(principal2.toString, WildcardHost, AclOperation.READ, AclPermissionType.ALLOW))
+
+    zkClient.createAclPaths()
+    zkClient.createAclsForResourceIfNotExists(resource1, Set(acl1Resource1))
+    zkClient.createAclsForResourceIfNotExists(resource2, Set(acl1Resource2))
+
+    // Create a metadata image such that ACLs for one resource are update, one resource is deleted
+    // one new resource is created in Zookeeper.
+
+    // Create an ACL for a new resource.
+    val delta = new MetadataDelta(MetadataImage.EMPTY)
+    val acl1Resource3 = new AccessControlEntryRecord()
+      .setId(Uuid.randomUuid())
+      .setHost("192.168.10.1")
+      .setOperation(AclOperation.READ.code())
+      .setPrincipal(WildcardPrincipalString)
+      .setPermissionType(AclPermissionType.ALLOW.code())
+      .setPatternType(resource3.patternType().code())
+      .setResourceName(resource3.name())
+      .setResourceType(resource3.resourceType().code()
+      )
+    delta.replay(acl1Resource3)
+
+    // Change an ACL for existing resource.
+    val acl2Resource1 = new AccessControlEntryRecord()
+      .setId(Uuid.randomUuid())
+      .setHost("192.168.15.1")
+      .setOperation(AclOperation.WRITE.code())
+      .setPrincipal(principal1.toString)
+      .setPermissionType(AclPermissionType.ALLOW.code())
+      .setPatternType(resource1.patternType().code())
+      .setResourceName(resource1.name())
+      .setResourceType(resource1.resourceType().code()
+      )
+    delta.replay(acl2Resource1)
+
+    // Do not add anything for resource 2 in the delta.
+    val image = delta.apply(MetadataProvenance.EMPTY)
+
+    // load snapshot to Zookeeper.
+    val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient,
+      (_, operation) => { migrationState = operation.apply(migrationState) })
+    kraftMigrationZkWriter.handleLoadSnapshot(image)
+
+    // Verify the new ACLs in Zookeeper.
+    val resource1AclsInZk = zkClient.getVersionedAclsForResource(resource1).acls
+    assertEquals(1, resource1AclsInZk.size)
+    assertEquals(
+      new AccessControlEntry(acl2Resource1.principal(), acl2Resource1.host(),
+        AclOperation.fromCode(acl2Resource1.operation()),
+        AclPermissionType.fromCode(acl2Resource1.permissionType())),
+      resource1AclsInZk.head.ace)
+    val resource2AclsInZk = zkClient.getVersionedAclsForResource(resource2).acls
+    assertTrue(resource2AclsInZk.isEmpty)
+    val resource3AclsInZk = zkClient.getVersionedAclsForResource(resource3).acls
+    assertEquals(
+      new AccessControlEntry(acl1Resource3.principal(), acl1Resource3.host(),
+        AclOperation.fromCode(acl1Resource3.operation()),
+        AclPermissionType.fromCode(acl1Resource3.permissionType())),
+      resource3AclsInZk.head.ace)
+  }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
index ce673c74dc1..fc4d335b36e 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -351,9 +351,11 @@ public class KRaftMigrationZkWriter {
             );
         });
 
+        Set<ResourcePattern> newResources = new HashSet<>(allAclsInSnapshot.keySet());
         Set<ResourcePattern> resourcesToDelete = new HashSet<>();
         Map<ResourcePattern, Set<AccessControlEntry>> changedResources = new HashMap<>();
         migrationClient.aclClient().iterateAcls((resourcePattern, accessControlEntries) -> {
+            newResources.remove(resourcePattern);
             if (!allAclsInSnapshot.containsKey(resourcePattern)) {
                 resourcesToDelete.add(resourcePattern);
             } else {
@@ -364,6 +366,13 @@ public class KRaftMigrationZkWriter {
             }
         });
 
+        newResources.forEach(resourcePattern -> {
+            Set<AccessControlEntry> accessControlEntries = allAclsInSnapshot.get(resourcePattern);
+            String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
+            operationConsumer.accept(name, migrationState ->
+                migrationClient.aclClient().writeResourceAcls(resourcePattern, accessControlEntries, migrationState));
+        });
+
         resourcesToDelete.forEach(deletedResource -> {
             String name = "Deleting resource " + deletedResource + " which has no ACLs in snapshot";
             operationConsumer.accept(name, migrationState ->