You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/20 19:43:29 UTC
kafka git commit: KAFKA-2863; Add a `close()` method to `Authorizer`
Repository: kafka
Updated Branches:
refs/heads/trunk 7b3d1bf6a -> b1d17e7ef
KAFKA-2863; Add a `close()` method to `Authorizer`
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #568 from ijuma/kafka-2863-authorizer-close
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1d17e7e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1d17e7e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1d17e7e
Branch: refs/heads/trunk
Commit: b1d17e7ef0e901234d95d7825f6862d2aaead76f
Parents: 7b3d1bf
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Nov 20 10:43:25 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Nov 20 10:43:25 2015 -0800
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AclCommand.scala | 70 +++++++++++---------
.../scala/kafka/security/auth/Authorizer.scala | 6 ++
.../security/auth/SimpleAclAuthorizer.scala | 6 +-
.../main/scala/kafka/server/KafkaServer.scala | 10 +--
.../scala/unit/kafka/admin/AclCommandTest.scala | 21 ++++--
.../security/auth/SimpleAclAuthorizerTest.scala | 7 +-
6 files changed, 73 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d17e7e/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 6ec0cf8..505be5a 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -58,7 +58,7 @@ object AclCommand {
}
}
- def getAuthorizer(opts: AclCommandOptions): Authorizer = {
+ def withAuthorizer(opts: AclCommandOptions)(f: Authorizer => Unit) {
var authorizerProperties = Map.empty[String, Any]
if (opts.options.has(opts.authorizerPropertiesOpt)) {
val props = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala.map(_.split("="))
@@ -66,55 +66,59 @@ object AclCommand {
}
val authorizerClass = opts.options.valueOf(opts.authorizerOpt)
- val authZ: Authorizer = CoreUtils.createObject(authorizerClass)
+ val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
authZ.configure(authorizerProperties.asJava)
- authZ
+ try f(authZ)
+ finally CoreUtils.swallow(authZ.close())
}
private def addAcl(opts: AclCommandOptions) {
- val authZ: Authorizer = getAuthorizer(opts)
- val resourceToAcl = getResourceToAcls(opts)
+ withAuthorizer(opts) { authorizer =>
+ val resourceToAcl = getResourceToAcls(opts)
- if (resourceToAcl.values.exists(_.isEmpty))
- CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add acls.")
+ if (resourceToAcl.values.exists(_.isEmpty))
+ CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add acls.")
- for ((resource, acls) <- resourceToAcl) {
- val acls = resourceToAcl(resource)
- println(s"Adding following acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
- authZ.addAcls(acls, resource)
- }
+ for ((resource, acls) <- resourceToAcl) {
+ val acls = resourceToAcl(resource)
+ println(s"Adding following acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+ authorizer.addAcls(acls, resource)
+ }
- listAcl(opts)
+ listAcl(opts)
+ }
}
private def removeAcl(opts: AclCommandOptions) {
- val authZ: Authorizer = getAuthorizer(opts)
- val resourceToAcl = getResourceToAcls(opts)
-
- for ((resource, acls) <- resourceToAcl) {
- if (acls.isEmpty) {
- if (confirmAction(s"Are you sure you want to delete all acls for resource: $resource y/n?"))
- authZ.removeAcls(resource)
- } else {
- if (confirmAction(s"Are you sure you want to remove acls: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource $resource y/n?"))
- authZ.removeAcls(acls, resource)
+ withAuthorizer(opts) { authorizer =>
+ val resourceToAcl = getResourceToAcls(opts)
+
+ for ((resource, acls) <- resourceToAcl) {
+ if (acls.isEmpty) {
+ if (confirmAction(s"Are you sure you want to delete all acls for resource: $resource y/n?"))
+ authorizer.removeAcls(resource)
+ } else {
+ if (confirmAction(s"Are you sure you want to remove acls: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource $resource y/n?"))
+ authorizer.removeAcls(acls, resource)
+ }
}
- }
- listAcl(opts)
+ listAcl(opts)
+ }
}
private def listAcl(opts: AclCommandOptions) {
- val authZ = getAuthorizer(opts)
- val resources = getResource(opts, dieIfNoResourceFound = false)
+ withAuthorizer(opts) { authorizer =>
+ val resources = getResource(opts, dieIfNoResourceFound = false)
- val resourceToAcls = if(resources.isEmpty)
- authZ.getAcls()
- else
- resources.map(resource => (resource -> authZ.getAcls(resource)))
+ val resourceToAcls = if (resources.isEmpty)
+ authorizer.getAcls()
+ else
+ resources.map(resource => (resource -> authorizer.getAcls(resource)))
- for ((resource, acls) <- resourceToAcls)
- println(s"Following is list of acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+ for ((resource, acls) <- resourceToAcls)
+ println(s"Following is list of acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
+ }
}
private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d17e7e/core/src/main/scala/kafka/security/auth/Authorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala
index 939ed12..4c708b2 100644
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala
@@ -81,5 +81,11 @@ trait Authorizer extends Configurable {
* gets the map of resource to acls for all resources.
*/
def getAcls(): Map[Resource, Set[Acl]]
+
+ /**
+ * Closes this instance.
+ */
+ def close(): Unit
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d17e7e/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index cae8f2a..d0d226c 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -109,7 +109,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
}
override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
- val principal: KafkaPrincipal = session.principal
+ val principal = session.principal
val host = session.clientAddress.getHostAddress
val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))
@@ -226,6 +226,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
aclCache.toMap
}
+ def close() {
+ if (zkUtils != null) zkUtils.close()
+ }
+
private def loadCache() {
var acls = Set.empty[Acl]
val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d17e7e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index e8ea204..9eedbe2 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -109,6 +109,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
val brokerState: BrokerState = new BrokerState
var apis: KafkaApis = null
+ var authorizer: Option[Authorizer] = None
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
@@ -191,12 +192,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
consumerCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
- val authorizer: Option[Authorizer] = if (config.authorizerClassName != null && !config.authorizerClassName.isEmpty) {
- val authZ: Authorizer = CoreUtils.createObject(config.authorizerClassName)
+ authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
+ val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
- Option(authZ)
- } else {
- None
+ authZ
}
/* start processing requests */
@@ -533,6 +532,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
CoreUtils.swallow(kafkaScheduler.shutdown())
if(apis != null)
CoreUtils.swallow(apis.close())
+ CoreUtils.swallow(authorizer.foreach(_.close()))
if(replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown())
if(logManager != null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d17e7e/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 0bb950d..65393d8 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -78,7 +78,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
AclCommand.main(args ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
for (resource <- resources) {
- TestUtils.waitAndVerifyAcls(acls, getAuthorizer(brokerProps), resource)
+ withAuthorizer(brokerProps) { authorizer =>
+ TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
+ }
}
testRemove(resources, resourceCmd, args, brokerProps)
@@ -97,7 +99,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
AclCommand.main(args ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add")
for ((resources, acls) <- resourcesToAcls) {
for (resource <- resources) {
- TestUtils.waitAndVerifyAcls(acls, getAuthorizer(brokerProps), resource)
+ withAuthorizer(brokerProps) { authorizer =>
+ TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
+ }
}
}
testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand, args, brokerProps)
@@ -108,7 +112,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
for (resource <- resources) {
Console.withIn(new StringReader(s"y${AclCommand.Newline}" * resources.size)) {
AclCommand.main(args ++ resourceCmd :+ "--remove")
- TestUtils.waitAndVerifyAcls(Set.empty[Acl], getAuthorizer(brokerProps), resource)
+ withAuthorizer(brokerProps) { authorizer =>
+ TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource)
+ }
}
}
}
@@ -124,11 +130,12 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
Users.foldLeft(cmd) ((cmd, user) => cmd ++ Array(principalCmd, user.toString))
}
- def getAuthorizer(props: Properties): Authorizer = {
+ def withAuthorizer(props: Properties)(f: Authorizer => Unit) {
val kafkaConfig = KafkaConfig.fromProps(props)
val authZ = new SimpleAclAuthorizer
- authZ.configure(kafkaConfig.originals)
-
- authZ
+ try {
+ authZ.configure(kafkaConfig.originals)
+ f(authZ)
+ } finally authZ.close()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d17e7e/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index a4f61df..efcf930 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -26,7 +26,7 @@ import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.junit.Assert._
-import org.junit.{Before, Test}
+import org.junit.{After, Before, Test}
class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
@@ -51,6 +51,11 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
resource = new Resource(Topic, UUID.randomUUID().toString)
}
+ @After
+ override def tearDown(): Unit = {
+ simpleAclAuthorizer.close()
+ }
+
@Test
def testTopicAcl() {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)