You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/04/03 15:25:14 UTC

[kafka] branch trunk updated: KAFKA-12590: Remove deprecated kafka.security.auth.Authorizer, SimpleAclAuthorizer and related classes in 3.0 (#10450)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 976e78e  KAFKA-12590: Remove deprecated kafka.security.auth.Authorizer, SimpleAclAuthorizer and related classes in 3.0 (#10450)
976e78e is described below

commit 976e78e405d57943b989ac487b7f49119b0f4af4
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sat Apr 3 08:23:26 2021 -0700

    KAFKA-12590: Remove deprecated kafka.security.auth.Authorizer, SimpleAclAuthorizer and related classes in 3.0 (#10450)
    
    These were deprecated in Apache Kafka 2.4 (released in December 2019) to be replaced
    by `org.apache.kafka.server.authorizer.Authorizer` and `AclAuthorizer`.
    
    As part of KIP-500, we will implement a new `Authorizer` implementation that relies
    on a topic (potentially a KRaft topic) instead of `ZooKeeper`, so we should take the chance
    to remove related tech debt in 3.0.
    
    Details on the issues affecting the old Authorizer interface can be found in the KIP:
    https://cwiki.apache.org/confluence/display/KAFKA/KIP-504+-+Add+new+Java+Authorizer+Interface
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Ron Dagostino <rd...@confluent.io>
---
 core/src/main/scala/kafka/security/auth/Acl.scala  |  86 ---
 .../scala/kafka/security/auth/Authorizer.scala     | 149 -----
 .../main/scala/kafka/security/auth/Operation.scala | 113 ----
 .../scala/kafka/security/auth/PermissionType.scala |  50 --
 .../main/scala/kafka/security/auth/Resource.scala  |  85 ---
 .../scala/kafka/security/auth/ResourceType.scala   |  93 ---
 .../kafka/security/auth/SimpleAclAuthorizer.scala  | 175 -----
 .../kafka/security/authorizer/AclAuthorizer.scala  |  12 +-
 .../security/authorizer/AuthorizerUtils.scala      |  19 +-
 .../security/authorizer/AuthorizerWrapper.scala    | 223 -------
 core/src/main/scala/kafka/server/KafkaConfig.scala |   3 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   5 +-
 .../kafka/api/GroupAuthorizerIntegrationTest.scala |   4 +-
 .../SaslGssapiSslEndToEndAuthorizationTest.scala   |   6 +-
 .../kafka/api/SaslSslAdminIntegrationTest.scala    |  91 ++-
 .../kafka/api/SslAdminIntegrationTest.scala        |  63 +-
 .../scala/kafka/security/auth/ResourceTest.scala   |  68 --
 .../unit/kafka/security/auth/OperationTest.scala   |  40 --
 .../kafka/security/auth/PermissionTypeTest.scala   |  49 --
 .../kafka/security/auth/ResourceTypeTest.scala     |  48 --
 .../security/auth/SimpleAclAuthorizerTest.scala    | 731 ---------------------
 .../authorizer/AuthorizerWrapperTest.scala         | 106 ---
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  10 -
 docs/upgrade.html                                  |   2 +
 tests/kafkatest/services/kafka/kafka.py            |   2 -
 .../tests/core/security_rolling_upgrade_test.py    |   8 +-
 26 files changed, 68 insertions(+), 2173 deletions(-)

diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala
deleted file mode 100644
index befd9d2..0000000
--- a/core/src/main/scala/kafka/security/auth/Acl.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.auth
-
-import kafka.security.authorizer.AclEntry
-import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-
-@deprecated("Use org.apache.kafka.common.acl.AclBinding", "Since 2.5")
-object Acl {
-  val WildCardPrincipal: KafkaPrincipal = AclEntry.WildcardPrincipal
-  val WildCardHost: String = AclEntry.WildcardHost
-  val WildCardResource: String = ResourcePattern.WILDCARD_RESOURCE
-  val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All)
-  val PrincipalKey = AclEntry.PrincipalKey
-  val PermissionTypeKey = AclEntry.PermissionTypeKey
-  val OperationKey = AclEntry.OperationKey
-  val HostsKey = AclEntry.HostsKey
-  val VersionKey = AclEntry.VersionKey
-  val CurrentVersion = AclEntry.CurrentVersion
-  val AclsKey = AclEntry.AclsKey
-
-  /**
-   *
-   * @see AclEntry
-   */
-  def fromBytes(bytes: Array[Byte]): Set[Acl] = {
-    AclEntry.fromBytes(bytes)
-      .map(ace => Acl(ace.kafkaPrincipal,
-        PermissionType.fromJava(ace.permissionType()),
-        ace.host(),
-        Operation.fromJava(ace.operation())))
-  }
-
-  def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = {
-    AclEntry.toJsonCompatibleMap(acls.map(acl =>
-      AclEntry(acl.principal, acl.permissionType.toJava, acl.host, acl.operation.toJava)
-    ))
-  }
-}
-
-/**
- * An instance of this class will represent an acl that can express following statement.
- * <pre>
- * Principal P has permissionType PT on Operation O1 from hosts H1.
- * </pre>
- * @param principal A value of *:* indicates all users.
- * @param permissionType
- * @param host A value of * indicates all hosts.
- * @param operation A value of ALL indicates all operations.
- */
-@deprecated("Use org.apache.kafka.common.acl.AclBinding", "Since 2.5")
-case class Acl(principal: KafkaPrincipal, permissionType: PermissionType, host: String, operation: Operation) {
-
-  /**
-   * TODO: Ideally we would have a symmetric toJson method but our current json library can not jsonify/dejsonify complex objects.
-   * @return Map representation of the Acl.
-   */
-  def toMap(): Map[String, Any] = {
-    Map(Acl.PrincipalKey -> principal.toString,
-      Acl.PermissionTypeKey -> permissionType.name,
-      Acl.OperationKey -> operation.name,
-      Acl.HostsKey -> host)
-  }
-
-  override def toString: String = {
-    "%s has %s permission for operations: %s from hosts: %s".format(principal, permissionType.name, operation, host)
-  }
-
-}
-
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala
deleted file mode 100644
index 7509171..0000000
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.auth
-
-import kafka.network.RequestChannel.Session
-import org.apache.kafka.common.Configurable
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-
-/**
- * Top level interface that all pluggable authorizers must implement. Kafka will read the `authorizer.class.name` config
- * value at startup time, create an instance of the specified class using the default constructor, and call its
- * `configure` method.
- *
- * From that point onwards, every client request will first be routed to the `authorize` method and the request will only
- * be authorized if the method returns true.
- *
- * If `authorizer.class.name` has no value specified, then no authorization will be performed, and all operations are
- * permitted.
- */
-@deprecated("Use org.apache.kafka.server.authorizer.Authorizer", "Since 2.4")
-trait Authorizer extends Configurable {
-
-  /**
-   * @param session The session being authenticated.
-   * @param operation Type of operation client is trying to perform on resource.
-   * @param resource Resource the client is trying to access. Resource pattern type is always literal in input resource.
-   * @return true if the operation should be permitted, false otherwise
-   */
-  def authorize(session: Session, operation: Operation, resource: Resource): Boolean
-
-  /**
-   * add the acls to resource, this is an additive operation so existing acls will not be overwritten, instead these new
-   * acls will be added to existing acls.
-   *
-   * {code}
-   * // The following will add ACLs to the literal resource path 'foo', which will only affect the topic named 'foo':
-   * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL))
-   *
-   * // The following will add ACLs to the special literal topic resource path '*', which affects all topics:
-   * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL))
-   *
-   * // The following will add ACLs to the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
-   * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED))
-   * {code}
-   *
-   * @param acls set of acls to add to existing acls
-   * @param resource the resource path to which these acls should be attached.
-   *                the supplied resource will have a specific resource pattern type,
-   *                i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
-   */
-  def addAcls(acls: Set[Acl], resource: Resource): Unit
-
-  /**
-   * remove these acls from the resource.
-   *
-   * {code}
-   * // The following will remove ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
-   * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL))
-   *
-   * // The following will remove ACLs from the special literal topic resource path '*', which affects all topics:
-   * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL))
-   *
-   * // The following will remove ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
-   * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED))
-   * {code}
-   *
-   * @param acls set of acls to be removed.
-   * @param resource resource path from which the acls should be removed.
-   *                 the supplied resource will have a specific resource pattern type,
-   *                 i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
-   * @return true if some acl got removed, false if no acl was removed.
-   */
-  def removeAcls(acls: Set[Acl], resource: Resource): Boolean
-
-  /**
-   * remove a resource along with all of its acls from acl store.
-   *
-   * {code}
-   * // The following will remove all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
-   * authorizer.removeAcls(Resource(Topic, "foo", LITERAL))
-   *
-   * // The following will remove all ACLs from the special literal topic resource path '*', which affects all topics:
-   * authorizer.removeAcls(Resource(Topic, "*", LITERAL))
-   *
-   * // The following will remove all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
-   * authorizer.removeAcls(Resource(Topic, "foo", PREFIXED))
-   * {code}
-   *
-   * @param resource the resource path from which these acls should be removed.
-   *                 the supplied resource will have a specific resource pattern type,
-   *                 i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
-   * @return
-   */
-  def removeAcls(resource: Resource): Boolean
-
-  /**
-   * get set of acls for the supplied resource
-   *
-   * {code}
-   * // The following will get all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
-   * authorizer.removeAcls(Resource(Topic, "foo", LITERAL))
-   *
-   * // The following will get all ACLs from the special literal topic resource path '*', which affects all topics:
-   * authorizer.removeAcls(Resource(Topic, "*", LITERAL))
-   *
-   * // The following will get all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
-   * authorizer.removeAcls(Resource(Topic, "foo", PREFIXED))
-   * {code}
-   *
-   * @param resource the resource path to which the acls belong.
-   *                 the supplied resource will have a specific resource pattern type,
-   *                 i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
-   * @return empty set if no acls are found, otherwise the acls for the resource.
-   */
-  def getAcls(resource: Resource): Set[Acl]
-
-  /**
-   * get the acls for this principal.
-   * @param principal principal name.
-   * @return empty Map if no acls exist for this principal, otherwise a map of resource -> acls for the principal.
-   */
-  def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]]
-
-  /**
-   * gets the map of resource paths to acls for all resources.
-   */
-  def getAcls(): Map[Resource, Set[Acl]]
-
-  /**
-   * Closes this instance.
-   */
-  def close(): Unit
-
-}
diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala
deleted file mode 100644
index 3ae2384..0000000
--- a/core/src/main/scala/kafka/security/auth/Operation.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.{BaseEnum, KafkaException}
-import org.apache.kafka.common.acl.AclOperation
-
-/**
- * Different operations a client may perform on kafka resources.
- */
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-sealed trait Operation extends BaseEnum {
-  def toJava : AclOperation
-}
-
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object Read extends Operation {
-  val name = "Read"
-  val toJava = AclOperation.READ
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object Write extends Operation {
-  val name = "Write"
-  val toJava = AclOperation.WRITE
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object Create extends Operation {
-  val name = "Create"
-  val toJava = AclOperation.CREATE
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object Delete extends Operation {
-  val name = "Delete"
-  val toJava = AclOperation.DELETE
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object Alter extends Operation {
-  val name = "Alter"
-  val toJava = AclOperation.ALTER
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object Describe extends Operation {
-  val name = "Describe"
-  val toJava = AclOperation.DESCRIBE
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object ClusterAction extends Operation {
-  val name = "ClusterAction"
-  val toJava = AclOperation.CLUSTER_ACTION
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object DescribeConfigs extends Operation {
-  val name = "DescribeConfigs"
-  val toJava = AclOperation.DESCRIBE_CONFIGS
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object AlterConfigs extends Operation {
-  val name = "AlterConfigs"
-  val toJava = AclOperation.ALTER_CONFIGS
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object IdempotentWrite extends Operation {
-  val name = "IdempotentWrite"
-  val toJava = AclOperation.IDEMPOTENT_WRITE
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object All extends Operation {
-  val name = "All"
-  val toJava = AclOperation.ALL
-}
-
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-object Operation {
-
-  def fromString(operation: String): Operation = {
-    val op = values.find(op => op.name.equalsIgnoreCase(operation))
-    op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(",")))
-  }
-
-  def fromJava(operation: AclOperation): Operation = {
-    operation match {
-      case AclOperation.READ => Read
-      case AclOperation.WRITE => Write
-      case AclOperation.CREATE => Create
-      case AclOperation.DELETE => Delete
-      case AclOperation.ALTER => Alter
-      case AclOperation.DESCRIBE => Describe
-      case AclOperation.CLUSTER_ACTION => ClusterAction
-      case AclOperation.ALTER_CONFIGS => AlterConfigs
-      case AclOperation.DESCRIBE_CONFIGS => DescribeConfigs
-      case AclOperation.IDEMPOTENT_WRITE => IdempotentWrite
-      case AclOperation.ALL => All
-      case _ => throw new KafkaException(operation + " is not a convertible operation name. The valid names are " + values.mkString(","))
-    }
-  }
-
-  def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs,
-     DescribeConfigs, IdempotentWrite, All)
-}
diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala
deleted file mode 100644
index c5325b2..0000000
--- a/core/src/main/scala/kafka/security/auth/PermissionType.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.{BaseEnum, KafkaException}
-import org.apache.kafka.common.acl.AclPermissionType
-
-@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5")
-sealed trait PermissionType extends BaseEnum {
-  val toJava: AclPermissionType
-}
-
-@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5")
-case object Allow extends PermissionType {
-  val name = "Allow"
-  val toJava = AclPermissionType.ALLOW
-}
-
-@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5")
-case object Deny extends PermissionType {
-  val name = "Deny"
-  val toJava = AclPermissionType.DENY
-}
-
-@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5")
-object PermissionType {
-  def fromString(permissionType: String): PermissionType = {
-    val pType = values.find(pType => pType.name.equalsIgnoreCase(permissionType))
-    pType.getOrElse(throw new KafkaException(permissionType + " not a valid permissionType name. The valid names are " + values.mkString(",")))
-  }
-
-  def fromJava(permissionType: AclPermissionType): PermissionType = fromString(permissionType.toString)
-
-  def values: Seq[PermissionType] = List(Allow, Deny)
-}
-
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
deleted file mode 100644
index 8045c68..0000000
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.KafkaException
-import kafka.security.authorizer.AclEntry
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourcePattern", "Since 2.5")
-object Resource {
-  val Separator = AclEntry.ResourceSeparator
-  val ClusterResourceName = "kafka-cluster"
-  val ClusterResource = Resource(Cluster, Resource.ClusterResourceName, PatternType.LITERAL)
-  val WildCardResource = AclEntry.WildcardResource
-
-  @deprecated("This resource name is not used by Kafka and will be removed in a future release", since = "2.1")
-  val ProducerIdResourceName = "producer-id" // This is not used since we don't have a producer id resource
-
-  def fromString(str: String): Resource = {
-    ResourceType.values.find(resourceType => str.startsWith(resourceType.name + Separator)) match {
-      case None => throw new KafkaException("Invalid resource string: '" + str + "'")
-      case Some(resourceType) =>
-        val remaining = str.substring(resourceType.name.length + 1)
-
-        PatternType.values.find(patternType => remaining.startsWith(patternType.name + Separator)) match {
-          case Some(patternType) =>
-            val name = remaining.substring(patternType.name.length + 1)
-            Resource(resourceType, name, patternType)
-
-          case None =>
-            Resource(resourceType, remaining, PatternType.LITERAL)
-        }
-    }
-  }
-}
-
-/**
- *
- * @param resourceType non-null type of resource.
- * @param name non-null name of the resource, for topic this will be topic name , for group it will be group name. For cluster type
- *             it will be a constant string kafka-cluster.
- * @param patternType non-null resource pattern type: literal, prefixed, etc.
- */
-@deprecated("Use org.apache.kafka.common.resource.ResourcePattern", "Since 2.5")
-case class Resource(resourceType: ResourceType, name: String, patternType: PatternType) {
-
-  if (!patternType.isSpecific)
-    throw new IllegalArgumentException(s"patternType must not be $patternType")
-
-  /**
-    * Create an instance of this class with the provided parameters.
-    * Resource pattern type would default to PatternType.LITERAL.
-    *
-    * @param resourceType non-null resource type
-    * @param name         non-null resource name
-    * @deprecated Since 2.0, use [[kafka.security.auth.Resource(ResourceType, String, PatternType)]]
-    */
-  @deprecated("Use Resource(ResourceType, String, PatternType", "Since 2.0")
-  def this(resourceType: ResourceType, name: String) = {
-    this(resourceType, name, PatternType.LITERAL)
-  }
-
-  def toPattern: ResourcePattern = {
-    new ResourcePattern(resourceType.toJava, name, patternType)
-  }
-
-  override def toString: String = {
-    resourceType.name + Resource.Separator + patternType + Resource.Separator + name
-  }
-}
-
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
deleted file mode 100644
index 72b71c0..0000000
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.{BaseEnum, KafkaException}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.resource.{ResourceType => JResourceType}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-sealed trait ResourceType extends BaseEnum with Ordered[ ResourceType ] {
-  def error: Errors
-  def toJava: JResourceType
-  // this method output will not include "All" Operation type
-  def supportedOperations: Set[Operation]
-
-  override def compare(that: ResourceType): Int = this.name compare that.name
-}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-case object Topic extends ResourceType {
-  val name = "Topic"
-  val error = Errors.TOPIC_AUTHORIZATION_FAILED
-  val toJava = JResourceType.TOPIC
-  val supportedOperations = Set(Read, Write, Create, Describe, Delete, Alter, DescribeConfigs, AlterConfigs)
-}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-case object Group extends ResourceType {
-  val name = "Group"
-  val error = Errors.GROUP_AUTHORIZATION_FAILED
-  val toJava = JResourceType.GROUP
-  val supportedOperations = Set(Read, Describe, Delete)
-}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-case object Cluster extends ResourceType {
-  val name = "Cluster"
-  val error = Errors.CLUSTER_AUTHORIZATION_FAILED
-  val toJava = JResourceType.CLUSTER
-  val supportedOperations = Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe)
-}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-case object TransactionalId extends ResourceType {
-  val name = "TransactionalId"
-  val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
-  val toJava = JResourceType.TRANSACTIONAL_ID
-  val supportedOperations = Set(Describe, Write)
-}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-case object DelegationToken extends ResourceType {
-  val name = "DelegationToken"
-  val error = Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
-  val toJava = JResourceType.DELEGATION_TOKEN
-  val supportedOperations : Set[Operation] = Set(Describe)
-}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-object ResourceType {
-
-  def fromString(resourceType: String): ResourceType = {
-    val rType = values.find(rType => rType.name.equalsIgnoreCase(resourceType))
-    rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
-  }
-
-  def fromJava(resourceType: JResourceType): ResourceType = {
-    resourceType match {
-      case JResourceType.TOPIC => Topic
-      case JResourceType.GROUP => Group
-      case JResourceType.CLUSTER => Cluster
-      case JResourceType.TRANSACTIONAL_ID => TransactionalId
-      case JResourceType.DELEGATION_TOKEN => DelegationToken
-      case _ => throw new KafkaException(resourceType + " is not a convertible resource type. The valid types are " + values.mkString(","))
-    }
-  }
-
-  def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId, DelegationToken)
-}
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
deleted file mode 100644
index 653e154..0000000
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import java.util
-
-import kafka.network.RequestChannel.Session
-import kafka.security.auth.SimpleAclAuthorizer.BaseAuthorizer
-import kafka.security.authorizer.{AclAuthorizer, AuthorizerUtils, AuthorizerWrapper}
-import kafka.utils._
-import kafka.zk.ZkVersion
-import org.apache.kafka.common.acl.{AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
-import org.apache.kafka.common.errors.ApiException
-import org.apache.kafka.common.resource.{PatternType, ResourcePatternFilter}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
-
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.4")
-object SimpleAclAuthorizer {
-  //optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in
-  //same zookeeper where all other kafka broker info is stored.
-  val ZkUrlProp = AclAuthorizer.ZkUrlProp
-  val ZkConnectionTimeOutProp = AclAuthorizer.ZkConnectionTimeOutProp
-  val ZkSessionTimeOutProp = AclAuthorizer.ZkSessionTimeOutProp
-  val ZkMaxInFlightRequests = AclAuthorizer.ZkMaxInFlightRequests
-
-  //List of users that will be treated as super users and will have access to all the resources for all actions from all hosts, defaults to no super users.
-  val SuperUsersProp = AclAuthorizer.SuperUsersProp
-  //If set to true when no acls are found for a resource , authorizer allows access to everyone. Defaults to false.
-  val AllowEveryoneIfNoAclIsFoundProp = AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp
-
-  case class VersionedAcls(acls: Set[Acl], zkVersion: Int) {
-    def exists: Boolean = zkVersion != ZkVersion.UnknownVersion
-  }
-  val NoAcls = VersionedAcls(Set.empty, ZkVersion.UnknownVersion)
-
-  private[auth] class BaseAuthorizer extends AclAuthorizer {
-    override def logAuditMessage(requestContext: AuthorizableRequestContext, action: Action, authorized: Boolean): Unit = {
-      val principal = requestContext.principal
-      val host = requestContext.clientAddress.getHostAddress
-      val operation = Operation.fromJava(action.operation)
-      val resource = AuthorizerWrapper.convertToResource(action.resourcePattern)
-      def logMessage: String = {
-        val authResult = if (authorized) "Allowed" else "Denied"
-        s"Principal = $principal is $authResult Operation = $operation from host = $host on resource = $resource"
-      }
-
-      if (authorized) authorizerLogger.debug(logMessage)
-      else authorizerLogger.info(logMessage)
-    }
-  }
-}
-
-@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.4")
-class SimpleAclAuthorizer extends Authorizer with Logging {
-
-  private val aclAuthorizer = new BaseAuthorizer
-
-  // The maximum number of times we should try to update the resource acls in zookeeper before failing;
-  // This should never occur, but is a safeguard just in case.
-  protected[auth] var maxUpdateRetries = 10
-
-
-  /**
-   * Guaranteed to be called before any authorize call is made.
-   */
-  override def configure(javaConfigs: util.Map[String, _]): Unit = {
-    aclAuthorizer.configure(javaConfigs)
-  }
-
-  override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
-    val requestContext = AuthorizerUtils.sessionToRequestContext(session)
-    val action = new Action(operation.toJava, resource.toPattern, 1, true, true)
-    aclAuthorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED
-  }
-
-  def isSuperUser(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String): Boolean = {
-    aclAuthorizer.isSuperUser(principal)
-  }
-
-  override def addAcls(acls: Set[Acl], resource: Resource): Unit = {
-    aclAuthorizer.maxUpdateRetries = maxUpdateRetries
-    if (acls != null && acls.nonEmpty) {
-      val bindings = acls.map { acl => AuthorizerWrapper.convertToAclBinding(resource, acl) }
-      createAcls(bindings)
-    }
-  }
-
-  override def removeAcls(aclsTobeRemoved: Set[Acl], resource: Resource): Boolean = {
-    val filters = aclsTobeRemoved.map { acl =>
-      new AclBindingFilter(resource.toPattern.toFilter, AuthorizerWrapper.convertToAccessControlEntry(acl).toFilter)
-    }
-    deleteAcls(filters)
-  }
-
-  override def removeAcls(resource: Resource): Boolean = {
-    val filter = new AclBindingFilter(resource.toPattern.toFilter, AccessControlEntryFilter.ANY)
-    deleteAcls(Set(filter))
-  }
-
-  override def getAcls(resource: Resource): Set[Acl] = {
-    val filter = new AclBindingFilter(resource.toPattern.toFilter, AccessControlEntryFilter.ANY)
-    acls(filter).getOrElse(resource, Set.empty)
-  }
-
-  override def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] = {
-    val filter = new AclBindingFilter(ResourcePatternFilter.ANY,
-      new AccessControlEntryFilter(principal.toString, null, AclOperation.ANY, AclPermissionType.ANY))
-    acls(filter)
-  }
-
-  def getMatchingAcls(resourceType: ResourceType, resourceName: String): Set[Acl] = {
-    val filter = new AclBindingFilter(new ResourcePatternFilter(resourceType.toJava, resourceName, PatternType.MATCH),
-      AccessControlEntryFilter.ANY)
-    acls(filter).flatMap(_._2).toSet
-  }
-
-  override def getAcls(): Map[Resource, Set[Acl]] = {
-    acls(AclBindingFilter.ANY)
-  }
-
-  def close(): Unit = {
-    aclAuthorizer.close()
-  }
-
-  private def createAcls(bindings: Set[AclBinding]): Unit = {
-    aclAuthorizer.maxUpdateRetries = maxUpdateRetries
-    val results = aclAuthorizer.createAcls(null, bindings.toList.asJava).asScala.map(_.toCompletableFuture.get)
-    results.foreach { result => result.exception.ifPresent(throwException) }
-  }
-
-  private def deleteAcls(filters: Set[AclBindingFilter]): Boolean = {
-    aclAuthorizer.maxUpdateRetries = maxUpdateRetries
-    val results = aclAuthorizer.deleteAcls(null, filters.toList.asJava).asScala.map(_.toCompletableFuture.get)
-    results.foreach { result => result.exception.ifPresent(throwException) }
-    results.flatMap(_.aclBindingDeleteResults.asScala).foreach { result => result.exception.ifPresent(e => throw e) }
-    results.exists(r => r.aclBindingDeleteResults.asScala.exists(d => !d.exception.isPresent))
-  }
-
-  private def acls(filter: AclBindingFilter): Map[Resource, Set[Acl]] = {
-    val result = mutable.Map[Resource, mutable.Set[Acl]]()
-    aclAuthorizer.acls(filter).forEach { binding =>
-      val resource = AuthorizerWrapper.convertToResource(binding.pattern)
-      val acl = AuthorizerWrapper.convertToAcl(binding.entry)
-      result.getOrElseUpdate(resource, mutable.Set()).add(acl)
-    }
-    result.mapValues(_.toSet).toMap
-  }
-
-  // To retain the same exceptions as in previous versions, throw the underlying exception when the exception
-  // was wrapped by AclAuthorizer in an ApiException
-  private def throwException(e: ApiException): Unit = {
-    if (e.getCause != null)
-      throw e.getCause
-    else
-      throw e
-  }
-}
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index 46c33ca..cac6480 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{CompletableFuture, CompletionStage}
 
 import com.typesafe.scalalogging.Logger
 import kafka.api.KAFKA_2_0_IV1
-import kafka.security.authorizer.AclAuthorizer.{AclSeqs, ResourceOrdering, VersionedAcls}
 import kafka.security.authorizer.AclEntry.ResourceSeparator
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils._
@@ -118,9 +117,16 @@ object AclAuthorizer {
       zkClientConfig
     }
   }
+
+  private def validateAclBinding(aclBinding: AclBinding): Unit = {
+    if (aclBinding.isUnknown)
+      throw new IllegalArgumentException("ACL binding contains unknown elements")
+  }
 }
 
 class AclAuthorizer extends Authorizer with Logging {
+  import kafka.security.authorizer.AclAuthorizer._
+
   private[security] val authorizerLogger = Logger("kafka.authorizer.logger")
   private var superUsers = Set.empty[KafkaPrincipal]
   private var shouldAllowEveryoneIfNoAclIsFound = false
@@ -200,7 +206,7 @@ class AclAuthorizer extends Authorizer with Logging {
           throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " +
             s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater")
         }
-        AuthorizerUtils.validateAclBinding(aclBinding)
+        validateAclBinding(aclBinding)
         true
       } catch {
         case e: Throwable =>
@@ -225,7 +231,7 @@ class AclAuthorizer extends Authorizer with Logging {
         }
       }
     }
-    results.toList.map(CompletableFuture.completedFuture[AclCreateResult]).asJava
+    results.toBuffer.map(CompletableFuture.completedFuture[AclCreateResult]).asJava
   }
 
   /**
diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
index 0d670be..0e417d6 100644
--- a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
@@ -20,32 +20,15 @@ package kafka.security.authorizer
 import java.net.InetAddress
 
 import kafka.network.RequestChannel.Session
-import kafka.security.auth.{Authorizer => LegacyAuthorizer}
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.resource.Resource
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer}
 
-import scala.annotation.nowarn
-
 
 object AuthorizerUtils {
 
-  @nowarn("cat=deprecation")
-  def createAuthorizer(className: String): Authorizer = {
-    Utils.newInstance(className, classOf[Object]) match {
-      case auth: Authorizer => auth
-      case auth: kafka.security.auth.Authorizer => new AuthorizerWrapper(auth)
-      case _ => throw new ConfigException(s"Authorizer does not implement ${classOf[Authorizer].getName} or ${classOf[LegacyAuthorizer].getName}.")
-    }
-  }
-
-  def validateAclBinding(aclBinding: AclBinding): Unit = {
-    if (aclBinding.isUnknown)
-      throw new IllegalArgumentException("ACL binding contains unknown elements")
-  }
+  def createAuthorizer(className: String): Authorizer = Utils.newInstance(className, classOf[Authorizer])
 
   def isClusterResource(name: String): Boolean = name.equals(Resource.CLUSTER_NAME)
 
diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
deleted file mode 100644
index cc25fce..0000000
--- a/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.authorizer
-
-import java.util.concurrent.{CompletableFuture, CompletionStage}
-import java.{lang, util}
-
-import kafka.network.RequestChannel.Session
-import kafka.security.auth.{Acl, Operation, PermissionType, Resource, SimpleAclAuthorizer, ResourceType => ResourceTypeLegacy}
-import kafka.security.authorizer.AuthorizerWrapper._
-import org.apache.kafka.common.Endpoint
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.ApiError
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.kafka.common.utils.SecurityUtils.parseKafkaPrincipal
-import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
-import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, AuthorizerServerInfo, _}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.{Seq, immutable, mutable}
-import scala.jdk.CollectionConverters._
-import scala.util.{Failure, Success, Try}
-
-@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
-object AuthorizerWrapper {
-
-  def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = {
-    (for {
-      resourceType <- Try(ResourceTypeLegacy.fromJava(filter.patternFilter.resourceType))
-      principal <- Try(parseKafkaPrincipal(filter.entryFilter.principal))
-      operation <- Try(Operation.fromJava(filter.entryFilter.operation))
-      permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
-      resource = Resource(resourceType, filter.patternFilter.name, filter.patternFilter.patternType)
-      acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
-    } yield (resource, acl)) match {
-      case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, throwable.getMessage))
-      case Success(s) => Right(s)
-    }
-  }
-
-  def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
-    val resourcePattern = new ResourcePattern(resource.resourceType.toJava, resource.name, resource.patternType)
-    new AclBinding(resourcePattern, convertToAccessControlEntry(acl))
-  }
-
-  def convertToAccessControlEntry(acl: Acl): AccessControlEntry = {
-    new AccessControlEntry(acl.principal.toString, acl.host.toString,
-      acl.operation.toJava, acl.permissionType.toJava)
-  }
-
-  def convertToAcl(ace: AccessControlEntry): Acl = {
-    new Acl(parseKafkaPrincipal(ace.principal), PermissionType.fromJava(ace.permissionType), ace.host,
-      Operation.fromJava(ace.operation))
-  }
-
-  def convertToResource(resourcePattern: ResourcePattern): Resource = {
-    Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType)
-  }
-}
-
-@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
-class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer {
-
-  var shouldAllowEveryoneIfNoAclIsFound = false
-
-  override def configure(configs: util.Map[String, _]): Unit = {
-    baseAuthorizer.configure(configs)
-    shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
-        AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
-      && baseAuthorizer.isInstanceOf[SimpleAclAuthorizer])
-  }
-
-  override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = {
-    serverInfo.endpoints.asScala.map { endpoint =>
-      endpoint -> CompletableFuture.completedFuture[Void](null) }.toMap.asJava
-  }
-
-  override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = {
-    val session = Session(requestContext.principal, requestContext.clientAddress)
-    actions.asScala.map { action =>
-      val operation = Operation.fromJava(action.operation)
-      if (baseAuthorizer.authorize(session, operation, convertToResource(action.resourcePattern)))
-        AuthorizationResult.ALLOWED
-      else
-        AuthorizationResult.DENIED
-    }.asJava
-  }
-
-  override def createAcls(requestContext: AuthorizableRequestContext,
-                          aclBindings: util.List[AclBinding]): util.List[_ <: CompletionStage[AclCreateResult]] = {
-    aclBindings.asScala
-      .map { aclBinding =>
-        convertToResourceAndAcl(aclBinding.toFilter) match {
-          case Left(apiError) => new AclCreateResult(apiError.exception)
-          case Right((resource, acl)) =>
-            try {
-              baseAuthorizer.addAcls(Set(acl), resource)
-              AclCreateResult.SUCCESS
-            } catch {
-              case e: ApiException => new AclCreateResult(e)
-              case e: Throwable => new AclCreateResult(new InvalidRequestException("Failed to create ACL", e))
-            }
-        }
-      }.toList.map(CompletableFuture.completedFuture[AclCreateResult]).asJava
-  }
-
-  override def deleteAcls(requestContext: AuthorizableRequestContext,
-                          aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: CompletionStage[AclDeleteResult]] = {
-    val filters = aclBindingFilters.asScala
-    val results = mutable.Map[Int, AclDeleteResult]()
-    val toDelete = mutable.Map[Int, ArrayBuffer[(Resource, Acl)]]()
-
-    if (filters.forall(_.matchesAtMostOne)) {
-      // Delete based on a list of ACL fixtures.
-      for ((filter, i) <- filters.zipWithIndex) {
-        convertToResourceAndAcl(filter) match {
-          case Left(apiError) => results.put(i, new AclDeleteResult(apiError.exception))
-          case Right(binding) => toDelete.put(i, ArrayBuffer(binding))
-        }
-      }
-    } else {
-      // Delete based on filters that may match more than one ACL.
-      val aclMap = baseAuthorizer.getAcls()
-      val filtersWithIndex = filters.zipWithIndex
-      for ((resource, acls) <- aclMap; acl <- acls) {
-        val binding = new AclBinding(
-          new ResourcePattern(resource.resourceType.toJava, resource.name, resource.patternType),
-          new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava,
-            acl.permissionType.toJava))
-
-        for ((filter, i) <- filtersWithIndex if filter.matches(binding))
-          toDelete.getOrElseUpdate(i, ArrayBuffer.empty) += ((resource, acl))
-      }
-    }
-
-    for ((i, acls) <- toDelete) {
-      val deletionResults = acls.flatMap { case (resource, acl) =>
-        val aclBinding = convertToAclBinding(resource, acl)
-        try {
-          if (baseAuthorizer.removeAcls(immutable.Set(acl), resource))
-            Some(new AclBindingDeleteResult(aclBinding))
-          else None
-        } catch {
-          case throwable: Throwable =>
-            Some(new AclBindingDeleteResult(aclBinding, ApiError.fromThrowable(throwable).exception))
-        }
-      }.asJava
-
-      results.put(i, new AclDeleteResult(deletionResults))
-    }
-
-    filters.indices.map { i =>
-      results.getOrElse(i, new AclDeleteResult(Seq.empty[AclBindingDeleteResult].asJava))
-    }.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
-  }
-
-  override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
-    baseAuthorizer.getAcls().flatMap { case (resource, acls) =>
-      acls.map(acl => convertToAclBinding(resource, acl)).filter(filter.matches)
-    }.asJava
-  }
-
-  override def close(): Unit = {
-    baseAuthorizer.close()
-  }
-
-  override def authorizeByResourceType(requestContext: AuthorizableRequestContext,
-                                       op: AclOperation,
-                                       resourceType: ResourceType): AuthorizationResult = {
-    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
-
-    if (super.authorizeByResourceType(requestContext, op, resourceType) == AuthorizationResult.ALLOWED)
-      AuthorizationResult.ALLOWED
-    else if (denyAllResource(requestContext, op, resourceType) || !shouldAllowEveryoneIfNoAclIsFound)
-      AuthorizationResult.DENIED
-    else
-      AuthorizationResult.ALLOWED
-  }
-
-  private def denyAllResource(requestContext: AuthorizableRequestContext,
-                              op: AclOperation,
-                              resourceType: ResourceType): Boolean = {
-    val resourceTypeFilter = new ResourcePatternFilter(
-      resourceType, Resource.WildCardResource, PatternType.LITERAL)
-    val principal = new KafkaPrincipal(
-      requestContext.principal.getPrincipalType, requestContext.principal.getName).toString
-    val host = requestContext.clientAddress().getHostAddress
-    val entryFilter = new AccessControlEntryFilter(null, null, op, AclPermissionType.DENY)
-    val entryFilterAllOp = new AccessControlEntryFilter(null, null, AclOperation.ALL, AclPermissionType.DENY)
-    val aclFilter = new AclBindingFilter(resourceTypeFilter, entryFilter)
-    val aclFilterAllOp = new AclBindingFilter(resourceTypeFilter, entryFilterAllOp)
-
-    (acls(aclFilter).asScala.exists(b => principalHostMatch(b.entry(), principal, host))
-      || acls(aclFilterAllOp).asScala.exists(b => principalHostMatch(b.entry(), principal, host)))
-  }
-
-  private def principalHostMatch(ace: AccessControlEntry,
-                                 principal: String,
-                                 host: String): Boolean = {
-    ((ace.host() == AclEntry.WildcardHost || ace.host() == host)
-      && (ace.principal() == AclEntry.WildcardPrincipalString || ace.principal() == principal))
-  }
-
-}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2e4dc48..e8a3e6c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -679,8 +679,7 @@ object KafkaConfig {
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" +
-  " interface, which is used by the broker for authorization. This config also supports authorizers that implement the deprecated" +
-  " kafka.security.auth.Authorizer trait which was previously used for authorization."
+  " interface, which is used by the broker for authorization."
   /** ********* Socket Server Configuration ***********/
   val PortDoc = "DEPRECATED: only used when <code>listeners</code> is not set. " +
   "Use <code>listeners</code> instead. \n" +
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index d82c8e5..f0d9188 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -18,10 +18,9 @@ import java.util
 import java.util.concurrent.ExecutionException
 import java.util.regex.Pattern
 import java.util.{Collections, Optional, Properties}
