You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2024/02/07 20:55:08 UTC

(kafka) branch trunk updated: MINOR: Fix some MetadataDelta handling issues during ZK migration (#15327)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c000b1fae2b MINOR: Fix some MetadataDelta handling issues during ZK migration (#15327)
c000b1fae2b is described below

commit c000b1fae2bd7d4b76713a53508f128a13431ab6
Author: David Arthur <mu...@gmail.com>
AuthorDate: Wed Feb 7 15:54:59 2024 -0500

    MINOR: Fix some MetadataDelta handling issues during ZK migration (#15327)
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../zk/migration/ZkAclMigrationClientTest.scala    | 162 ++++++++++++++++++++-
 .../zk/migration/ZkConfigMigrationClientTest.scala |   4 +-
 .../kafka/zk/migration/ZkMigrationClientTest.scala |   2 +-
 .../java/org/apache/kafka/image/AclsDelta.java     |  13 --
 .../metadata/migration/KRaftMigrationDriver.java   |   5 +-
 .../metadata/migration/KRaftMigrationZkWriter.java |  70 +++++----
 .../migration/KRaftMigrationZkWriterTest.java      |   6 +-
 7 files changed, 206 insertions(+), 56 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala
index 0107b9fa954..bb4d3e646c7 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala
@@ -21,14 +21,14 @@ import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString
 import kafka.utils.TestUtils
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.acl._
-import org.apache.kafka.common.metadata.AccessControlEntryRecord
+import org.apache.kafka.common.metadata.{AccessControlEntryRecord, RemoveAccessControlEntryRecord}
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.SecurityUtils
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
 import org.apache.kafka.metadata.migration.KRaftMigrationZkWriter
 import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
 import org.junit.jupiter.api.Test
 
 import scala.collection.mutable
@@ -169,7 +169,7 @@ class ZkAclMigrationClientTest extends ZkMigrationTestHarness {
     val image = delta.apply(MetadataProvenance.EMPTY)
 
     // load snapshot to Zookeeper.
-    val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient)
+    val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient, fail(_))
     kraftMigrationZkWriter.handleSnapshot(image, (_, _, operation) => { migrationState = operation.apply(migrationState) })
 
     // Verify the new ACLs in Zookeeper.
@@ -189,4 +189,160 @@ class ZkAclMigrationClientTest extends ZkMigrationTestHarness {
         AclPermissionType.fromCode(acl1Resource3.permissionType())),
       resource3AclsInZk.head.ace)
   }
