You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/06/08 15:14:41 UTC
[kafka] branch 2.0 updated: KAFKA-7006 - remove duplicate Scala ResourceNameType in preference to… (#5152)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 9f66fb2 KAFKA-7006 - remove duplicate Scala ResourceNameType in preference to… (#5152)
9f66fb2 is described below
commit 9f66fb2a3ad3615c718d13a5b28ebdc9ed30c3d1
Author: Andy Coates <80...@users.noreply.github.com>
AuthorDate: Fri Jun 8 16:13:44 2018 +0100
KAFKA-7006 - remove duplicate Scala ResourceNameType in preference to… (#5152)
remove duplicate Scala ResourceNameType in preference to in preference to Java ResourceNameType.
This is follow on work for KIP-290 and PR #5117, which saw the Scala ResourceNameType class introduced.
I've added tests to ensure AclBindings can't be created with ResourceNameType.ANY or UNKNOWN.
Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>, Jun Rao <ju...@gmail.com>
---
.../apache/kafka/common/acl/AclBindingTest.java | 56 +++++++++++-----
core/src/main/scala/kafka/admin/AclCommand.scala | 22 +++----
.../main/scala/kafka/security/SecurityUtils.scala | 7 +-
.../scala/kafka/security/auth/Authorizer.scala | 34 ++++++----
.../main/scala/kafka/security/auth/Resource.scala | 16 +++--
.../kafka/security/auth/ResourceNameType.scala | 49 --------------
.../kafka/security/auth/SimpleAclAuthorizer.scala | 16 +++--
core/src/main/scala/kafka/server/KafkaApis.scala | 74 +++++++++++-----------
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 3 +-
core/src/main/scala/kafka/zk/ZkData.scala | 12 ++--
.../kafka/api/AuthorizerIntegrationTest.scala | 43 +++++++------
.../kafka/api/EndToEndAuthorizationTest.scala | 15 +++--
.../scala/kafka/security/auth/ResourceTest.scala | 17 ++---
.../scala/unit/kafka/admin/AclCommandTest.scala | 19 +++---
.../ZkNodeChangeNotificationListenerTest.scala | 13 ++--
.../security/auth/SimpleAclAuthorizerTest.scala | 55 ++++++++--------
.../delegation/DelegationTokenManagerTest.scala | 7 +-
17 files changed, 229 insertions(+), 229 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
index a35faca..4e41f98 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -56,29 +58,29 @@ public class AclBindingTest {
new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
@Test
- public void testMatching() throws Exception {
- assertTrue(ACL1.equals(ACL1));
+ public void testMatching() {
+ assertEquals(ACL1, ACL1);
final AclBinding acl1Copy = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW));
- assertTrue(ACL1.equals(acl1Copy));
- assertTrue(acl1Copy.equals(ACL1));
- assertTrue(ACL2.equals(ACL2));
- assertFalse(ACL1.equals(ACL2));
- assertFalse(ACL2.equals(ACL1));
+ assertEquals(ACL1, acl1Copy);
+ assertEquals(acl1Copy, ACL1);
+ assertEquals(ACL2, ACL2);
+ assertNotEquals(ACL1, ACL2);
+ assertNotEquals(ACL2, ACL1);
assertTrue(AclBindingFilter.ANY.matches(ACL1));
- assertFalse(AclBindingFilter.ANY.equals(ACL1));
+ assertNotEquals(AclBindingFilter.ANY, ACL1);
assertTrue(AclBindingFilter.ANY.matches(ACL2));
- assertFalse(AclBindingFilter.ANY.equals(ACL2));
+ assertNotEquals(AclBindingFilter.ANY, ACL2);
assertTrue(AclBindingFilter.ANY.matches(ACL3));
- assertFalse(AclBindingFilter.ANY.equals(ACL3));
- assertTrue(AclBindingFilter.ANY.equals(AclBindingFilter.ANY));
+ assertNotEquals(AclBindingFilter.ANY, ACL3);
+ assertEquals(AclBindingFilter.ANY, AclBindingFilter.ANY);
assertTrue(ANY_ANONYMOUS.matches(ACL1));
- assertFalse(ANY_ANONYMOUS.equals(ACL1));
+ assertNotEquals(ANY_ANONYMOUS, ACL1);
assertFalse(ANY_ANONYMOUS.matches(ACL2));
- assertFalse(ANY_ANONYMOUS.equals(ACL2));
+ assertNotEquals(ANY_ANONYMOUS, ACL2);
assertTrue(ANY_ANONYMOUS.matches(ACL3));
- assertFalse(ANY_ANONYMOUS.equals(ACL3));
+ assertNotEquals(ANY_ANONYMOUS, ACL3);
assertFalse(ANY_DENY.matches(ACL1));
assertFalse(ANY_DENY.matches(ACL2));
assertTrue(ANY_DENY.matches(ACL3));
@@ -87,12 +89,12 @@ public class AclBindingTest {
assertFalse(ANY_MYTOPIC.matches(ACL3));
assertTrue(ANY_ANONYMOUS.matches(UNKNOWN_ACL));
assertTrue(ANY_DENY.matches(UNKNOWN_ACL));
- assertTrue(UNKNOWN_ACL.equals(UNKNOWN_ACL));
+ assertEquals(UNKNOWN_ACL, UNKNOWN_ACL);
assertFalse(ANY_MYTOPIC.matches(UNKNOWN_ACL));
}
@Test
- public void testUnknowns() throws Exception {
+ public void testUnknowns() {
assertFalse(ACL1.isUnknown());
assertFalse(ACL2.isUnknown());
assertFalse(ACL3.isUnknown());
@@ -103,7 +105,7 @@ public class AclBindingTest {
}
@Test
- public void testMatchesAtMostOne() throws Exception {
+ public void testMatchesAtMostOne() {
assertNull(ACL1.toFilter().findIndefiniteField());
assertNull(ACL2.toFilter().findIndefiniteField());
assertNull(ACL3.toFilter().findIndefiniteField());
@@ -111,4 +113,24 @@ public class AclBindingTest {
assertFalse(ANY_DENY.matchesAtMostOne());
assertFalse(ANY_MYTOPIC.matchesAtMostOne());
}
+
+ @Test
+ public void shouldNotThrowOnUnknownResourceNameType() {
+ new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.UNKNOWN), ACL1.entry());
+ }
+
+ @Test
+ public void shouldNotThrowOnUnknownResourceType() {
+ new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "foo", ResourceNameType.LITERAL), ACL1.entry());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowOnAnyResourceNameType() {
+ new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.ANY), ACL1.entry());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowOnAnyResourceType() {
+ new AclBinding(new ResourcePattern(ResourceType.ANY, "foo", ResourceNameType.LITERAL), ACL1.entry());
+ }
}
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index d55e886..d223945 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -25,13 +25,13 @@ import kafka.utils._
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.resource.{ResourcePatternFilter, ResourceNameType => JResourceNameType, ResourceType => JResourceType, Resource => JResource}
+import org.apache.kafka.common.resource.{ResourcePatternFilter, ResourceNameType, ResourceType => JResourceType, Resource => JResource}
import scala.collection.JavaConverters._
object AclCommand extends Logging {
- val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, JResourceNameType.LITERAL)
+ val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, ResourceNameType.LITERAL)
private val Newline = scala.util.Properties.lineSeparator
@@ -87,13 +87,13 @@ object AclCommand extends Logging {
}
private def addAcl(opts: AclCommandOptions) {
- if (opts.options.valueOf(opts.resourceNameType) == JResourceNameType.ANY)
+ if (opts.options.valueOf(opts.resourceNameType) == ResourceNameType.ANY)
CommandLineUtils.printUsageAndDie(opts.parser, "A '--resource-name-type' value of 'Any' is not valid when adding acls.")
withAuthorizer(opts) { authorizer =>
val resourceToAcl = getResourceFilterToAcls(opts).map {
case (filter, acls) =>
- Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), ResourceNameType.fromJava(filter.nameType())) -> acls
+ Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), filter.nameType()) -> acls
}
if (resourceToAcl.values.exists(_.isEmpty))
@@ -262,13 +262,13 @@ object AclCommand extends Logging {
}
private def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[ResourcePatternFilter] = {
- val resourceNameType: JResourceNameType = opts.options.valueOf(opts.resourceNameType)
+ val resourceNameType: ResourceNameType = opts.options.valueOf(opts.resourceNameType)
var resourceFilters = Set.empty[ResourcePatternFilter]
if (opts.options.has(opts.topicOpt))
opts.options.valuesOf(opts.topicOpt).asScala.foreach(topic => resourceFilters += new ResourcePatternFilter(JResourceType.TOPIC, topic.trim, resourceNameType))
- if (resourceNameType == JResourceNameType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)))
+ if (resourceNameType == ResourceNameType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)))
resourceFilters += ClusterResourceFilter
if (opts.options.has(opts.groupOpt))
@@ -349,7 +349,7 @@ object AclCommand extends Logging {
.withRequiredArg()
.ofType(classOf[String])
.withValuesConvertedBy(new ResourceNameTypeConverter())
- .defaultsTo(JResourceNameType.LITERAL)
+ .defaultsTo(ResourceNameType.LITERAL)
val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.")
val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
@@ -429,9 +429,9 @@ object AclCommand extends Logging {
}
-class ResourceNameTypeConverter extends EnumConverter[JResourceNameType](classOf[JResourceNameType]) {
+class ResourceNameTypeConverter extends EnumConverter[ResourceNameType](classOf[ResourceNameType]) {
- override def convert(value: String): JResourceNameType = {
+ override def convert(value: String): ResourceNameType = {
val nameType = super.convert(value)
if (nameType.isUnknown)
throw new ValueConversionException("Unknown resourceNameType: " + value)
@@ -439,7 +439,7 @@ class ResourceNameTypeConverter extends EnumConverter[JResourceNameType](classOf
nameType
}
- override def valuePattern: String = JResourceNameType.values
- .filter(_ != JResourceNameType.UNKNOWN)
+ override def valuePattern: String = ResourceNameType.values
+ .filter(_ != ResourceNameType.UNKNOWN)
.mkString("|")
}
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala b/core/src/main/scala/kafka/security/SecurityUtils.scala
index 7489a3e..3d0f52e 100644
--- a/core/src/main/scala/kafka/security/SecurityUtils.scala
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -17,7 +17,7 @@
package kafka.security
-import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceNameType, ResourceType}
+import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceType}
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ApiError
@@ -32,11 +32,10 @@ object SecurityUtils {
def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = {
(for {
resourceType <- Try(ResourceType.fromJava(filter.patternFilter.resourceType))
- resourceNameType <- Try(ResourceNameType.fromJava(filter.patternFilter.nameType))
principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
operation <- Try(Operation.fromJava(filter.entryFilter.operation))
permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
- resource = Resource(resourceType, filter.patternFilter.name, resourceNameType)
+ resource = Resource(resourceType, filter.patternFilter.name, filter.patternFilter.nameType)
acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
} yield (resource, acl)) match {
case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, throwable.getMessage))
@@ -45,7 +44,7 @@ object SecurityUtils {
}
def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
- val resourcePattern = new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType.toJava)
+ val resourcePattern = new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType)
val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString,
acl.operation.toJava, acl.permissionType.toJava)
new AclBinding(resourcePattern, entry)
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala
index 4f4ddcf..6875dc6 100644
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala
@@ -48,17 +48,19 @@ trait Authorizer extends Configurable {
*
* {code}
* // The following will add ACLs to the literal resource path 'foo', which will only affect the topic named 'foo':
- * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", Literal))
+ * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL))
*
* // The following will add ACLs to the special literal topic resource path '*', which affects all topics:
- * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "*", Literal))
+ * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL))
*
* // The following will add ACLs to the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
- * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", Prefixed))
+ * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED))
* {code}
*
* @param acls set of acls to add to existing acls
- * @param resource the resource path to which these acls should be attached
+ * @param resource the resource path to which these acls should be attached.
+ * the supplied resource will have a specific resource name type,
+ * i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
*/
def addAcls(acls: Set[Acl], resource: Resource): Unit
@@ -67,17 +69,19 @@ trait Authorizer extends Configurable {
*
* {code}
* // The following will remove ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
- * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", Literal))
+ * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL))
*
* // The following will remove ACLs from the special literal topic resource path '*', which affects all topics:
- * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "*", Literal))
+ * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL))
*
* // The following will remove ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
- * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", Prefixed))
+ * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED))
* {code}
*
* @param acls set of acls to be removed.
* @param resource resource path from which the acls should be removed.
+ * the supplied resource will have a specific resource name type,
+ * i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
* @return true if some acl got removed, false if no acl was removed.
*/
def removeAcls(acls: Set[Acl], resource: Resource): Boolean
@@ -87,16 +91,18 @@ trait Authorizer extends Configurable {
*
* {code}
* // The following will remove all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
- * authorizer.removeAcls(Resource(Topic, "foo", Literal))
+ * authorizer.removeAcls(Resource(Topic, "foo", LITERAL))
*
* // The following will remove all ACLs from the special literal topic resource path '*', which affects all topics:
- * authorizer.removeAcls(Resource(Topic, "*", Literal))
+ * authorizer.removeAcls(Resource(Topic, "*", LITERAL))
*
* // The following will remove all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
- * authorizer.removeAcls(Resource(Topic, "foo", Prefixed))
+ * authorizer.removeAcls(Resource(Topic, "foo", PREFIXED))
* {code}
*
* @param resource the resource path from which these acls should be removed.
+ * the supplied resource will have a specific resource name type,
+ * i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
* @return
*/
def removeAcls(resource: Resource): Boolean
@@ -106,16 +112,18 @@ trait Authorizer extends Configurable {
*
* {code}
* // The following will get all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
- * authorizer.removeAcls(Resource(Topic, "foo", Literal))
+ * authorizer.removeAcls(Resource(Topic, "foo", LITERAL))
*
* // The following will get all ACLs from the special literal topic resource path '*', which affects all topics:
- * authorizer.removeAcls(Resource(Topic, "*", Literal))
+ * authorizer.removeAcls(Resource(Topic, "*", LITERAL))
*
* // The following will get all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
- * authorizer.removeAcls(Resource(Topic, "foo", Prefixed))
+ * authorizer.removeAcls(Resource(Topic, "foo", PREFIXED))
* {code}
*
* @param resource the resource path to which the acls belong.
+ * the supplied resource will have a specific resource name type,
+ * i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
* @return empty set if no acls are found, otherwise the acls for the resource.
*/
def getAcls(resource: Resource): Set[Acl]
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
index c9b5727..f07a11c 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -16,12 +16,12 @@
*/
package kafka.security.auth
-import org.apache.kafka.common.resource.ResourcePattern
+import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern}
object Resource {
val Separator = ":"
val ClusterResourceName = "kafka-cluster"
- val ClusterResource = new Resource(Cluster, Resource.ClusterResourceName, Literal)
+ val ClusterResource = Resource(Cluster, Resource.ClusterResourceName, ResourceNameType.LITERAL)
val ProducerIdResourceName = "producer-id"
val WildCardResource = "*"
@@ -34,7 +34,7 @@ object Resource {
}
case _ =>
str.split(Separator, 2) match {
- case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, Literal)
+ case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, ResourceNameType.LITERAL)
case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
}
}
@@ -50,6 +50,12 @@ object Resource {
*/
case class Resource(resourceType: ResourceType, name: String, nameType: ResourceNameType) {
+ if (nameType == ResourceNameType.ANY)
+ throw new IllegalArgumentException("nameType must not be ANY")
+
+ if (nameType == ResourceNameType.UNKNOWN)
+ throw new IllegalArgumentException("nameType must not be UNKNOWN")
+
/**
* Create an instance of this class with the provided parameters.
* Resource name type would default to ResourceNameType.LITERAL.
@@ -60,11 +66,11 @@ case class Resource(resourceType: ResourceType, name: String, nameType: Resource
*/
@deprecated("Use Resource(ResourceType, String, ResourceNameType")
def this(resourceType: ResourceType, name: String) {
- this(resourceType, name, Literal)
+ this(resourceType, name, ResourceNameType.LITERAL)
}
def toPattern: ResourcePattern = {
- new ResourcePattern(resourceType.toJava, name, nameType.toJava)
+ new ResourcePattern(resourceType.toJava, name, nameType)
}
override def toString: String = {
diff --git a/core/src/main/scala/kafka/security/auth/ResourceNameType.scala b/core/src/main/scala/kafka/security/auth/ResourceNameType.scala
deleted file mode 100644
index 21b10a1..0000000
--- a/core/src/main/scala/kafka/security/auth/ResourceNameType.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.auth
-
-import kafka.common.{BaseEnum, KafkaException}
-import org.apache.kafka.common.resource.{ResourceNameType => JResourceNameType}
-
-sealed trait ResourceNameType extends BaseEnum with Ordered[ ResourceNameType ] {
- def toJava: JResourceNameType
-
- override def compare(that: ResourceNameType): Int = this.name compare that.name
-}
-
-case object Literal extends ResourceNameType {
- val name = "Literal"
- val toJava = JResourceNameType.LITERAL
-}
-
-case object Prefixed extends ResourceNameType {
- val name = "Prefixed"
- val toJava = JResourceNameType.PREFIXED
-}
-
-object ResourceNameType {
-
- def fromString(resourceNameType: String): ResourceNameType = {
- val rType = values.find(rType => rType.name.equalsIgnoreCase(resourceNameType))
- rType.getOrElse(throw new KafkaException(resourceNameType + " not a valid resourceNameType name. The valid names are " + values.mkString(",")))
- }
-
- def values: Seq[ResourceNameType] = List(Literal, Prefixed)
-
- def fromJava(nameType: JResourceNameType): ResourceNameType = fromString(nameType.toString)
-}
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 0cb2fae..601b5be 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -27,6 +27,7 @@ import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zk.{AclChangeNotificationSequenceZNode, KafkaZkClient, ZkAclStore}
+import org.apache.kafka.common.resource.ResourceNameType
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{SecurityUtils, Time}
@@ -101,7 +102,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
}
override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
- if (resource.nameType != Literal) {
+ if (resource.nameType != ResourceNameType.LITERAL) {
throw new IllegalArgumentException("Only literal resources are supported. Got: " + resource.nameType)
}
@@ -203,15 +204,18 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
def getMatchingAcls(resourceType: ResourceType, resourceName: String): Set[Acl] = {
inReadLock(lock) {
- val wildcard = aclCache.get(Resource(resourceType, Acl.WildCardResource, Literal))
+ val wildcard = aclCache.get(Resource(resourceType, Acl.WildCardResource, ResourceNameType.LITERAL))
.map(_.acls)
.getOrElse(Set.empty[Acl])
- val literal = aclCache.get(Resource(resourceType, resourceName, Literal))
+ val literal = aclCache.get(Resource(resourceType, resourceName, ResourceNameType.LITERAL))
.map(_.acls)
.getOrElse(Set.empty[Acl])
- val prefixed = aclCache.range(Resource(resourceType, resourceName, Prefixed), Resource(resourceType, resourceName.substring(0, 1), Prefixed))
+ val prefixed = aclCache.range(
+ Resource(resourceType, resourceName, ResourceNameType.PREFIXED),
+ Resource(resourceType, resourceName.substring(0, 1), ResourceNameType.PREFIXED)
+ )
.filterKeys(resource => resourceName.startsWith(resource.name))
.flatMap { case (resource, versionedAcls) => versionedAcls.acls }
.toSet
@@ -222,7 +226,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
override def getAcls(): Map[Resource, Set[Acl]] = {
inReadLock(lock) {
- aclCache.mapValues(_.acls).toMap
+ aclCache.mapValues(_.acls)
}
}
@@ -365,7 +369,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
if (rt != 0)
rt
else {
- val rnt = a.nameType compare b.nameType
+ val rnt = a.nameType compareTo b.nameType
if (rnt != 0)
rnt
else
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6d9e3d1..7a39c12 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -52,6 +52,7 @@ import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, A
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
+import org.apache.kafka.common.resource.ResourceNameType.LITERAL
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{Time, Utils}
@@ -273,7 +274,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetCommitRequest = request.body[OffsetCommitRequest]
// reject the request if not authorized to the group
- if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId, Literal))) {
+ if (!authorize(request.session, Read, Resource(Group, offsetCommitRequest.groupId, LITERAL))) {
val error = Errors.GROUP_AUTHORIZATION_FAILED
val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
(topicPartition, error)
@@ -286,7 +287,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequest.PartitionData]
for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala) {
- if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic, Literal)))
+ if (!authorize(request.session, Read, Resource(Topic, topicPartition.topic, LITERAL)))
unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
@@ -384,7 +385,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes
if (produceRequest.isTransactional) {
- if (!authorize(request.session, Write, new Resource(TransactionalId, produceRequest.transactionalId, Literal))) {
+ if (!authorize(request.session, Write, Resource(TransactionalId, produceRequest.transactionalId, LITERAL))) {
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
return
}
@@ -400,7 +401,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {
- if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic, Literal)))
+ if (!authorize(request.session, Write, Resource(Topic, topicPartition.topic, LITERAL)))
unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
@@ -529,7 +530,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
// Regular Kafka consumers need READ permission on each partition they are fetching.
fetchContext.foreachPartition((topicPartition, data) => {
- if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic, Literal)))
+ if (!authorize(request.session, Read, Resource(Topic, topicPartition.topic, LITERAL)))
erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
@@ -741,7 +742,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetRequest = request.body[ListOffsetRequest]
val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.offsetData.asScala.partition {
- case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic, Literal))
+ case (topicPartition, _) => authorize(request.session, Describe, Resource(Topic, topicPartition.topic, LITERAL))
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
@@ -794,7 +795,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetRequest = request.body[ListOffsetRequest]
val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.partitionTimestamps.asScala.partition {
- case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic, Literal))
+ case (topicPartition, _) => authorize(request.session, Describe, Resource(Topic, topicPartition.topic, LITERAL))
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
@@ -1033,7 +1034,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
var (authorizedTopics, unauthorizedForDescribeTopics) =
- topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic, Literal)))
+ topics.partition(topic => authorize(request.session, Describe, Resource(Topic, topic, LITERAL)))
var unauthorizedForCreateTopics = Set[String]()
@@ -1097,12 +1098,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetFetchRequest = request.body[OffsetFetchRequest]
def authorizeTopicDescribe(partition: TopicPartition) =
- authorize(request.session, Describe, new Resource(Topic, partition.topic, Literal))
+ authorize(request.session, Describe, Resource(Topic, partition.topic, LITERAL))
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val offsetFetchResponse =
// reject the request if not authorized to the group
- if (!authorize(request.session, Describe, new Resource(Group, offsetFetchRequest.groupId, Literal)))
+ if (!authorize(request.session, Describe, Resource(Group, offsetFetchRequest.groupId, LITERAL)))
offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)
else {
if (header.apiVersion == 0) {
@@ -1170,10 +1171,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val findCoordinatorRequest = request.body[FindCoordinatorRequest]
if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
- !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey, Literal)))
+ !authorize(request.session, Describe, Resource(Group, findCoordinatorRequest.coordinatorKey, LITERAL)))
sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
else if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION &&
- !authorize(request.session, Describe, new Resource(TransactionalId, findCoordinatorRequest.coordinatorKey, Literal)))
+ !authorize(request.session, Describe, Resource(TransactionalId, findCoordinatorRequest.coordinatorKey, LITERAL)))
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
else {
// get metadata (and create the topic if necessary)
@@ -1220,7 +1221,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val describeRequest = request.body[DescribeGroupsRequest]
val groups = describeRequest.groupIds.asScala.map { groupId =>
- if (!authorize(request.session, Describe, new Resource(Group, groupId, Literal))) {
+ if (!authorize(request.session, Describe, Resource(Group, groupId, LITERAL))) {
groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
} else {
val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
@@ -1266,7 +1267,7 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseMaybeThrottle(request, createResponse)
}
- if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId(), Literal))) {
+ if (!authorize(request.session, Read, Resource(Group, joinGroupRequest.groupId(), LITERAL))) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
new JoinGroupResponse(
requestThrottleMs,
@@ -1302,7 +1303,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new SyncGroupResponse(requestThrottleMs, error, ByteBuffer.wrap(memberState)))
}
- if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId(), Literal))) {
+ if (!authorize(request.session, Read, Resource(Group, syncGroupRequest.groupId(), LITERAL))) {
sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED)
} else {
groupCoordinator.handleSyncGroup(
@@ -1320,7 +1321,7 @@ class KafkaApis(val requestChannel: RequestChannel,
var groups = deleteGroupsRequest.groups.asScala.toSet
val (authorizedGroups, unauthorizedGroups) = groups.partition { group =>
- authorize(request.session, Delete, new Resource(Group, group, Literal))
+ authorize(request.session, Delete, Resource(Group, group, LITERAL))
}
val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups) ++
@@ -1344,7 +1345,7 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseMaybeThrottle(request, createResponse)
}
- if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId, Literal))) {
+ if (!authorize(request.session, Read, Resource(Group, heartbeatRequest.groupId, LITERAL))) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
new HeartbeatResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
} else {
@@ -1371,7 +1372,7 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseMaybeThrottle(request, createResponse)
}
- if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId, Literal))) {
+ if (!authorize(request.session, Read, Resource(Group, leaveGroupRequest.groupId, LITERAL))) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
new LeaveGroupResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
} else {
@@ -1491,7 +1492,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val dupes = createPartitionsRequest.duplicates.asScala
val notDuped = createPartitionsRequest.newPartitions.asScala -- dupes
val (authorized, unauthorized) = notDuped.partition { case (topic, _) =>
- authorize(request.session, Alter, new Resource(Topic, topic, Literal))
+ authorize(request.session, Alter, Resource(Topic, topic, LITERAL))
}
val (queuedForDeletion, valid) = authorized.partition { case (topic, _) =>
@@ -1515,7 +1516,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedForDeleteTopics = mutable.Set[String]()
for (topic <- deleteTopicRequest.topics.asScala) {
- if (!authorize(request.session, Delete, new Resource(Topic, topic, Literal)))
+ if (!authorize(request.session, Delete, Resource(Topic, topic, LITERAL)))
unauthorizedTopicErrors += topic -> Errors.TOPIC_AUTHORIZATION_FAILED
else if (!metadataCache.contains(topic))
nonExistingTopicErrors += topic -> Errors.UNKNOWN_TOPIC_OR_PARTITION
@@ -1560,7 +1561,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedForDeleteTopicOffsets = mutable.Map[TopicPartition, Long]()
for ((topicPartition, offset) <- deleteRecordsRequest.partitionOffsets.asScala) {
- if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic, Literal)))
+ if (!authorize(request.session, Delete, Resource(Topic, topicPartition.topic, LITERAL)))
unauthorizedTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse(
DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
@@ -1603,7 +1604,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val transactionalId = initProducerIdRequest.transactionalId
if (transactionalId != null) {
- if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId, Literal))) {
+ if (!authorize(request.session, Write, Resource(TransactionalId, transactionalId, LITERAL))) {
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
return
}
@@ -1628,7 +1629,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val endTxnRequest = request.body[EndTxnRequest]
val transactionalId = endTxnRequest.transactionalId
- if (authorize(request.session, Write, new Resource(TransactionalId, transactionalId, Literal))) {
+ if (authorize(request.session, Write, Resource(TransactionalId, transactionalId, LITERAL))) {
def sendResponseCallback(error: Errors) {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val responseBody = new EndTxnResponse(requestThrottleMs, error)
@@ -1763,7 +1764,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
val transactionalId = addPartitionsToTxnRequest.transactionalId
val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
- if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId, Literal)))
+ if (!authorize(request.session, Write, Resource(TransactionalId, transactionalId, LITERAL)))
sendResponseMaybeThrottle(request, requestThrottleMs =>
addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
else {
@@ -1773,7 +1774,7 @@ class KafkaApis(val requestChannel: RequestChannel,
for (topicPartition <- partitionsToAdd) {
if (org.apache.kafka.common.internals.Topic.isInternal(topicPartition.topic) ||
- !authorize(request.session, Write, new Resource(Topic, topicPartition.topic, Literal)))
+ !authorize(request.session, Write, Resource(Topic, topicPartition.topic, LITERAL)))
unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
else if (!metadataCache.contains(topicPartition))
nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
@@ -1817,10 +1818,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val groupId = addOffsetsToTxnRequest.consumerGroupId
val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
- if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId, Literal)))
+ if (!authorize(request.session, Write, Resource(TransactionalId, transactionalId, LITERAL)))
sendResponseMaybeThrottle(request, requestThrottleMs =>
new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
- else if (!authorize(request.session, Read, new Resource(Group, groupId, Literal)))
+ else if (!authorize(request.session, Read, Resource(Group, groupId, LITERAL)))
sendResponseMaybeThrottle(request, requestThrottleMs =>
new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
else {
@@ -1849,9 +1850,9 @@ class KafkaApis(val requestChannel: RequestChannel,
// authorize for the transactionalId and the consumer group. Note that we skip producerId authorization
// since it is implied by transactionalId authorization
- if (!authorize(request.session, Write, new Resource(TransactionalId, txnOffsetCommitRequest.transactionalId, Literal)))
+ if (!authorize(request.session, Write, Resource(TransactionalId, txnOffsetCommitRequest.transactionalId, LITERAL)))
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
- else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId, Literal)))
+ else if (!authorize(request.session, Read, Resource(Group, txnOffsetCommitRequest.consumerGroupId, LITERAL)))
sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
else {
val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
@@ -1859,7 +1860,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) {
- if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic, Literal)))
+ if (!authorize(request.session, Read, Resource(Topic, topicPartition.topic, LITERAL)))
unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
else if (!metadataCache.contains(topicPartition))
nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
@@ -1920,10 +1921,9 @@ class KafkaApis(val requestChannel: RequestChannel,
val filter = describeAclsRequest.filter()
val returnedAcls = auth.getAcls.toSeq.flatMap { case (resource, acls) =>
acls.flatMap { acl =>
- val fixture = new AclBinding(new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType.toJava),
+ val fixture = new AclBinding(new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType),
new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava))
- if (filter.matches(fixture)) Some(fixture)
- else None
+ Some(fixture).filter(filter.matches)
}
}
sendResponseMaybeThrottle(request, requestThrottleMs =>
@@ -1994,7 +1994,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val filtersWithIndex = filters.zipWithIndex
for ((resource, acls) <- aclMap; acl <- acls) {
val binding = new AclBinding(
- new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType.toJava),
+ new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType),
new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava,
acl.permissionType.toJava))
@@ -2042,7 +2042,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case RResourceType.BROKER =>
authorize(request.session, AlterConfigs, Resource.ClusterResource)
case RResourceType.TOPIC =>
- authorize(request.session, AlterConfigs, new Resource(Topic, resource.name, Literal))
+ authorize(request.session, AlterConfigs, Resource(Topic, resource.name, LITERAL))
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
}
}
@@ -2069,7 +2069,7 @@ class KafkaApis(val requestChannel: RequestChannel,
resource.`type` match {
case RResourceType.BROKER => authorize(request.session, DescribeConfigs, Resource.ClusterResource)
case RResourceType.TOPIC =>
- authorize(request.session, DescribeConfigs, new Resource(Topic, resource.name, Literal))
+ authorize(request.session, DescribeConfigs, Resource(Topic, resource.name, LITERAL))
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
}
}
@@ -2216,7 +2216,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
else {
val owners = if (describeTokenRequest.owners == null) None else Some(describeTokenRequest.owners.asScala.toList)
- def authorizeToken(tokenId: String) = authorize(request.session, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId, Literal))
+ def authorizeToken(tokenId: String) = authorize(request.session, Describe, Resource(kafka.security.auth.DelegationToken, tokenId, LITERAL))
def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, owners, token, authorizeToken)
val tokens = tokenManager.getTokens(eligible)
sendResponseCallback(Errors.NONE, tokens)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 20e4b83..6ec8e30 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -26,11 +26,12 @@ import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
-import kafka.security.auth.{Acl, Resource, ResourceNameType, ResourceType}
+import kafka.security.auth.{Acl, Resource, ResourceType}
import kafka.server.ConfigType
import kafka.utils.Logging
import kafka.zookeeper._
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.resource.ResourceNameType
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 6121035..d4470ab 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -26,11 +26,12 @@ import kafka.cluster.{Broker, EndPoint}
import kafka.common.KafkaException
import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
-import kafka.security.auth.{Acl, Literal, Prefixed, Resource, ResourceNameType, ResourceType}
+import kafka.security.auth.{Acl, Resource, ResourceType}
import kafka.server.{ConfigType, DelegationTokenManager}
import kafka.utils.Json
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.resource.ResourceNameType
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.Time
@@ -458,14 +459,14 @@ object StateChangeHandlers {
*/
case class ZkAclStore(nameType: ResourceNameType) {
val aclPath: String = nameType match {
- case Literal => "/kafka-acl"
- case Prefixed => "/kafka-prefixed-acl"
+ case ResourceNameType.LITERAL => "/kafka-acl"
+ case ResourceNameType.PREFIXED => "/kafka-prefixed-acl"
case _ => throw new IllegalArgumentException("Unknown name type:" + nameType)
}
val aclChangePath: String = nameType match {
- case Literal => "/kafka-acl-changes"
- case Prefixed => "/kafka-prefixed-acl-changes"
+ case ResourceNameType.LITERAL => "/kafka-acl-changes"
+ case ResourceNameType.PREFIXED => "/kafka-prefixed-acl-changes"
case _ => throw new IllegalArgumentException("Unknown name type:" + nameType)
}
@@ -480,6 +481,7 @@ case class ZkAclStore(nameType: ResourceNameType) {
object ZkAclStore {
val stores: Seq[ZkAclStore] = ResourceNameType.values
+ .filter(nameType => nameType != ResourceNameType.ANY && nameType != ResourceNameType.UNKNOWN)
.map(nameType => ZkAclStore(nameType))
val securePaths: Seq[String] = stores
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a3b3233..b48a349 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -40,9 +40,10 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records,
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
+import org.apache.kafka.common.resource.ResourceNameType.LITERAL
import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.common.{KafkaException, Node, TopicPartition, acl, requests, resource}
+import org.apache.kafka.common.{KafkaException, Node, TopicPartition, requests}
import org.junit.Assert._
import org.junit.{After, Assert, Before, Test}
@@ -70,11 +71,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val deleteRecordsPartition = new TopicPartition(deleteTopic, part)
val topicAndPartition = TopicAndPartition(topic, part)
val group = "my-group"
- val topicResource = new Resource(Topic, topic, Literal)
- val groupResource = new Resource(Group, group, Literal)
- val deleteTopicResource = new Resource(Topic, deleteTopic, Literal)
- val transactionalIdResource = new Resource(TransactionalId, transactionalId, Literal)
- val createTopicResource = new Resource(Topic, createTopic, Literal)
+ val topicResource = Resource(Topic, topic, LITERAL)
+ val groupResource = Resource(Group, group, LITERAL)
+ val deleteTopicResource = Resource(Topic, deleteTopic, LITERAL)
+ val transactionalIdResource = Resource(TransactionalId, transactionalId, LITERAL)
+ val createTopicResource = Resource(Topic, createTopic, LITERAL)
val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
@@ -378,12 +379,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def createAclsRequest = new CreateAclsRequest.Builder(
Collections.singletonList(new AclCreation(new AclBinding(
- new ResourcePattern(AdminResourceType.TOPIC, "mytopic", resource.ResourceNameType.LITERAL),
+ new ResourcePattern(AdminResourceType.TOPIC, "mytopic", LITERAL),
new AccessControlEntry(userPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.DENY))))).build()
private def deleteAclsRequest = new DeleteAclsRequest.Builder(
Collections.singletonList(new AclBindingFilter(
- new ResourcePatternFilter(AdminResourceType.TOPIC, null, resource.ResourceNameType.LITERAL),
+ new ResourcePatternFilter(AdminResourceType.TOPIC, null, LITERAL),
new AccessControlEntryFilter(userPrincipal.toString, "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
private def alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build()
@@ -577,13 +578,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def testCreatePermissionNeededToWriteToNonExistentTopic(resType: ResourceType) {
val topicPartition = new TopicPartition(createTopic, 0)
- val newTopicResource = new Resource(Topic, createTopic, Literal)
+ val newTopicResource = Resource(Topic, createTopic, LITERAL)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource)
try {
sendRecords(numRecords, topicPartition)
Assert.fail("should have thrown exception")
} catch {
- case e: TopicAuthorizationException =>
+ case e: TopicAuthorizationException =>
assertEquals(Collections.singleton(createTopic), e.unauthorizedTopics())
}
@@ -733,7 +734,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
// create an unmatched topic
val unmatchedTopic = "unmatched"
createTopic(unmatchedTopic)
- addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), new Resource(Topic, unmatchedTopic, Literal))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), Resource(Topic, unmatchedTopic, LITERAL))
sendRecords(1, new TopicPartition(unmatchedTopic, part))
removeAllAcls()
@@ -745,8 +746,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
// set the subscription pattern to an internal topic that the consumer has read permission to. Since
// internal topics are not included, we should not be assigned any partitions from this topic
- addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), new Resource(Topic,
- GROUP_METADATA_TOPIC_NAME, Literal))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Topic,
+ GROUP_METADATA_TOPIC_NAME, LITERAL))
consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
consumer.poll(0)
assertTrue(consumer.subscription().isEmpty)
@@ -774,7 +775,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
// now authorize the user for the internal topic and verify that we can subscribe
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Topic,
- GROUP_METADATA_TOPIC_NAME, Literal))
+ GROUP_METADATA_TOPIC_NAME, LITERAL))
consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
consumer.poll(0)
assertEquals(Set(GROUP_METADATA_TOPIC_NAME), consumer.subscription.asScala)
@@ -789,7 +790,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
- val internalTopicResource = new Resource(Topic, GROUP_METADATA_TOPIC_NAME, Literal)
+ val internalTopicResource = Resource(Topic, GROUP_METADATA_TOPIC_NAME, LITERAL)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), internalTopicResource)
val consumerConfig = new Properties
@@ -836,13 +837,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testCreatePermissionOnClusterToReadFromNonExistentTopic() {
testCreatePermissionNeededToReadFromNonExistentTopic("newTopic",
- Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)),
+ Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)),
Cluster)
}
private def testCreatePermissionNeededToReadFromNonExistentTopic(newTopic: String, acls: Set[Acl], resType: ResourceType) {
val topicPartition = new TopicPartition(newTopic, 0)
- val newTopicResource = new Resource(Topic, newTopic, Literal)
+ val newTopicResource = Resource(Topic, newTopic, LITERAL)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource)
addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
this.consumers.head.assign(List(topicPartition).asJava)
@@ -1045,7 +1046,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testDeleteTopicsWithWildCardAuth() {
- addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*", Literal))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), Resource(Topic, "*", LITERAL))
val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
val version = ApiKeys.DELETE_TOPICS.latestVersion
val deleteResponse = DeleteTopicsResponse.parse(response, version)
@@ -1072,7 +1073,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testDeleteRecordsWithWildCardAuth() {
- addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*", Literal))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), Resource(Topic, "*", LITERAL))
val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS)
val version = ApiKeys.DELETE_RECORDS.latestVersion
val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version)
@@ -1090,7 +1091,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testCreatePartitionsWithWildCardAuth() {
- addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)), new Resource(Topic, "*", Literal))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)), Resource(Topic, "*", LITERAL))
val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS)
val version = ApiKeys.CREATE_PARTITIONS.latestVersion
val createPartitionsResponse = CreatePartitionsResponse.parse(response, version)
@@ -1283,7 +1284,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(): Unit = {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
- addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic, Literal))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), Resource(Topic, deleteTopic, LITERAL))
val producer = buildTransactionalProducer()
producer.initTransactions()
producer.beginTransaction()
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index b809686..1f89ea3 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{GroupAuthorizationException, TimeoutException, TopicAuthorizationException}
+import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -78,13 +79,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
- val topicResource = new Resource(Topic, topic, Literal)
- val groupResource = new Resource(Group, group, Literal)
+ val topicResource = Resource(Topic, topic, LITERAL)
+ val groupResource = Resource(Group, group, LITERAL)
val clusterResource = Resource.ClusterResource
- val prefixedTopicResource = new Resource(Topic, topicPrefix, Prefixed)
- val prefixedGroupResource = new Resource(Group, groupPrefix, Prefixed)
- val wildcardTopicResource = new Resource(Topic, wildcard, Literal)
- val wildcardGroupResource = new Resource(Group, wildcard, Literal)
+ val prefixedTopicResource = Resource(Topic, topicPrefix, PREFIXED)
+ val prefixedGroupResource = Resource(Group, groupPrefix, PREFIXED)
+ val wildcardTopicResource = Resource(Topic, wildcard, LITERAL)
+ val wildcardGroupResource = Resource(Group, wildcard, LITERAL)
// Arguments to AclCommand to set ACLs.
def clusterActionArgs: Array[String] = Array("--authorizer-properties",
@@ -182,7 +183,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
super.setUp()
servers.foreach { s =>
TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, Resource.ClusterResource)
- TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*", Literal))
+ TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, Resource(Topic, "*", LITERAL))
}
// create the test topic with all the brokers as replicas
createTopic(topic, 1, 3)
diff --git a/core/src/test/scala/kafka/security/auth/ResourceTest.scala b/core/src/test/scala/kafka/security/auth/ResourceTest.scala
index 66049b4..2924cff 100644
--- a/core/src/test/scala/kafka/security/auth/ResourceTest.scala
+++ b/core/src/test/scala/kafka/security/auth/ResourceTest.scala
@@ -18,6 +18,7 @@
package kafka.security.auth
import kafka.common.KafkaException
+import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
import org.junit.Test
import org.junit.Assert._
@@ -29,30 +30,30 @@ class ResourceTest {
@Test
def shouldParseOldTwoPartString(): Unit = {
- assertEquals(Resource(Group, "fred", Literal), Resource.fromString("Group:fred"))
- assertEquals(Resource(Topic, "t", Literal), Resource.fromString("Topic:t"))
+ assertEquals(Resource(Group, "fred", LITERAL), Resource.fromString("Group:fred"))
+ assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("Topic:t"))
}
@Test
def shouldParseOldTwoPartWithEmbeddedSeparators(): Unit = {
- assertEquals(Resource(Group, ":This:is:a:weird:group:name:", Literal), Resource.fromString("Group::This:is:a:weird:group:name:"))
+ assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("Group::This:is:a:weird:group:name:"))
}
@Test
def shouldParseThreePartString(): Unit = {
- assertEquals(Resource(Group, "fred", Prefixed), Resource.fromString("Prefixed:Group:fred"))
- assertEquals(Resource(Topic, "t", Literal), Resource.fromString("Literal:Topic:t"))
+ assertEquals(Resource(Group, "fred", PREFIXED), Resource.fromString("PREFIXED:Group:fred"))
+ assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("LITERAL:Topic:t"))
}
@Test
def shouldParseThreePartWithEmbeddedSeparators(): Unit = {
- assertEquals(Resource(Group, ":This:is:a:weird:group:name:", Prefixed), Resource.fromString("Prefixed:Group::This:is:a:weird:group:name:"))
- assertEquals(Resource(Group, ":This:is:a:weird:group:name:", Literal), Resource.fromString("Literal:Group::This:is:a:weird:group:name:"))
+ assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), Resource.fromString("PREFIXED:Group::This:is:a:weird:group:name:"))
+ assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("LITERAL:Group::This:is:a:weird:group:name:"))
}
@Test
def shouldRoundTripViaString(): Unit = {
- val expected = Resource(Group, "fred", Prefixed)
+ val expected = Resource(Group, "fred", PREFIXED)
val actual = Resource.fromString(expected.toString)
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 71754ba..76cf787 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -23,8 +23,9 @@ import kafka.security.auth._
import kafka.server.KafkaConfig
import kafka.utils.{Logging, TestUtils}
import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.junit.{After, Before, Test}
+import org.junit.{Before, Test}
class AclCommandTest extends ZooKeeperTestHarness with Logging {
@@ -36,10 +37,10 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
private val AllowHostCommand = Array("--allow-host", "host1", "--allow-host", "host2")
private val DenyHostCommand = Array("--deny-host", "host1", "--deny-host", "host2")
- private val TopicResources = Set(Resource(Topic, "test-1", Literal), Resource(Topic, "test-2", Literal))
- private val GroupResources = Set(Resource(Group, "testGroup-1", Literal), Resource(Group, "testGroup-2", Literal))
- private val TransactionalIdResources = Set(Resource(TransactionalId, "t0", Literal), Resource(TransactionalId, "t1", Literal))
- private val TokenResources = Set(Resource(DelegationToken, "token1", Literal), Resource(DelegationToken, "token2", Literal))
+ private val TopicResources = Set(Resource(Topic, "test-1", LITERAL), Resource(Topic, "test-2", LITERAL))
+ private val GroupResources = Set(Resource(Group, "testGroup-1", LITERAL), Resource(Group, "testGroup-2", LITERAL))
+ private val TransactionalIdResources = Set(Resource(TransactionalId, "t0", LITERAL), Resource(TransactionalId, "t1", LITERAL))
+ private val TokenResources = Set(Resource(DelegationToken, "token1", LITERAL), Resource(DelegationToken, "token2", LITERAL))
private val ResourceToCommand = Map[Set[Resource], Array[String]](
TopicResources -> Array("--topic", "test-1", "--topic", "test-2"),
@@ -64,7 +65,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
private def ProducerResourceToAcls(enableIdempotence: Boolean = false) = Map[Set[Resource], Set[Acl]](
TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe, Create), Hosts),
TransactionalIdResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts),
- Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow,
+ Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow,
Set(if (enableIdempotence) Some(IdempotentWrite) else None).flatten, Hosts)
)
@@ -140,14 +141,14 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
val writeAcl = Acl(principal, Allow, Acl.WildCardHost, Write)
val describeAcl = Acl(principal, Allow, Acl.WildCardHost, Describe)
val createAcl = Acl(principal, Allow, Acl.WildCardHost, Create)
- TestUtils.waitAndVerifyAcls(Set(writeAcl, describeAcl, createAcl), authorizer, Resource(Topic, "Test-", Prefixed))
+ TestUtils.waitAndVerifyAcls(Set(writeAcl, describeAcl, createAcl), authorizer, Resource(Topic, "Test-", PREFIXED))
}
AclCommand.main(zkArgs ++ cmd :+ "--remove" :+ "--force")
withAuthorizer() { authorizer =>
- TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Cluster, "kafka-cluster", Literal))
- TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Topic, "Test-", Prefixed))
+ TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Cluster, "kafka-cluster", LITERAL))
+ TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Topic, "Test-", PREFIXED))
}
}
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index cee0bd6..02918d6 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -16,9 +16,10 @@
*/
package kafka.common
-import kafka.security.auth.{Group, Literal, Resource}
+import kafka.security.auth.{Group, Resource}
import kafka.utils.TestUtils
import kafka.zk.{AclChangeNotificationSequenceZNode, ZkAclStore, ZooKeeperTestHarness}
+import org.apache.kafka.common.resource.ResourceNameType.LITERAL
import org.junit.{After, Test}
class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
@@ -38,17 +39,17 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
@volatile var invocationCount = 0
val notificationHandler = new NotificationHandler {
override def processNotification(notificationMessage: Array[Byte]): Unit = {
- notification = AclChangeNotificationSequenceZNode.decode(Literal, notificationMessage)
+ notification = AclChangeNotificationSequenceZNode.decode(LITERAL, notificationMessage)
invocationCount += 1
}
}
zkClient.createAclPaths()
- val notificationMessage1 = Resource(Group, "messageA", Literal)
- val notificationMessage2 = Resource(Group, "messageB", Literal)
+ val notificationMessage1 = Resource(Group, "messageA", LITERAL)
+ val notificationMessage2 = Resource(Group, "messageB", LITERAL)
val changeExpirationMs = 1000
- notificationListener = new ZkNodeChangeNotificationListener(zkClient, ZkAclStore(Literal).aclChangePath,
+ notificationListener = new ZkNodeChangeNotificationListener(zkClient, ZkAclStore(LITERAL).aclChangePath,
AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs)
notificationListener.init()
@@ -68,7 +69,7 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2,
"Failed to send/process notification message in the timeout period.")
- (3 to 10).foreach(i => zkClient.createAclChangeNotification(Resource(Group, "message" + i, Literal)))
+ (3 to 10).foreach(i => zkClient.createAclChangeNotification(Resource(Group, "message" + i, LITERAL)))
TestUtils.waitUntilTrue(() => invocationCount == 10 ,
s"Expected 10 invocations of processNotifications, but there were $invocationCount")
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 3e7f6a8..05a433c 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -24,6 +24,7 @@ import kafka.security.auth.Acl.{WildCardHost, WildCardResource}
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -34,8 +35,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
val allowWriteAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
val denyReadAcl = Acl(Acl.WildCardPrincipal, Deny, WildCardHost, Read)
- val wildCardResource = Resource(Topic, WildCardResource, Literal)
- val prefixedResource = Resource(Topic, "foo", Prefixed)
+ val wildCardResource = Resource(Topic, WildCardResource, LITERAL)
+ val prefixedResource = Resource(Topic, "foo", PREFIXED)
val simpleAclAuthorizer = new SimpleAclAuthorizer
val simpleAclAuthorizer2 = new SimpleAclAuthorizer
@@ -62,7 +63,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
config = KafkaConfig.fromProps(props)
simpleAclAuthorizer.configure(config.originals)
simpleAclAuthorizer2.configure(config.originals)
- resource = new Resource(Topic, "foo-" + UUID.randomUUID(), Literal)
+ resource = Resource(Topic, "foo-" + UUID.randomUUID(), LITERAL)
}
@After
@@ -74,7 +75,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
@Test(expected = classOf[IllegalArgumentException])
def testAuthorizeThrowsOnNoneLiteralResource() {
- simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "something", Prefixed))
+ simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "something", PREFIXED))
}
@Test
@@ -234,10 +235,10 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == simpleAclAuthorizer.getAcls(user2), "changes not propagated in timeout period")
val resourceToAcls = Map[Resource, Set[Acl]](
- new Resource(Topic, Resource.WildCardResource, Literal) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)),
- new Resource(Cluster, Resource.WildCardResource, Literal) -> Set[Acl](new Acl(user2, Allow, host1, Read)),
- new Resource(Group, Resource.WildCardResource, Literal) -> acls,
- new Resource(Group, "test-ConsumerGroup", Literal) -> acls
+ new Resource(Topic, Resource.WildCardResource, LITERAL) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)),
+ new Resource(Cluster, Resource.WildCardResource, LITERAL) -> Set[Acl](new Acl(user2, Allow, host1, Read)),
+ new Resource(Group, Resource.WildCardResource, LITERAL) -> acls,
+ new Resource(Group, "test-ConsumerGroup", LITERAL) -> acls
)
resourceToAcls foreach { case (key, value) => changeAclAndVerify(Set.empty[Acl], value, Set.empty[Acl], key) }
@@ -265,7 +266,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
simpleAclAuthorizer.addAcls(acls, resource)
val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
- val resource1 = new Resource(Topic, "test-2", Literal)
+ val resource1 = Resource(Topic, "test-2", LITERAL)
val acl2 = new Acl(user2, Deny, "host3", Read)
val acls1 = Set[Acl](acl2)
simpleAclAuthorizer.addAcls(acls1, resource1)
@@ -284,7 +285,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
@Test
def testLocalConcurrentModificationOfResourceAcls() {
- val commonResource = new Resource(Topic, "test", Literal)
+ val commonResource = Resource(Topic, "test", LITERAL)
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val acl1 = new Acl(user1, Allow, WildCardHost, Read)
@@ -300,7 +301,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
@Test
def testDistributedConcurrentModificationOfResourceAcls() {
- val commonResource = new Resource(Topic, "test", Literal)
+ val commonResource = Resource(Topic, "test", LITERAL)
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val acl1 = new Acl(user1, Allow, WildCardHost, Read)
@@ -330,7 +331,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
@Test
def testHighConcurrencyModificationOfResourceAcls() {
- val commonResource = new Resource(Topic, "test", Literal)
+ val commonResource = Resource(Topic, "test", LITERAL)
val acls = (0 to 50).map { i =>
val useri = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, i.toString)
@@ -513,18 +514,18 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
@Test
def testAuthorizeWithPrefixedResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", Literal))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", Prefixed))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), Prefixed))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), Prefixed))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID() + "-zzz", Prefixed))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fooo-" + UUID.randomUUID(), Prefixed))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fo-" + UUID.randomUUID(), Prefixed))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fop-" + UUID.randomUUID(), Prefixed))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-" + UUID.randomUUID(), Prefixed))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-", Prefixed))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", Prefixed))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", Literal))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", LITERAL))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", PREFIXED))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), PREFIXED))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), PREFIXED))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID() + "-zzz", PREFIXED))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fooo-" + UUID.randomUUID(), PREFIXED))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fo-" + UUID.randomUUID(), PREFIXED))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fop-" + UUID.randomUUID(), PREFIXED))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-" + UUID.randomUUID(), PREFIXED))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-", PREFIXED))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", LITERAL))
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
@@ -539,16 +540,16 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
simpleAclAuthorizer.addAcls(Set[Acl](acl1), resource)
assertEquals(1, simpleAclAuthorizer.getAcls(principal).size)
- simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Topic, Acl.WildCardResource, Literal))
+ simpleAclAuthorizer.addAcls(Set[Acl](acl1), Resource(Topic, Acl.WildCardResource, LITERAL))
assertEquals(2, simpleAclAuthorizer.getAcls(principal).size)
val acl2 = new Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
- simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Group, "groupA", Literal))
+ simpleAclAuthorizer.addAcls(Set[Acl](acl1), Resource(Group, "groupA", LITERAL))
assertEquals(3, simpleAclAuthorizer.getAcls(principal).size)
// add prefixed principal acl on wildcard group name
val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName.charAt(0) + WildCardResource), Allow, WildCardHost, Write)
- simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Group, Acl.WildCardResource, Literal))
+ simpleAclAuthorizer.addAcls(Set[Acl](acl1), Resource(Group, Acl.WildCardResource, LITERAL))
assertEquals(4, simpleAclAuthorizer.getAcls(principal).size)
}
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index eec7175..7df30c9 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -28,6 +28,7 @@ import kafka.server.{CreateTokenResult, Defaults, DelegationTokenManager, KafkaC
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.resource.ResourceNameType.LITERAL
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
@@ -242,7 +243,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
//get all tokens for multiple owners (owner1, renewer4) and with permission
var acl = new Acl(owner1, Allow, WildCardHost, Describe)
- simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId3, Literal))
+ simpleAclAuthorizer.addAcls(Set(acl), Resource(kafka.security.auth.DelegationToken, tokenId3, LITERAL))
tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List(owner1, renewer4))
assert(tokens.size == 3)
@@ -257,7 +258,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
//get all tokens for multiple owners (renewer2, renewer3) which are token renewers principals and with permissions
hostSession = new Session(renewer2, InetAddress.getByName("192.168.1.1"))
acl = new Acl(renewer2, Allow, WildCardHost, Describe)
- simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId2, Literal))
+ simpleAclAuthorizer.addAcls(Set(acl), Resource(kafka.security.auth.DelegationToken, tokenId2, LITERAL))
tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, renewer2, List(renewer2, renewer3))
assert(tokens.size == 2)
@@ -271,7 +272,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
List()
}
else {
- def authorizeToken(tokenId: String) = simpleAclAuthorizer.authorize(hostSession, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId, Literal))
+ def authorizeToken(tokenId: String) = simpleAclAuthorizer.authorize(hostSession, Describe, Resource(kafka.security.auth.DelegationToken, tokenId, LITERAL))
def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, Option(requestedOwners), token, authorizeToken)
tokenManager.getTokens(eligible)
}
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.