-
 import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
 import kafka.log.LogConfig
-import kafka.security.authorizer.AclEntry
+import kafka.security.authorizer.{AclAuthorizer, AclEntry}
 import kafka.security.authorizer.AclEntry.WildcardHost
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
@@ -144,7 +143,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
 
   override def brokerPropertyOverrides(properties: Properties): Unit = {
-    properties.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
+    properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
     properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
     properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
     properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 425d8f3..2b3ae1c 100644
--- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -15,7 +15,7 @@ package kafka.api
 import java.util.Properties
 import java.util.concurrent.ExecutionException
 import kafka.api.GroupAuthorizerIntegrationTest._
-import kafka.security.auth.SimpleAclAuthorizer
+import kafka.security.authorizer.AclAuthorizer
 import kafka.security.authorizer.AclEntry.WildcardHost
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
@@ -63,7 +63,7 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
   def clientPrincipal: KafkaPrincipal = ClientPrincipal
 
   override def brokerPropertyOverrides(properties: Properties): Unit = {
-    properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
+    properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
     properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
     properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
     properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
diff --git a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
index 75df9e7..17e39f6 100644
--- a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
@@ -16,7 +16,7 @@
   */
 package kafka.api
 
-import kafka.security.auth.SimpleAclAuthorizer
+import kafka.security.authorizer.AclAuthorizer
 import kafka.server.KafkaConfig
 import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.config.SslConfigs
@@ -25,8 +25,6 @@ import org.junit.jupiter.api.Assertions.assertNull
 
 import scala.collection.immutable.List
 
-// Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage
-// It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed
 class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
   override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
     JaasTestUtils.KafkaClientPrincipalUnqualifiedName)
@@ -35,7 +33,7 @@ class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTe
 
   override protected def kafkaClientSaslMechanism = "GSSAPI"
   override protected def kafkaServerSaslMechanisms = List("GSSAPI")
-  override protected def authorizerClass = classOf[SimpleAclAuthorizer]
+  override protected def authorizerClass = classOf[AclAuthorizer]
 
   // Configure brokers to require SSL client authentication in order to verify that SASL_SSL works correctly even if the
   // client doesn't have a keystore. We want to cover the scenario where a broker requires either SSL client
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 1f15563..53a267f 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -15,40 +15,38 @@ package kafka.api
 import java.io.File
 import java.util
 import kafka.log.LogConfig
+import kafka.security.authorizer.AclAuthorizer
+import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
 import kafka.server.{Defaults, KafkaConfig}
 import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
 import kafka.utils.TestUtils._
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.acl._
-import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE}
+import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE}
 import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, TopicAuthorizationException, UnknownTopicOrPartitionException}
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
+import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.server.authorizer.Authorizer
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
-import scala.annotation.nowarn
+import java.util.Collections
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
 import scala.concurrent.ExecutionException
 import scala.util.{Failure, Success, Try}
 