+
+  def user(user: String): String = {
+    new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user).toString
+  }
+
+  def acl(resourceName: String,
+          resourceType: ResourceType,
+          resourcePattern: PatternType,
+          principal: String,
+          host: String = "*",
+          operation: AclOperation = AclOperation.READ,
+          permissionType: AclPermissionType = AclPermissionType.ALLOW
+  ): AccessControlEntryRecord = {
+    new AccessControlEntryRecord()
+      .setId(Uuid.randomUuid())
+      .setHost(host)
+      .setOperation(operation.code())
+      .setPrincipal(principal)
+      .setPermissionType(permissionType.code())
+      .setPatternType(resourcePattern.code())
+      .setResourceName(resourceName)
+      .setResourceType(resourceType.code())
+  }
+
+  @Test
+  def testDeleteOneAclOfMany(): Unit = {
+    zkClient.createAclPaths()
+    val topicName = "topic-" + Uuid.randomUuid()
+    val resource = new ResourcePattern(ResourceType.TOPIC, topicName, PatternType.LITERAL)
+
+    // Create a delta with some ACLs
+    val delta = new MetadataDelta(MetadataImage.EMPTY)
+    val acl1 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("alice"))
+    val acl2 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("bob"))
+    val acl3 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("carol"))
+    delta.replay(acl1)
+    delta.replay(acl2)
+    delta.replay(acl3)
+    val image = delta.apply(MetadataProvenance.EMPTY)
+
+    // Sync image to ZK
+    val errorLogs = mutable.Buffer[String]()
+    val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient, errorLogs.append)
+    kraftMigrationZkWriter.handleSnapshot(image, (_, _, operation) => {
+      migrationState = operation.apply(migrationState)
+    })
+
+    // verify 3 ACLs in ZK
+    val aclsInZk = zkClient.getVersionedAclsForResource(resource).acls
+    assertEquals(3, aclsInZk.size)
+
+    // Delete one of the ACLs
+    val delta2 = new MetadataDelta.Builder()
+      .setImage(image)
+      .build()
+    delta2.replay(new RemoveAccessControlEntryRecord().setId(acl3.id()))
+    val image2 = delta2.apply(MetadataProvenance.EMPTY)
+    kraftMigrationZkWriter.handleDelta(image, image2, delta2, (_, _, operation) => {
+      migrationState = operation.apply(migrationState)
+    })
+
+    // verify the other 2 ACLs are still in ZK
+    val aclsInZk2 = zkClient.getVersionedAclsForResource(resource).acls
+    assertEquals(2, aclsInZk2.size)
+    assertEquals(0, errorLogs.size)
+
+    // Add another ACL
+    val acl4 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("carol"))
+    delta2.replay(acl4)
+    val image3 = delta2.apply(MetadataProvenance.EMPTY)
+
+    // This is a contrived error case. In practice, we will never pass the same image as prev and current.
+    // The point of this is to exercise the case of a deleted ACL missing from the prev image.
+    kraftMigrationZkWriter.handleDelta(image3, image3, delta2, (_, _, operation) => {
+      migrationState = operation.apply(migrationState)
+    })
+
+    val aclsInZk3 = zkClient.getVersionedAclsForResource(resource).acls
+    assertEquals(3, aclsInZk3.size)
+    assertEquals(1, errorLogs.size)
+    assertEquals(s"Cannot delete ACL ${acl3.id()} from ZK since it is missing from previous AclImage", errorLogs.head)
+  }
+
+  @Test
+  def testAclUpdateAndDelete(): Unit = {
+    zkClient.createAclPaths()
+    val errorLogs = mutable.Buffer[String]()
+    val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient, errorLogs.append)
+
+    val topicName = "topic-" + Uuid.randomUuid()
+    val otherName = "other-" + Uuid.randomUuid()
+    val literalResource = new ResourcePattern(ResourceType.TOPIC, topicName, PatternType.LITERAL)
+    val prefixedResource = new ResourcePattern(ResourceType.TOPIC, topicName, PatternType.PREFIXED)
+    val otherResource = new ResourcePattern(ResourceType.TOPIC, otherName, PatternType.LITERAL)
+
+    // Create a delta with some ACLs
+    val acl1 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("alice"))
+    val acl2 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("bob"))
+    val acl3 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("carol"))
+    val acl4 = acl(topicName, ResourceType.TOPIC, PatternType.LITERAL, user("dave"))
+
+    val delta1 = new MetadataDelta(MetadataImage.EMPTY)
+    delta1.replay(acl1)
+    delta1.replay(acl2)
+    delta1.replay(acl3)
+    delta1.replay(acl4)
+
+    val image1 = delta1.apply(MetadataProvenance.EMPTY)
+    kraftMigrationZkWriter.handleDelta(MetadataImage.EMPTY, image1, delta1, (_, _, operation) => {
+      migrationState = operation.apply(migrationState)
+    })
+    assertEquals(4, zkClient.getVersionedAclsForResource(literalResource).acls.size)
+    assertEquals(0, zkClient.getVersionedAclsForResource(prefixedResource).acls.size)
+    assertEquals(0, zkClient.getVersionedAclsForResource(otherResource).acls.size)
+    assertEquals(0, errorLogs.size)
+
+    val acl5 = acl(topicName, ResourceType.TOPIC, PatternType.PREFIXED, user("alice"))
+    val acl6 = acl(topicName, ResourceType.TOPIC, PatternType.PREFIXED, user("bob"))
+    val acl7 = acl(otherName, ResourceType.TOPIC, PatternType.LITERAL, user("carol"))
+    val acl8 = acl(otherName, ResourceType.TOPIC, PatternType.LITERAL, user("dave"))
+
+    // Add two prefixed and two "other" ACLs, delete one of the literal ACLs
+    val delta2 = new MetadataDelta.Builder().setImage(image1).build()
+    delta2.replay(acl5)
+    delta2.replay(acl6)
+    delta2.replay(acl7)
+    delta2.replay(acl8)
+    delta2.replay(new RemoveAccessControlEntryRecord().setId(acl1.id()))
+
+    val image2 = delta2.apply(MetadataProvenance.EMPTY)
+    kraftMigrationZkWriter.handleDelta(image1, image2, delta2, (_, _, operation) => {
+      migrationState = operation.apply(migrationState)
+    })
+    assertEquals(3, zkClient.getVersionedAclsForResource(literalResource).acls.size)
+    assertEquals(2, zkClient.getVersionedAclsForResource(prefixedResource).acls.size)
+    assertEquals(2, zkClient.getVersionedAclsForResource(otherResource).acls.size)
+    assertEquals(0, errorLogs.size)
+
+    // Delete and add ACL for literal resource, remove both prefixed ACLs, add another "other"
+    val acl9 = acl(otherName, ResourceType.TOPIC, PatternType.LITERAL, user("eve"))
+    val delta3 = new MetadataDelta.Builder().setImage(image2).build()
+    delta3.replay(acl1)
+    delta3.replay(new RemoveAccessControlEntryRecord().setId(acl2.id()))
+    delta3.replay(new RemoveAccessControlEntryRecord().setId(acl5.id()))
+    delta3.replay(new RemoveAccessControlEntryRecord().setId(acl6.id()))
+    delta3.replay(acl9)
+
+    val image3 = delta3.apply(MetadataProvenance.EMPTY)
+    kraftMigrationZkWriter.handleDelta(image2, image3, delta3, (_, _, operation) => {
+      migrationState = operation.apply(migrationState)
+    })
+    assertEquals(3, zkClient.getVersionedAclsForResource(literalResource).acls.size)
+    assertEquals(0, zkClient.getVersionedAclsForResource(prefixedResource).acls.size)
+    assertEquals(3, zkClient.getVersionedAclsForResource(otherResource).acls.size)
+    assertEquals(0, errorLogs.size)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
index 16b15386bac..2aff7edf713 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.config.ConfigType
 import org.apache.kafka.server.util.MockRandom
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
 import org.junit.jupiter.api.Test
 
 import java.util
