You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/07 22:33:17 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #11649: KAFKA-13646: Implement KIP-801: KRaft authorizer

hachikuji commented on a change in pull request #11649:
URL: https://github.com/apache/kafka/pull/11649#discussion_r800941876



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -689,7 +689,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleCreatePartitions(request: RequestChannel.Request): Unit = {
     val future = createPartitions(request.body[CreatePartitionsRequest].data,
       authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
-      names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(n => n))
+      names => authHelper.filterByAuthorized(request.context, ALTER, TOPIC, names)(n => n))

Review comment:
       I was going to suggest pulling this fix out so that we could backport more easily, but I guess it doesn't matter since kraft never had an authorizer before.

##########
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##########
@@ -173,8 +174,12 @@ class ControllerServer(
         setMetrics(new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())).
         setCreateTopicPolicy(createTopicPolicy.asJava).
         setAlterConfigPolicy(alterConfigPolicy.asJava).
-        setConfigurationValidator(new ControllerConfigurationValidator()).
-        build()
+        setConfigurationValidator(new ControllerConfigurationValidator())
+      authorizer match {

Review comment:
       nit: can be simplified. Something like this:
   ```scala
   authorizer.foreach(controllerBuilder.setAuthorizer)
   ```

##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##########
@@ -209,6 +212,33 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
         clientQuotaMetadataManager.update(clientQuotasDelta)
       }
 