-abstract class AuthorizationAdmin {
-  def authorizerClassName: String
-  def initializeAcls(): Unit
-  def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit
-  def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit
-}
-
-// Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage
-// It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed
 class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup {
-  @nowarn("cat=deprecation")
-  val authorizationAdmin: AuthorizationAdmin = new LegacyAuthorizationAdmin
+  val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
+
+  val authorizationAdmin = new AclAuthorizationAdmin(classOf[AclAuthorizer], classOf[AclAuthorizer])
+
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
 
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
@@ -477,58 +475,53 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
     client.describeAcls(allTopicAcls).values.get().asScala.toSet
   }
 
-  @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
-  class LegacyAuthorizationAdmin extends AuthorizationAdmin {
-    import kafka.security.auth._
-    import kafka.security.authorizer.AuthorizerWrapper
+  class AclAuthorizationAdmin(authorizerClass: Class[_ <: AclAuthorizer], authorizerForInitClass: Class[_ <: AclAuthorizer]) {
 
-    override def authorizerClassName: String = classOf[SimpleAclAuthorizer].getName
+    def authorizerClassName: String = authorizerClass.getName
 
-    override def initializeAcls(): Unit = {
-      val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName)
+    def initializeAcls(): Unit = {
+      val authorizer = CoreUtils.createObject[Authorizer](authorizerForInitClass.getName)
       try {
         authorizer.configure(configs.head.originals())
-        authorizer.addAcls(Set(new Acl(Acl.WildCardPrincipal, Allow,
-          Acl.WildCardHost, All)), new Resource(Topic, "*", PatternType.LITERAL))
-        authorizer.addAcls(Set(new Acl(Acl.WildCardPrincipal, Allow,
-          Acl.WildCardHost, All)), new Resource(Group, "*", PatternType.LITERAL))
+        val ace = new AccessControlEntry(WildcardPrincipalString, WildcardHost, ALL, ALLOW)
+        authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(TOPIC, "*", LITERAL), ace)).asJava)
+        authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(GROUP, "*", LITERAL), ace)).asJava)
 
