You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/09/16 07:11:42 UTC
[kafka] branch trunk updated: KAFKA-5690;
Add support to list ACLs for a given principal (KIP-357)
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4e31147 KAFKA-5690; Add support to list ACLs for a given principal (KIP-357)
4e31147 is described below
commit 4e31147a41f03f4ad4c0c8f11414e49d33e60aac
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Sun Sep 16 00:11:05 2018 -0700
KAFKA-5690; Add support to list ACLs for a given principal (KIP-357)
Author: Manikumar Reddy <ma...@gmail.com>
Reviewers: Dong Lin <li...@gmail.com>
Closes #5633 from omkreddy/KAFKA-5690-LIST-PER-PRICIPAL
---
core/src/main/scala/kafka/admin/AclCommand.scala | 64 ++++++++++++++++++++----
1 file changed, 53 insertions(+), 11 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index c2dda33..ad375d2 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -138,10 +138,22 @@ object AclCommand extends Logging {
def listAcls(): Unit = {
withAdminClient(opts) { adminClient =>
val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+ val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
val resourceToAcls = getAcls(adminClient, filters)
- for ((resource, acls) <- resourceToAcls)
- println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+ if (listPrincipals.isEmpty) {
+ for ((resource, acls) <- resourceToAcls)
+ println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+ } else {
+ listPrincipals.foreach(principal => {
+ println(s"ACLs for principal `$principal`")
+ val filteredResourceToAcls = resourceToAcls.mapValues(acls =>
+ acls.filter(acl => principal.toString.equals(acl.principal))).filter(entry => entry._2.nonEmpty)
+
+ for ((resource, acls) <- filteredResourceToAcls)
+ println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+ })
+ }
}
}
@@ -237,13 +249,20 @@ object AclCommand extends Logging {
def listAcls(): Unit = {
withAuthorizer() { authorizer =>
val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+ val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
- val resourceToAcls: Iterable[(Resource, Set[Acl])] =
- if (filters.isEmpty) authorizer.getAcls()
- else filters.flatMap(filter => getAcls(authorizer, filter))
-
- for ((resource, acls) <- resourceToAcls)
- println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+ if (listPrincipals.isEmpty) {
+ val resourceToAcls = getFilteredResourceToAcls(authorizer, filters)
+ for ((resource, acls) <- resourceToAcls)
+ println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+ } else {
+ listPrincipals.foreach(principal => {
+ println(s"ACLs for principal `$principal`")
+ val resourceToAcls = getFilteredResourceToAcls(authorizer, filters, Some(principal))
+ for ((resource, acls) <- resourceToAcls)
+ println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+ })
+ }
}
}
@@ -256,9 +275,23 @@ object AclCommand extends Logging {
)
}
- private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter): Map[Resource, Set[Acl]] =
- authorizer.getAcls()
- .filter { case (resource, acl) => filter.matches(resource.toPattern) }
+ private def getFilteredResourceToAcls(authorizer: Authorizer, filters: Set[ResourcePatternFilter],
+ listPrincipal: Option[KafkaPrincipal] = None): Iterable[(Resource, Set[Acl])] = {
+ if (filters.isEmpty)
+ if (listPrincipal.isEmpty)
+ authorizer.getAcls()
+ else
+ authorizer.getAcls(listPrincipal.get)
+ else filters.flatMap(filter => getAcls(authorizer, filter, listPrincipal))
+ }
+
+ private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter,
+ listPrincipal: Option[KafkaPrincipal] = None): Map[Resource, Set[Acl]] =
+ if (listPrincipal.isEmpty)
+ authorizer.getAcls().filter { case (resource, acl) => filter.matches(resource.toPattern) }
+ else
+ authorizer.getAcls(listPrincipal.get).filter { case (resource, acl) => filter.matches(resource.toPattern) }
+
}
private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
@@ -521,6 +554,12 @@ object AclCommand extends Logging {
.describedAs("deny-principal")
.ofType(classOf[String])
+ val listPrincipalsOpt = parser.accepts("principal", "List ACLs for the specified principal. principal is in principalType:name format." +
+ " Note that principalType must be supported by the Authorizer being used. Multiple --principal option can be passed.")
+ .withOptionalArg()
+ .describedAs("principal")
+ .ofType(classOf[String])
+
val allowHostsOpt = parser.accepts("allow-host", "Host from which principals listed in --allow-principal will have access. " +
"If you have specified --allow-principal then the default for this option will be set to * which allows access from all hosts.")
.withRequiredArg
@@ -568,6 +607,9 @@ object AclCommand extends Logging {
CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
+ if (options.has(listPrincipalsOpt) && !options.has(listOpt))
+ CommandLineUtils.printUsageAndDie(parser, "The --principal option is only available if --list is set")
+
if (options.has(producerOpt) && !options.has(topicOpt))
CommandLineUtils.printUsageAndDie(parser, "With --producer you must specify a --topic")