You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/09/16 07:11:42 UTC

[kafka] branch trunk updated: KAFKA-5690; Add support to list ACLs for a given principal (KIP-357)

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e31147  KAFKA-5690; Add support to list ACLs for a given principal (KIP-357)
4e31147 is described below

commit 4e31147a41f03f4ad4c0c8f11414e49d33e60aac
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Sun Sep 16 00:11:05 2018 -0700

    KAFKA-5690; Add support to list ACLs for a given principal (KIP-357)
    
    Author: Manikumar Reddy <ma...@gmail.com>
    
    Reviewers: Dong Lin <li...@gmail.com>
    
    Closes #5633 from omkreddy/KAFKA-5690-LIST-PER-PRICIPAL
---
 core/src/main/scala/kafka/admin/AclCommand.scala | 64 ++++++++++++++++++++----
 1 file changed, 53 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index c2dda33..ad375d2 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -138,10 +138,22 @@ object AclCommand extends Logging {
     def listAcls(): Unit = {
       withAdminClient(opts) { adminClient =>
         val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+        val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
         val resourceToAcls = getAcls(adminClient, filters)
 
-        for ((resource, acls) <- resourceToAcls)
-          println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+        if (listPrincipals.isEmpty) {
+          for ((resource, acls) <- resourceToAcls)
+            println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+        } else {
+          listPrincipals.foreach(principal => {
+            println(s"ACLs for principal `$principal`")
+            val filteredResourceToAcls =  resourceToAcls.mapValues(acls =>
+              acls.filter(acl => principal.toString.equals(acl.principal))).filter(entry => entry._2.nonEmpty)
+
+            for ((resource, acls) <- filteredResourceToAcls)
+              println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+          })
+        }
       }
     }
 
@@ -237,13 +249,20 @@ object AclCommand extends Logging {
     def listAcls(): Unit = {
       withAuthorizer() { authorizer =>
         val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+        val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
 
-        val resourceToAcls: Iterable[(Resource, Set[Acl])] =
-          if (filters.isEmpty) authorizer.getAcls()
-          else filters.flatMap(filter => getAcls(authorizer, filter))
-
-        for ((resource, acls) <- resourceToAcls)
-          println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+        if (listPrincipals.isEmpty) {
+          val resourceToAcls =  getFilteredResourceToAcls(authorizer, filters)
+          for ((resource, acls) <- resourceToAcls)
+            println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+        } else {
+          listPrincipals.foreach(principal => {
+            println(s"ACLs for principal `$principal`")
+            val resourceToAcls =  getFilteredResourceToAcls(authorizer, filters, Some(principal))
+            for ((resource, acls) <- resourceToAcls)
+              println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+          })
+        }
       }
     }
 
@@ -256,9 +275,23 @@ object AclCommand extends Logging {
         )
     }
 
-    private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter): Map[Resource, Set[Acl]] =
-      authorizer.getAcls()
-        .filter { case (resource, acl) => filter.matches(resource.toPattern) }
+    private def getFilteredResourceToAcls(authorizer: Authorizer, filters: Set[ResourcePatternFilter],
+                                          listPrincipal: Option[KafkaPrincipal] = None): Iterable[(Resource, Set[Acl])] = {
+      if (filters.isEmpty)
+        if (listPrincipal.isEmpty)
+          authorizer.getAcls()
+        else
+          authorizer.getAcls(listPrincipal.get)
+      else filters.flatMap(filter => getAcls(authorizer, filter, listPrincipal))
+    }
+
+    private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter,
+                        listPrincipal: Option[KafkaPrincipal] = None): Map[Resource, Set[Acl]] =
+      if (listPrincipal.isEmpty)
+        authorizer.getAcls().filter { case (resource, acl) => filter.matches(resource.toPattern) }
+      else
+        authorizer.getAcls(listPrincipal.get).filter { case (resource, acl) => filter.matches(resource.toPattern) }
+
   }
 
   private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
@@ -521,6 +554,12 @@ object AclCommand extends Logging {
       .describedAs("deny-principal")
       .ofType(classOf[String])
 
+    val listPrincipalsOpt = parser.accepts("principal", "List ACLs for the specified principal. principal is in principalType:name format." +
+      " Note that principalType must be supported by the Authorizer being used. Multiple --principal option can be passed.")
+      .withOptionalArg()
+      .describedAs("principal")
+      .ofType(classOf[String])
+
     val allowHostsOpt = parser.accepts("allow-host", "Host from which principals listed in --allow-principal will have access. " +
       "If you have specified --allow-principal then the default for this option will be set to * which allows access from all hosts.")
       .withRequiredArg
@@ -568,6 +607,9 @@ object AclCommand extends Logging {
       CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
 
+      if (options.has(listPrincipalsOpt) && !options.has(listOpt))
+        CommandLineUtils.printUsageAndDie(parser, "The --principal option is only available if --list is set")
+
       if (options.has(producerOpt) && !options.has(topicOpt))
         CommandLineUtils.printUsageAndDie(parser, "With --producer you must specify a --topic")