-        authorizer.addAcls(Set(clusterAcl(ALLOW, CREATE),
+        authorizer.createAcls(null, List(clusterAcl(ALLOW, CREATE),
           clusterAcl(ALLOW, DELETE),
           clusterAcl(ALLOW, CLUSTER_ACTION),
           clusterAcl(ALLOW, ALTER_CONFIGS),
-          clusterAcl(ALLOW, ALTER)),
-          Resource.ClusterResource)
+          clusterAcl(ALLOW, ALTER))
+          .map(ace => new AclBinding(clusterResourcePattern, ace)).asJava)
       } finally {
         authorizer.close()
       }
     }
 
-    override def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
-      val acls = Set(clusterAcl(permissionType, operation))
-      val authorizer = simpleAclAuthorizer
-      val prevAcls = authorizer.getAcls(Resource.ClusterResource)
-      authorizer.addAcls(acls, Resource.ClusterResource)
-      TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, Resource.ClusterResource)
+    def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
+      val ace = clusterAcl(permissionType, operation)
+      val aclBinding = new AclBinding(clusterResourcePattern, ace)
+      val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
+      val prevAcls = authorizer.acls(new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY))
+        .asScala.map(_.entry).toSet
+      authorizer.createAcls(null, Collections.singletonList(aclBinding))
+      TestUtils.waitAndVerifyAcls(prevAcls ++ Set(ace), authorizer, clusterResourcePattern)
     }
 