@@ -326,7 +326,7 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
     val image = delta.apply(MetadataProvenance.EMPTY)
 
     // load snapshot to Zookeeper.
-    val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient)
+    val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient, fail(_))
     kraftMigrationZkWriter.handleSnapshot(image, (_, _, operation) => {
       migrationState = operation.apply(migrationState)
     })
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
index 9b71bd6b0e9..345e2b05d86 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
@@ -317,7 +317,7 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
 
   @Test
   def testTopicAndBrokerConfigsMigrationWithSnapshots(): Unit = {
-    val kraftWriter = new KRaftMigrationZkWriter(migrationClient)
+    val kraftWriter = new KRaftMigrationZkWriter(migrationClient, fail(_))
 
     // Add add some topics and broker configs and create new image.
     val topicName = "testTopic"
diff --git a/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java b/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java
index 15e9a69c193..3a38d52aca3 100644
--- a/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java
@@ -25,12 +25,10 @@ import org.apache.kafka.metadata.authorizer.StandardAclWithId;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 
@@ -40,7 +38,6 @@ import java.util.stream.Collectors;
 public final class AclsDelta {
     private final AclsImage image;
     private final Map<Uuid, Optional<StandardAcl>> changes = new LinkedHashMap<>();
-    private final Set<StandardAcl> deleted = new HashSet<>();
 
     public AclsDelta(AclsImage image) {
         this.image = image;
@@ -56,15 +53,6 @@ public final class AclsDelta {
         return changes;
     }
 
-    /**
-     * Return a Set of the ACLs which were deleted in this delta. This is used by the ZK migration components.
-     *
-     * @return Set of deleted ACLs
-     */
-    public Set<StandardAcl> deleted() {
-        return deleted;
-    }
-
     void finishSnapshot() {
         for (Entry<Uuid, StandardAcl> entry : image.acls().entrySet()) {
             if (!changes.containsKey(entry.getKey())) {
@@ -93,7 +81,6 @@ public final class AclsDelta {
     public void replay(RemoveAccessControlEntryRecord record) {
         if (image.acls().containsKey(record.id())) {
             changes.put(record.id(), Optional.empty());
-            deleted.add(image.acls().get(record.id()));
         } else if (changes.containsKey(record.id())) {
             changes.remove(record.id());
             // No need to track a ACL that was added and deleted within the same delta
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index f33f0790e0a..4c796f9eade 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -134,7 +134,8 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         this.time = time;
         LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
         this.controllerMetrics = controllerMetrics;
-        this.log = logContext.logger(KRaftMigrationDriver.class);
+        Logger log = logContext.logger(KRaftMigrationDriver.class);
+        this.log = log;
         this.migrationState = MigrationDriverState.UNINITIALIZED;
         this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
         this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "controller-" + nodeId + "-migration-driver-");
@@ -144,7 +145,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         this.initialZkLoadHandler = initialZkLoadHandler;
         this.faultHandler = faultHandler;
         this.quorumFeatures = quorumFeatures;
-        this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient);
+        this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient, log::error);
         this.recordRedactor = new RecordRedactor(configSchema);
         this.minBatchSize = minBatchSize;
     }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
index 6c82c9cb9cc..8a7e148d579 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -60,8 +60,8 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 public class KRaftMigrationZkWriter {
 
@@ -82,11 +82,14 @@ public class KRaftMigrationZkWriter {
 
 
     private final MigrationClient migrationClient;
+    private final Consumer<String> errorLogger;
 
     public KRaftMigrationZkWriter(
-        MigrationClient migrationClient
+        MigrationClient migrationClient,
+        Consumer<String> errorLogger
     ) {
         this.migrationClient = migrationClient;
+        this.errorLogger = errorLogger;
     }
 
     public void handleSnapshot(MetadataImage image, KRaftMigrationOperationConsumer operationConsumer) {
@@ -122,7 +125,7 @@ public class KRaftMigrationZkWriter {
             updated = true;
         }
         if (delta.aclsDelta() != null) {
-            handleAclsDelta(image.acls(), delta.aclsDelta(), operationConsumer);
+            handleAclsDelta(previousImage.acls(), image.acls(), delta.aclsDelta(), operationConsumer);
             updated = true;
         }
         if (delta.delegationTokenDelta() != null) {
@@ -593,6 +596,7 @@ public class KRaftMigrationZkWriter {
         });
 
         newResources.forEach(resourcePattern -> {
+            // newResources is generated from allAclsInSnapshot, and we don't remove from that map, so this unguarded .get() is safe
             Set<AccessControlEntry> accessControlEntries = allAclsInSnapshot.get(resourcePattern);
             String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
             operationConsumer.accept(UPDATE_ACL, name, migrationState ->
@@ -612,43 +616,45 @@ public class KRaftMigrationZkWriter {
         });
     }
 
-    void handleAclsDelta(AclsImage image, AclsDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
-        // Compute the resource patterns that were changed
-        Set<ResourcePattern> resourcesWithChangedAcls = delta.changes().values()
-            .stream()
-            .filter(Optional::isPresent)
-            .map(Optional::get)
-            .map(this::resourcePatternFromAcl)
-            .collect(Collectors.toSet());
-
-        Set<ResourcePattern> resourcesWithDeletedAcls = delta.deleted()
-            .stream()
-            .map(this::resourcePatternFromAcl)
-            .collect(Collectors.toSet());
-
+    void handleAclsDelta(AclsImage prevImage, AclsImage image, AclsDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
         // Need to collect all ACLs for any changed resource pattern
         Map<ResourcePattern, List<AccessControlEntry>> aclsToWrite = new HashMap<>();
-        image.acls().forEach((uuid, standardAcl) -> {
-            ResourcePattern resourcePattern = resourcePatternFromAcl(standardAcl);
-            boolean removed = resourcesWithDeletedAcls.remove(resourcePattern);
-            // If a resource pattern is present in the delta as a changed or deleted acl, need to include it
-            if (resourcesWithChangedAcls.contains(resourcePattern) || removed) {
-                aclsToWrite.computeIfAbsent(resourcePattern, __ -> new ArrayList<>()).add(
-                    new AccessControlEntry(standardAcl.principal(), standardAcl.host(), standardAcl.operation(), standardAcl.permissionType())
-                );
+        delta.changes().forEach((aclId, aclChange) -> {
+            if (aclChange.isPresent()) {
+                ResourcePattern resourcePattern = resourcePatternFromAcl(aclChange.get());
+                aclsToWrite.put(resourcePattern, new ArrayList<>());
+            } else {
+                // We need to look in the previous image to get deleted ACLs resource pattern
+                StandardAcl deletedAcl = prevImage.acls().get(aclId);
+                if (deletedAcl == null) {
+                    errorLogger.accept("Cannot delete ACL " + aclId + " from ZK since it is missing from previous AclImage");
+                } else {
+                    ResourcePattern resourcePattern = resourcePatternFromAcl(deletedAcl);
+                    aclsToWrite.put(resourcePattern, new ArrayList<>());
+                }
             }
         });
 
-        resourcesWithDeletedAcls.forEach(deletedResource -> {
-            String name = "Deleting resource " + deletedResource + " which has no more ACLs";
-            operationConsumer.accept(DELETE_ACL, name, migrationState ->
-                migrationClient.aclClient().deleteResource(deletedResource, migrationState));
+        // Iterate through the new image to collect any ACLs for these changed resources
+        image.acls().forEach((uuid, standardAcl) -> {
+            ResourcePattern resourcePattern = resourcePatternFromAcl(standardAcl);
+            List<AccessControlEntry> entries = aclsToWrite.get(resourcePattern);
+            if (entries != null) {
+                entries.add(new AccessControlEntry(standardAcl.principal(), standardAcl.host(), standardAcl.operation(), standardAcl.permissionType()));
+            }
         });
 
+        // If there are no more ACLs for a resource, delete it. Otherwise, update it with the new set of ACLs
         aclsToWrite.forEach((resourcePattern, accessControlEntries) -> {
-            String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
-            operationConsumer.accept(UPDATE_ACL, name, migrationState ->
-                migrationClient.aclClient().writeResourceAcls(resourcePattern, accessControlEntries, migrationState));
+            if (accessControlEntries.isEmpty()) {
+                String name = "Deleting resource " + resourcePattern + " which has no more ACLs";
+                operationConsumer.accept(DELETE_ACL, name, migrationState ->
+                    migrationClient.aclClient().deleteResource(resourcePattern, migrationState));
+            } else {
+                String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
+                operationConsumer.accept(UPDATE_ACL, name, migrationState ->
+                    migrationClient.aclClient().writeResourceAcls(resourcePattern, accessControlEntries, migrationState));
+            }
         });
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java
index e823d6bdaca..0e109f6b9c7 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java
@@ -80,7 +80,7 @@ public class KRaftMigrationZkWriterTest {
             .setConfigMigrationClient(configClient)
             .build();
 
-        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient, __ -> { });
 
         MetadataImage image = new MetadataImage(
             MetadataProvenance.EMPTY,
@@ -120,7 +120,7 @@ public class KRaftMigrationZkWriterTest {
             .setAclMigrationClient(aclClient)
             .build();
 
-        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient, __ -> { });
 
         MetadataImage image = new MetadataImage(
             MetadataProvenance.EMPTY,
@@ -179,7 +179,7 @@ public class KRaftMigrationZkWriterTest {
             .setAclMigrationClient(aclClient)
             .build();
 
-        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient, __ -> { });
 
         MetadataImage image = new MetadataImage(
             MetadataProvenance.EMPTY,