You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/09/21 20:56:04 UTC
[kafka] branch 2.0 updated: KAFKA-7216: Ignore unknown
ResourceTypes while loading acl cache (#5673)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new cfd33b3 KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5673)
cfd33b3 is described below
commit cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Sat Sep 22 02:17:26 2018 +0530
KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5673)
Reviewers: Jun Rao <ju...@gmail.com>
---
.../kafka/security/auth/SimpleAclAuthorizer.scala | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 9472411..6de81d2 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import com.typesafe.scalalogging.Logger
import kafka.api.KAFKA_2_0_IV1
import kafka.network.RequestChannel.Session
-import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls}
+import kafka.security.auth.SimpleAclAuthorizer.{NoAcls, VersionedAcls}
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
@@ -33,7 +33,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{SecurityUtils, Time}
import scala.collection.JavaConverters._
-import scala.util.Random
+import scala.util.{Failure, Random, Success, Try}
object SimpleAclAuthorizer {
//optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in
@@ -259,12 +259,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
ZkAclStore.stores.foreach(store => {
val resourceTypes = zkClient.getResourceTypes(store.patternType)
for (rType <- resourceTypes) {
- val resourceType = ResourceType.fromString(rType)
- val resourceNames = zkClient.getResourceNames(store.patternType, resourceType)
- for (resourceName <- resourceNames) {
- val resource = new Resource(resourceType, resourceName, store.patternType)
- val versionedAcls = getAclsFromZk(resource)
- updateCache(resource, versionedAcls)
+ val resourceType = Try(ResourceType.fromString(rType))
+ resourceType match {
+ case Success(resourceTypeObj) => {
+ val resourceNames = zkClient.getResourceNames(store.patternType, resourceTypeObj)
+ for (resourceName <- resourceNames) {
+ val resource = new Resource(resourceTypeObj, resourceName, store.patternType)
+ val versionedAcls = getAclsFromZk(resource)
+ updateCache(resource, versionedAcls)
+ }
+ }
+ case Failure(f) => warn(s"Ignoring unknown ResourceType: $rType")
}
}
})