+      // Apply changes to ACLs. This needs to be handled carefully because while we are
+      // applying these changes, the Authorizer is continuing to return authorization
+      // results in other threads. We never want to expose an invalid state. For example,
+      // if the user created a DENY ALL acl and then created an ALLOW ACL for topic foo,
+      // we want to apply those changes in that order, not the reverse order! Otherwise
+      // there could be a window during which incorrect authorization results are returned.
+      Option(delta.aclsDelta()).foreach( aclsDelta =>
+        _authorizer match {
+          case Some(authorizer: ClusterMetadataAuthorizer) => if (aclsDelta.isSnapshotDelta()) {
+            // If the delta resulted from a snapshot load, we want to apply the new changes
+            // all at once using ClusterMetadataAuthorizer#loadSnapshot. If this is the
+            // first snapshot load, it will also complete the futures returned by
+           // Authorizer#start (which we wait for before processing RPCs).
+            authorizer.loadSnapshot(newImage.acls().acls())
+          } else {
+            // Because the changes map is a LinkedHashMap, the deltas will be returned in
+            // the order they were performed.
+            aclsDelta.changes().entrySet().forEach(e =>
+              if (e.getValue().isPresent()) {
+                authorizer.addAcl(e.getKey(), e.getValue().get())
+              } else {
+                authorizer.removeAcl(e.getKey())
+              })
+          }
+          case _ => // No ClusterMetadataAuthorizer is configured. There is nothing to do.

Review comment:
       Should we log some kind of warning in this case? It may be a misconfiguration I guess. Or maybe the user has changed authorizers?

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+
+/**
+ * An interface for Authorizers which store state in the __cluster_metadata log.
+ *
+ * These methods must all be thread-safe.
+ */
+public interface ClusterMetadataAuthorizer extends Authorizer {
+    /**
+     * Set the mutator object which should be used for creating and deleting ACLs.
+     */
+    void setAclMutator(AclMutator aclMutator);
+
+    /**
+     * Get the mutator object which should be used for creating and deleting ACLs.
+     *
+     * @throws org.apache.kafka.common.errors.NotControllerException

Review comment:
       To clarify, when there is a controller change, do we expect `AclMutator` to raise NotController if it has been set?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1433,6 +1481,17 @@ public void processBatchEndOffset(long offset) {
         return future;
     }
 
+    @Override
+    public CompletableFuture<List<AclCreateResult>> createAcls(List<AclBinding> aclBindings) {
+        return appendWriteEvent("createAcls", () -> aclControlManager.createAcls(aclBindings));
+    }
+
+    @Override
+    public CompletableFuture<List<AclDeleteResult>> deleteAcls(List<AclBindingFilter> filters) {
+        return appendWriteEvent("deleteAcls", () -> aclControlManager.deleteAcls(filters));
+    }
+
+

Review comment:
       nit: extra line

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * The standard authorizer which is used in KRaft-based clusters if no other authorizer is
+ * configured.
+ */
+public class StandardAuthorizer implements ClusterMetadataAuthorizer {
+    public final static String SUPER_USERS_CONFIG = "super.users";
+
+    public final static String ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG = "allow.everyone.if.no.acl.found";
+
+    /**
+     * A future which is completed once we have loaded a snapshot.
+     * TODO: complete this when we reach the high water mark.
+     */
+    private final CompletableFuture<Void> initialLoadFuture = CompletableFuture.completedFuture(null);
+
+    /**
+     * The current data. Can be read without a lock. Must be written while holding the object lock.

Review comment:
       Can you explain why the lock is necessary? It doesn't seem like we mutate the existing `StandardAuthorizerData` object.

##########
File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##########
@@ -147,7 +150,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
 
   override def brokerPropertyOverrides(properties: Properties): Unit = {
-    properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
+    if (isKRaftTest()) {
+      properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
+      properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString())
+    } else {
+      properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)

Review comment:
       Why is it not necessary to identify the broker as a super-user for AclAuthorizer?

##########
File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##########
@@ -321,7 +340,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     // Allow inter-broker communication
     addAndVerifyAcls(Set(new AccessControlEntry(brokerPrincipal.toString, WildcardHost, CLUSTER_ACTION, ALLOW)), clusterResource)
 
-    TestUtils.createOffsetsTopic(zkClient, servers)
+    if (isKRaftTest()) {
+      TestUtils.createOffsetsTopicWithAdmin(brokers)

Review comment:
       Any reason we couldn't do this in both cases?

##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -254,11 +254,23 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
   }
 
   def getTopicIds(): Map[String, Uuid] = {
-    getController().kafkaController.controllerContext.topicIds.toMap
+    if (isKRaftTest()) {
+      controllerServer.controller.findAllTopicIds(Long.MaxValue).get().asScala.toMap
+    } else {
+      getController().kafkaController.controllerContext.topicIds.toMap
+    }
   }
 
   def getTopicNames(): Map[Uuid, String] = {
-    getController().kafkaController.controllerContext.topicNames.toMap
+    if (isKRaftTest()) {
+      val result = new util.HashMap[Uuid, String]()
+      controllerServer.controller.findAllTopicIds(Long.MaxValue).get().entrySet().forEach {

Review comment:
       nit: I wonder if we may as well add a `findAllTopicNames` (or something like that) to go along with `findAllTopicIds`.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/Controller.java
##########
@@ -90,6 +91,16 @@
     CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(long deadlineNs,
                                                                      Collection<String> topicNames);
 
+    /**
+     * Find the ids for all topic names. Note that this function should only used for

Review comment:
       nit: only *be* used

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -2106,32 +2106,65 @@ object TestUtils extends Logging {
       s"There still are ongoing reassignments", pause = pause)
   }
 
-  def addAndVerifyAcls(broker: KafkaBroker, acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
-    val authorizer = broker.dataPlaneRequestProcessor.authorizer.get
+  /**
+   * Find an Authorizer that we can call createAcls or deleteAcls on.
+   */
+  def pickAuthorizerForWrite[B <: KafkaBroker](

Review comment:
       Would it be possible to go through the admin API instead?

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+
+/**
+ * An interface for Authorizers which store state in the __cluster_metadata log.
+ *
+ * These methods must all be thread-safe.
+ */
+public interface ClusterMetadataAuthorizer extends Authorizer {
+    /**
+     * Set the mutator object which should be used for creating and deleting ACLs.
+     */
+    void setAclMutator(AclMutator aclMutator);
+
+    /**
+     * Get the mutator object which should be used for creating and deleting ACLs.
+     *
+     * @throws org.apache.kafka.common.errors.NotControllerException
+     *              If the aclMutator was not set.
+     */
+    AclMutator aclMutatorOrException();
+
+    /**
+     * Load the ACLs in the given map. Anything not in the map will be removed.
+     * The authorizer will also wait for this initial snapshot load to complete when
+     * coming up.
+     */
+    void loadSnapshot(Map<Uuid, StandardAcl> acls);
+
+    /**
+     * Add a new ACL. Any ACL with the same ID will be replaced.
+     */
+    void addAcl(Uuid id, StandardAcl acl);
+
+    /**
+     * Remove the ACL with the given ID.
+     */
+    void removeAcl(Uuid id);
+
+    default List<? extends CompletionStage<AclCreateResult>> createAcls(
+            AuthorizableRequestContext requestContext,
+            List<AclBinding> aclBindings) {
+        List<CompletableFuture<AclCreateResult>> futures = new ArrayList<>(aclBindings.size());
+        AclMutator aclMutator = aclMutatorOrException();
+        aclBindings.forEach(b -> futures.add(new CompletableFuture<>()));
+        aclMutator.createAcls(aclBindings).whenComplete((results, throwable) -> {
+            if (throwable == null && results.size() != futures.size()) {
+                throwable = new UnknownServerException("Invalid size " +
+                    "of result set from controller. Expected " + futures.size() +
+                    "; got " + results.size());
+            }
+            if (throwable == null) {
+                for (int i = 0; i < futures.size(); i++) {
+                    futures.get(i).complete(results.get(i));
+                }
+            } else {
+                for (int i = 0; i < futures.size(); i++) {

Review comment:
       nit: since we're not using the index, maybe use `for` with implicit iterator

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+
+/**
+ * An interface for Authorizers which store state in the __cluster_metadata log.
+ *
+ * These methods must all be thread-safe.
+ */
+public interface ClusterMetadataAuthorizer extends Authorizer {
+    /**
+     * Set the mutator object which should be used for creating and deleting ACLs.
+     */
+    void setAclMutator(AclMutator aclMutator);
+
+    /**
+     * Get the mutator object which should be used for creating and deleting ACLs.
+     *
+     * @throws org.apache.kafka.common.errors.NotControllerException
+     *              If the aclMutator was not set.
+     */
+    AclMutator aclMutatorOrException();
+
+    /**
+     * Load the ACLs in the given map. Anything not in the map will be removed.
+     * The authorizer will also wait for this initial snapshot load to complete when
+     * coming up.
+     */
+    void loadSnapshot(Map<Uuid, StandardAcl> acls);
+
+    /**
+     * Add a new ACL. Any ACL with the same ID will be replaced.
+     */
+    void addAcl(Uuid id, StandardAcl acl);
+
+    /**
+     * Remove the ACL with the given ID.
+     */
+    void removeAcl(Uuid id);
+
+    default List<? extends CompletionStage<AclCreateResult>> createAcls(
+            AuthorizableRequestContext requestContext,
+            List<AclBinding> aclBindings) {
+        List<CompletableFuture<AclCreateResult>> futures = new ArrayList<>(aclBindings.size());
+        AclMutator aclMutator = aclMutatorOrException();
+        aclBindings.forEach(b -> futures.add(new CompletableFuture<>()));
+        aclMutator.createAcls(aclBindings).whenComplete((results, throwable) -> {
+            if (throwable == null && results.size() != futures.size()) {

Review comment:
       nit: a bit more natural?
   ```java
   if (results != null && results.size() != futures.size()) {
   ```

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+
+/**
+ * An interface for Authorizers which store state in the __cluster_metadata log.
+ *
+ * These methods must all be thread-safe.
+ */
+public interface ClusterMetadataAuthorizer extends Authorizer {
+    /**
+     * Set the mutator object which should be used for creating and deleting ACLs.
+     */
+    void setAclMutator(AclMutator aclMutator);
+
+    /**
+     * Get the mutator object which should be used for creating and deleting ACLs.
+     *
+     * @throws org.apache.kafka.common.errors.NotControllerException
+     *              If the aclMutator was not set.
+     */
+    AclMutator aclMutatorOrException();
+
+    /**
+     * Load the ACLs in the given map. Anything not in the map will be removed.
+     * The authorizer will also wait for this initial snapshot load to complete when
+     * coming up.
+     */
+    void loadSnapshot(Map<Uuid, StandardAcl> acls);
+
+    /**
+     * Add a new ACL. Any ACL with the same ID will be replaced.
+     */
+    void addAcl(Uuid id, StandardAcl acl);
+
+    /**
+     * Remove the ACL with the given ID.
+     */
+    void removeAcl(Uuid id);
+
+    default List<? extends CompletionStage<AclCreateResult>> createAcls(

Review comment:
       I am wondering if there is anything about the ordering that is worth documenting here. Do we guarantee for example that `addAcl` will be applied before the future returns?
   
   Also nit: can you add javadoc for this and `deleteAcls`?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.apache.kafka.metadata.authorizer.StandardAclRecordIterator;
+import org.apache.kafka.metadata.authorizer.StandardAclWithId;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+
+/**
+ * The AclControlManager manages any ACLs that are stored in the __cluster_metadata topic.
+ * If the ACLs are stored externally (such as in ZooKeeper) then there will be nothing for
+ * this manager to do, and the authorizer field will always be Optional.empty.
+ *
+ * Because the Authorizer is being concurrently used by other threads, we need to be
+ * careful about snapshots. We don't want the Authorizer to act based on partial state
+ * during the loading process. Therefore, unlike most of the other managers,
+ * AclControlManager needs to receive callbacks when we start loading a snapshot and when
+ * we finish. The prepareForSnapshotLoad callback clears the authorizer field, preventing
+ * any changes from affecting the authorizer until completeSnapshotLoad is called.
+ * Note that the Authorizer's start() method will block until the first snapshot load has
+ * completed, which is another reason the prepare / complete callbacks are needed.
+ */
+public class AclControlManager {
+    private final TimelineHashMap<Uuid, StandardAcl> idToAcl;
+    private final TimelineHashSet<StandardAcl> existingAcls;
+    private final Optional<ClusterMetadataAuthorizer> authorizer;

Review comment:
       Why might the authorizer be empty here?

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+
+/**
+ * An interface for Authorizers which store state in the __cluster_metadata log.
+ *
+ * These methods must all be thread-safe.
+ */
+public interface ClusterMetadataAuthorizer extends Authorizer {
+    /**
+     * Set the mutator object which should be used for creating and deleting ACLs.
+     */
+    void setAclMutator(AclMutator aclMutator);
+
+    /**
+     * Get the mutator object which should be used for creating and deleting ACLs.
+     *
+     * @throws org.apache.kafka.common.errors.NotControllerException
+     *              If the aclMutator was not set.
+     */
+    AclMutator aclMutatorOrException();
+
+    /**
+     * Load the ACLs in the given map. Anything not in the map will be removed.
+     * The authorizer will also wait for this initial snapshot load to complete when
+     * coming up.
+     */
+    void loadSnapshot(Map<Uuid, StandardAcl> acls);
+
+    /**
+     * Add a new ACL. Any ACL with the same ID will be replaced.
+     */
+    void addAcl(Uuid id, StandardAcl acl);
+
+    /**
+     * Remove the ACL with the given ID.
+     */
+    void removeAcl(Uuid id);
+
+    default List<? extends CompletionStage<AclCreateResult>> createAcls(
+            AuthorizableRequestContext requestContext,
+            List<AclBinding> aclBindings) {
+        List<CompletableFuture<AclCreateResult>> futures = new ArrayList<>(aclBindings.size());
+        AclMutator aclMutator = aclMutatorOrException();
+        aclBindings.forEach(b -> futures.add(new CompletableFuture<>()));
+        aclMutator.createAcls(aclBindings).whenComplete((results, throwable) -> {
+            if (throwable == null && results.size() != futures.size()) {
+                throwable = new UnknownServerException("Invalid size " +
+                    "of result set from controller. Expected " + futures.size() +
+                    "; got " + results.size());
+            }
+            if (throwable == null) {
+                for (int i = 0; i < futures.size(); i++) {
+                    futures.get(i).complete(results.get(i));
+                }
+            } else {
+                for (int i = 0; i < futures.size(); i++) {
+                    ApiException e = (throwable instanceof ApiException) ? (ApiException) throwable :
+                        ApiError.fromThrowable(throwable).exception();
+                    futures.get(i).complete(new AclCreateResult(e));
+                }
+            }
+        });
+        return futures;
+    }
+
+    default List<? extends CompletionStage<AclDeleteResult>> deleteAcls(
+            AuthorizableRequestContext requestContext,
+            List<AclBindingFilter> filters) {
+        List<CompletableFuture<AclDeleteResult>> futures = new ArrayList<>(filters.size());
+        AclMutator aclMutator = aclMutatorOrException();
+        filters.forEach(b -> futures.add(new CompletableFuture<>()));
+        aclMutator.deleteAcls(filters).whenComplete((results, throwable) -> {
+            if (throwable == null && results.size() != futures.size()) {
+                throwable = new UnknownServerException("Invalid size " +
+                    "of result set from controller. Expected " + futures.size() +
+                    "; got " + results.size());
+            }
+            if (throwable == null) {
+                for (int i = 0; i < futures.size(); i++) {
+                    futures.get(i).complete(results.get(i));
+                }
+            } else {
+                for (int i = 0; i < futures.size(); i++) {

Review comment:
       Ditto. Not using index.

##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##########
@@ -209,6 +212,33 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
         clientQuotaMetadataManager.update(clientQuotasDelta)
       }
 
+      // Apply changes to ACLs. This needs to be handled carefully because while we are
+      // applying these changes, the Authorizer is continuing to return authorization
+      // results in other threads. We never want to expose an invalid state. For example,
+      // if the user created a DENY ALL acl and then created an ALLOW ACL for topic foo,
+      // we want to apply those changes in that order, not the reverse order! Otherwise
+      // there could be a window during which incorrect authorization results are returned.
+      Option(delta.aclsDelta()).foreach( aclsDelta =>
+        _authorizer match {
+          case Some(authorizer: ClusterMetadataAuthorizer) => if (aclsDelta.isSnapshotDelta()) {
+            // If the delta resulted from a snapshot load, we want to apply the new changes
+            // all at once using ClusterMetadataAuthorizer#loadSnapshot. If this is the
+            // first snapshot load, it will also complete the futures returned by
+           // Authorizer#start (which we wait for before processing RPCs).
+            authorizer.loadSnapshot(newImage.acls().acls())
+          } else {
+            // Because the changes map is a LinkedHashMap, the deltas will be returned in
+            // the order they were performed.
+            aclsDelta.changes().entrySet().forEach(e =>

Review comment:
       nit: I think you can simplify a little:
   ```scala
               aclsDelta.changes.forEach { (aclId, aclOpt) =>
                 if (aclOpt.isPresent) {
                   authorizer.addAcl(aclId, aclOpt.get)
                 } else {
                   authorizer.removeAcl(aclId)
                 }
               }
   ```

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAcl.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+
+import java.util.Objects;
+
+
+/**
+ * A Kafka ACLs which is identified by a UUID and stored in the metadata log.
+ */
+final public class StandardAcl implements Comparable<StandardAcl> {
+    public static StandardAcl fromRecord(AccessControlEntryRecord record) {
+        return new StandardAcl(
+            ResourceType.fromCode(record.resourceType()),
+            record.resourceName(),
+            PatternType.fromCode(record.patternType()),
+            record.principal(),
+            record.host(),
+            AclOperation.fromCode(record.operation()),
+            AclPermissionType.fromCode(record.permissionType()));
+    }
+
+    public static StandardAcl fromAclBinding(AclBinding acl) {
+        return new StandardAcl(
+            acl.pattern().resourceType(),
+            acl.pattern().name(),
+            acl.pattern().patternType(),
+            acl.entry().principal(),
+            acl.entry().host(),
+            acl.entry().operation(),
+            acl.entry().permissionType());
+    }
+
+    private final ResourceType resourceType;
+    private final String resourceName;
+    private final PatternType patternType;
+    private final String principal;
+    private final String host;
+    private final AclOperation operation;
+    private final AclPermissionType permissionType;
+
+    public StandardAcl(
+                ResourceType resourceType,
+                String resourceName,
+                PatternType patternType,
+                String principal,
+                String host,
+                AclOperation operation,
+                AclPermissionType permissionType) {
+        this.resourceType = resourceType;
+        this.resourceName = resourceName;
+        this.patternType = patternType;
+        this.principal = principal;
+        this.host = host;
+        this.operation = operation;
+        this.permissionType = permissionType;
+    }
+
+    public ResourceType resourceType() {
+        return resourceType;
+    }
+
+    public String resourceName() {
+        return resourceName;
+    }
+
+    public PatternType patternType() {
+        return patternType;
+    }
+
+    public String principal() {
+        return principal;
+    }
+
+    public String host() {
+        return host;
+    }
+
+    public AclOperation operation() {
+        return operation;
+    }
+
+    public AclPermissionType permissionType() {
+        return permissionType;
+    }
+
+    public AclBinding toBinding() {
+        ResourcePattern resourcePattern =
+            new ResourcePattern(resourceType, resourceName, patternType);
+        AccessControlEntry accessControlEntry =
+            new AccessControlEntry(principal, host, operation, permissionType);
+        return new AclBinding(resourcePattern, accessControlEntry);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !o.getClass().equals(StandardAcl.class)) return false;
+        if (o == this) return true;
+        StandardAcl other = (StandardAcl) o;
+        return resourceType.equals(other.resourceType) &&
+            resourceName.equals(other.resourceName) &&
+            patternType.equals(other.patternType) &&
+            principal.equals(other.principal) &&
+            host.equals(other.host) &&
+            operation.equals(other.operation) &&
+            permissionType.equals(other.permissionType);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+            resourceType,
+            resourceName,
+            patternType,
+            principal,
+            host,
+            operation,
+            permissionType);
+    }
+
+    @Override
+    public int compareTo(StandardAcl other) {
+        int result;
+        result = resourceType.compareTo(other.resourceType);
+        if (result != 0) return result;
+        result = patternType.compareTo(other.patternType);
+        if (result != 0) return result;
+        result = resourceName.compareTo(other.resourceName);
+        if (result != 0) return result;
+        result = principal.compareTo(other.principal);
+        if (result != 0) return result;
+        result = host.compareTo(other.host);
+        if (result != 0) return result;
+        result = operation.compareTo(other.operation);
+        if (result != 0) return result;
+        result = permissionType.compareTo(other.permissionType);
+        return 0;

Review comment:
       `return result;`?

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * The standard authorizer which is used in KRaft-based clusters if no other authorizer is
+ * configured.
+ */
+public class StandardAuthorizer implements ClusterMetadataAuthorizer {
+    public final static String SUPER_USERS_CONFIG = "super.users";
+
+    public final static String ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG = "allow.everyone.if.no.acl.found";
+
+    /**
+     * A future which is completed once we have loaded a snapshot.
+     * TODO: complete this when we reach the high water mark.
+     */
+    private final CompletableFuture<Void> initialLoadFuture = CompletableFuture.completedFuture(null);
+
+    /**
+     * The current data. Can be read without a lock. Must be written while holding the object lock.
+     */
+    private volatile StandardAuthorizerData data = StandardAuthorizerData.createEmpty();
+
+    @Override
+    public synchronized void setAclMutator(AclMutator aclMutator) {
+        this.data = data.copyWithNewAclMutator(aclMutator);
+    }
+
+    @Override
+    public AclMutator aclMutatorOrException() {
+        AclMutator aclMutator = data.aclMutator;
+        if (aclMutator == null) {
+            throw new NotControllerException("The current node is not the active controller.");
+        }
+        return aclMutator;
+    }
+
+    @Override
+    public void addAcl(Uuid id, StandardAcl acl) {
+        data.addAcl(id, acl);
+    }
+
+    @Override
+    public void removeAcl(Uuid id) {
+        data.removeAcl(id);
+    }
+
+    @Override
+    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
+        data = data.copyWithNewAcls(acls.entrySet());
+    }
+
+    @Override
+    public Map<Endpoint, ? extends CompletionStage<Void>> start(
+            AuthorizerServerInfo serverInfo) {
+        Map<Endpoint, CompletableFuture<Void>> result = new HashMap<>();
+        for (Endpoint endpoint : serverInfo.endpoints()) {
+            result.put(endpoint, initialLoadFuture);
+        }
+        return result;
+    }
+
+    @Override
+    public List<AuthorizationResult> authorize(
+            AuthorizableRequestContext requestContext,
+            List<Action> actions) {
+        StandardAuthorizerData curData = data;
+        List<AuthorizationResult> results = new ArrayList<>(actions.size());
+        for (Action action: actions) {
+            AuthorizationResult result = curData.authorize(requestContext, action);
+            System.out.println("authorize(requestContext=" + requestContext + ", action=" + action + ", result=" + result + ")");

Review comment:
       nit: Change to debug log?
   
   By the way, `AclAuthorizer` uses a special logger for auditing authorization requests "kafka.authorizer.logger." We can save it for a follow-up, but it seems like we ought to duplicate the auditing logic here.

##########
File path: metadata/src/main/java/org/apache/kafka/image/AclsImage.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.apache.kafka.metadata.authorizer.StandardAclWithId;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+
+/**
+ * Represents the ACLs in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class AclsImage {
+    public static final AclsImage EMPTY = new AclsImage(Collections.emptyMap());
+
+    private final Map<Uuid, StandardAcl> acls;
+
+    public AclsImage(Map<Uuid, StandardAcl> acls) {
+        this.acls = Collections.unmodifiableMap(acls);
+    }
+
+    public boolean isEmpty() {
+        return acls.isEmpty();
+    }
+
+    public Map<Uuid, StandardAcl> acls() {
+        return acls;
+    }
+
+    public void write(Consumer<List<ApiMessageAndVersion>> out) {
+        List<ApiMessageAndVersion> batch = new ArrayList<>();
+        for (Entry<Uuid, StandardAcl> entry : acls.entrySet()) {
+            StandardAclWithId aclWithId = new StandardAclWithId(entry.getKey(), entry.getValue());
+            batch.add(new ApiMessageAndVersion(aclWithId.toRecord(), (short) 0));
+        }
+        out.accept(batch);
+    }
+
+    @Override
+    public int hashCode() {
+        return acls.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof AclsImage)) return false;
+        AclsImage other = (AclsImage) o;
+        return acls.equals(other.acls);
+    }
+
+    @Override
+    public String toString() {
+        return acls.values().stream().

Review comment:
       nit: helpful to surround with `AclsImage()`?

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * The standard authorizer which is used in KRaft-based clusters if no other authorizer is
+ * configured.
+ */
+public class StandardAuthorizer implements ClusterMetadataAuthorizer {
+    public final static String SUPER_USERS_CONFIG = "super.users";
+
+    public final static String ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG = "allow.everyone.if.no.acl.found";
+
+    /**
+     * A future which is completed once we have loaded a snapshot.
+     * TODO: complete this when we reach the high water mark.

Review comment:
       Let's create a follow-up JIRA.

##########
File path: metadata/src/main/java/org/apache/kafka/image/AclsDelta.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.apache.kafka.metadata.authorizer.StandardAclWithId;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+
+/**
+ * Represents changes to the ACLs in the metadata image.
+ */
+public final class AclsDelta {
+    private final AclsImage image;
+    private final Map<Uuid, Optional<StandardAcl>> changes = new LinkedHashMap<>();
+    private boolean isSnapshotDelta = false;
+
+    public AclsDelta(AclsImage image) {
+        this.image = image;
+    }
+
+    public Map<Uuid, Optional<StandardAcl>> changes() {
+        return changes;
+    }
+
+    void finishSnapshot() {
+        this.isSnapshotDelta = true;
+    }
+
+    public boolean isSnapshotDelta() {
+        return isSnapshotDelta;
+    }
+
+    public void replay(AccessControlEntryRecord record) {
+        StandardAclWithId aclWithId = StandardAclWithId.fromRecord(record);
+        changes.put(aclWithId.id(), Optional.of(aclWithId.acl()));
+    }
+
+    public void replay(RemoveAccessControlEntryRecord record) {
+        changes.put(record.id(), Optional.empty());
+    }
+
+    public AclsImage apply() {
+        Map<Uuid, StandardAcl> newAcls = new HashMap<>();
+        if (!isSnapshotDelta) {
+            for (Entry<Uuid, StandardAcl> entry : image.acls().entrySet()) {
+                Optional<StandardAcl> change = changes.get(entry.getKey());
+                if (change == null) {
+                    newAcls.put(entry.getKey(), entry.getValue());
+                } else if (change.isPresent()) {
+                    newAcls.put(entry.getKey(), change.get());
+                }
+            }
+        }
+        for (Entry<Uuid, Optional<StandardAcl>> entry : changes.entrySet()) {
+            if (!newAcls.containsKey(entry.getKey())) {
+                if (entry.getValue().isPresent()) {
+                    newAcls.put(entry.getKey(), entry.getValue().get());
+                }
+            }
+        }
+        return new AclsImage(newAcls);
+    }
+
+    @Override
+    public String toString() {
+        return "AclsDelta(isSnapshotDelta=" + isSnapshotDelta +

Review comment:
       nit: I think we're missing the end parenthesis.

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * The standard authorizer which is used in KRaft-based clusters if no other authorizer is
+ * configured.
+ */
+public class StandardAuthorizer implements ClusterMetadataAuthorizer {
+    public final static String SUPER_USERS_CONFIG = "super.users";
+
+    public final static String ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG = "allow.everyone.if.no.acl.found";
+
+    /**
+     * A future which is completed once we have loaded a snapshot.
+     * TODO: complete this when we reach the high water mark.
+     */
+    private final CompletableFuture<Void> initialLoadFuture = CompletableFuture.completedFuture(null);
+
+    /**
+     * The current data. Can be read without a lock. Must be written while holding the object lock.
+     */
+    private volatile StandardAuthorizerData data = StandardAuthorizerData.createEmpty();
+
+    @Override
+    public synchronized void setAclMutator(AclMutator aclMutator) {
+        this.data = data.copyWithNewAclMutator(aclMutator);
+    }
+
+    @Override
+    public AclMutator aclMutatorOrException() {
+        AclMutator aclMutator = data.aclMutator;
+        if (aclMutator == null) {
+            throw new NotControllerException("The current node is not the active controller.");
+        }
+        return aclMutator;
+    }
+
+    @Override
+    public void addAcl(Uuid id, StandardAcl acl) {
+        data.addAcl(id, acl);
+    }
+
+    @Override
+    public void removeAcl(Uuid id) {
+        data.removeAcl(id);
+    }
+
+    @Override
+    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
+        data = data.copyWithNewAcls(acls.entrySet());
+    }
+
+    @Override
+    public Map<Endpoint, ? extends CompletionStage<Void>> start(
+            AuthorizerServerInfo serverInfo) {
+        Map<Endpoint, CompletableFuture<Void>> result = new HashMap<>();
+        for (Endpoint endpoint : serverInfo.endpoints()) {
+            result.put(endpoint, initialLoadFuture);
+        }
+        return result;
+    }
+
+    @Override
+    public List<AuthorizationResult> authorize(
+            AuthorizableRequestContext requestContext,
+            List<Action> actions) {
+        StandardAuthorizerData curData = data;
+        List<AuthorizationResult> results = new ArrayList<>(actions.size());
+        for (Action action: actions) {
+            AuthorizationResult result = curData.authorize(requestContext, action);
+            System.out.println("authorize(requestContext=" + requestContext + ", action=" + action + ", result=" + result + ")");
+            results.add(result);
+        }
+        return results;
+    }
+
+    @Override
+    public Iterable<AclBinding> acls(AclBindingFilter filter) {
+        return data.acls(filter);
+    }
+
+    @Override
+    public int aclCount() {
+        return data.aclCount();
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Complete the initialLoadFuture, if it hasn't been completed already.
+        initialLoadFuture.completeExceptionally(new TimeoutException());

Review comment:
       nit: maybe useful to add a message to the exception

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * A class which encapsulates the configuration and the ACL data owned by StandardAuthorizer.
+ *
+ * The methods in this class support lockless concurrent access.
+ */
+public class StandardAuthorizerData {
+    /**
+     * The host or name string used in ACLs that match any host or name.
+     */
+    public static final String WILDCARD = "*";
+
+    /**
+     * The principal entry used in ACLs that match any principal.
+     */
+    public static final String WILDCARD_PRINCIPAL = "User:*";
+
+    /**
+     * The logger to use.
+     */
+    final Logger log;
+
+    /**
+     * The current AclMutator.
+     */
+    final AclMutator aclMutator;
+
+    /**
+     * A statically configured set of users that are authorized to do anything.
+     */
+    private final Set<String> superUsers;
+
+    /**
+     * The result to return if no ACLs match.
+     */
+    private final AuthorizationResult defaultResult;
+
+    /**
+     * Contains all of the current ACLs sorted by (resource type, pattern type, resource name).
+     */
+    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;

Review comment:
       For a follow-up, but we will probably need to add something like `AclAuthorizerBenchmark`.

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * A class which encapsulates the configuration and the ACL data owned by StandardAuthorizer.
+ *
+ * The methods in this class support lockless concurrent access.
+ */
+public class StandardAuthorizerData {
+    /**
+     * The host or name string used in ACLs that match any host or name.
+     */
+    public static final String WILDCARD = "*";
+
+    /**
+     * The principal entry used in ACLs that match any principal.
+     */
+    public static final String WILDCARD_PRINCIPAL = "User:*";
+
+    /**
+     * The logger to use.
+     */
+    final Logger log;
+
+    /**
+     * The current AclMutator.
+     */
+    final AclMutator aclMutator;
+
+    /**
+     * A statically configured set of users that are authorized to do anything.
+     */
+    private final Set<String> superUsers;
+
+    /**
+     * The result to return if no ACLs match.
+     */
+    private final AuthorizationResult defaultResult;
+
+    /**
+     * Contains all of the current ACLs sorted by (resource type, pattern type, resource name).
+     */
+    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;
+
+    /**
+     * Contains all of the current ACLs indexed by UUID.
+     */
+    private final ConcurrentHashMap<Uuid, StandardAcl> aclsById;
+
+    private static Logger createLogger(int nodeId) {
+        return new LogContext("[StandardAuthorizer " + nodeId + "] ").logger(StandardAuthorizerData.class);
+    }
+
+    static StandardAuthorizerData createEmpty() {
+        return new StandardAuthorizerData(createLogger(-1),
+            null,
+            Collections.emptySet(),
+            DENIED,
+            new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
+    }
+
+    private StandardAuthorizerData(Logger log,
+                                   AclMutator aclMutator,
+                                   Set<String> superUsers,
+                                   AuthorizationResult defaultResult,
+                                   ConcurrentSkipListSet<StandardAcl> aclsByResource,
+                                   ConcurrentHashMap<Uuid, StandardAcl> aclsById) {
+        this.log = log;
+        this.aclMutator = aclMutator;
+        this.superUsers = superUsers;
+        this.defaultResult = defaultResult;
+        this.aclsByResource = aclsByResource;
+        this.aclsById = aclsById;
+    }
+
+    StandardAuthorizerData copyWithNewAclMutator(AclMutator newAclMutator) {
+        return new StandardAuthorizerData(log,
+            newAclMutator,
+            superUsers,
+            defaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewConfig(int nodeId,
+                                             Set<String> newSuperUsers,
+                                             AuthorizationResult newDefaultResult) {
+        return new StandardAuthorizerData(
+            createLogger(nodeId),
+            aclMutator,
+            newSuperUsers,
+            newDefaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) {
+        StandardAuthorizerData newData = new StandardAuthorizerData(
+            log,
+            aclMutator,
+            superUsers,
+            defaultResult,
+            new ConcurrentSkipListSet<>(),
+            new ConcurrentHashMap<>());
+        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
+            newData.addAcl(entry.getKey(), entry.getValue());
+        }
+        log.info("Applied " + aclEntries.size() + "acl(s) from image.");
+        return newData;
+    }
+
+    void addAcl(Uuid id, StandardAcl acl) {
+        try {
+            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
+            if (prevAcl != null) {
+                throw new RuntimeException("An ACL with ID " + id + " already exists.");
+            }
+            if (!aclsByResource.add(acl)) {
+                aclsById.remove(id);
+                throw new RuntimeException("Unable to add the ACL with ID " + id +
+                    " to aclsByResource");
+            }
+        } catch (Throwable e) {
+            log.error("addAcl error", e);
+            throw e;
+        }
+    }
+
+    void removeAcl(Uuid id) {
+        try {
+            StandardAcl acl = aclsById.remove(id);
+            if (acl == null) {
+                throw new RuntimeException("ID " + id + " not found in aclsById.");
+            }
+            if (!aclsByResource.remove(acl)) {
+                throw new RuntimeException("Unable to remove the ACL with ID " + id +
+                    " from aclsByResource");
+            }
+        } catch (Throwable e) {
+            log.error("removeAcl error", e);
+            throw e;
+        }
+    }
+
+    Set<String> superUsers() {
+        return superUsers;
+    }
+
+    AuthorizationResult defaultResult() {
+        return defaultResult;
+    }
+
+    int aclCount() {
+        return aclsById.size();
+    }
+
+    static class AuthorizationResultBuilder {
+        boolean foundDeny = false;
+        boolean foundAllow = false;
+    }
+
+    /**
+     * Authorize an action based on the current set of ACLs.
+     *
+     * In order to know whether to allow or deny the action, we need to examine the ACLs
+     * that apply to it. If any DENY ACLs match, the operation is denied, no matter how
+     * many ALLOW ACLs match. If neither ALLOW nor DENY ACLs match, we return the default
+     * result. In general it makes more sense to configure the default result to be
+     * DENY, but some people (and unit tests) configure it as ALLOW.
+     */
+    AuthorizationResult authorize(AuthorizableRequestContext requestContext,
+                                  Action action) {
+        // Superusers are authorized to do anything.
+        if (superUsers.contains(requestContext.principal().toString())) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", action=" + action +
+                    "): ALLOWED because " + requestContext.principal().toString() +
+                    " is a superuser");
+            }
+            return ALLOWED;
+        }
+
+        AuthorizationResultBuilder builder = new AuthorizationResultBuilder();
+
+        // This code relies on the ordering of StandardAcl within the NavigableMap.
+        // Because entries are arranged by (resource type, pattern type, name),
+        // we can find all the applicable ACLs by starting at (type, PREFIX, name) and
+        // stepping backwards.
+        StandardAcl exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            action.resourcePattern().name(),
+            PREFIXED,
+            "",
+            "",
+            AclOperation.ANY,
+            AclPermissionType.ANY);
+        checkSection(action, exemplar, requestContext, builder);
+        if (builder.foundDeny) return DENIED;
+
+        // In addition to ACLs for this specific resource name, there can also be wildcard
+        // ACLs that match any resource name. These are stored as type = LITERAL,
+        // name = "*". We search these next.
+        exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            WILDCARD,
+            LITERAL,
+            "",
+            "",
+            AclOperation.ANY,
+            AclPermissionType.ANY);
+        checkSection(action, exemplar, requestContext, builder);
+        if (builder.foundDeny) return DENIED;
+
+        // If we found ALLOW ACLs, the action is allowed.
+        if (builder.foundAllow) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", action=" +
+                    action + "): ALLOWED");
+            }
+            return ALLOWED;
+        }
+
+        // If nothing matched, we return the default result.
+        if (log.isTraceEnabled()) {
+            log.trace("authorize(requestContext=" + requestContext + ", action=" +
+                action + "): returning default result " + defaultResult);
+        }
+        return defaultResult;
+    }
+
+    void checkSection(Action action,
+                      StandardAcl exemplar,
+                      AuthorizableRequestContext requestContext,
+                      AuthorizationResultBuilder builder) {
+        NavigableSet<StandardAcl> headSet = aclsByResource.headSet(exemplar, true);
+        for (Iterator<StandardAcl> iterator = headSet.descendingIterator();
+             iterator.hasNext(); ) {
+            StandardAcl acl = iterator.next();
+            if (!aclIsInCorrectSection(action, acl)) break;
+            AuthorizationResult result = findResult(action, requestContext, acl);
+            if (ALLOWED.equals(result)) {
+                builder.foundAllow = true;
+            } else if (DENIED.equals(result)) {
+                if (log.isTraceEnabled()) {
+                    log.trace("authorize(requestContext=" + requestContext + ", action=" +
+                        action + "): DENIED because of " + acl);
+                }
+                builder.foundDeny = true;
+                return;
+            }
+        }
+    }
+
+    static boolean aclIsInCorrectSection(Action action, StandardAcl acl) {
+        if (!acl.resourceType().equals(action.resourcePattern().resourceType())) return false;

Review comment:
       nit: more simply
   ```java
   if ( acl.resourceType() != action.resourcePattern().resourceType()) return false;
   ```

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * The standard authorizer which is used in KRaft-based clusters if no other authorizer is
+ * configured.
+ */
+public class StandardAuthorizer implements ClusterMetadataAuthorizer {
+    public final static String SUPER_USERS_CONFIG = "super.users";
+
+    public final static String ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG = "allow.everyone.if.no.acl.found";
+
+    /**
+     * A future which is completed once we have loaded a snapshot.
+     * TODO: complete this when we reach the high water mark.
+     */
+    private final CompletableFuture<Void> initialLoadFuture = CompletableFuture.completedFuture(null);
+
+    /**
+     * The current data. Can be read without a lock. Must be written while holding the object lock.
+     */
+    private volatile StandardAuthorizerData data = StandardAuthorizerData.createEmpty();
+
+    @Override
+    public synchronized void setAclMutator(AclMutator aclMutator) {
+        this.data = data.copyWithNewAclMutator(aclMutator);
+    }
+
+    @Override
+    public AclMutator aclMutatorOrException() {
+        AclMutator aclMutator = data.aclMutator;
+        if (aclMutator == null) {
+            throw new NotControllerException("The current node is not the active controller.");
+        }
+        return aclMutator;
+    }
+
+    @Override
+    public void addAcl(Uuid id, StandardAcl acl) {
+        data.addAcl(id, acl);
+    }
+
+    @Override
+    public void removeAcl(Uuid id) {
+        data.removeAcl(id);
+    }
+
+    @Override
+    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
+        data = data.copyWithNewAcls(acls.entrySet());
+    }
+
+    @Override
+    public Map<Endpoint, ? extends CompletionStage<Void>> start(
+            AuthorizerServerInfo serverInfo) {
+        Map<Endpoint, CompletableFuture<Void>> result = new HashMap<>();
+        for (Endpoint endpoint : serverInfo.endpoints()) {
+            result.put(endpoint, initialLoadFuture);
+        }
+        return result;
+    }
+
+    @Override
+    public List<AuthorizationResult> authorize(
+            AuthorizableRequestContext requestContext,
+            List<Action> actions) {
+        StandardAuthorizerData curData = data;
+        List<AuthorizationResult> results = new ArrayList<>(actions.size());
+        for (Action action: actions) {
+            AuthorizationResult result = curData.authorize(requestContext, action);
+            System.out.println("authorize(requestContext=" + requestContext + ", action=" + action + ", result=" + result + ")");
+            results.add(result);
+        }
+        return results;
+    }
+
+    @Override
+    public Iterable<AclBinding> acls(AclBindingFilter filter) {
+        return data.acls(filter);
+    }
+
+    @Override
+    public int aclCount() {
+        return data.aclCount();
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Complete the initialLoadFuture, if it hasn't been completed already.
+        initialLoadFuture.completeExceptionally(new TimeoutException());
+
+        // Nothing else to do here.
+    }
+
+    @Override
+    public synchronized void configure(Map<String, ?> configs) {
+        Set<String> superUsers = getConfiguredSuperUsers(configs);
+        AuthorizationResult defaultResult = getDefaultResult(configs);
+        int nodeId;
+        try {
+            nodeId = Integer.parseInt(configs.get("node.id").toString());
+        } catch (Exception e) {
+            nodeId = -1;
+        }
+        this.data = data.copyWithNewConfig(nodeId, superUsers, defaultResult);
+        this.data.log.info("set super.users=" + String.join(",", superUsers) +
+            ", default result=" + defaultResult);
+    }
+
+    // VisibleForTesting
+    Set<String> superUsers()  {
+        return new HashSet<>(data.superUsers());
+    }
+
+    AuthorizationResult defaultResult() {
+        return data.defaultResult();
+    }
+
+    static Set<String> getConfiguredSuperUsers(Map<String, ?> configs) {
+        Object configValue = configs.get(SUPER_USERS_CONFIG);
+        if (configValue == null) return Collections.emptySet();
+        String stringValue = configValue.toString().trim();

Review comment:
       The logic in `AclAuthorizer` is a little different. We do the split first and then trim:
   ```scala
       superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect {
         case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
       }.getOrElse(Set.empty[KafkaPrincipal])
   ```

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * A class which encapsulates the configuration and the ACL data owned by StandardAuthorizer.
+ *
+ * The methods in this class support lockless concurrent access.
+ */
+public class StandardAuthorizerData {
+    /**
+     * The host or name string used in ACLs that match any host or name.
+     */
+    public static final String WILDCARD = "*";
+
+    /**
+     * The principal entry used in ACLs that match any principal.
+     */
+    public static final String WILDCARD_PRINCIPAL = "User:*";
+
+    /**
+     * The logger to use.
+     */
+    final Logger log;
+
+    /**
+     * The current AclMutator.
+     */
+    final AclMutator aclMutator;
+
+    /**
+     * A statically configured set of users that are authorized to do anything.
+     */
+    private final Set<String> superUsers;
+
+    /**
+     * The result to return if no ACLs match.
+     */
+    private final AuthorizationResult defaultResult;
+
+    /**
+     * Contains all of the current ACLs sorted by (resource type, pattern type, resource name).
+     */
+    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;
+
+    /**
+     * Contains all of the current ACLs indexed by UUID.
+     */
+    private final ConcurrentHashMap<Uuid, StandardAcl> aclsById;
+
+    private static Logger createLogger(int nodeId) {
+        return new LogContext("[StandardAuthorizer " + nodeId + "] ").logger(StandardAuthorizerData.class);
+    }
+
+    static StandardAuthorizerData createEmpty() {
+        return new StandardAuthorizerData(createLogger(-1),
+            null,
+            Collections.emptySet(),
+            DENIED,
+            new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
+    }
+
+    private StandardAuthorizerData(Logger log,
+                                   AclMutator aclMutator,
+                                   Set<String> superUsers,
+                                   AuthorizationResult defaultResult,
+                                   ConcurrentSkipListSet<StandardAcl> aclsByResource,
+                                   ConcurrentHashMap<Uuid, StandardAcl> aclsById) {
+        this.log = log;
+        this.aclMutator = aclMutator;
+        this.superUsers = superUsers;
+        this.defaultResult = defaultResult;
+        this.aclsByResource = aclsByResource;
+        this.aclsById = aclsById;
+    }
+
+    StandardAuthorizerData copyWithNewAclMutator(AclMutator newAclMutator) {
+        return new StandardAuthorizerData(log,
+            newAclMutator,
+            superUsers,
+            defaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewConfig(int nodeId,
+                                             Set<String> newSuperUsers,
+                                             AuthorizationResult newDefaultResult) {
+        return new StandardAuthorizerData(
+            createLogger(nodeId),
+            aclMutator,
+            newSuperUsers,
+            newDefaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) {
+        StandardAuthorizerData newData = new StandardAuthorizerData(
+            log,
+            aclMutator,
+            superUsers,
+            defaultResult,
+            new ConcurrentSkipListSet<>(),
+            new ConcurrentHashMap<>());
+        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
+            newData.addAcl(entry.getKey(), entry.getValue());
+        }
+        log.info("Applied " + aclEntries.size() + "acl(s) from image.");
+        return newData;
+    }
+
+    void addAcl(Uuid id, StandardAcl acl) {
+        try {
+            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
+            if (prevAcl != null) {
+                throw new RuntimeException("An ACL with ID " + id + " already exists.");
+            }
+            if (!aclsByResource.add(acl)) {
+                aclsById.remove(id);
+                throw new RuntimeException("Unable to add the ACL with ID " + id +
+                    " to aclsByResource");
+            }
+        } catch (Throwable e) {
+            log.error("addAcl error", e);
+            throw e;
+        }
+    }
+
+    void removeAcl(Uuid id) {
+        try {
+            StandardAcl acl = aclsById.remove(id);
+            if (acl == null) {
+                throw new RuntimeException("ID " + id + " not found in aclsById.");
+            }
+            if (!aclsByResource.remove(acl)) {
+                throw new RuntimeException("Unable to remove the ACL with ID " + id +
+                    " from aclsByResource");
+            }
+        } catch (Throwable e) {
+            log.error("removeAcl error", e);
+            throw e;
+        }
+    }
+
+    Set<String> superUsers() {
+        return superUsers;
+    }
+
+    AuthorizationResult defaultResult() {
+        return defaultResult;
+    }
+
+    int aclCount() {
+        return aclsById.size();
+    }
+
+    static class AuthorizationResultBuilder {
+        boolean foundDeny = false;
+        boolean foundAllow = false;
+    }
+
+    /**
+     * Authorize an action based on the current set of ACLs.
+     *
+     * In order to know whether to allow or deny the action, we need to examine the ACLs
+     * that apply to it. If any DENY ACLs match, the operation is denied, no matter how
+     * many ALLOW ACLs match. If neither ALLOW nor DENY ACLs match, we return the default
+     * result. In general it makes more sense to configure the default result to be
+     * DENY, but some people (and unit tests) configure it as ALLOW.
+     */
+    AuthorizationResult authorize(AuthorizableRequestContext requestContext,
+                                  Action action) {
+        // Superusers are authorized to do anything.
+        if (superUsers.contains(requestContext.principal().toString())) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", action=" + action +
+                    "): ALLOWED because " + requestContext.principal().toString() +
+                    " is a superuser");
+            }
+            return ALLOWED;
+        }
+
+        AuthorizationResultBuilder builder = new AuthorizationResultBuilder();
+
+        // This code relies on the ordering of StandardAcl within the NavigableMap.
+        // Because entries are arranged by (resource type, pattern type, name),
+        // we can find all the applicable ACLs by starting at (type, PREFIX, name) and
+        // stepping backwards.
+        StandardAcl exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            action.resourcePattern().name(),
+            PREFIXED,
+            "",
+            "",
+            AclOperation.ANY,
+            AclPermissionType.ANY);
+        checkSection(action, exemplar, requestContext, builder);
+        if (builder.foundDeny) return DENIED;
+
+        // In addition to ACLs for this specific resource name, there can also be wildcard
+        // ACLs that match any resource name. These are stored as type = LITERAL,
+        // name = "*". We search these next.
+        exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            WILDCARD,
+            LITERAL,
+            "",
+            "",
+            AclOperation.ANY,
+            AclPermissionType.ANY);
+        checkSection(action, exemplar, requestContext, builder);
+        if (builder.foundDeny) return DENIED;
+
+        // If we found ALLOW ACLs, the action is allowed.
+        if (builder.foundAllow) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", action=" +
+                    action + "): ALLOWED");
+            }
+            return ALLOWED;
+        }
+
+        // If nothing matched, we return the default result.
+        if (log.isTraceEnabled()) {
+            log.trace("authorize(requestContext=" + requestContext + ", action=" +
+                action + "): returning default result " + defaultResult);
+        }
+        return defaultResult;
+    }
+
+    void checkSection(Action action,
+                      StandardAcl exemplar,
+                      AuthorizableRequestContext requestContext,
+                      AuthorizationResultBuilder builder) {
+        NavigableSet<StandardAcl> headSet = aclsByResource.headSet(exemplar, true);
+        for (Iterator<StandardAcl> iterator = headSet.descendingIterator();
+             iterator.hasNext(); ) {
+            StandardAcl acl = iterator.next();
+            if (!aclIsInCorrectSection(action, acl)) break;
+            AuthorizationResult result = findResult(action, requestContext, acl);
+            if (ALLOWED.equals(result)) {

Review comment:
       nit: `ALLOWED == result`

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * A class which encapsulates the configuration and the ACL data owned by StandardAuthorizer.
+ *
+ * The methods in this class support lockless concurrent access.
+ */
+public class StandardAuthorizerData {
+    /**
+     * The host or name string used in ACLs that match any host or name.
+     */
+    public static final String WILDCARD = "*";
+
+    /**
+     * The principal entry used in ACLs that match any principal.
+     */
+    public static final String WILDCARD_PRINCIPAL = "User:*";
+
+    /**
+     * The logger to use.
+     */
+    final Logger log;
+
+    /**
+     * The current AclMutator.
+     */
+    final AclMutator aclMutator;
+
+    /**
+     * A statically configured set of users that are authorized to do anything.
+     */
+    private final Set<String> superUsers;
+
+    /**
+     * The result to return if no ACLs match.
+     */
+    private final AuthorizationResult defaultResult;
+
+    /**
+     * Contains all of the current ACLs sorted by (resource type, pattern type, resource name).
+     */
+    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;
+
+    /**
+     * Contains all of the current ACLs indexed by UUID.
+     */
+    private final ConcurrentHashMap<Uuid, StandardAcl> aclsById;
+
+    private static Logger createLogger(int nodeId) {
+        return new LogContext("[StandardAuthorizer " + nodeId + "] ").logger(StandardAuthorizerData.class);
+    }
+
+    static StandardAuthorizerData createEmpty() {
+        return new StandardAuthorizerData(createLogger(-1),
+            null,
+            Collections.emptySet(),
+            DENIED,
+            new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
+    }
+
+    private StandardAuthorizerData(Logger log,
+                                   AclMutator aclMutator,
+                                   Set<String> superUsers,
+                                   AuthorizationResult defaultResult,
+                                   ConcurrentSkipListSet<StandardAcl> aclsByResource,
+                                   ConcurrentHashMap<Uuid, StandardAcl> aclsById) {
+        this.log = log;
+        this.aclMutator = aclMutator;
+        this.superUsers = superUsers;
+        this.defaultResult = defaultResult;
+        this.aclsByResource = aclsByResource;
+        this.aclsById = aclsById;
+    }
+
+    StandardAuthorizerData copyWithNewAclMutator(AclMutator newAclMutator) {
+        return new StandardAuthorizerData(log,
+            newAclMutator,
+            superUsers,
+            defaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewConfig(int nodeId,
+                                             Set<String> newSuperUsers,
+                                             AuthorizationResult newDefaultResult) {
+        return new StandardAuthorizerData(
+            createLogger(nodeId),
+            aclMutator,
+            newSuperUsers,
+            newDefaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) {
+        StandardAuthorizerData newData = new StandardAuthorizerData(
+            log,
+            aclMutator,
+            superUsers,
+            defaultResult,
+            new ConcurrentSkipListSet<>(),
+            new ConcurrentHashMap<>());
+        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
+            newData.addAcl(entry.getKey(), entry.getValue());
+        }
+        log.info("Applied " + aclEntries.size() + "acl(s) from image.");
+        return newData;
+    }
+
+    void addAcl(Uuid id, StandardAcl acl) {
+        try {
+            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
+            if (prevAcl != null) {
+                throw new RuntimeException("An ACL with ID " + id + " already exists.");
+            }
+            if (!aclsByResource.add(acl)) {
+                aclsById.remove(id);
+                throw new RuntimeException("Unable to add the ACL with ID " + id +
+                    " to aclsByResource");
+            }
+        } catch (Throwable e) {
+            log.error("addAcl error", e);
+            throw e;
+        }
+    }
+
+    void removeAcl(Uuid id) {
+        try {
+            StandardAcl acl = aclsById.remove(id);
+            if (acl == null) {
+                throw new RuntimeException("ID " + id + " not found in aclsById.");
+            }
+            if (!aclsByResource.remove(acl)) {
+                throw new RuntimeException("Unable to remove the ACL with ID " + id +
+                    " from aclsByResource");
+            }
+        } catch (Throwable e) {
+            log.error("removeAcl error", e);
+            throw e;
+        }
+    }
+
+    Set<String> superUsers() {
+        return superUsers;
+    }
+
+    AuthorizationResult defaultResult() {
+        return defaultResult;
+    }
+
+    int aclCount() {
+        return aclsById.size();
+    }
+
+    static class AuthorizationResultBuilder {
+        boolean foundDeny = false;
+        boolean foundAllow = false;
+    }
+
+    /**
+     * Authorize an action based on the current set of ACLs.
+     *
+     * In order to know whether to allow or deny the action, we need to examine the ACLs
+     * that apply to it. If any DENY ACLs match, the operation is denied, no matter how
+     * many ALLOW ACLs match. If neither ALLOW nor DENY ACLs match, we return the default
+     * result. In general it makes more sense to configure the default result to be
+     * DENY, but some people (and unit tests) configure it as ALLOW.
+     */
+    AuthorizationResult authorize(AuthorizableRequestContext requestContext,
+                                  Action action) {
+        // Superusers are authorized to do anything.
+        if (superUsers.contains(requestContext.principal().toString())) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", action=" + action +
+                    "): ALLOWED because " + requestContext.principal().toString() +
+                    " is a superuser");
+            }
+            return ALLOWED;
+        }
+
+        AuthorizationResultBuilder builder = new AuthorizationResultBuilder();
+
+        // This code relies on the ordering of StandardAcl within the NavigableMap.
+        // Because entries are arranged by (resource type, pattern type, name),
+        // we can find all the applicable ACLs by starting at (type, PREFIX, name) and
+        // stepping backwards.
+        StandardAcl exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            action.resourcePattern().name(),
+            PREFIXED,
+            "",
+            "",
+            AclOperation.ANY,
+            AclPermissionType.ANY);
+        checkSection(action, exemplar, requestContext, builder);
+        if (builder.foundDeny) return DENIED;
+
+        // In addition to ACLs for this specific resource name, there can also be wildcard
+        // ACLs that match any resource name. These are stored as type = LITERAL,
+        // name = "*". We search these next.
+        exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            WILDCARD,
+            LITERAL,
+            "",
+            "",
+            AclOperation.ANY,
+            AclPermissionType.ANY);
+        checkSection(action, exemplar, requestContext, builder);
+        if (builder.foundDeny) return DENIED;
+
+        // If we found ALLOW ACLs, the action is allowed.
+        if (builder.foundAllow) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", action=" +
+                    action + "): ALLOWED");
+            }
+            return ALLOWED;
+        }
+
+        // If nothing matched, we return the default result.
+        if (log.isTraceEnabled()) {
+            log.trace("authorize(requestContext=" + requestContext + ", action=" +
+                action + "): returning default result " + defaultResult);
+        }
+        return defaultResult;
+    }
+
+    void checkSection(Action action,
+                      StandardAcl exemplar,
+                      AuthorizableRequestContext requestContext,
+                      AuthorizationResultBuilder builder) {
+        NavigableSet<StandardAcl> headSet = aclsByResource.headSet(exemplar, true);
+        for (Iterator<StandardAcl> iterator = headSet.descendingIterator();
+             iterator.hasNext(); ) {
+            StandardAcl acl = iterator.next();
+            if (!aclIsInCorrectSection(action, acl)) break;
+            AuthorizationResult result = findResult(action, requestContext, acl);
+            if (ALLOWED.equals(result)) {
+                builder.foundAllow = true;
+            } else if (DENIED.equals(result)) {
+                if (log.isTraceEnabled()) {
+                    log.trace("authorize(requestContext=" + requestContext + ", action=" +
+                        action + "): DENIED because of " + acl);
+                }
+                builder.foundDeny = true;
+                return;
+            }
+        }
+    }
+
+    static boolean aclIsInCorrectSection(Action action, StandardAcl acl) {
+        if (!acl.resourceType().equals(action.resourcePattern().resourceType())) return false;
+        String name = action.resourcePattern().name();
+        if (acl.patternType() == PREFIXED) {
+            if (!name.startsWith(acl.resourceName())) return false;

Review comment:
       Can just return `name.startsWith(acl.resourceName())`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org