You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/20 19:43:29 UTC

kafka git commit: KAFKA-2863; Add a `close()` method to `Authorizer`

Repository: kafka
Updated Branches:
  refs/heads/trunk 7b3d1bf6a -> b1d17e7ef


KAFKA-2863; Add a `close()` method to `Authorizer`

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #568 from ijuma/kafka-2863-authorizer-close


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1d17e7e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1d17e7e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1d17e7e

Branch: refs/heads/trunk
Commit: b1d17e7ef0e901234d95d7825f6862d2aaead76f
Parents: 7b3d1bf
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Nov 20 10:43:25 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Nov 20 10:43:25 2015 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AclCommand.scala | 70 +++++++++++---------
 .../scala/kafka/security/auth/Authorizer.scala  |  6 ++
 .../security/auth/SimpleAclAuthorizer.scala     |  6 +-
 .../main/scala/kafka/server/KafkaServer.scala   | 10 +--
 .../scala/unit/kafka/admin/AclCommandTest.scala | 21 ++++--
 .../security/auth/SimpleAclAuthorizerTest.scala |  7 +-
 6 files changed, 73 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d17e7e/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 6ec0cf8..505be5a 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -58,7 +58,7 @@ object AclCommand {
     }
   }
 
-  def getAuthorizer(opts: AclCommandOptions): Authorizer = {
+  def withAuthorizer(opts: AclCommandOptions)(f: Authorizer => Unit) {
     var authorizerProperties = Map.empty[String, Any]
     if (opts.options.has(opts.authorizerPropertiesOpt)) {
       val props = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala.map(_.split("="))
@@ -66,55 +66,59 @@ object AclCommand {
     }
 
     val authorizerClass = opts.options.valueOf(opts.authorizerOpt)
-    val authZ: Authorizer = CoreUtils.createObject(authorizerClass)
+    val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
     authZ.configure(authorizerProperties.asJava)
-    authZ
+    try f(authZ)
+    finally CoreUtils.swallow(authZ.close())
   }
 
   private def addAcl(opts: AclCommandOptions) {
-    val authZ: Authorizer = getAuthorizer(opts)
-    val resourceToAcl = getResourceToAcls(opts)
+    withAuthorizer(opts) { authorizer =>
+      val resourceToAcl = getResourceToAcls(opts)
 
-    if (resourceToAcl.values.exists(_.isEmpty))
-      CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add acls.")
+      if (resourceToAcl.values.exists(_.isEmpty))
+        CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add acls.")
 
-    for ((resource, acls) <- resourceToAcl) {
-      val acls = resourceToAcl(resource)
-      println(s"Adding following acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
-      authZ.addAcls(acls, resource)
-    }
+      for ((resource, acls) <- resourceToAcl) {
+        val acls = resourceToAcl(resource)
+        println(s"Adding following acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+        authorizer.addAcls(acls, resource)
+      }
 
-    listAcl(opts)
+      listAcl(opts)
+    }
   }
 
   private def removeAcl(opts: AclCommandOptions) {
-    val authZ: Authorizer = getAuthorizer(opts)
-    val resourceToAcl = getResourceToAcls(opts)
-
-    for ((resource, acls) <- resourceToAcl) {
-      if (acls.isEmpty) {
-        if (confirmAction(s"Are you sure you want to delete all acls for resource: $resource y/n?"))
-          authZ.removeAcls(resource)
-      } else {
-        if (confirmAction(s"Are you sure you want to remove acls: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource $resource y/n?"))
-          authZ.removeAcls(acls, resource)
+    withAuthorizer(opts) { authorizer =>
+      val resourceToAcl = getResourceToAcls(opts)
+
+      for ((resource, acls) <- resourceToAcl) {
+        if (acls.isEmpty) {
+          if (confirmAction(s"Are you sure you want to delete all acls for resource: $resource y/n?"))
+            authorizer.removeAcls(resource)
+        } else {
+          if (confirmAction(s"Are you sure you want to remove acls: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource $resource y/n?"))
+            authorizer.removeAcls(acls, resource)
+        }
       }
-    }
 
-    listAcl(opts)
+      listAcl(opts)
+    }
   }
 
   private def listAcl(opts: AclCommandOptions) {
-    val authZ = getAuthorizer(opts)
-    val resources = getResource(opts, dieIfNoResourceFound = false)
+    withAuthorizer(opts) { authorizer =>
+      val resources = getResource(opts, dieIfNoResourceFound = false)
 
-    val resourceToAcls = if(resources.isEmpty)
-      authZ.getAcls()
-    else
-      resources.map(resource => (resource -> authZ.getAcls(resource)))
+      val resourceToAcls = if (resources.isEmpty)
+        authorizer.getAcls()
+      else
+        resources.map(resource => (resource -> authorizer.getAcls(resource)))
 
-    for ((resource, acls) <- resourceToAcls)
-      println(s"Following is list of acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+      for ((resource, acls) <- resourceToAcls)
+        println(s"Following is list of acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+    }
   }
 
   private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d17e7e/core/src/main/scala/kafka/security/auth/Authorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala
index 939ed12..4c708b2 100644
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala
@@ -81,5 +81,11 @@ trait Authorizer extends Configurable {
    * gets the map of resource to acls for all resources.
    */
   def getAcls(): Map[Resource, Set[Acl]]
+
+  /**
+   * Closes this instance.
+   */
+  def close(): Unit
+
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d17e7e/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index cae8f2a..d0d226c 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -109,7 +109,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
-    val principal: KafkaPrincipal = session.principal
+    val principal = session.principal
     val host = session.clientAddress.getHostAddress
     val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))
 
@@ -226,6 +226,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     aclCache.toMap
   }
 
+  def close() {
+    if (zkUtils != null) zkUtils.close()
+  }
+
   private def loadCache()  {
     var acls = Set.empty[Acl]
     val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d17e7e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index e8ea204..9eedbe2 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -109,6 +109,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
   val brokerState: BrokerState = new BrokerState
 
   var apis: KafkaApis = null
+  var authorizer: Option[Authorizer] = None
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
 
@@ -191,12 +192,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         consumerCoordinator.startup()
 
         /* Get the authorizer and initialize it if one is specified.*/
-        val authorizer: Option[Authorizer] = if (config.authorizerClassName != null && !config.authorizerClassName.isEmpty) {
-          val authZ: Authorizer = CoreUtils.createObject(config.authorizerClassName)
+        authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
+          val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
           authZ.configure(config.originals())
-          Option(authZ)
-        } else {
-          None
+          authZ
         }
 
         /* start processing requests */
@@ -533,6 +532,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         CoreUtils.swallow(kafkaScheduler.shutdown())
         if(apis != null)
           CoreUtils.swallow(apis.close())
+        CoreUtils.swallow(authorizer.foreach(_.close()))
         if(replicaManager != null)
           CoreUtils.swallow(replicaManager.shutdown())
         if(logManager != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d17e7e/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 0bb950d..65393d8 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -78,7 +78,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
         val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
           AclCommand.main(args ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
           for (resource <- resources) {
-            TestUtils.waitAndVerifyAcls(acls, getAuthorizer(brokerProps), resource)
+            withAuthorizer(brokerProps) { authorizer =>
+              TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
+            }
           }
 
           testRemove(resources, resourceCmd, args, brokerProps)
@@ -97,7 +99,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
       AclCommand.main(args ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add")
       for ((resources, acls) <- resourcesToAcls) {
         for (resource <- resources) {
-          TestUtils.waitAndVerifyAcls(acls, getAuthorizer(brokerProps), resource)
+          withAuthorizer(brokerProps) { authorizer =>
+            TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
+          }
         }
       }
       testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand, args, brokerProps)
@@ -108,7 +112,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
     for (resource <- resources) {
       Console.withIn(new StringReader(s"y${AclCommand.Newline}" * resources.size)) {
         AclCommand.main(args ++ resourceCmd :+ "--remove")
-        TestUtils.waitAndVerifyAcls(Set.empty[Acl], getAuthorizer(brokerProps), resource)
+        withAuthorizer(brokerProps) { authorizer =>
+          TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource)
+        }
       }
     }
   }
@@ -124,11 +130,12 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
     Users.foldLeft(cmd) ((cmd, user) => cmd ++ Array(principalCmd, user.toString))
   }
 
-  def getAuthorizer(props: Properties): Authorizer = {
+  def withAuthorizer(props: Properties)(f: Authorizer => Unit) {
     val kafkaConfig = KafkaConfig.fromProps(props)
     val authZ = new SimpleAclAuthorizer
-    authZ.configure(kafkaConfig.originals)
-
-    authZ
+    try {
+      authZ.configure(kafkaConfig.originals)
+      f(authZ)
+    } finally authZ.close()
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d17e7e/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index a4f61df..efcf930 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -26,7 +26,7 @@ import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Assert._
-import org.junit.{Before, Test}
+import org.junit.{After, Before, Test}
 
 class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
@@ -51,6 +51,11 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     resource = new Resource(Topic, UUID.randomUUID().toString)
   }
 
+  @After
+  override def tearDown(): Unit = {
+    simpleAclAuthorizer.close()
+  }
+
   @Test
   def testTopicAcl() {
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)