-    override def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
-      val acls = Set(clusterAcl(permissionType, operation))
-      val authorizer = simpleAclAuthorizer
-      val prevAcls = authorizer.getAcls(Resource.ClusterResource)
-      assertTrue(authorizer.removeAcls(acls, Resource.ClusterResource))
-      TestUtils.waitAndVerifyAcls(prevAcls -- acls, authorizer, Resource.ClusterResource)
-    }
-
-
-    private def clusterAcl(permissionType: AclPermissionType, operation: AclOperation): Acl = {
-      new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"), PermissionType.fromJava(permissionType),
-        Acl.WildCardHost, Operation.fromJava(operation))
+    def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
+      val ace = clusterAcl(permissionType, operation)
+      val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
+      val clusterFilter = new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY)
+      val prevAcls = authorizer.acls(clusterFilter).asScala.map(_.entry).toSet
+      val deleteFilter = new AclBindingFilter(clusterResourcePattern.toFilter, ace.toFilter)
+      assertFalse(authorizer.deleteAcls(null, Collections.singletonList(deleteFilter))
+        .get(0).toCompletableFuture.get.aclBindingDeleteResults().asScala.head.exception.isPresent)
+      TestUtils.waitAndVerifyAcls(prevAcls -- Set(ace), authorizer, clusterResourcePattern)
     }
 
-    private def simpleAclAuthorizer: Authorizer = {
-      val authorizerWrapper = servers.head.dataPlaneRequestProcessor.authorizer.get.asInstanceOf[AuthorizerWrapper]
-      authorizerWrapper.baseAuthorizer
+    private def clusterAcl(permissionType: AclPermissionType, operation: AclOperation): AccessControlEntry = {
+      new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*").toString,
+        WildcardHost, operation, permissionType)
     }
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
index 88b649f..b918081 100644
--- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
@@ -14,22 +14,16 @@ package kafka.api
 
 import java.io.File
 import java.util
-import java.util.Collections
 import java.util.concurrent._
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaYammerMetrics
 import kafka.security.authorizer.AclAuthorizer
-import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
 import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateAclsResult}
 import org.apache.kafka.common.acl._
-import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl.AclPermissionType._
 import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
-import org.apache.kafka.common.resource.PatternType._
-import org.apache.kafka.common.resource.ResourceType._
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.server.authorizer._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
@@ -84,8 +78,7 @@ object SslAdminIntegrationTest {
 }
 
 class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
-  override val authorizationAdmin = new AclAuthorizationAdmin
-  val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
+  override val authorizationAdmin = new AclAuthorizationAdmin(classOf[SslAdminIntegrationTest.TestableAclAuthorizer], classOf[AclAuthorizer])
 
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
 
@@ -266,54 +259,4 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
     assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}")
     metrics.map(_.asInstanceOf[Gauge[Int]].value).sum
   }
-
-  class AclAuthorizationAdmin extends AuthorizationAdmin {
-
-    override def authorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName
-
-    override def initializeAcls(): Unit = {
-      val authorizer = CoreUtils.createObject[Authorizer](classOf[AclAuthorizer].getName)
-      try {
-        authorizer.configure(configs.head.originals())
-        val ace = new AccessControlEntry(WildcardPrincipalString, WildcardHost, ALL, ALLOW)
-        authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(TOPIC, "*", LITERAL), ace)).asJava)
-        authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(GROUP, "*", LITERAL), ace)).asJava)
-
-        authorizer.createAcls(null, List(clusterAcl(ALLOW, CREATE),
-          clusterAcl(ALLOW, DELETE),
-          clusterAcl(ALLOW, CLUSTER_ACTION),
-          clusterAcl(ALLOW, ALTER_CONFIGS),
-          clusterAcl(ALLOW, ALTER))
-          .map(ace => new AclBinding(clusterResourcePattern, ace)).asJava)
-      } finally {
-        authorizer.close()
-      }
-    }
-
-    override def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
-      val ace = clusterAcl(permissionType, operation)
-      val aclBinding = new AclBinding(clusterResourcePattern, ace)
-      val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
-      val prevAcls = authorizer.acls(new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY))
-        .asScala.map(_.entry).toSet
-      authorizer.createAcls(null, Collections.singletonList(aclBinding))
-      TestUtils.waitAndVerifyAcls(prevAcls ++ Set(ace), authorizer, clusterResourcePattern)
-    }
-
-    override def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
-      val ace = clusterAcl(permissionType, operation)
-      val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
-      val clusterFilter = new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY)
-      val prevAcls = authorizer.acls(clusterFilter).asScala.map(_.entry).toSet
-      val deleteFilter = new AclBindingFilter(clusterResourcePattern.toFilter, ace.toFilter)
-      assertFalse(authorizer.deleteAcls(null, Collections.singletonList(deleteFilter))
-        .get(0).toCompletableFuture.get.aclBindingDeleteResults().asScala.head.exception.isPresent)
-      TestUtils.waitAndVerifyAcls(prevAcls -- Set(ace), authorizer, clusterResourcePattern)
-    }
-
-    private def clusterAcl(permissionType: AclPermissionType, operation: AclOperation): AccessControlEntry = {
-      new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*").toString,
-        WildcardHost, operation, permissionType)
-    }
-  }
 }
