You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/09/26 01:20:03 UTC

[kafka] branch 1.1 updated: KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5680)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 8b1d283  KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5680)
8b1d283 is described below

commit 8b1d283d8945788341890af759d6ed2be27ced7b
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Wed Sep 26 06:49:51 2018 +0530

    KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5680)
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala     | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index da85b00..979f7f6 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import com.typesafe.scalalogging.Logger
 import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
 import kafka.network.RequestChannel.Session
-import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls}
+import kafka.security.auth.SimpleAclAuthorizer.{NoAcls, VersionedAcls}
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
@@ -32,7 +32,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{SecurityUtils, Time}
 
 import scala.collection.JavaConverters._
-import scala.util.Random
+import scala.util.{Failure, Random, Success, Try}
 
 object SimpleAclAuthorizer {
   //optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in
@@ -216,11 +216,16 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     inWriteLock(lock) {
       val resourceTypes = zkClient.getResourceTypes()
       for (rType <- resourceTypes) {
-        val resourceType = ResourceType.fromString(rType)
-        val resourceNames = zkClient.getResourceNames(resourceType.name)
-        for (resourceName <- resourceNames) {
-          val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName))
-          updateCache(new Resource(resourceType, resourceName), versionedAcls)
+        val resourceType = Try(ResourceType.fromString(rType))
+        resourceType match {
+          case Success(resourceTypeObj) => {
+            val resourceNames = zkClient.getResourceNames(resourceTypeObj.name)
+            for (resourceName <- resourceNames) {
+              val versionedAcls = getAclsFromZk(Resource(resourceTypeObj, resourceName))
+              updateCache(new Resource(resourceTypeObj, resourceName), versionedAcls)
+            }
+          }
+          case Failure(f) => warn(s"Ignoring unknown ResourceType: $rType")
         }
       }
     }