You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/04/03 15:25:14 UTC
[kafka] branch trunk updated: KAFKA-12590: Remove deprecated
kafka.security.auth.Authorizer,
SimpleAclAuthorizer and related classes in 3.0 (#10450)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 976e78e KAFKA-12590: Remove deprecated kafka.security.auth.Authorizer, SimpleAclAuthorizer and related classes in 3.0 (#10450)
976e78e is described below
commit 976e78e405d57943b989ac487b7f49119b0f4af4
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sat Apr 3 08:23:26 2021 -0700
KAFKA-12590: Remove deprecated kafka.security.auth.Authorizer, SimpleAclAuthorizer and related classes in 3.0 (#10450)
These were deprecated in Apache Kafka 2.4 (released in December 2019) to be replaced
by `org.apache.kafka.server.authorizer.Authorizer` and `AclAuthorizer`.
As part of KIP-500, we will implement a new `Authorizer` implementation that relies
on a topic (potentially a KRaft topic) instead of `ZooKeeper`, so we should take the chance
to remove related tech debt in 3.0.
Details on the issues affecting the old Authorizer interface can be found in the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-504+-+Add+new+Java+Authorizer+Interface
Reviewers: Manikumar Reddy <ma...@gmail.com>, Ron Dagostino <rd...@confluent.io>
---
core/src/main/scala/kafka/security/auth/Acl.scala | 86 ---
.../scala/kafka/security/auth/Authorizer.scala | 149 -----
.../main/scala/kafka/security/auth/Operation.scala | 113 ----
.../scala/kafka/security/auth/PermissionType.scala | 50 --
.../main/scala/kafka/security/auth/Resource.scala | 85 ---
.../scala/kafka/security/auth/ResourceType.scala | 93 ---
.../kafka/security/auth/SimpleAclAuthorizer.scala | 175 -----
.../kafka/security/authorizer/AclAuthorizer.scala | 12 +-
.../security/authorizer/AuthorizerUtils.scala | 19 +-
.../security/authorizer/AuthorizerWrapper.scala | 223 -------
core/src/main/scala/kafka/server/KafkaConfig.scala | 3 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 5 +-
.../kafka/api/GroupAuthorizerIntegrationTest.scala | 4 +-
.../SaslGssapiSslEndToEndAuthorizationTest.scala | 6 +-
.../kafka/api/SaslSslAdminIntegrationTest.scala | 91 ++-
.../kafka/api/SslAdminIntegrationTest.scala | 63 +-
.../scala/kafka/security/auth/ResourceTest.scala | 68 --
.../unit/kafka/security/auth/OperationTest.scala | 40 --
.../kafka/security/auth/PermissionTypeTest.scala | 49 --
.../kafka/security/auth/ResourceTypeTest.scala | 48 --
.../security/auth/SimpleAclAuthorizerTest.scala | 731 ---------------------
.../authorizer/AuthorizerWrapperTest.scala | 106 ---
.../test/scala/unit/kafka/utils/TestUtils.scala | 10 -
docs/upgrade.html | 2 +
tests/kafkatest/services/kafka/kafka.py | 2 -
.../tests/core/security_rolling_upgrade_test.py | 8 +-
26 files changed, 68 insertions(+), 2173 deletions(-)
diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala
deleted file mode 100644
index befd9d2..0000000
--- a/core/src/main/scala/kafka/security/auth/Acl.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.auth
-
-import kafka.security.authorizer.AclEntry
-import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-
-@deprecated("Use org.apache.kafka.common.acl.AclBinding", "Since 2.5")
-object Acl {
- val WildCardPrincipal: KafkaPrincipal = AclEntry.WildcardPrincipal
- val WildCardHost: String = AclEntry.WildcardHost
- val WildCardResource: String = ResourcePattern.WILDCARD_RESOURCE
- val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All)
- val PrincipalKey = AclEntry.PrincipalKey
- val PermissionTypeKey = AclEntry.PermissionTypeKey
- val OperationKey = AclEntry.OperationKey
- val HostsKey = AclEntry.HostsKey
- val VersionKey = AclEntry.VersionKey
- val CurrentVersion = AclEntry.CurrentVersion
- val AclsKey = AclEntry.AclsKey
-
- /**
- *
- * @see AclEntry
- */
- def fromBytes(bytes: Array[Byte]): Set[Acl] = {
- AclEntry.fromBytes(bytes)
- .map(ace => Acl(ace.kafkaPrincipal,
- PermissionType.fromJava(ace.permissionType()),
- ace.host(),
- Operation.fromJava(ace.operation())))
- }
-
- def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = {
- AclEntry.toJsonCompatibleMap(acls.map(acl =>
- AclEntry(acl.principal, acl.permissionType.toJava, acl.host, acl.operation.toJava)
- ))
- }
-}
-
-/**
- * An instance of this class will represent an acl that can express following statement.
- * <pre>
- * Principal P has permissionType PT on Operation O1 from hosts H1.
- * </pre>
- * @param principal A value of *:* indicates all users.
- * @param permissionType
- * @param host A value of * indicates all hosts.
- * @param operation A value of ALL indicates all operations.
- */
-@deprecated("Use org.apache.kafka.common.acl.AclBinding", "Since 2.5")
-case class Acl(principal: KafkaPrincipal, permissionType: PermissionType, host: String, operation: Operation) {
-
- /**
- * TODO: Ideally we would have a symmetric toJson method but our current json library can not jsonify/dejsonify complex objects.
- * @return Map representation of the Acl.
- */
- def toMap(): Map[String, Any] = {
- Map(Acl.PrincipalKey -> principal.toString,
- Acl.PermissionTypeKey -> permissionType.name,
- Acl.OperationKey -> operation.name,
- Acl.HostsKey -> host)
- }
-
- override def toString: String = {
- "%s has %s permission for operations: %s from hosts: %s".format(principal, permissionType.name, operation, host)
- }
-
-}
-
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala
deleted file mode 100644
index 7509171..0000000
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.auth
-
-import kafka.network.RequestChannel.Session
-import org.apache.kafka.common.Configurable
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-
-/**
- * Top level interface that all pluggable authorizers must implement. Kafka will read the `authorizer.class.name` config
- * value at startup time, create an instance of the specified class using the default constructor, and call its
- * `configure` method.
- *
- * From that point onwards, every client request will first be routed to the `authorize` method and the request will only
- * be authorized if the method returns true.
- *
- * If `authorizer.class.name` has no value specified, then no authorization will be performed, and all operations are
- * permitted.
- */
-@deprecated("Use org.apache.kafka.server.authorizer.Authorizer", "Since 2.4")
-trait Authorizer extends Configurable {
-
- /**
- * @param session The session being authenticated.
- * @param operation Type of operation client is trying to perform on resource.
- * @param resource Resource the client is trying to access. Resource pattern type is always literal in input resource.
- * @return true if the operation should be permitted, false otherwise
- */
- def authorize(session: Session, operation: Operation, resource: Resource): Boolean
-
- /**
- * add the acls to resource, this is an additive operation so existing acls will not be overwritten, instead these new
- * acls will be added to existing acls.
- *
- * {code}
- * // The following will add ACLs to the literal resource path 'foo', which will only affect the topic named 'foo':
- * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL))
- *
- * // The following will add ACLs to the special literal topic resource path '*', which affects all topics:
- * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL))
- *
- * // The following will add ACLs to the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
- * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED))
- * {code}
- *
- * @param acls set of acls to add to existing acls
- * @param resource the resource path to which these acls should be attached.
- * the supplied resource will have a specific resource pattern type,
- * i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
- */
- def addAcls(acls: Set[Acl], resource: Resource): Unit
-
- /**
- * remove these acls from the resource.
- *
- * {code}
- * // The following will remove ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
- * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL))
- *
- * // The following will remove ACLs from the special literal topic resource path '*', which affects all topics:
- * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL))
- *
- * // The following will remove ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
- * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED))
- * {code}
- *
- * @param acls set of acls to be removed.
- * @param resource resource path from which the acls should be removed.
- * the supplied resource will have a specific resource pattern type,
- * i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
- * @return true if some acl got removed, false if no acl was removed.
- */
- def removeAcls(acls: Set[Acl], resource: Resource): Boolean
-
- /**
- * remove a resource along with all of its acls from acl store.
- *
- * {code}
- * // The following will remove all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
- * authorizer.removeAcls(Resource(Topic, "foo", LITERAL))
- *
- * // The following will remove all ACLs from the special literal topic resource path '*', which affects all topics:
- * authorizer.removeAcls(Resource(Topic, "*", LITERAL))
- *
- * // The following will remove all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
- * authorizer.removeAcls(Resource(Topic, "foo", PREFIXED))
- * {code}
- *
- * @param resource the resource path from which these acls should be removed.
- * the supplied resource will have a specific resource pattern type,
- * i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
- * @return
- */
- def removeAcls(resource: Resource): Boolean
-
- /**
- * get set of acls for the supplied resource
- *
- * {code}
- * // The following will get all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
- * authorizer.removeAcls(Resource(Topic, "foo", LITERAL))
- *
- * // The following will get all ACLs from the special literal topic resource path '*', which affects all topics:
- * authorizer.removeAcls(Resource(Topic, "*", LITERAL))
- *
- * // The following will get all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
- * authorizer.removeAcls(Resource(Topic, "foo", PREFIXED))
- * {code}
- *
- * @param resource the resource path to which the acls belong.
- * the supplied resource will have a specific resource pattern type,
- * i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
- * @return empty set if no acls are found, otherwise the acls for the resource.
- */
- def getAcls(resource: Resource): Set[Acl]
-
- /**
- * get the acls for this principal.
- * @param principal principal name.
- * @return empty Map if no acls exist for this principal, otherwise a map of resource -> acls for the principal.
- */
- def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]]
-
- /**
- * gets the map of resource paths to acls for all resources.
- */
- def getAcls(): Map[Resource, Set[Acl]]
-
- /**
- * Closes this instance.
- */
- def close(): Unit
-
-}
diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala
deleted file mode 100644
index 3ae2384..0000000
--- a/core/src/main/scala/kafka/security/auth/Operation.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.{BaseEnum, KafkaException}
-import org.apache.kafka.common.acl.AclOperation
-
-/**
- * Different operations a client may perform on kafka resources.
- */
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-sealed trait Operation extends BaseEnum {
- def toJava : AclOperation
-}
-
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object Read extends Operation {
- val name = "Read"
- val toJava = AclOperation.READ
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object Write extends Operation {
- val name = "Write"
- val toJava = AclOperation.WRITE
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object Create extends Operation {
- val name = "Create"
- val toJava = AclOperation.CREATE
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object Delete extends Operation {
- val name = "Delete"
- val toJava = AclOperation.DELETE
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object Alter extends Operation {
- val name = "Alter"
- val toJava = AclOperation.ALTER
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object Describe extends Operation {
- val name = "Describe"
- val toJava = AclOperation.DESCRIBE
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object ClusterAction extends Operation {
- val name = "ClusterAction"
- val toJava = AclOperation.CLUSTER_ACTION
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object DescribeConfigs extends Operation {
- val name = "DescribeConfigs"
- val toJava = AclOperation.DESCRIBE_CONFIGS
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object AlterConfigs extends Operation {
- val name = "AlterConfigs"
- val toJava = AclOperation.ALTER_CONFIGS
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object IdempotentWrite extends Operation {
- val name = "IdempotentWrite"
- val toJava = AclOperation.IDEMPOTENT_WRITE
-}
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-case object All extends Operation {
- val name = "All"
- val toJava = AclOperation.ALL
-}
-
-@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
-object Operation {
-
- def fromString(operation: String): Operation = {
- val op = values.find(op => op.name.equalsIgnoreCase(operation))
- op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(",")))
- }
-
- def fromJava(operation: AclOperation): Operation = {
- operation match {
- case AclOperation.READ => Read
- case AclOperation.WRITE => Write
- case AclOperation.CREATE => Create
- case AclOperation.DELETE => Delete
- case AclOperation.ALTER => Alter
- case AclOperation.DESCRIBE => Describe
- case AclOperation.CLUSTER_ACTION => ClusterAction
- case AclOperation.ALTER_CONFIGS => AlterConfigs
- case AclOperation.DESCRIBE_CONFIGS => DescribeConfigs
- case AclOperation.IDEMPOTENT_WRITE => IdempotentWrite
- case AclOperation.ALL => All
- case _ => throw new KafkaException(operation + " is not a convertible operation name. The valid names are " + values.mkString(","))
- }
- }
-
- def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs,
- DescribeConfigs, IdempotentWrite, All)
-}
diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala
deleted file mode 100644
index c5325b2..0000000
--- a/core/src/main/scala/kafka/security/auth/PermissionType.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.{BaseEnum, KafkaException}
-import org.apache.kafka.common.acl.AclPermissionType
-
-@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5")
-sealed trait PermissionType extends BaseEnum {
- val toJava: AclPermissionType
-}
-
-@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5")
-case object Allow extends PermissionType {
- val name = "Allow"
- val toJava = AclPermissionType.ALLOW
-}
-
-@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5")
-case object Deny extends PermissionType {
- val name = "Deny"
- val toJava = AclPermissionType.DENY
-}
-
-@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5")
-object PermissionType {
- def fromString(permissionType: String): PermissionType = {
- val pType = values.find(pType => pType.name.equalsIgnoreCase(permissionType))
- pType.getOrElse(throw new KafkaException(permissionType + " not a valid permissionType name. The valid names are " + values.mkString(",")))
- }
-
- def fromJava(permissionType: AclPermissionType): PermissionType = fromString(permissionType.toString)
-
- def values: Seq[PermissionType] = List(Allow, Deny)
-}
-
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
deleted file mode 100644
index 8045c68..0000000
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.KafkaException
-import kafka.security.authorizer.AclEntry
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourcePattern", "Since 2.5")
-object Resource {
- val Separator = AclEntry.ResourceSeparator
- val ClusterResourceName = "kafka-cluster"
- val ClusterResource = Resource(Cluster, Resource.ClusterResourceName, PatternType.LITERAL)
- val WildCardResource = AclEntry.WildcardResource
-
- @deprecated("This resource name is not used by Kafka and will be removed in a future release", since = "2.1")
- val ProducerIdResourceName = "producer-id" // This is not used since we don't have a producer id resource
-
- def fromString(str: String): Resource = {
- ResourceType.values.find(resourceType => str.startsWith(resourceType.name + Separator)) match {
- case None => throw new KafkaException("Invalid resource string: '" + str + "'")
- case Some(resourceType) =>
- val remaining = str.substring(resourceType.name.length + 1)
-
- PatternType.values.find(patternType => remaining.startsWith(patternType.name + Separator)) match {
- case Some(patternType) =>
- val name = remaining.substring(patternType.name.length + 1)
- Resource(resourceType, name, patternType)
-
- case None =>
- Resource(resourceType, remaining, PatternType.LITERAL)
- }
- }
- }
-}
-
-/**
- *
- * @param resourceType non-null type of resource.
- * @param name non-null name of the resource, for topic this will be topic name , for group it will be group name. For cluster type
- * it will be a constant string kafka-cluster.
- * @param patternType non-null resource pattern type: literal, prefixed, etc.
- */
-@deprecated("Use org.apache.kafka.common.resource.ResourcePattern", "Since 2.5")
-case class Resource(resourceType: ResourceType, name: String, patternType: PatternType) {
-
- if (!patternType.isSpecific)
- throw new IllegalArgumentException(s"patternType must not be $patternType")
-
- /**
- * Create an instance of this class with the provided parameters.
- * Resource pattern type would default to PatternType.LITERAL.
- *
- * @param resourceType non-null resource type
- * @param name non-null resource name
- * @deprecated Since 2.0, use [[kafka.security.auth.Resource(ResourceType, String, PatternType)]]
- */
- @deprecated("Use Resource(ResourceType, String, PatternType", "Since 2.0")
- def this(resourceType: ResourceType, name: String) = {
- this(resourceType, name, PatternType.LITERAL)
- }
-
- def toPattern: ResourcePattern = {
- new ResourcePattern(resourceType.toJava, name, patternType)
- }
-
- override def toString: String = {
- resourceType.name + Resource.Separator + patternType + Resource.Separator + name
- }
-}
-
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
deleted file mode 100644
index 72b71c0..0000000
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.{BaseEnum, KafkaException}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.resource.{ResourceType => JResourceType}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-sealed trait ResourceType extends BaseEnum with Ordered[ ResourceType ] {
- def error: Errors
- def toJava: JResourceType
- // this method output will not include "All" Operation type
- def supportedOperations: Set[Operation]
-
- override def compare(that: ResourceType): Int = this.name compare that.name
-}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-case object Topic extends ResourceType {
- val name = "Topic"
- val error = Errors.TOPIC_AUTHORIZATION_FAILED
- val toJava = JResourceType.TOPIC
- val supportedOperations = Set(Read, Write, Create, Describe, Delete, Alter, DescribeConfigs, AlterConfigs)
-}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-case object Group extends ResourceType {
- val name = "Group"
- val error = Errors.GROUP_AUTHORIZATION_FAILED
- val toJava = JResourceType.GROUP
- val supportedOperations = Set(Read, Describe, Delete)
-}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-case object Cluster extends ResourceType {
- val name = "Cluster"
- val error = Errors.CLUSTER_AUTHORIZATION_FAILED
- val toJava = JResourceType.CLUSTER
- val supportedOperations = Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe)
-}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-case object TransactionalId extends ResourceType {
- val name = "TransactionalId"
- val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
- val toJava = JResourceType.TRANSACTIONAL_ID
- val supportedOperations = Set(Describe, Write)
-}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-case object DelegationToken extends ResourceType {
- val name = "DelegationToken"
- val error = Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
- val toJava = JResourceType.DELEGATION_TOKEN
- val supportedOperations : Set[Operation] = Set(Describe)
-}
-
-@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
-object ResourceType {
-
- def fromString(resourceType: String): ResourceType = {
- val rType = values.find(rType => rType.name.equalsIgnoreCase(resourceType))
- rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
- }
-
- def fromJava(resourceType: JResourceType): ResourceType = {
- resourceType match {
- case JResourceType.TOPIC => Topic
- case JResourceType.GROUP => Group
- case JResourceType.CLUSTER => Cluster
- case JResourceType.TRANSACTIONAL_ID => TransactionalId
- case JResourceType.DELEGATION_TOKEN => DelegationToken
- case _ => throw new KafkaException(resourceType + " is not a convertible resource type. The valid types are " + values.mkString(","))
- }
- }
-
- def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId, DelegationToken)
-}
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
deleted file mode 100644
index 653e154..0000000
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import java.util
-
-import kafka.network.RequestChannel.Session
-import kafka.security.auth.SimpleAclAuthorizer.BaseAuthorizer
-import kafka.security.authorizer.{AclAuthorizer, AuthorizerUtils, AuthorizerWrapper}
-import kafka.utils._
-import kafka.zk.ZkVersion
-import org.apache.kafka.common.acl.{AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
-import org.apache.kafka.common.errors.ApiException
-import org.apache.kafka.common.resource.{PatternType, ResourcePatternFilter}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
-
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.4")
-object SimpleAclAuthorizer {
- //optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in
- //same zookeeper where all other kafka broker info is stored.
- val ZkUrlProp = AclAuthorizer.ZkUrlProp
- val ZkConnectionTimeOutProp = AclAuthorizer.ZkConnectionTimeOutProp
- val ZkSessionTimeOutProp = AclAuthorizer.ZkSessionTimeOutProp
- val ZkMaxInFlightRequests = AclAuthorizer.ZkMaxInFlightRequests
-
- //List of users that will be treated as super users and will have access to all the resources for all actions from all hosts, defaults to no super users.
- val SuperUsersProp = AclAuthorizer.SuperUsersProp
- //If set to true when no acls are found for a resource , authorizer allows access to everyone. Defaults to false.
- val AllowEveryoneIfNoAclIsFoundProp = AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp
-
- case class VersionedAcls(acls: Set[Acl], zkVersion: Int) {
- def exists: Boolean = zkVersion != ZkVersion.UnknownVersion
- }
- val NoAcls = VersionedAcls(Set.empty, ZkVersion.UnknownVersion)
-
- private[auth] class BaseAuthorizer extends AclAuthorizer {
- override def logAuditMessage(requestContext: AuthorizableRequestContext, action: Action, authorized: Boolean): Unit = {
- val principal = requestContext.principal
- val host = requestContext.clientAddress.getHostAddress
- val operation = Operation.fromJava(action.operation)
- val resource = AuthorizerWrapper.convertToResource(action.resourcePattern)
- def logMessage: String = {
- val authResult = if (authorized) "Allowed" else "Denied"
- s"Principal = $principal is $authResult Operation = $operation from host = $host on resource = $resource"
- }
-
- if (authorized) authorizerLogger.debug(logMessage)
- else authorizerLogger.info(logMessage)
- }
- }
-}
-
-@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.4")
-class SimpleAclAuthorizer extends Authorizer with Logging {
-
- private val aclAuthorizer = new BaseAuthorizer
-
- // The maximum number of times we should try to update the resource acls in zookeeper before failing;
- // This should never occur, but is a safeguard just in case.
- protected[auth] var maxUpdateRetries = 10
-
-
- /**
- * Guaranteed to be called before any authorize call is made.
- */
- override def configure(javaConfigs: util.Map[String, _]): Unit = {
- aclAuthorizer.configure(javaConfigs)
- }
-
- override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
- val requestContext = AuthorizerUtils.sessionToRequestContext(session)
- val action = new Action(operation.toJava, resource.toPattern, 1, true, true)
- aclAuthorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED
- }
-
- def isSuperUser(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String): Boolean = {
- aclAuthorizer.isSuperUser(principal)
- }
-
- override def addAcls(acls: Set[Acl], resource: Resource): Unit = {
- aclAuthorizer.maxUpdateRetries = maxUpdateRetries
- if (acls != null && acls.nonEmpty) {
- val bindings = acls.map { acl => AuthorizerWrapper.convertToAclBinding(resource, acl) }
- createAcls(bindings)
- }
- }
-
- override def removeAcls(aclsTobeRemoved: Set[Acl], resource: Resource): Boolean = {
- val filters = aclsTobeRemoved.map { acl =>
- new AclBindingFilter(resource.toPattern.toFilter, AuthorizerWrapper.convertToAccessControlEntry(acl).toFilter)
- }
- deleteAcls(filters)
- }
-
- override def removeAcls(resource: Resource): Boolean = {
- val filter = new AclBindingFilter(resource.toPattern.toFilter, AccessControlEntryFilter.ANY)
- deleteAcls(Set(filter))
- }
-
- override def getAcls(resource: Resource): Set[Acl] = {
- val filter = new AclBindingFilter(resource.toPattern.toFilter, AccessControlEntryFilter.ANY)
- acls(filter).getOrElse(resource, Set.empty)
- }
-
- override def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] = {
- val filter = new AclBindingFilter(ResourcePatternFilter.ANY,
- new AccessControlEntryFilter(principal.toString, null, AclOperation.ANY, AclPermissionType.ANY))
- acls(filter)
- }
-
- def getMatchingAcls(resourceType: ResourceType, resourceName: String): Set[Acl] = {
- val filter = new AclBindingFilter(new ResourcePatternFilter(resourceType.toJava, resourceName, PatternType.MATCH),
- AccessControlEntryFilter.ANY)
- acls(filter).flatMap(_._2).toSet
- }
-
- override def getAcls(): Map[Resource, Set[Acl]] = {
- acls(AclBindingFilter.ANY)
- }
-
- def close(): Unit = {
- aclAuthorizer.close()
- }
-
- private def createAcls(bindings: Set[AclBinding]): Unit = {
- aclAuthorizer.maxUpdateRetries = maxUpdateRetries
- val results = aclAuthorizer.createAcls(null, bindings.toList.asJava).asScala.map(_.toCompletableFuture.get)
- results.foreach { result => result.exception.ifPresent(throwException) }
- }
-
- private def deleteAcls(filters: Set[AclBindingFilter]): Boolean = {
- aclAuthorizer.maxUpdateRetries = maxUpdateRetries
- val results = aclAuthorizer.deleteAcls(null, filters.toList.asJava).asScala.map(_.toCompletableFuture.get)
- results.foreach { result => result.exception.ifPresent(throwException) }
- results.flatMap(_.aclBindingDeleteResults.asScala).foreach { result => result.exception.ifPresent(e => throw e) }
- results.exists(r => r.aclBindingDeleteResults.asScala.exists(d => !d.exception.isPresent))
- }
-
- private def acls(filter: AclBindingFilter): Map[Resource, Set[Acl]] = {
- val result = mutable.Map[Resource, mutable.Set[Acl]]()
- aclAuthorizer.acls(filter).forEach { binding =>
- val resource = AuthorizerWrapper.convertToResource(binding.pattern)
- val acl = AuthorizerWrapper.convertToAcl(binding.entry)
- result.getOrElseUpdate(resource, mutable.Set()).add(acl)
- }
- result.mapValues(_.toSet).toMap
- }
-
- // To retain the same exceptions as in previous versions, throw the underlying exception when the exception
- // was wrapped by AclAuthorizer in an ApiException
- private def throwException(e: ApiException): Unit = {
- if (e.getCause != null)
- throw e.getCause
- else
- throw e
- }
-}
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index 46c33ca..cac6480 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{CompletableFuture, CompletionStage}
import com.typesafe.scalalogging.Logger
import kafka.api.KAFKA_2_0_IV1
-import kafka.security.authorizer.AclAuthorizer.{AclSeqs, ResourceOrdering, VersionedAcls}
import kafka.security.authorizer.AclEntry.ResourceSeparator
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils._
@@ -118,9 +117,16 @@ object AclAuthorizer {
zkClientConfig
}
}
+
+ private def validateAclBinding(aclBinding: AclBinding): Unit = {
+ if (aclBinding.isUnknown)
+ throw new IllegalArgumentException("ACL binding contains unknown elements")
+ }
}
class AclAuthorizer extends Authorizer with Logging {
+ import kafka.security.authorizer.AclAuthorizer._
+
private[security] val authorizerLogger = Logger("kafka.authorizer.logger")
private var superUsers = Set.empty[KafkaPrincipal]
private var shouldAllowEveryoneIfNoAclIsFound = false
@@ -200,7 +206,7 @@ class AclAuthorizer extends Authorizer with Logging {
throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " +
s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater")
}
- AuthorizerUtils.validateAclBinding(aclBinding)
+ validateAclBinding(aclBinding)
true
} catch {
case e: Throwable =>
@@ -225,7 +231,7 @@ class AclAuthorizer extends Authorizer with Logging {
}
}
}
- results.toList.map(CompletableFuture.completedFuture[AclCreateResult]).asJava
+ results.toBuffer.map(CompletableFuture.completedFuture[AclCreateResult]).asJava
}
/**
diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
index 0d670be..0e417d6 100644
--- a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
@@ -20,32 +20,15 @@ package kafka.security.authorizer
import java.net.InetAddress
import kafka.network.RequestChannel.Session
-import kafka.security.auth.{Authorizer => LegacyAuthorizer}
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.resource.Resource
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer}
-import scala.annotation.nowarn
-
object AuthorizerUtils {
- @nowarn("cat=deprecation")
- def createAuthorizer(className: String): Authorizer = {
- Utils.newInstance(className, classOf[Object]) match {
- case auth: Authorizer => auth
- case auth: kafka.security.auth.Authorizer => new AuthorizerWrapper(auth)
- case _ => throw new ConfigException(s"Authorizer does not implement ${classOf[Authorizer].getName} or ${classOf[LegacyAuthorizer].getName}.")
- }
- }
-
- def validateAclBinding(aclBinding: AclBinding): Unit = {
- if (aclBinding.isUnknown)
- throw new IllegalArgumentException("ACL binding contains unknown elements")
- }
+ def createAuthorizer(className: String): Authorizer = Utils.newInstance(className, classOf[Authorizer])
def isClusterResource(name: String): Boolean = name.equals(Resource.CLUSTER_NAME)
diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
deleted file mode 100644
index cc25fce..0000000
--- a/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.authorizer
-
-import java.util.concurrent.{CompletableFuture, CompletionStage}
-import java.{lang, util}
-
-import kafka.network.RequestChannel.Session
-import kafka.security.auth.{Acl, Operation, PermissionType, Resource, SimpleAclAuthorizer, ResourceType => ResourceTypeLegacy}
-import kafka.security.authorizer.AuthorizerWrapper._
-import org.apache.kafka.common.Endpoint
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.ApiError
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.kafka.common.utils.SecurityUtils.parseKafkaPrincipal
-import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
-import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, AuthorizerServerInfo, _}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.{Seq, immutable, mutable}
-import scala.jdk.CollectionConverters._
-import scala.util.{Failure, Success, Try}
-
-@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
-object AuthorizerWrapper {
-
- def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = {
- (for {
- resourceType <- Try(ResourceTypeLegacy.fromJava(filter.patternFilter.resourceType))
- principal <- Try(parseKafkaPrincipal(filter.entryFilter.principal))
- operation <- Try(Operation.fromJava(filter.entryFilter.operation))
- permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
- resource = Resource(resourceType, filter.patternFilter.name, filter.patternFilter.patternType)
- acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
- } yield (resource, acl)) match {
- case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, throwable.getMessage))
- case Success(s) => Right(s)
- }
- }
-
- def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
- val resourcePattern = new ResourcePattern(resource.resourceType.toJava, resource.name, resource.patternType)
- new AclBinding(resourcePattern, convertToAccessControlEntry(acl))
- }
-
- def convertToAccessControlEntry(acl: Acl): AccessControlEntry = {
- new AccessControlEntry(acl.principal.toString, acl.host.toString,
- acl.operation.toJava, acl.permissionType.toJava)
- }
-
- def convertToAcl(ace: AccessControlEntry): Acl = {
- new Acl(parseKafkaPrincipal(ace.principal), PermissionType.fromJava(ace.permissionType), ace.host,
- Operation.fromJava(ace.operation))
- }
-
- def convertToResource(resourcePattern: ResourcePattern): Resource = {
- Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType)
- }
-}
-
-@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
-class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer {
-
- var shouldAllowEveryoneIfNoAclIsFound = false
-
- override def configure(configs: util.Map[String, _]): Unit = {
- baseAuthorizer.configure(configs)
- shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
- AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
- && baseAuthorizer.isInstanceOf[SimpleAclAuthorizer])
- }
-
- override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = {
- serverInfo.endpoints.asScala.map { endpoint =>
- endpoint -> CompletableFuture.completedFuture[Void](null) }.toMap.asJava
- }
-
- override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = {
- val session = Session(requestContext.principal, requestContext.clientAddress)
- actions.asScala.map { action =>
- val operation = Operation.fromJava(action.operation)
- if (baseAuthorizer.authorize(session, operation, convertToResource(action.resourcePattern)))
- AuthorizationResult.ALLOWED
- else
- AuthorizationResult.DENIED
- }.asJava
- }
-
- override def createAcls(requestContext: AuthorizableRequestContext,
- aclBindings: util.List[AclBinding]): util.List[_ <: CompletionStage[AclCreateResult]] = {
- aclBindings.asScala
- .map { aclBinding =>
- convertToResourceAndAcl(aclBinding.toFilter) match {
- case Left(apiError) => new AclCreateResult(apiError.exception)
- case Right((resource, acl)) =>
- try {
- baseAuthorizer.addAcls(Set(acl), resource)
- AclCreateResult.SUCCESS
- } catch {
- case e: ApiException => new AclCreateResult(e)
- case e: Throwable => new AclCreateResult(new InvalidRequestException("Failed to create ACL", e))
- }
- }
- }.toList.map(CompletableFuture.completedFuture[AclCreateResult]).asJava
- }
-
- override def deleteAcls(requestContext: AuthorizableRequestContext,
- aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: CompletionStage[AclDeleteResult]] = {
- val filters = aclBindingFilters.asScala
- val results = mutable.Map[Int, AclDeleteResult]()
- val toDelete = mutable.Map[Int, ArrayBuffer[(Resource, Acl)]]()
-
- if (filters.forall(_.matchesAtMostOne)) {
- // Delete based on a list of ACL fixtures.
- for ((filter, i) <- filters.zipWithIndex) {
- convertToResourceAndAcl(filter) match {
- case Left(apiError) => results.put(i, new AclDeleteResult(apiError.exception))
- case Right(binding) => toDelete.put(i, ArrayBuffer(binding))
- }
- }
- } else {
- // Delete based on filters that may match more than one ACL.
- val aclMap = baseAuthorizer.getAcls()
- val filtersWithIndex = filters.zipWithIndex
- for ((resource, acls) <- aclMap; acl <- acls) {
- val binding = new AclBinding(
- new ResourcePattern(resource.resourceType.toJava, resource.name, resource.patternType),
- new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava,
- acl.permissionType.toJava))
-
- for ((filter, i) <- filtersWithIndex if filter.matches(binding))
- toDelete.getOrElseUpdate(i, ArrayBuffer.empty) += ((resource, acl))
- }
- }
-
- for ((i, acls) <- toDelete) {
- val deletionResults = acls.flatMap { case (resource, acl) =>
- val aclBinding = convertToAclBinding(resource, acl)
- try {
- if (baseAuthorizer.removeAcls(immutable.Set(acl), resource))
- Some(new AclBindingDeleteResult(aclBinding))
- else None
- } catch {
- case throwable: Throwable =>
- Some(new AclBindingDeleteResult(aclBinding, ApiError.fromThrowable(throwable).exception))
- }
- }.asJava
-
- results.put(i, new AclDeleteResult(deletionResults))
- }
-
- filters.indices.map { i =>
- results.getOrElse(i, new AclDeleteResult(Seq.empty[AclBindingDeleteResult].asJava))
- }.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
- }
-
- override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
- baseAuthorizer.getAcls().flatMap { case (resource, acls) =>
- acls.map(acl => convertToAclBinding(resource, acl)).filter(filter.matches)
- }.asJava
- }
-
- override def close(): Unit = {
- baseAuthorizer.close()
- }
-
- override def authorizeByResourceType(requestContext: AuthorizableRequestContext,
- op: AclOperation,
- resourceType: ResourceType): AuthorizationResult = {
- SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
-
- if (super.authorizeByResourceType(requestContext, op, resourceType) == AuthorizationResult.ALLOWED)
- AuthorizationResult.ALLOWED
- else if (denyAllResource(requestContext, op, resourceType) || !shouldAllowEveryoneIfNoAclIsFound)
- AuthorizationResult.DENIED
- else
- AuthorizationResult.ALLOWED
- }
-
- private def denyAllResource(requestContext: AuthorizableRequestContext,
- op: AclOperation,
- resourceType: ResourceType): Boolean = {
- val resourceTypeFilter = new ResourcePatternFilter(
- resourceType, Resource.WildCardResource, PatternType.LITERAL)
- val principal = new KafkaPrincipal(
- requestContext.principal.getPrincipalType, requestContext.principal.getName).toString
- val host = requestContext.clientAddress().getHostAddress
- val entryFilter = new AccessControlEntryFilter(null, null, op, AclPermissionType.DENY)
- val entryFilterAllOp = new AccessControlEntryFilter(null, null, AclOperation.ALL, AclPermissionType.DENY)
- val aclFilter = new AclBindingFilter(resourceTypeFilter, entryFilter)
- val aclFilterAllOp = new AclBindingFilter(resourceTypeFilter, entryFilterAllOp)
-
- (acls(aclFilter).asScala.exists(b => principalHostMatch(b.entry(), principal, host))
- || acls(aclFilterAllOp).asScala.exists(b => principalHostMatch(b.entry(), principal, host)))
- }
-
- private def principalHostMatch(ace: AccessControlEntry,
- principal: String,
- host: String): Boolean = {
- ((ace.host() == AclEntry.WildcardHost || ace.host() == host)
- && (ace.principal() == AclEntry.WildcardPrincipalString || ace.principal() == principal))
- }
-
-}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2e4dc48..e8a3e6c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -679,8 +679,7 @@ object KafkaConfig {
/************* Authorizer Configuration ***********/
val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" +
- " interface, which is used by the broker for authorization. This config also supports authorizers that implement the deprecated" +
- " kafka.security.auth.Authorizer trait which was previously used for authorization."
+ " interface, which is used by the broker for authorization."
/** ********* Socket Server Configuration ***********/
val PortDoc = "DEPRECATED: only used when <code>listeners</code> is not set. " +
"Use <code>listeners</code> instead. \n" +
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index d82c8e5..f0d9188 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -18,10 +18,9 @@ import java.util
import java.util.concurrent.ExecutionException
import java.util.regex.Pattern
import java.util.{Collections, Optional, Properties}
-
import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
import kafka.log.LogConfig
-import kafka.security.authorizer.AclEntry
+import kafka.security.authorizer.{AclAuthorizer, AclEntry}
import kafka.security.authorizer.AclEntry.WildcardHost
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
@@ -144,7 +143,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
override def brokerPropertyOverrides(properties: Properties): Unit = {
- properties.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
+ properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 425d8f3..2b3ae1c 100644
--- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -15,7 +15,7 @@ package kafka.api
import java.util.Properties
import java.util.concurrent.ExecutionException
import kafka.api.GroupAuthorizerIntegrationTest._
-import kafka.security.auth.SimpleAclAuthorizer
+import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.WildcardHost
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
@@ -63,7 +63,7 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
def clientPrincipal: KafkaPrincipal = ClientPrincipal
override def brokerPropertyOverrides(properties: Properties): Unit = {
- properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
+ properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
diff --git a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
index 75df9e7..17e39f6 100644
--- a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.api
-import kafka.security.auth.SimpleAclAuthorizer
+import kafka.security.authorizer.AclAuthorizer
import kafka.server.KafkaConfig
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.config.SslConfigs
@@ -25,8 +25,6 @@ import org.junit.jupiter.api.Assertions.assertNull
import scala.collection.immutable.List
-// Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage
-// It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed
class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
JaasTestUtils.KafkaClientPrincipalUnqualifiedName)
@@ -35,7 +33,7 @@ class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTe
override protected def kafkaClientSaslMechanism = "GSSAPI"
override protected def kafkaServerSaslMechanisms = List("GSSAPI")
- override protected def authorizerClass = classOf[SimpleAclAuthorizer]
+ override protected def authorizerClass = classOf[AclAuthorizer]
// Configure brokers to require SSL client authentication in order to verify that SASL_SSL works correctly even if the
// client doesn't have a keystore. We want to cover the scenario where a broker requires either SSL client
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 1f15563..53a267f 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -15,40 +15,38 @@ package kafka.api
import java.io.File
import java.util
import kafka.log.LogConfig
+import kafka.security.authorizer.AclAuthorizer
+import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.{Defaults, KafkaConfig}
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
import kafka.utils.TestUtils._
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.acl._
-import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE}
+import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE}
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, TopicAuthorizationException, UnknownTopicOrPartitionException}
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
+import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.server.authorizer.Authorizer
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import scala.annotation.nowarn
+import java.util.Collections
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionException
import scala.util.{Failure, Success, Try}
-abstract class AuthorizationAdmin {
- def authorizerClassName: String
- def initializeAcls(): Unit
- def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit
- def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit
-}
-
-// Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage
-// It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed
class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup {
- @nowarn("cat=deprecation")
- val authorizationAdmin: AuthorizationAdmin = new LegacyAuthorizationAdmin
+ val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
+
+ val authorizationAdmin = new AclAuthorizationAdmin(classOf[AclAuthorizer], classOf[AclAuthorizer])
+
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
@@ -477,58 +475,53 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
client.describeAcls(allTopicAcls).values.get().asScala.toSet
}
- @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
- class LegacyAuthorizationAdmin extends AuthorizationAdmin {
- import kafka.security.auth._
- import kafka.security.authorizer.AuthorizerWrapper
+ class AclAuthorizationAdmin(authorizerClass: Class[_ <: AclAuthorizer], authorizerForInitClass: Class[_ <: AclAuthorizer]) {
- override def authorizerClassName: String = classOf[SimpleAclAuthorizer].getName
+ def authorizerClassName: String = authorizerClass.getName
- override def initializeAcls(): Unit = {
- val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName)
+ def initializeAcls(): Unit = {
+ val authorizer = CoreUtils.createObject[Authorizer](authorizerForInitClass.getName)
try {
authorizer.configure(configs.head.originals())
- authorizer.addAcls(Set(new Acl(Acl.WildCardPrincipal, Allow,
- Acl.WildCardHost, All)), new Resource(Topic, "*", PatternType.LITERAL))
- authorizer.addAcls(Set(new Acl(Acl.WildCardPrincipal, Allow,
- Acl.WildCardHost, All)), new Resource(Group, "*", PatternType.LITERAL))
+ val ace = new AccessControlEntry(WildcardPrincipalString, WildcardHost, ALL, ALLOW)
+ authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(TOPIC, "*", LITERAL), ace)).asJava)
+ authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(GROUP, "*", LITERAL), ace)).asJava)
- authorizer.addAcls(Set(clusterAcl(ALLOW, CREATE),
+ authorizer.createAcls(null, List(clusterAcl(ALLOW, CREATE),
clusterAcl(ALLOW, DELETE),
clusterAcl(ALLOW, CLUSTER_ACTION),
clusterAcl(ALLOW, ALTER_CONFIGS),
- clusterAcl(ALLOW, ALTER)),
- Resource.ClusterResource)
+ clusterAcl(ALLOW, ALTER))
+ .map(ace => new AclBinding(clusterResourcePattern, ace)).asJava)
} finally {
authorizer.close()
}
}
- override def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
- val acls = Set(clusterAcl(permissionType, operation))
- val authorizer = simpleAclAuthorizer
- val prevAcls = authorizer.getAcls(Resource.ClusterResource)
- authorizer.addAcls(acls, Resource.ClusterResource)
- TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, Resource.ClusterResource)
+ def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
+ val ace = clusterAcl(permissionType, operation)
+ val aclBinding = new AclBinding(clusterResourcePattern, ace)
+ val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
+ val prevAcls = authorizer.acls(new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY))
+ .asScala.map(_.entry).toSet
+ authorizer.createAcls(null, Collections.singletonList(aclBinding))
+ TestUtils.waitAndVerifyAcls(prevAcls ++ Set(ace), authorizer, clusterResourcePattern)
}
- override def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
- val acls = Set(clusterAcl(permissionType, operation))
- val authorizer = simpleAclAuthorizer
- val prevAcls = authorizer.getAcls(Resource.ClusterResource)
- assertTrue(authorizer.removeAcls(acls, Resource.ClusterResource))
- TestUtils.waitAndVerifyAcls(prevAcls -- acls, authorizer, Resource.ClusterResource)
- }
-
-
- private def clusterAcl(permissionType: AclPermissionType, operation: AclOperation): Acl = {
- new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"), PermissionType.fromJava(permissionType),
- Acl.WildCardHost, Operation.fromJava(operation))
+ def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
+ val ace = clusterAcl(permissionType, operation)
+ val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
+ val clusterFilter = new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY)
+ val prevAcls = authorizer.acls(clusterFilter).asScala.map(_.entry).toSet
+ val deleteFilter = new AclBindingFilter(clusterResourcePattern.toFilter, ace.toFilter)
+ assertFalse(authorizer.deleteAcls(null, Collections.singletonList(deleteFilter))
+ .get(0).toCompletableFuture.get.aclBindingDeleteResults().asScala.head.exception.isPresent)
+ TestUtils.waitAndVerifyAcls(prevAcls -- Set(ace), authorizer, clusterResourcePattern)
}
- private def simpleAclAuthorizer: Authorizer = {
- val authorizerWrapper = servers.head.dataPlaneRequestProcessor.authorizer.get.asInstanceOf[AuthorizerWrapper]
- authorizerWrapper.baseAuthorizer
+ private def clusterAcl(permissionType: AclPermissionType, operation: AclOperation): AccessControlEntry = {
+ new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*").toString,
+ WildcardHost, operation, permissionType)
}
}
}
diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
index 88b649f..b918081 100644
--- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
@@ -14,22 +14,16 @@ package kafka.api
import java.io.File
import java.util
-import java.util.Collections
import java.util.concurrent._
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaYammerMetrics
import kafka.security.authorizer.AclAuthorizer
-import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateAclsResult}
import org.apache.kafka.common.acl._
-import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl.AclPermissionType._
import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
-import org.apache.kafka.common.resource.PatternType._
-import org.apache.kafka.common.resource.ResourceType._
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.authorizer._
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
@@ -84,8 +78,7 @@ object SslAdminIntegrationTest {
}
class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
- override val authorizationAdmin = new AclAuthorizationAdmin
- val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
+ override val authorizationAdmin = new AclAuthorizationAdmin(classOf[SslAdminIntegrationTest.TestableAclAuthorizer], classOf[AclAuthorizer])
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
@@ -266,54 +259,4 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}")
metrics.map(_.asInstanceOf[Gauge[Int]].value).sum
}
-
- class AclAuthorizationAdmin extends AuthorizationAdmin {
-
- override def authorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName
-
- override def initializeAcls(): Unit = {
- val authorizer = CoreUtils.createObject[Authorizer](classOf[AclAuthorizer].getName)
- try {
- authorizer.configure(configs.head.originals())
- val ace = new AccessControlEntry(WildcardPrincipalString, WildcardHost, ALL, ALLOW)
- authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(TOPIC, "*", LITERAL), ace)).asJava)
- authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(GROUP, "*", LITERAL), ace)).asJava)
-
- authorizer.createAcls(null, List(clusterAcl(ALLOW, CREATE),
- clusterAcl(ALLOW, DELETE),
- clusterAcl(ALLOW, CLUSTER_ACTION),
- clusterAcl(ALLOW, ALTER_CONFIGS),
- clusterAcl(ALLOW, ALTER))
- .map(ace => new AclBinding(clusterResourcePattern, ace)).asJava)
- } finally {
- authorizer.close()
- }
- }
-
- override def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
- val ace = clusterAcl(permissionType, operation)
- val aclBinding = new AclBinding(clusterResourcePattern, ace)
- val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
- val prevAcls = authorizer.acls(new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY))
- .asScala.map(_.entry).toSet
- authorizer.createAcls(null, Collections.singletonList(aclBinding))
- TestUtils.waitAndVerifyAcls(prevAcls ++ Set(ace), authorizer, clusterResourcePattern)
- }
-
- override def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
- val ace = clusterAcl(permissionType, operation)
- val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
- val clusterFilter = new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY)
- val prevAcls = authorizer.acls(clusterFilter).asScala.map(_.entry).toSet
- val deleteFilter = new AclBindingFilter(clusterResourcePattern.toFilter, ace.toFilter)
- assertFalse(authorizer.deleteAcls(null, Collections.singletonList(deleteFilter))
- .get(0).toCompletableFuture.get.aclBindingDeleteResults().asScala.head.exception.isPresent)
- TestUtils.waitAndVerifyAcls(prevAcls -- Set(ace), authorizer, clusterResourcePattern)
- }
-
- private def clusterAcl(permissionType: AclPermissionType, operation: AclOperation): AccessControlEntry = {
- new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*").toString,
- WildcardHost, operation, permissionType)
- }
- }
}
diff --git a/core/src/test/scala/kafka/security/auth/ResourceTest.scala b/core/src/test/scala/kafka/security/auth/ResourceTest.scala
deleted file mode 100644
index 9c905ec..0000000
--- a/core/src/test/scala/kafka/security/auth/ResourceTest.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.auth
-
-import kafka.common.KafkaException
-import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.Assertions._
-
-@deprecated("Use org.apache.kafka.common.resource.ResourcePattern", "Since 2.5")
-class ResourceTest {
- @Test
- def shouldThrowOnTwoPartStringWithUnknownResourceType(): Unit = {
- assertThrows(classOf[KafkaException], () => Resource.fromString("Unknown:fred"))
- }
-
- @Test
- def shouldThrowOnBadResourceTypeSeparator(): Unit = {
- assertThrows(classOf[KafkaException], () => Resource.fromString("Topic-fred"))
- }
-
- @Test
- def shouldParseOldTwoPartString(): Unit = {
- assertEquals(Resource(Group, "fred", LITERAL), Resource.fromString("Group:fred"))
- assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("Topic:t"))
- }
-
- @Test
- def shouldParseOldTwoPartWithEmbeddedSeparators(): Unit = {
- assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("Group::This:is:a:weird:group:name:"))
- }
-
- @Test
- def shouldParseThreePartString(): Unit = {
- assertEquals(Resource(Group, "fred", PREFIXED), Resource.fromString("Group:PREFIXED:fred"))
- assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("Topic:LITERAL:t"))
- }
-
- @Test
- def shouldParseThreePartWithEmbeddedSeparators(): Unit = {
- assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), Resource.fromString("Group:PREFIXED::This:is:a:weird:group:name:"))
- assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("Group:LITERAL::This:is:a:weird:group:name:"))
- }
-
- @Test
- def shouldRoundTripViaString(): Unit = {
- val expected = Resource(Group, "fred", PREFIXED)
-
- val actual = Resource.fromString(expected.toString)
-
- assertEquals(expected, actual)
- }
-}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
deleted file mode 100644
index 5c58a08..0000000
--- a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.auth
-
-import org.apache.kafka.common.acl.AclOperation
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-
-@deprecated("Scala Authorizer API classes gave been deprecated", "Since 2.5")
-class OperationTest {
- /**
- * Test round trip conversions between org.apache.kafka.common.acl.AclOperation and
- * kafka.security.auth.Operation.
- */
- @Test
- def testJavaConversions(): Unit = {
- AclOperation.values.foreach {
- case AclOperation.UNKNOWN | AclOperation.ANY =>
- case aclOp =>
- val op = Operation.fromJava(aclOp)
- val aclOp2 = op.toJava
- assertEquals(aclOp, aclOp2)
- }
- }
-}
diff --git a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
deleted file mode 100644
index 1471202..0000000
--- a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.KafkaException
-import org.apache.kafka.common.acl.AclPermissionType
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-@deprecated("Scala Authorizer API classes gave been deprecated", "Since 2.5")
-class PermissionTypeTest {
-
- @Test
- def testFromString(): Unit = {
- val permissionType = PermissionType.fromString("Allow")
- assertEquals(Allow, permissionType)
-
- assertThrows(classOf[KafkaException], () => PermissionType.fromString("badName"))
- }
-
- /**
- * Test round trip conversions between org.apache.kafka.common.acl.AclPermissionType and
- * kafka.security.auth.PermissionType.
- */
- @Test
- def testJavaConversions(): Unit = {
- AclPermissionType.values().foreach {
- case AclPermissionType.UNKNOWN | AclPermissionType.ANY =>
- case aclPerm =>
- val perm = PermissionType.fromJava(aclPerm)
- val aclPerm2 = perm.toJava
- assertEquals(aclPerm, aclPerm2)
- }
- }
-}
diff --git a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
deleted file mode 100644
index 02c35c7..0000000
--- a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.KafkaException
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-import org.apache.kafka.common.resource.{ResourceType => JResourceType}
-
-@deprecated("Scala Authorizer API classes gave been deprecated", "Since 2.5")
-class ResourceTypeTest {
-
- @Test
- def testFromString(): Unit = {
- val resourceType = ResourceType.fromString("Topic")
- assertEquals(Topic, resourceType)
- assertThrows(classOf[KafkaException], () => ResourceType.fromString("badName"))
- }
-
- /**
- * Test round trip conversions between org.apache.kafka.common.acl.ResourceType and
- * kafka.security.auth.ResourceType.
- */
- @Test
- def testJavaConversions(): Unit = {
- JResourceType.values.foreach {
- case JResourceType.UNKNOWN | JResourceType.ANY =>
- case jResourceType =>
- val resourceType = ResourceType.fromJava(jResourceType)
- val jResourceType2 = resourceType.toJava
- assertEquals(jResourceType, jResourceType2)
- }
- }
-}
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
deleted file mode 100644
index 6e2a0bd..0000000
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ /dev/null
@@ -1,731 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import java.net.InetAddress
-import java.nio.charset.StandardCharsets.UTF_8
-import java.util.UUID
-
-import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1}
-import kafka.network.RequestChannel.Session
-import kafka.security.auth.Acl.{WildCardHost, WildCardResource}
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import kafka.zk.{ZkAclStore, ZooKeeperTestHarness}
-import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient}
-import org.apache.kafka.common.errors.UnsupportedVersionException
-import org.apache.kafka.common.resource.PatternType
-import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.Time
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-
-@deprecated("Use AclAuthorizer", "Since 2.4")
-class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
-
- private val allowReadAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Read)
- private val allowWriteAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
- private val denyReadAcl = Acl(Acl.WildCardPrincipal, Deny, WildCardHost, Read)
-
- private val wildCardResource = Resource(Topic, WildCardResource, LITERAL)
- private val prefixedResource = Resource(Topic, "foo", PREFIXED)
-
- private val simpleAclAuthorizer = new SimpleAclAuthorizer
- private val simpleAclAuthorizer2 = new SimpleAclAuthorizer
- private var resource: Resource = _
- private val superUsers = "User:superuser1; User:superuser2"
- private val username = "alice"
- private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- private val session = Session(principal, InetAddress.getByName("192.168.0.1"))
- private var config: KafkaConfig = _
- private var zooKeeperClient: ZooKeeperClient = _
-
- class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) {
- override def equals(o: scala.Any): Boolean = false
- }
-
- @BeforeEach
- override def setUp(): Unit = {
- super.setUp()
-
- // Increase maxUpdateRetries to avoid transient failures
- simpleAclAuthorizer.maxUpdateRetries = Int.MaxValue
- simpleAclAuthorizer2.maxUpdateRetries = Int.MaxValue
-
- val props = TestUtils.createBrokerConfig(0, zkConnect)
- props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers)
-
- config = KafkaConfig.fromProps(props)
- simpleAclAuthorizer.configure(config.originals)
- simpleAclAuthorizer2.configure(config.originals)
- resource = Resource(Topic, "foo-" + UUID.randomUUID(), LITERAL)
-
- zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
- Time.SYSTEM, "kafka.test", "SimpleAclAuthorizerTest")
- }
-
- @AfterEach
- override def tearDown(): Unit = {
- simpleAclAuthorizer.close()
- simpleAclAuthorizer2.close()
- zooKeeperClient.close()
- super.tearDown()
- }
-
- @Test
- def testAuthorizeThrowsOnNonLiteralResource(): Unit = {
- assertThrows(classOf[IllegalArgumentException], () => simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "something", PREFIXED)))
- }
-
- @Test
- def testAuthorizeWithEmptyResourceName(): Unit = {
- assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Group, "", LITERAL)))
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Group, WildCardResource, LITERAL))
- assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Group, "", LITERAL)))
- }
-
- // Authorizing the empty resource is not supported because we create a znode with the resource name.
- @Test
- def testEmptyAclThrowsException(): Unit = {
- assertThrows(classOf[IllegalArgumentException], () => simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Group, "", LITERAL)))
- }
-
- @Test
- def testTopicAcl(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
- val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
- val host1 = InetAddress.getByName("192.168.1.1")
- val host2 = InetAddress.getByName("192.168.1.2")
-
- //user1 has READ access from host1 and host2.
- val acl1 = new Acl(user1, Allow, host1.getHostAddress, Read)
- val acl2 = new Acl(user1, Allow, host2.getHostAddress, Read)
-
- //user1 does not have READ access from host1.
- val acl3 = new Acl(user1, Deny, host1.getHostAddress, Read)
-
- //user1 has Write access from host1 only.
- val acl4 = new Acl(user1, Allow, host1.getHostAddress, Write)
-
- //user1 has DESCRIBE access from all hosts.
- val acl5 = new Acl(user1, Allow, WildCardHost, Describe)
-
- //user2 has READ access from all hosts.
- val acl6 = new Acl(user2, Allow, WildCardHost, Read)
-
- //user3 has WRITE access from all hosts.
- val acl7 = new Acl(user3, Allow, WildCardHost, Write)
-
- val acls = Set[Acl](acl1, acl2, acl3, acl4, acl5, acl6, acl7)
-
- changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
-
- val host1Session = Session(user1, host1)
- val host2Session = Session(user1, host2)
-
- assertTrue(simpleAclAuthorizer.authorize(host2Session, Read, resource), "User1 should have READ access from host2")
- assertFalse(simpleAclAuthorizer.authorize(host1Session, Read, resource), "User1 should not have READ access from host1 due to denyAcl")
- assertTrue(simpleAclAuthorizer.authorize(host1Session, Write, resource), "User1 should have WRITE access from host1")
- assertFalse(simpleAclAuthorizer.authorize(host2Session, Write, resource), "User1 should not have WRITE access from host2 as no allow acl is defined")
- assertTrue(simpleAclAuthorizer.authorize(host1Session, Describe, resource), "User1 should not have DESCRIBE access from host1")
- assertTrue(simpleAclAuthorizer.authorize(host2Session, Describe, resource), "User1 should have DESCRIBE access from host2")
- assertFalse(simpleAclAuthorizer.authorize(host1Session, Alter, resource), "User1 should not have edit access from host1")
- assertFalse(simpleAclAuthorizer.authorize(host2Session, Alter, resource), "User1 should not have edit access from host2")
-
- //test if user has READ and write access they also get describe access
- val user2Session = Session(user2, host1)
- val user3Session = Session(user3, host1)
- assertTrue(simpleAclAuthorizer.authorize(user2Session, Describe, resource), "User2 should have DESCRIBE access from host1")
- assertTrue(simpleAclAuthorizer.authorize(user3Session, Describe, resource), "User3 should have DESCRIBE access from host2")
- assertTrue(simpleAclAuthorizer.authorize(user2Session, Read, resource), "User2 should have READ access from host1")
- assertTrue(simpleAclAuthorizer.authorize(user3Session, Write, resource), "User3 should have WRITE access from host2")
- }
-
- /**
- CustomPrincipals should be compared with their principal type and name
- */
- @Test
- def testAllowAccessWithCustomPrincipal(): Unit = {
- val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host1 = InetAddress.getByName("192.168.1.1")
- val host2 = InetAddress.getByName("192.168.1.2")
-
- // user has READ access from host2 but not from host1
- val acl1 = new Acl(user, Deny, host1.getHostAddress, Read)
- val acl2 = new Acl(user, Allow, host2.getHostAddress, Read)
- val acls = Set[Acl](acl1, acl2)
- changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
-
- val host1Session = Session(customUserPrincipal, host1)
- val host2Session = Session(customUserPrincipal, host2)
-
- assertTrue(simpleAclAuthorizer.authorize(host2Session, Read, resource), "User1 should have READ access from host2")
- assertFalse(simpleAclAuthorizer.authorize(host1Session, Read, resource), "User1 should not have READ access from host1 due to denyAcl")
- }
-
- @Test
- def testDenyTakesPrecedence(): Unit = {
- val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host = InetAddress.getByName("192.168.2.1")
- val session = Session(user, host)
-
- val allowAll = Acl.AllowAllAcl
- val denyAcl = new Acl(user, Deny, host.getHostAddress, All)
- val acls = Set[Acl](allowAll, denyAcl)
-
- changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
-
- assertFalse(simpleAclAuthorizer.authorize(session, Read, resource), "deny should take precedence over allow.")
- }
-
- @Test
- def testAllowAllAccess(): Unit = {
- val allowAllAcl = Acl.AllowAllAcl
-
- changeAclAndVerify(Set.empty[Acl], Set[Acl](allowAllAcl), Set.empty[Acl])
-
- val session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), InetAddress.getByName("192.0.4.4"))
- assertTrue(simpleAclAuthorizer.authorize(session, Read, resource), "allow all acl should allow access to all.")
- }
-
- @Test
- def testSuperUserHasAccess(): Unit = {
- val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
-
- changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
-
- val session1 = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
- val session2 = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), InetAddress.getByName("192.0.4.4"))
-
- assertTrue(simpleAclAuthorizer.authorize(session1, Read, resource), "superuser always has access, no matter what acls.")
- assertTrue(simpleAclAuthorizer.authorize(session2, Read, resource), "superuser always has access, no matter what acls.")
- }
-
- /**
- CustomPrincipals should be compared with their principal type and name
- */
- @Test
- def testSuperUserWithCustomPrincipalHasAccess(): Unit = {
- val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
- changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
-
- val session = Session(new CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
-
- assertTrue(simpleAclAuthorizer.authorize(session, Read, resource), "superuser with custom principal always has access, no matter what acls.")
- }
-
- @Test
- def testWildCardAcls(): Unit = {
- assertFalse(simpleAclAuthorizer.authorize(session, Read, resource), "when acls = [], authorizer should fail close.")
-
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host1 = InetAddress.getByName("192.168.3.1")
- val readAcl = new Acl(user1, Allow, host1.getHostAddress, Read)
-
- val acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](readAcl), Set.empty[Acl], wildCardResource)
-
- val host1Session = Session(user1, host1)
- assertTrue(simpleAclAuthorizer.authorize(host1Session, Read, resource), "User1 should have Read access from host1")
-
- //allow Write to specific topic.
- val writeAcl = new Acl(user1, Allow, host1.getHostAddress, Write)
- changeAclAndVerify(Set.empty[Acl], Set[Acl](writeAcl), Set.empty[Acl])
-
- //deny Write to wild card topic.
- val denyWriteOnWildCardResourceAcl = new Acl(user1, Deny, host1.getHostAddress, Write)
- changeAclAndVerify(acls, Set[Acl](denyWriteOnWildCardResourceAcl), Set.empty[Acl], wildCardResource)
-
- assertFalse(simpleAclAuthorizer.authorize(host1Session, Write, resource), "User1 should not have Write access from host1")
- }
-
- @Test
- def testNoAclFound(): Unit = {
- assertFalse(simpleAclAuthorizer.authorize(session, Read, resource), "when acls = [], authorizer should fail close.")
- }
-
- @Test
- def testNoAclFoundOverride(): Unit = {
- val props = TestUtils.createBrokerConfig(1, zkConnect)
- props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
-
- val cfg = KafkaConfig.fromProps(props)
- val testAuthorizer = new SimpleAclAuthorizer
- try {
- testAuthorizer.configure(cfg.originals)
- assertTrue(testAuthorizer.authorize(session, Read, resource), "when acls = null or [], authorizer should fail open with allow.everyone = true.")
- } finally {
- testAuthorizer.close()
- }
- }
-
- @Test
- def testAclManagementAPIs(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
- val host1 = "host1"
- val host2 = "host2"
-
- val acl1 = new Acl(user1, Allow, host1, Read)
- val acl2 = new Acl(user1, Allow, host1, Write)
- val acl3 = new Acl(user2, Allow, host2, Read)
- val acl4 = new Acl(user2, Allow, host2, Write)
-
- var acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](acl1, acl2, acl3, acl4), Set.empty[Acl])
-
- //test addAcl is additive
- val acl5 = new Acl(user2, Allow, WildCardHost, Read)
- acls = changeAclAndVerify(acls, Set[Acl](acl5), Set.empty[Acl])
-
- //test get by principal name.
- TestUtils.waitUntilTrue(() => Map(resource -> Set(acl1, acl2)) == simpleAclAuthorizer.getAcls(user1), "changes not propagated in timeout period")
- TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == simpleAclAuthorizer.getAcls(user2), "changes not propagated in timeout period")
-
- val resourceToAcls = Map[Resource, Set[Acl]](
- new Resource(Topic, Resource.WildCardResource, LITERAL) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)),
- new Resource(Cluster, Resource.WildCardResource, LITERAL) -> Set[Acl](new Acl(user2, Allow, host1, Read)),
- new Resource(Group, Resource.WildCardResource, LITERAL) -> acls,
- new Resource(Group, "test-ConsumerGroup", LITERAL) -> acls
- )
-
- resourceToAcls foreach { case (key, value) => changeAclAndVerify(Set.empty[Acl], value, Set.empty[Acl], key) }
- TestUtils.waitUntilTrue(() => resourceToAcls + (resource -> acls) == simpleAclAuthorizer.getAcls(), "changes not propagated in timeout period.")
-
- //test remove acl from existing acls.
- acls = changeAclAndVerify(acls, Set.empty[Acl], Set(acl1, acl5))
-
- //test remove all acls for resource
- simpleAclAuthorizer.removeAcls(resource)
- TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
- assertFalse(zkClient.resourceExists(resource.toPattern))
-
- //test removing last acl also deletes ZooKeeper path
- acls = changeAclAndVerify(Set.empty[Acl], Set(acl1), Set.empty[Acl])
- changeAclAndVerify(acls, Set.empty[Acl], acls)
- assertFalse(zkClient.resourceExists(resource.toPattern))
- }
-
- @Test
- def testLoadCache(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val acl1 = new Acl(user1, Allow, "host-1", Read)
- val acls = Set[Acl](acl1)
- simpleAclAuthorizer.addAcls(acls, resource)
-
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
- val resource1 = Resource(Topic, "test-2", LITERAL)
- val acl2 = new Acl(user2, Deny, "host3", Read)
- val acls1 = Set[Acl](acl2)
- simpleAclAuthorizer.addAcls(acls1, resource1)
-
- zkClient.deleteAclChangeNotifications()
- val authorizer = new SimpleAclAuthorizer
- try {
- authorizer.configure(config.originals)
-
- assertEquals(acls, authorizer.getAcls(resource))
- assertEquals(acls1, authorizer.getAcls(resource1))
- } finally {
- authorizer.close()
- }
- }
-
- @Test
- def testLocalConcurrentModificationOfResourceAcls(): Unit = {
- val commonResource = Resource(Topic, "test", LITERAL)
-
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val acl1 = new Acl(user1, Allow, WildCardHost, Read)
-
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
- val acl2 = new Acl(user2, Deny, WildCardHost, Read)
-
- simpleAclAuthorizer.addAcls(Set(acl1), commonResource)
- simpleAclAuthorizer.addAcls(Set(acl2), commonResource)
-
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
- }
-
- @Test
- def testDistributedConcurrentModificationOfResourceAcls(): Unit = {
- val commonResource = Resource(Topic, "test", LITERAL)
-
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val acl1 = new Acl(user1, Allow, WildCardHost, Read)
-
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
- val acl2 = new Acl(user2, Deny, WildCardHost, Read)
-
- // Add on each instance
- simpleAclAuthorizer.addAcls(Set(acl1), commonResource)
- simpleAclAuthorizer2.addAcls(Set(acl2), commonResource)
-
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer2, commonResource)
-
- val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "joe")
- val acl3 = new Acl(user3, Deny, WildCardHost, Read)
-
- // Add on one instance and delete on another
- simpleAclAuthorizer.addAcls(Set(acl3), commonResource)
- val deleted = simpleAclAuthorizer2.removeAcls(Set(acl3), commonResource)
-
- assertTrue(deleted, "The authorizer should see a value that needs to be deleted")
-
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer2, commonResource)
- }
-
- @Test
- def testHighConcurrencyModificationOfResourceAcls(): Unit = {
- val commonResource = Resource(Topic, "test", LITERAL)
-
- val acls = (0 to 50).map { i =>
- val useri = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, i.toString)
- new Acl(useri, Allow, WildCardHost, Read)
- }
-
- // Alternate authorizer, Remove all acls that end in 0
- val concurrentFuctions = acls.map { acl =>
- () => {
- val aclId = acl.principal.getName.toInt
- if (aclId % 2 == 0) {
- simpleAclAuthorizer.addAcls(Set(acl), commonResource)
- } else {
- simpleAclAuthorizer2.addAcls(Set(acl), commonResource)
- }
- if (aclId % 10 == 0) {
- simpleAclAuthorizer2.removeAcls(Set(acl), commonResource)
- }
- }
- }
-
- val expectedAcls = acls.filter { acl =>
- val aclId = acl.principal.getName.toInt
- aclId % 10 != 0
- }.toSet
-
- TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
-
- TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer, commonResource)
- TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer2, commonResource)
- }
-
- /**
- * Test ACL inheritance, as described in #{org.apache.kafka.common.acl.AclOperation}
- */
- @Test
- def testAclInheritance(): Unit = {
- testImplicationsOfAllow(All, Set(Read, Write, Create, Delete, Alter, Describe,
- ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite))
- testImplicationsOfDeny(All, Set(Read, Write, Create, Delete, Alter, Describe,
- ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite))
- testImplicationsOfAllow(Read, Set(Describe))
- testImplicationsOfAllow(Write, Set(Describe))
- testImplicationsOfAllow(Delete, Set(Describe))
- testImplicationsOfAllow(Alter, Set(Describe))
- testImplicationsOfDeny(Describe, Set())
- testImplicationsOfAllow(AlterConfigs, Set(DescribeConfigs))
- testImplicationsOfDeny(DescribeConfigs, Set())
- }
-
- private def testImplicationsOfAllow(parentOp: Operation, allowedOps: Set[Operation]): Unit = {
- val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host = InetAddress.getByName("192.168.3.1")
- val hostSession = Session(user, host)
- val acl = Acl(user, Allow, WildCardHost, parentOp)
- simpleAclAuthorizer.addAcls(Set(acl), Resource.ClusterResource)
- Operation.values.foreach { op =>
- val authorized = simpleAclAuthorizer.authorize(hostSession, op, Resource.ClusterResource)
- if (allowedOps.contains(op) || op == parentOp)
- assertTrue(authorized, s"ALLOW $parentOp should imply ALLOW $op")
- else
- assertFalse(authorized, s"ALLOW $parentOp should not imply ALLOW $op")
- }
- simpleAclAuthorizer.removeAcls(Set(acl), Resource.ClusterResource)
- }
-
- private def testImplicationsOfDeny(parentOp: Operation, deniedOps: Set[Operation]): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host1 = InetAddress.getByName("192.168.3.1")
- val host1Session = Session(user1, host1)
- val acls = Set(Acl(user1, Deny, WildCardHost, parentOp), Acl(user1, Allow, WildCardHost, All))
- simpleAclAuthorizer.addAcls(acls, Resource.ClusterResource)
- Operation.values.foreach { op =>
- val authorized = simpleAclAuthorizer.authorize(host1Session, op, Resource.ClusterResource)
- if (deniedOps.contains(op) || op == parentOp)
- assertFalse(authorized, s"DENY $parentOp should imply DENY $op")
- else
- assertTrue(authorized, s"DENY $parentOp should not imply DENY $op")
- }
- simpleAclAuthorizer.removeAcls(acls, Resource.ClusterResource)
- }
-
- @Test
- def testHighConcurrencyDeletionOfResourceAcls(): Unit = {
- val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All)
-
- // Alternate authorizer to keep adding and removing ZooKeeper path
- val concurrentFuctions = (0 to 50).map { _ =>
- () => {
- simpleAclAuthorizer.addAcls(Set(acl), resource)
- simpleAclAuthorizer2.removeAcls(Set(acl), resource)
- }
- }
-
- TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
-
- TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
- TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer2, resource)
- }
-
- @Test
- def testAccessAllowedIfAllowAclExistsOnWildcardResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), wildCardResource)
-
- assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
- }
-
- @Test
- def testDeleteAclOnWildcardResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), wildCardResource)
-
- simpleAclAuthorizer.removeAcls(Set[Acl](allowReadAcl), wildCardResource)
-
- assertEquals(Set(allowWriteAcl), simpleAclAuthorizer.getAcls(wildCardResource))
- }
-
- @Test
- def testDeleteAllAclOnWildcardResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), wildCardResource)
-
- simpleAclAuthorizer.removeAcls(wildCardResource)
-
- assertEquals(Map(), simpleAclAuthorizer.getAcls())
- }
-
- @Test
- def testAccessAllowedIfAllowAclExistsOnPrefixedResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
-
- assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
- }
-
- @Test
- def testDeleteAclOnPrefixedResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
-
- simpleAclAuthorizer.removeAcls(Set[Acl](allowReadAcl), prefixedResource)
-
- assertEquals(Set(allowWriteAcl), simpleAclAuthorizer.getAcls(prefixedResource))
- }
-
- @Test
- def testDeleteAllAclOnPrefixedResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
-
- simpleAclAuthorizer.removeAcls(prefixedResource)
-
- assertEquals(Map(), simpleAclAuthorizer.getAcls())
- }
-
- @Test
- def testAddAclsOnLiteralResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), resource)
- simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), resource)
-
- assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(resource))
- assertEquals(Set(), simpleAclAuthorizer.getAcls(wildCardResource))
- assertEquals(Set(), simpleAclAuthorizer.getAcls(prefixedResource))
- }
-
- @Test
- def testAddAclsOnWildcardResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), wildCardResource)
- simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), wildCardResource)
-
- assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(wildCardResource))
- assertEquals(Set(), simpleAclAuthorizer.getAcls(resource))
- assertEquals(Set(), simpleAclAuthorizer.getAcls(prefixedResource))
- }
-
- @Test
- def testAddAclsOnPrefiexedResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
- simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), prefixedResource)
-
- assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(prefixedResource))
- assertEquals(Set(), simpleAclAuthorizer.getAcls(wildCardResource))
- assertEquals(Set(), simpleAclAuthorizer.getAcls(resource))
- }
-
- @Test
- def testAuthorizeWithPrefixedResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", LITERAL))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID() + "-zzz", PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fooo-" + UUID.randomUUID(), PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fo-" + UUID.randomUUID(), PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fop-" + UUID.randomUUID(), PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-" + UUID.randomUUID(), PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-", PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", LITERAL))
-
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
-
- assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
- }
-
- @Test
- def testSingleCharacterResourceAcls(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Topic, "f", LITERAL))
- assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "f", LITERAL)))
- assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "foo", LITERAL)))
-
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Topic, "_", PREFIXED))
- assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "_foo", LITERAL)))
- assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "_", LITERAL)))
- assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "foo_", LITERAL)))
- }
-
- @Test
- def testGetAclsPrincipal(): Unit = {
- val aclOnSpecificPrincipal = new Acl(principal, Allow, WildCardHost, Write)
- simpleAclAuthorizer.addAcls(Set[Acl](aclOnSpecificPrincipal), resource)
-
- assertEquals(0, simpleAclAuthorizer.getAcls(Acl.WildCardPrincipal).size,
- "acl on specific should not be returned for wildcard request")
- assertEquals(1, simpleAclAuthorizer.getAcls(principal).size,
- "acl on specific should be returned for specific request")
- assertEquals(1, simpleAclAuthorizer.getAcls(new KafkaPrincipal(principal.getPrincipalType, principal.getName)).size,
- "acl on specific should be returned for different principal instance")
-
- simpleAclAuthorizer.removeAcls(resource)
- val aclOnWildcardPrincipal = new Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
- simpleAclAuthorizer.addAcls(Set[Acl](aclOnWildcardPrincipal), resource)
-
- assertEquals(1, simpleAclAuthorizer.getAcls(Acl.WildCardPrincipal).size,
- "acl on wildcard should be returned for wildcard request")
- assertEquals(0, simpleAclAuthorizer.getAcls(principal).size,
- "acl on wildcard should not be returned for specific request")
- }
-
- @Test
- def testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow(): Unit = {
- givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0))
- assertThrows(classOf[UnsupportedVersionException], () => simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED)))
- }
-
- @Test
- def testWritesExtendedAclChangeEventIfInterBrokerProtocolNotSet(): Unit = {
- givenAuthorizerWithProtocolVersion(Option.empty)
- val resource = Resource(Topic, "z_other", PREFIXED)
- val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
-
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
-
- val actual = getAclChangeEventAsString(PREFIXED)
-
- assertEquals(expected, actual)
- }
-
- @Test
- def testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2(): Unit = {
- givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1))
- val resource = Resource(Topic, "z_other", PREFIXED)
- val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
-
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
-
- val actual = getAclChangeEventAsString(PREFIXED)
-
- assertEquals(expected, actual)
- }
-
- @Test
- def testWritesLiteralWritesLiteralAclChangeEventWhenInterBrokerProtocolLessThanKafkaV2eralAclChangesForOlderProtocolVersions(): Unit = {
- givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0))
- val resource = Resource(Topic, "z_other", LITERAL)
- val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
-
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
-
- val actual = getAclChangeEventAsString(LITERAL)
-
- assertEquals(expected, actual)
- }
-
- @Test
- def testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2(): Unit = {
- givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1))
- val resource = Resource(Topic, "z_other", LITERAL)
- val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
-
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
-
- val actual = getAclChangeEventAsString(LITERAL)
-
- assertEquals(expected, actual)
- }
-
- private def givenAuthorizerWithProtocolVersion(protocolVersion: Option[ApiVersion]): Unit = {
- simpleAclAuthorizer.close()
-
- val props = TestUtils.createBrokerConfig(0, zkConnect)
- props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers)
- protocolVersion.foreach(version => props.put(KafkaConfig.InterBrokerProtocolVersionProp, version.toString))
-
- config = KafkaConfig.fromProps(props)
-
- simpleAclAuthorizer.configure(config.originals)
- }
-
- private def getAclChangeEventAsString(patternType: PatternType) = {
- val store = ZkAclStore(patternType)
- val children = zooKeeperClient.handleRequest(GetChildrenRequest(store.changeStore.aclChangePath, registerWatch = true))
- children.maybeThrow()
- assertEquals(1, children.children.size, "Expecting 1 change event")
-
- val data = zooKeeperClient.handleRequest(GetDataRequest(s"${store.changeStore.aclChangePath}/${children.children.head}"))
- data.maybeThrow()
-
- new String(data.data, UTF_8)
- }
-
- private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = {
- var acls = originalAcls
-
- if(addedAcls.nonEmpty) {
- simpleAclAuthorizer.addAcls(addedAcls, resource)
- acls ++= addedAcls
- }
-
- if(removedAcls.nonEmpty) {
- simpleAclAuthorizer.removeAcls(removedAcls, resource)
- acls --=removedAcls
- }
-
- TestUtils.waitAndVerifyAcls(acls, simpleAclAuthorizer, resource)
-
- acls
- }
-}
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
deleted file mode 100644
index 7c575e4..0000000
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.authorizer
-
-import java.util.UUID
-
-import kafka.security.auth.SimpleAclAuthorizer
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
-import kafka.zookeeper.ZooKeeperClient
-import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.resource.PatternType.LITERAL
-import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.resource.ResourceType._
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.authorizer._
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-
-import scala.annotation.nowarn
-
-class AuthorizerWrapperTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
- @nowarn("cat=deprecation")
- private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new SimpleAclAuthorizer)
- @nowarn("cat=deprecation")
- private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new SimpleAclAuthorizer)
-
- override def authorizer: Authorizer = wrappedSimpleAuthorizer
-
- @BeforeEach
- @nowarn("cat=deprecation")
- override def setUp(): Unit = {
- super.setUp()
-
- val props = TestUtils.createBrokerConfig(0, zkConnect)
-
- props.put(AclAuthorizer.SuperUsersProp, superUsers)
- config = KafkaConfig.fromProps(props)
- wrappedSimpleAuthorizer.configure(config.originals)
-
- props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
- config = KafkaConfig.fromProps(props)
- wrappedSimpleAuthorizerAllowEveryone.configure(config.originals)
-
- resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
- zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
- Time.SYSTEM, "kafka.test", "AuthorizerWrapperTest")
- }
-
- @AfterEach
- override def tearDown(): Unit = {
- val authorizers = Seq(wrappedSimpleAuthorizer, wrappedSimpleAuthorizerAllowEveryone)
- authorizers.foreach(a => {
- a.close()
- })
- zooKeeperClient.close()
- super.tearDown()
- }
-
- @Test
- def testAuthorizeByResourceTypeEnableAllowEveryOne(): Unit = {
- testAuthorizeByResourceTypeEnableAllowEveryOne(wrappedSimpleAuthorizer)
- }
-
- private def testAuthorizeByResourceTypeEnableAllowEveryOne(authorizer: Authorizer): Unit = {
- assertTrue(authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, requestContext, READ, resource.resourceType()),
- "If allow.everyone.if.no.acl.found = true, " +
- "caller should have read access to at least one topic")
- val allUser = AclEntry.WildcardPrincipalString
- val allHost = AclEntry.WildcardHost
- val denyAll = new AccessControlEntry(allUser, allHost, ALL, AclPermissionType.DENY)
- val wildcardResource = new ResourcePattern(resource.resourceType(), AclEntry.WildcardResource, LITERAL)
-
- addAcls(wrappedSimpleAuthorizerAllowEveryone, Set(denyAll), resource)
- assertTrue(authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, requestContext, READ, resource.resourceType()),
- "Should still allow since the deny only apply on the specific resource")
-
- addAcls(wrappedSimpleAuthorizerAllowEveryone, Set(denyAll), wildcardResource)
- assertFalse(authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, requestContext, READ, resource.resourceType()),
- "When an ACL binding which can deny all users and hosts exists, " +
- "even if allow.everyone.if.no.acl.found = true, caller shouldn't have read access any topic")
- }
-
- @Test
- def testAuthorizeByResourceTypeDisableAllowEveryoneOverride(): Unit = {
- assertFalse (authorizeByResourceType(wrappedSimpleAuthorizer, requestContext, READ, resource.resourceType()),
- "If allow.everyone.if.no.acl.found = false, " +
- "caller shouldn't have read access to any topic")
- }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 38a5a12..b9e5667 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -34,7 +34,6 @@ import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.log._
import kafka.metrics.KafkaYammerMetrics
-import kafka.security.auth.{Acl, Resource, Authorizer => LegacyAuthorizer}
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, MetadataBroker}
@@ -1325,15 +1324,6 @@ object TestUtils extends Logging {
s"but got:${authorizer.acls(filter).asScala.map(_.entry).mkString(newLine + "\t", newLine + "\t", newLine)}")
}
- @deprecated("Use org.apache.kafka.server.authorizer.Authorizer", "Since 2.5")
- def waitAndVerifyAcls(expected: Set[Acl], authorizer: LegacyAuthorizer, resource: Resource): Unit = {
- val newLine = scala.util.Properties.lineSeparator
-
- waitUntilTrue(() => authorizer.getAcls(resource) == expected,
- s"expected acls:${expected.mkString(newLine + "\t", newLine + "\t", newLine)}" +
- s"but got:${authorizer.getAcls(resource).mkString(newLine + "\t", newLine + "\t", newLine)}")
- }
-
/**
* Verifies that this ACL is the secure one.
*/
diff --git a/docs/upgrade.html b/docs/upgrade.html
index b7666ec..a486b6c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -27,6 +27,8 @@
or updating the application not to use internal classes.</li>
<li>The Streams API removed all deprecated APIs that were deprecated in version 2.5.0 or earlier.
For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes.</li>
+ <li>The deprecated Scala <code>Authorizer</code>, <code>SimpleAclAuthorizer</code> and related classes have been removed. Please use the Java <code>Authorizer</code>
+ and <code>AclAuthorizer</code> instead.</li>
<li>The deprecated <code>Metric#value()</code> method was removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12573">KAFKA-12573</a>).</li>
<li>Deprecated security classes were removed: <code>PrincipalBuilder</code>, <code>DefaultPrincipalBuilder</code> and <code>ResourceFilter</code>.
Furthermore, deprecated constants and constructors were removed from <code>SslConfigs</code>, <code>SaslConfigs</code>,
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index e93a1c5..fcf3900 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -155,8 +155,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties")
# Kafka Authorizer
ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer"
- # Old Kafka Authorizer. This is deprecated but still supported.
- SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
INTERBROKER_LISTENER_NAME = 'INTERNAL'
JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf"
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index 4a176b0..6e66ffc 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -70,8 +70,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.kafka.close_port(SecurityConfig.PLAINTEXT)
self.set_authorizer_and_bounce(client_protocol, broker_protocol)
- def set_authorizer_and_bounce(self, client_protocol, broker_protocol, authorizer_class_name = KafkaService.ACL_AUTHORIZER):
- self.kafka.authorizer_class_name = authorizer_class_name
+ def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
+ self.kafka.authorizer_class_name = KafkaService.ACL_AUTHORIZER
# Force use of direct ZooKeeper access due to SecurityDisabledException: No Authorizer is configured on the broker.
self.acls.set_acls(client_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
self.acls.set_acls(broker_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
@@ -93,8 +93,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.kafka.interbroker_sasl_mechanism = new_sasl_mechanism
self.bounce()
- # Bounce again with ACLs for new mechanism. Use old SimpleAclAuthorizer here to ensure that is also tested.
- self.set_authorizer_and_bounce(security_protocol, security_protocol, KafkaService.SIMPLE_AUTHORIZER)
+ # Bounce again with ACLs for new mechanism.
+ self.set_authorizer_and_bounce(security_protocol, security_protocol)
def add_separate_broker_listener(self, broker_security_protocol, broker_sasl_mechanism):
# Enable the new internal listener on all brokers first