diff --git a/core/src/test/scala/kafka/security/auth/ResourceTest.scala b/core/src/test/scala/kafka/security/auth/ResourceTest.scala
deleted file mode 100644
index 9c905ec..0000000
--- a/core/src/test/scala/kafka/security/auth/ResourceTest.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.auth
-
-import kafka.common.KafkaException
-import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.Assertions._
-
-@deprecated("Use org.apache.kafka.common.resource.ResourcePattern", "Since 2.5")
-class ResourceTest {
-  @Test
-  def shouldThrowOnTwoPartStringWithUnknownResourceType(): Unit = {
-    assertThrows(classOf[KafkaException], () => Resource.fromString("Unknown:fred"))
-  }
-
-  @Test
-  def shouldThrowOnBadResourceTypeSeparator(): Unit = {
-    assertThrows(classOf[KafkaException], () => Resource.fromString("Topic-fred"))
-  }
-
-  @Test
-  def shouldParseOldTwoPartString(): Unit = {
-    assertEquals(Resource(Group, "fred", LITERAL), Resource.fromString("Group:fred"))
-    assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("Topic:t"))
-  }
-
-  @Test
-  def shouldParseOldTwoPartWithEmbeddedSeparators(): Unit = {
-    assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("Group::This:is:a:weird:group:name:"))
-  }
-
-  @Test
-  def shouldParseThreePartString(): Unit = {
-    assertEquals(Resource(Group, "fred", PREFIXED), Resource.fromString("Group:PREFIXED:fred"))
-    assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("Topic:LITERAL:t"))
-  }
-
-  @Test
-  def shouldParseThreePartWithEmbeddedSeparators(): Unit = {
-    assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), Resource.fromString("Group:PREFIXED::This:is:a:weird:group:name:"))
-    assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("Group:LITERAL::This:is:a:weird:group:name:"))
-  }
-
-  @Test
-  def shouldRoundTripViaString(): Unit = {
-    val expected = Resource(Group, "fred", PREFIXED)
-
-    val actual = Resource.fromString(expected.toString)
-
-    assertEquals(expected, actual)
-  }
-}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
deleted file mode 100644
index 5c58a08..0000000
--- a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.auth
-
-import org.apache.kafka.common.acl.AclOperation
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-
-@deprecated("Scala Authorizer API classes gave been deprecated", "Since 2.5")
-class OperationTest {
-  /**
-    * Test round trip conversions between org.apache.kafka.common.acl.AclOperation and
-    * kafka.security.auth.Operation.
-    */
-  @Test
-  def testJavaConversions(): Unit = {
-    AclOperation.values.foreach {
-      case AclOperation.UNKNOWN | AclOperation.ANY =>
-      case aclOp =>
-        val op = Operation.fromJava(aclOp)
-        val aclOp2 = op.toJava
-        assertEquals(aclOp, aclOp2)
-    }
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
deleted file mode 100644
index 1471202..0000000
--- a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.KafkaException
-import org.apache.kafka.common.acl.AclPermissionType
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-@deprecated("Scala Authorizer API classes gave been deprecated", "Since 2.5")
-class PermissionTypeTest {
-
-  @Test
-  def testFromString(): Unit = {
-    val permissionType = PermissionType.fromString("Allow")
-    assertEquals(Allow, permissionType)
-
-    assertThrows(classOf[KafkaException], () => PermissionType.fromString("badName"))
-  }
-
-  /**
-    * Test round trip conversions between org.apache.kafka.common.acl.AclPermissionType and
-    * kafka.security.auth.PermissionType.
-    */
-  @Test
-  def testJavaConversions(): Unit = {
-    AclPermissionType.values().foreach {
-      case AclPermissionType.UNKNOWN | AclPermissionType.ANY =>
-      case aclPerm =>
-        val perm = PermissionType.fromJava(aclPerm)
-        val aclPerm2 = perm.toJava
-        assertEquals(aclPerm, aclPerm2)
-    }
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
deleted file mode 100644
index 02c35c7..0000000
--- a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.KafkaException
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-import org.apache.kafka.common.resource.{ResourceType => JResourceType}
-
-@deprecated("Scala Authorizer API classes gave been deprecated", "Since 2.5")
-class ResourceTypeTest {
-
-  @Test
-  def testFromString(): Unit = {
-    val resourceType = ResourceType.fromString("Topic")
-    assertEquals(Topic, resourceType)
-    assertThrows(classOf[KafkaException], () => ResourceType.fromString("badName"))
-  }
-
-  /**
-    * Test round trip conversions between org.apache.kafka.common.acl.ResourceType and
-    * kafka.security.auth.ResourceType.
-    */
-  @Test
-  def testJavaConversions(): Unit = {
-    JResourceType.values.foreach {
-      case JResourceType.UNKNOWN | JResourceType.ANY =>
-      case jResourceType =>
-        val resourceType = ResourceType.fromJava(jResourceType)
-        val jResourceType2 = resourceType.toJava
-        assertEquals(jResourceType, jResourceType2)
-    }
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
deleted file mode 100644
index 6e2a0bd..0000000
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ /dev/null
@@ -1,731 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import java.net.InetAddress
-import java.nio.charset.StandardCharsets.UTF_8
-import java.util.UUID
-
-import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1}
-import kafka.network.RequestChannel.Session
-import kafka.security.auth.Acl.{WildCardHost, WildCardResource}
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import kafka.zk.{ZkAclStore, ZooKeeperTestHarness}
-import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient}
-import org.apache.kafka.common.errors.UnsupportedVersionException
-import org.apache.kafka.common.resource.PatternType
-import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.Time
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-
-@deprecated("Use AclAuthorizer", "Since 2.4")
-class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
-
-  private val allowReadAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Read)
-  private val allowWriteAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
-  private val denyReadAcl = Acl(Acl.WildCardPrincipal, Deny, WildCardHost, Read)
-
-  private val wildCardResource = Resource(Topic, WildCardResource, LITERAL)
-  private val prefixedResource = Resource(Topic, "foo", PREFIXED)
-
-  private val simpleAclAuthorizer = new SimpleAclAuthorizer
-  private val simpleAclAuthorizer2 = new SimpleAclAuthorizer
-  private var resource: Resource = _
-  private val superUsers = "User:superuser1; User:superuser2"
-  private val username = "alice"
-  private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-  private val session = Session(principal, InetAddress.getByName("192.168.0.1"))
-  private var config: KafkaConfig = _
-  private var zooKeeperClient: ZooKeeperClient = _
-
-  class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) {
-    override def equals(o: scala.Any): Boolean = false
-  }
-
-  @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
-
-    // Increase maxUpdateRetries to avoid transient failures
-    simpleAclAuthorizer.maxUpdateRetries = Int.MaxValue
-    simpleAclAuthorizer2.maxUpdateRetries = Int.MaxValue
-
-    val props = TestUtils.createBrokerConfig(0, zkConnect)
-    props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers)
-
-    config = KafkaConfig.fromProps(props)
-    simpleAclAuthorizer.configure(config.originals)
-    simpleAclAuthorizer2.configure(config.originals)
-    resource = Resource(Topic, "foo-" + UUID.randomUUID(), LITERAL)
-
-    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
-      Time.SYSTEM, "kafka.test", "SimpleAclAuthorizerTest")
-  }
-
-  @AfterEach
-  override def tearDown(): Unit = {
-    simpleAclAuthorizer.close()
-    simpleAclAuthorizer2.close()
-    zooKeeperClient.close()
-    super.tearDown()
-  }
-
-  @Test
-  def testAuthorizeThrowsOnNonLiteralResource(): Unit = {
-    assertThrows(classOf[IllegalArgumentException], () => simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "something", PREFIXED)))
-  }
-
-  @Test
-  def testAuthorizeWithEmptyResourceName(): Unit = {
-    assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Group, "", LITERAL)))
-    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Group, WildCardResource, LITERAL))
-    assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Group, "", LITERAL)))
-  }
-
-  // Authorizing the empty resource is not supported because we create a znode with the resource name.
-  @Test
-  def testEmptyAclThrowsException(): Unit = {
-    assertThrows(classOf[IllegalArgumentException], () => simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Group, "", LITERAL)))
-  }
-
-  @Test
-  def testTopicAcl(): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
-    val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
-    val host1 = InetAddress.getByName("192.168.1.1")
-    val host2 = InetAddress.getByName("192.168.1.2")
-
-    //user1 has READ access from host1 and host2.
-    val acl1 = new Acl(user1, Allow, host1.getHostAddress, Read)
-    val acl2 = new Acl(user1, Allow, host2.getHostAddress, Read)
-
-    //user1 does not have  READ access from host1.
-    val acl3 = new Acl(user1, Deny, host1.getHostAddress, Read)
-
-    //user1 has Write access from host1 only.
-    val acl4 = new Acl(user1, Allow, host1.getHostAddress, Write)
-
-    //user1 has DESCRIBE access from all hosts.
-    val acl5 = new Acl(user1, Allow, WildCardHost, Describe)
-
-    //user2 has READ access from all hosts.
-    val acl6 = new Acl(user2, Allow, WildCardHost, Read)
-
-    //user3 has WRITE access from all hosts.
-    val acl7 = new Acl(user3, Allow, WildCardHost, Write)
-
-    val acls = Set[Acl](acl1, acl2, acl3, acl4, acl5, acl6, acl7)
-
-    changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
-
-    val host1Session = Session(user1, host1)
-    val host2Session = Session(user1, host2)
-
-    assertTrue(simpleAclAuthorizer.authorize(host2Session, Read, resource), "User1 should have READ access from host2")
-    assertFalse(simpleAclAuthorizer.authorize(host1Session, Read, resource), "User1 should not have READ access from host1 due to denyAcl")
-    assertTrue(simpleAclAuthorizer.authorize(host1Session, Write, resource), "User1 should have WRITE access from host1")
-    assertFalse(simpleAclAuthorizer.authorize(host2Session, Write, resource), "User1 should not have WRITE access from host2 as no allow acl is defined")
-    assertTrue(simpleAclAuthorizer.authorize(host1Session, Describe, resource), "User1 should not have DESCRIBE access from host1")
-    assertTrue(simpleAclAuthorizer.authorize(host2Session, Describe, resource), "User1 should have DESCRIBE access from host2")
-    assertFalse(simpleAclAuthorizer.authorize(host1Session, Alter, resource), "User1 should not have edit access from host1")
-    assertFalse(simpleAclAuthorizer.authorize(host2Session, Alter, resource), "User1 should not have edit access from host2")
-
-    //test if user has READ and write access they also get describe access
-    val user2Session = Session(user2, host1)
-    val user3Session = Session(user3, host1)
-    assertTrue(simpleAclAuthorizer.authorize(user2Session, Describe, resource), "User2 should have DESCRIBE access from host1")
-    assertTrue(simpleAclAuthorizer.authorize(user3Session, Describe, resource), "User3 should have DESCRIBE access from host2")
-    assertTrue(simpleAclAuthorizer.authorize(user2Session, Read, resource), "User2 should have READ access from host1")
-    assertTrue(simpleAclAuthorizer.authorize(user3Session, Write, resource), "User3 should have WRITE access from host2")
-  }
-
-  /**
-    CustomPrincipals should be compared with their principal type and name
-   */
-  @Test
-  def testAllowAccessWithCustomPrincipal(): Unit = {
-    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val host1 = InetAddress.getByName("192.168.1.1")
-    val host2 = InetAddress.getByName("192.168.1.2")
-
-    // user has READ access from host2 but not from host1
-    val acl1 = new Acl(user, Deny, host1.getHostAddress, Read)
-    val acl2 = new Acl(user, Allow, host2.getHostAddress, Read)
-    val acls = Set[Acl](acl1, acl2)
-    changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
-
-    val host1Session = Session(customUserPrincipal, host1)
-    val host2Session = Session(customUserPrincipal, host2)
-
-    assertTrue(simpleAclAuthorizer.authorize(host2Session, Read, resource), "User1 should have READ access from host2")
-    assertFalse(simpleAclAuthorizer.authorize(host1Session, Read, resource), "User1 should not have READ access from host1 due to denyAcl")
-  }
-
-  @Test
-  def testDenyTakesPrecedence(): Unit = {
-    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val host = InetAddress.getByName("192.168.2.1")
-    val session = Session(user, host)
-
-    val allowAll = Acl.AllowAllAcl
-    val denyAcl = new Acl(user, Deny, host.getHostAddress, All)
-    val acls = Set[Acl](allowAll, denyAcl)
-
-    changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
-
-    assertFalse(simpleAclAuthorizer.authorize(session, Read, resource), "deny should take precedence over allow.")
-  }
-
-  @Test
-  def testAllowAllAccess(): Unit = {
-    val allowAllAcl = Acl.AllowAllAcl
-
-    changeAclAndVerify(Set.empty[Acl], Set[Acl](allowAllAcl), Set.empty[Acl])
-
-    val session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), InetAddress.getByName("192.0.4.4"))
-    assertTrue(simpleAclAuthorizer.authorize(session, Read, resource), "allow all acl should allow access to all.")
-  }
-
-  @Test
-  def testSuperUserHasAccess(): Unit = {
-    val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
-
-    changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
-
-    val session1 = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
-    val session2 = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), InetAddress.getByName("192.0.4.4"))
-
-    assertTrue(simpleAclAuthorizer.authorize(session1, Read, resource), "superuser always has access, no matter what acls.")
-    assertTrue(simpleAclAuthorizer.authorize(session2, Read, resource), "superuser always has access, no matter what acls.")
-  }
-
-  /**
-    CustomPrincipals should be compared with their principal type and name
-   */
-  @Test
-  def testSuperUserWithCustomPrincipalHasAccess(): Unit = {
-    val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
-    changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
-
-    val session = Session(new CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
-
-    assertTrue(simpleAclAuthorizer.authorize(session, Read, resource), "superuser with custom principal always has access, no matter what acls.")
-  }
-
-  @Test
-  def testWildCardAcls(): Unit = {
-    assertFalse(simpleAclAuthorizer.authorize(session, Read, resource), "when acls = [],  authorizer should fail close.")
-
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val host1 = InetAddress.getByName("192.168.3.1")
-    val readAcl = new Acl(user1, Allow, host1.getHostAddress, Read)
-
-    val acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](readAcl), Set.empty[Acl], wildCardResource)
-
-    val host1Session = Session(user1, host1)
-    assertTrue(simpleAclAuthorizer.authorize(host1Session, Read, resource), "User1 should have Read access from host1")
-
-    //allow Write to specific topic.
-    val writeAcl = new Acl(user1, Allow, host1.getHostAddress, Write)
-    changeAclAndVerify(Set.empty[Acl], Set[Acl](writeAcl), Set.empty[Acl])
-
-    //deny Write to wild card topic.
-    val denyWriteOnWildCardResourceAcl = new Acl(user1, Deny, host1.getHostAddress, Write)
-    changeAclAndVerify(acls, Set[Acl](denyWriteOnWildCardResourceAcl), Set.empty[Acl], wildCardResource)
-
-    assertFalse(simpleAclAuthorizer.authorize(host1Session, Write, resource), "User1 should not have Write access from host1")
-  }
-
-  @Test
-  def testNoAclFound(): Unit = {
-    assertFalse(simpleAclAuthorizer.authorize(session, Read, resource), "when acls = [],  authorizer should fail close.")
-  }
-
-  @Test
-  def testNoAclFoundOverride(): Unit = {
-    val props = TestUtils.createBrokerConfig(1, zkConnect)
-    props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
-
-    val cfg = KafkaConfig.fromProps(props)
-    val testAuthorizer = new SimpleAclAuthorizer
-    try {
-      testAuthorizer.configure(cfg.originals)
-      assertTrue(testAuthorizer.authorize(session, Read, resource), "when acls = null or [],  authorizer should fail open with allow.everyone = true.")
-    } finally {
-      testAuthorizer.close()
-    }
-  }
-
-  @Test
-  def testAclManagementAPIs(): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
-    val host1 = "host1"
-    val host2 = "host2"
-
-    val acl1 = new Acl(user1, Allow, host1, Read)
-    val acl2 = new Acl(user1, Allow, host1, Write)
-    val acl3 = new Acl(user2, Allow, host2, Read)
-    val acl4 = new Acl(user2, Allow, host2, Write)
-
-    var acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](acl1, acl2, acl3, acl4), Set.empty[Acl])
-
-    //test addAcl is additive
-    val acl5 = new Acl(user2, Allow, WildCardHost, Read)
-    acls = changeAclAndVerify(acls, Set[Acl](acl5), Set.empty[Acl])
-
-    //test get by principal name.
-    TestUtils.waitUntilTrue(() => Map(resource -> Set(acl1, acl2)) == simpleAclAuthorizer.getAcls(user1), "changes not propagated in timeout period")
-    TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == simpleAclAuthorizer.getAcls(user2), "changes not propagated in timeout period")
-
-    val resourceToAcls = Map[Resource, Set[Acl]](
-      new Resource(Topic, Resource.WildCardResource, LITERAL) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)),
-      new Resource(Cluster, Resource.WildCardResource, LITERAL) -> Set[Acl](new Acl(user2, Allow, host1, Read)),
-      new Resource(Group, Resource.WildCardResource, LITERAL) -> acls,
-      new Resource(Group, "test-ConsumerGroup", LITERAL) -> acls
-    )
-
-    resourceToAcls foreach { case (key, value) => changeAclAndVerify(Set.empty[Acl], value, Set.empty[Acl], key) }
-    TestUtils.waitUntilTrue(() => resourceToAcls + (resource -> acls) == simpleAclAuthorizer.getAcls(), "changes not propagated in timeout period.")
-
-    //test remove acl from existing acls.
-    acls = changeAclAndVerify(acls, Set.empty[Acl], Set(acl1, acl5))
-
-    //test remove all acls for resource
-    simpleAclAuthorizer.removeAcls(resource)
-    TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
-    assertFalse(zkClient.resourceExists(resource.toPattern))
-
-    //test removing last acl also deletes ZooKeeper path
-    acls = changeAclAndVerify(Set.empty[Acl], Set(acl1), Set.empty[Acl])
-    changeAclAndVerify(acls, Set.empty[Acl], acls)
-    assertFalse(zkClient.resourceExists(resource.toPattern))
-  }
-
-  @Test
-  def testLoadCache(): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val acl1 = new Acl(user1, Allow, "host-1", Read)
-    val acls = Set[Acl](acl1)
-    simpleAclAuthorizer.addAcls(acls, resource)
-
-    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
-    val resource1 = Resource(Topic, "test-2", LITERAL)
-    val acl2 = new Acl(user2, Deny, "host3", Read)
-    val acls1 = Set[Acl](acl2)
-    simpleAclAuthorizer.addAcls(acls1, resource1)
-
-    zkClient.deleteAclChangeNotifications()
-    val authorizer = new SimpleAclAuthorizer
-    try {
-      authorizer.configure(config.originals)
-
-      assertEquals(acls, authorizer.getAcls(resource))
-      assertEquals(acls1, authorizer.getAcls(resource1))
-    } finally {
-      authorizer.close()
-    }
-  }
-
-  @Test
-  def testLocalConcurrentModificationOfResourceAcls(): Unit = {
-    val commonResource = Resource(Topic, "test", LITERAL)
-
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val acl1 = new Acl(user1, Allow, WildCardHost, Read)
-
-    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
-    val acl2 = new Acl(user2, Deny, WildCardHost, Read)
-
-    simpleAclAuthorizer.addAcls(Set(acl1), commonResource)
-    simpleAclAuthorizer.addAcls(Set(acl2), commonResource)
-
-    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
-  }
-
-  @Test
-  def testDistributedConcurrentModificationOfResourceAcls(): Unit = {
-    val commonResource = Resource(Topic, "test", LITERAL)
-
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val acl1 = new Acl(user1, Allow, WildCardHost, Read)
-
-    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
-    val acl2 = new Acl(user2, Deny, WildCardHost, Read)
-
-    // Add on each instance
-    simpleAclAuthorizer.addAcls(Set(acl1), commonResource)
-    simpleAclAuthorizer2.addAcls(Set(acl2), commonResource)
-
-    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
-    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer2, commonResource)
-
-    val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "joe")
-    val acl3 = new Acl(user3, Deny, WildCardHost, Read)
-
-    // Add on one instance and delete on another
-    simpleAclAuthorizer.addAcls(Set(acl3), commonResource)
-    val deleted = simpleAclAuthorizer2.removeAcls(Set(acl3), commonResource)
-
-    assertTrue(deleted, "The authorizer should see a value that needs to be deleted")
-
-    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
-    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer2, commonResource)
-  }
-
-  @Test
-  def testHighConcurrencyModificationOfResourceAcls(): Unit = {
-    val commonResource = Resource(Topic, "test", LITERAL)
-
-    val acls = (0 to 50).map { i =>
-      val useri = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, i.toString)
-      new Acl(useri, Allow, WildCardHost, Read)
-    }
-
-    // Alternate authorizer, Remove all acls that end in 0
-    val concurrentFuctions = acls.map { acl =>
-      () => {
-        val aclId = acl.principal.getName.toInt
-        if (aclId % 2 == 0) {
-          simpleAclAuthorizer.addAcls(Set(acl), commonResource)
-        } else {
-          simpleAclAuthorizer2.addAcls(Set(acl), commonResource)
-        }
-        if (aclId % 10 == 0) {
-          simpleAclAuthorizer2.removeAcls(Set(acl), commonResource)
-        }
-      }
-    }
-
-    val expectedAcls = acls.filter { acl =>
-      val aclId = acl.principal.getName.toInt
-      aclId % 10 != 0
-    }.toSet
-
-    TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
-
-    TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer, commonResource)
-    TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer2, commonResource)
-  }
-
-  /**
-    * Test ACL inheritance, as described in #{org.apache.kafka.common.acl.AclOperation}
-    */
-  @Test
-  def testAclInheritance(): Unit = {
-    testImplicationsOfAllow(All, Set(Read, Write, Create, Delete, Alter, Describe,
-      ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite))
-    testImplicationsOfDeny(All, Set(Read, Write, Create, Delete, Alter, Describe,
-      ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite))
-    testImplicationsOfAllow(Read, Set(Describe))
-    testImplicationsOfAllow(Write, Set(Describe))
-    testImplicationsOfAllow(Delete, Set(Describe))
-    testImplicationsOfAllow(Alter, Set(Describe))
-    testImplicationsOfDeny(Describe, Set())
-    testImplicationsOfAllow(AlterConfigs, Set(DescribeConfigs))
-    testImplicationsOfDeny(DescribeConfigs, Set())
-  }
-
-  private def testImplicationsOfAllow(parentOp: Operation, allowedOps: Set[Operation]): Unit = {
-    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val host = InetAddress.getByName("192.168.3.1")
-    val hostSession = Session(user, host)
-    val acl = Acl(user, Allow, WildCardHost, parentOp)
-    simpleAclAuthorizer.addAcls(Set(acl), Resource.ClusterResource)
-    Operation.values.foreach { op =>
-      val authorized = simpleAclAuthorizer.authorize(hostSession, op, Resource.ClusterResource)
-      if (allowedOps.contains(op) || op == parentOp)
-        assertTrue(authorized, s"ALLOW $parentOp should imply ALLOW $op")
-      else
-        assertFalse(authorized, s"ALLOW $parentOp should not imply ALLOW $op")
-    }
-    simpleAclAuthorizer.removeAcls(Set(acl), Resource.ClusterResource)
-  }
-
-  private def testImplicationsOfDeny(parentOp: Operation, deniedOps: Set[Operation]): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val host1 = InetAddress.getByName("192.168.3.1")
-    val host1Session = Session(user1, host1)
-    val acls = Set(Acl(user1, Deny, WildCardHost, parentOp), Acl(user1, Allow, WildCardHost, All))
-    simpleAclAuthorizer.addAcls(acls, Resource.ClusterResource)
-    Operation.values.foreach { op =>
-      val authorized = simpleAclAuthorizer.authorize(host1Session, op, Resource.ClusterResource)
-      if (deniedOps.contains(op) || op == parentOp)
-        assertFalse(authorized, s"DENY $parentOp should imply DENY $op")
-      else
-        assertTrue(authorized, s"DENY $parentOp should not imply DENY $op")
-    }
-    simpleAclAuthorizer.removeAcls(acls, Resource.ClusterResource)
-  }
-
-  @Test
-  def testHighConcurrencyDeletionOfResourceAcls(): Unit = {
-    val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All)
-
-    // Alternate authorizer to keep adding and removing ZooKeeper path
-    val concurrentFuctions = (0 to 50).map { _ =>
-      () => {
-        simpleAclAuthorizer.addAcls(Set(acl), resource)
-        simpleAclAuthorizer2.removeAcls(Set(acl), resource)
-      }
-    }
-
-    TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
-
-    TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
-    TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer2, resource)
-  }
-
-  @Test
-  def testAccessAllowedIfAllowAclExistsOnWildcardResource(): Unit = {
-    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), wildCardResource)
-
-    assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
-  }
-
-  @Test
-  def testDeleteAclOnWildcardResource(): Unit = {
-    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), wildCardResource)
-
-    simpleAclAuthorizer.removeAcls(Set[Acl](allowReadAcl), wildCardResource)
-
-    assertEquals(Set(allowWriteAcl), simpleAclAuthorizer.getAcls(wildCardResource))
-  }
-
-  @Test
-  def testDeleteAllAclOnWildcardResource(): Unit = {
-    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), wildCardResource)
-
-    simpleAclAuthorizer.removeAcls(wildCardResource)
-
-    assertEquals(Map(), simpleAclAuthorizer.getAcls())
-  }
-
-  @Test
-  def testAccessAllowedIfAllowAclExistsOnPrefixedResource(): Unit = {
-    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
-
-    assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
-  }
-
-  @Test
-  def testDeleteAclOnPrefixedResource(): Unit = {
-    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
-
-    simpleAclAuthorizer.removeAcls(Set[Acl](allowReadAcl), prefixedResource)
-
-    assertEquals(Set(allowWriteAcl), simpleAclAuthorizer.getAcls(prefixedResource))
-  }
-
-  @Test
-  def testDeleteAllAclOnPrefixedResource(): Unit = {
-    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
-
-    simpleAclAuthorizer.removeAcls(prefixedResource)
-
-    assertEquals(Map(), simpleAclAuthorizer.getAcls())
-  }
-
-  @Test
-  def testAddAclsOnLiteralResource(): Unit = {
-    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), resource)
-    simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), resource)
-
-    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(resource))
-    assertEquals(Set(), simpleAclAuthorizer.getAcls(wildCardResource))
-    assertEquals(Set(), simpleAclAuthorizer.getAcls(prefixedResource))
-  }
-
-  @Test
-  def testAddAclsOnWildcardResource(): Unit = {
-    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), wildCardResource)
-    simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), wildCardResource)
-
-    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(wildCardResource))
-    assertEquals(Set(), simpleAclAuthorizer.getAcls(resource))
-    assertEquals(Set(), simpleAclAuthorizer.getAcls(prefixedResource))
-  }
-
-  @Test
-  def testAddAclsOnPrefiexedResource(): Unit = {
-    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
-    simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), prefixedResource)
-
-    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(prefixedResource))
-    assertEquals(Set(), simpleAclAuthorizer.getAcls(wildCardResource))
-    assertEquals(Set(), simpleAclAuthorizer.getAcls(resource))
-  }
-
-  @Test
-  def testAuthorizeWithPrefixedResource(): Unit = {
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", LITERAL))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", PREFIXED))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), PREFIXED))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), PREFIXED))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID() + "-zzz", PREFIXED))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fooo-" + UUID.randomUUID(), PREFIXED))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fo-" + UUID.randomUUID(), PREFIXED))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fop-" + UUID.randomUUID(), PREFIXED))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-" + UUID.randomUUID(), PREFIXED))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-", PREFIXED))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED))
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", LITERAL))
-
-    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
-
-    assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
-  }
-
-  @Test
-  def testSingleCharacterResourceAcls(): Unit = {
-    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Topic, "f", LITERAL))
-    assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "f", LITERAL)))
-    assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "foo", LITERAL)))
-
-    simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Topic, "_", PREFIXED))
-    assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "_foo", LITERAL)))
-    assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "_", LITERAL)))
-    assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "foo_", LITERAL)))
-  }
-
-  @Test
-  def testGetAclsPrincipal(): Unit = {
-    val aclOnSpecificPrincipal = new Acl(principal, Allow, WildCardHost, Write)
-    simpleAclAuthorizer.addAcls(Set[Acl](aclOnSpecificPrincipal), resource)
-
-    assertEquals(0, simpleAclAuthorizer.getAcls(Acl.WildCardPrincipal).size,
-      "acl on specific should not be returned for wildcard request")
-    assertEquals(1, simpleAclAuthorizer.getAcls(principal).size,
-      "acl on specific should be returned for specific request")
-    assertEquals(1, simpleAclAuthorizer.getAcls(new KafkaPrincipal(principal.getPrincipalType, principal.getName)).size,
-      "acl on specific should be returned for different principal instance")
-
-    simpleAclAuthorizer.removeAcls(resource)
-    val aclOnWildcardPrincipal = new Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
-    simpleAclAuthorizer.addAcls(Set[Acl](aclOnWildcardPrincipal), resource)
-
-    assertEquals(1, simpleAclAuthorizer.getAcls(Acl.WildCardPrincipal).size,
-      "acl on wildcard should be returned for wildcard request")
-    assertEquals(0, simpleAclAuthorizer.getAcls(principal).size,
-      "acl on wildcard should not be returned for specific request")
-  }
-
-  @Test
-  def testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow(): Unit = {
-    givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0))
-    assertThrows(classOf[UnsupportedVersionException], () => simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED)))
-  }
-
-  @Test
-  def testWritesExtendedAclChangeEventIfInterBrokerProtocolNotSet(): Unit = {
-    givenAuthorizerWithProtocolVersion(Option.empty)
-    val resource = Resource(Topic, "z_other", PREFIXED)
-    val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
-
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
-
-    val actual = getAclChangeEventAsString(PREFIXED)
-
-    assertEquals(expected, actual)
-  }
-
-  @Test
-  def testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2(): Unit = {
-    givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1))
-    val resource = Resource(Topic, "z_other", PREFIXED)
-    val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
-
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
-
-    val actual = getAclChangeEventAsString(PREFIXED)
-
-    assertEquals(expected, actual)
-  }
-
-  @Test
-  def testWritesLiteralWritesLiteralAclChangeEventWhenInterBrokerProtocolLessThanKafkaV2eralAclChangesForOlderProtocolVersions(): Unit = {
-    givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0))
-    val resource = Resource(Topic, "z_other", LITERAL)
-    val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
-
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
-
-    val actual = getAclChangeEventAsString(LITERAL)
-
-    assertEquals(expected, actual)
-  }
-
-  @Test
-  def testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2(): Unit = {
-    givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1))
-    val resource = Resource(Topic, "z_other", LITERAL)
-    val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
-
-    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
-
-    val actual = getAclChangeEventAsString(LITERAL)
-
-    assertEquals(expected, actual)
-  }
-
-  private def givenAuthorizerWithProtocolVersion(protocolVersion: Option[ApiVersion]): Unit = {
-    simpleAclAuthorizer.close()
-
-    val props = TestUtils.createBrokerConfig(0, zkConnect)
-    props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers)
-    protocolVersion.foreach(version => props.put(KafkaConfig.InterBrokerProtocolVersionProp, version.toString))
-
-    config = KafkaConfig.fromProps(props)
-
-    simpleAclAuthorizer.configure(config.originals)
-  }
-
-  private def getAclChangeEventAsString(patternType: PatternType) = {
-    val store = ZkAclStore(patternType)
-    val children = zooKeeperClient.handleRequest(GetChildrenRequest(store.changeStore.aclChangePath, registerWatch = true))
-    children.maybeThrow()
-    assertEquals(1, children.children.size, "Expecting 1 change event")
-
-    val data = zooKeeperClient.handleRequest(GetDataRequest(s"${store.changeStore.aclChangePath}/${children.children.head}"))
-    data.maybeThrow()
-
-    new String(data.data, UTF_8)
-  }
-
-  private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = {
-    var acls = originalAcls
-
-    if(addedAcls.nonEmpty) {
-      simpleAclAuthorizer.addAcls(addedAcls, resource)
-      acls ++= addedAcls
-    }
-
-    if(removedAcls.nonEmpty) {
-      simpleAclAuthorizer.removeAcls(removedAcls, resource)
-      acls --=removedAcls
-    }
-
-    TestUtils.waitAndVerifyAcls(acls, simpleAclAuthorizer, resource)
-
-    acls
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
deleted file mode 100644
index 7c575e4..0000000
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.authorizer
-
-import java.util.UUID
-
-import kafka.security.auth.SimpleAclAuthorizer
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
-import kafka.zookeeper.ZooKeeperClient
-import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.resource.PatternType.LITERAL
-import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.resource.ResourceType._
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.authorizer._
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-
-import scala.annotation.nowarn
-
-class AuthorizerWrapperTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
-  @nowarn("cat=deprecation")
-  private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new SimpleAclAuthorizer)
-  @nowarn("cat=deprecation")
-  private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new SimpleAclAuthorizer)
-
-  override def authorizer: Authorizer = wrappedSimpleAuthorizer
-
-  @BeforeEach
-  @nowarn("cat=deprecation")
-  override def setUp(): Unit = {
-    super.setUp()
-
-    val props = TestUtils.createBrokerConfig(0, zkConnect)
-
-    props.put(AclAuthorizer.SuperUsersProp, superUsers)
-    config = KafkaConfig.fromProps(props)
-    wrappedSimpleAuthorizer.configure(config.originals)
-
-    props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
-    config = KafkaConfig.fromProps(props)
-    wrappedSimpleAuthorizerAllowEveryone.configure(config.originals)
-
-    resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
-    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
-      Time.SYSTEM, "kafka.test", "AuthorizerWrapperTest")
-  }
-
-  @AfterEach
-  override def tearDown(): Unit = {
-    val authorizers = Seq(wrappedSimpleAuthorizer, wrappedSimpleAuthorizerAllowEveryone)
-    authorizers.foreach(a => {
-      a.close()
-    })
-    zooKeeperClient.close()
-    super.tearDown()
-  }
-
-  @Test
-  def testAuthorizeByResourceTypeEnableAllowEveryOne(): Unit = {
-    testAuthorizeByResourceTypeEnableAllowEveryOne(wrappedSimpleAuthorizer)
-  }
-
-  private def testAuthorizeByResourceTypeEnableAllowEveryOne(authorizer: Authorizer): Unit = {
-    assertTrue(authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, requestContext, READ, resource.resourceType()),
-      "If allow.everyone.if.no.acl.found = true, " +
-      "caller should have read access to at least one topic")
-    val allUser = AclEntry.WildcardPrincipalString
-    val allHost = AclEntry.WildcardHost
-    val denyAll = new AccessControlEntry(allUser, allHost, ALL, AclPermissionType.DENY)
-    val wildcardResource = new ResourcePattern(resource.resourceType(), AclEntry.WildcardResource, LITERAL)
-
-    addAcls(wrappedSimpleAuthorizerAllowEveryone, Set(denyAll), resource)
-    assertTrue(authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, requestContext, READ, resource.resourceType()),
-      "Should still allow since the deny only apply on the specific resource")
-
-    addAcls(wrappedSimpleAuthorizerAllowEveryone, Set(denyAll), wildcardResource)
-    assertFalse(authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, requestContext, READ, resource.resourceType()),
-      "When an ACL binding which can deny all users and hosts exists, " +
-      "even if allow.everyone.if.no.acl.found = true, caller shouldn't have read access any topic")
-  }
-
-  @Test
-  def testAuthorizeByResourceTypeDisableAllowEveryoneOverride(): Unit = {
-    assertFalse (authorizeByResourceType(wrappedSimpleAuthorizer, requestContext, READ, resource.resourceType()),
-      "If allow.everyone.if.no.acl.found = false, " +
-      "caller shouldn't have read access to any topic")
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 38a5a12..b9e5667 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -34,7 +34,6 @@ import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.log._
 import kafka.metrics.KafkaYammerMetrics
-import kafka.security.auth.{Acl, Resource, Authorizer => LegacyAuthorizer}
 import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, MetadataBroker}
@@ -1325,15 +1324,6 @@ object TestUtils extends Logging {
         s"but got:${authorizer.acls(filter).asScala.map(_.entry).mkString(newLine + "\t", newLine + "\t", newLine)}")
   }
 
-  @deprecated("Use org.apache.kafka.server.authorizer.Authorizer", "Since 2.5")
-  def waitAndVerifyAcls(expected: Set[Acl], authorizer: LegacyAuthorizer, resource: Resource): Unit = {
-    val newLine = scala.util.Properties.lineSeparator
-
-    waitUntilTrue(() => authorizer.getAcls(resource) == expected,
-      s"expected acls:${expected.mkString(newLine + "\t", newLine + "\t", newLine)}" +
-        s"but got:${authorizer.getAcls(resource).mkString(newLine + "\t", newLine + "\t", newLine)}")
-  }
-
   /**
    * Verifies that this ACL is the secure one.
    */
diff --git a/docs/upgrade.html b/docs/upgrade.html
index b7666ec..a486b6c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -27,6 +27,8 @@
         or updating the application not to use internal classes.</li>
     <li>The Streams API removed all deprecated APIs that were deprecated in version 2.5.0 or earlier.
         For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes.</li>
+    <li>The deprecated Scala <code>Authorizer</code>, <code>SimpleAclAuthorizer</code> and related classes have been removed. Please use the Java <code>Authorizer</code>
+        and <code>AclAuthorizer</code> instead.</li>
     <li>The deprecated <code>Metric#value()</code> method was removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12573">KAFKA-12573</a>).</li>
     <li>Deprecated security classes were removed: <code>PrincipalBuilder</code>, <code>DefaultPrincipalBuilder</code> and <code>ResourceFilter</code>.
         Furthermore, deprecated constants and constructors were removed from <code>SslConfigs</code>, <code>SaslConfigs</code>,
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index e93a1c5..fcf3900 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -155,8 +155,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties")
     # Kafka Authorizer
     ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer"
-    # Old Kafka Authorizer. This is deprecated but still supported.
-    SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
     HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
     INTERBROKER_LISTENER_NAME = 'INTERNAL'
     JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf"
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index 4a176b0..6e66ffc 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -70,8 +70,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.kafka.close_port(SecurityConfig.PLAINTEXT)
         self.set_authorizer_and_bounce(client_protocol, broker_protocol)
 
-    def set_authorizer_and_bounce(self, client_protocol, broker_protocol, authorizer_class_name = KafkaService.ACL_AUTHORIZER):
-        self.kafka.authorizer_class_name = authorizer_class_name
+    def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
+        self.kafka.authorizer_class_name = KafkaService.ACL_AUTHORIZER
         # Force use of direct ZooKeeper access due to SecurityDisabledException: No Authorizer is configured on the broker.
         self.acls.set_acls(client_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
         self.acls.set_acls(broker_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
@@ -93,8 +93,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.kafka.interbroker_sasl_mechanism = new_sasl_mechanism
         self.bounce()
 
-        # Bounce again with ACLs for new mechanism. Use old SimpleAclAuthorizer here to ensure that is also tested.
-        self.set_authorizer_and_bounce(security_protocol, security_protocol, KafkaService.SIMPLE_AUTHORIZER)
+        # Bounce again with ACLs for new mechanism.
+        self.set_authorizer_and_bounce(security_protocol, security_protocol)
 
     def add_separate_broker_listener(self, broker_security_protocol, broker_sasl_mechanism):
         # Enable the new internal listener on all brokers first