You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/09/21 20:56:04 UTC

[kafka] branch 2.0 updated: KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5673)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new cfd33b3  KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5673)
cfd33b3 is described below

commit cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Sat Sep 22 02:17:26 2018 +0530

    KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5673)
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala   | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 9472411..6de81d2 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import com.typesafe.scalalogging.Logger
 import kafka.api.KAFKA_2_0_IV1
 import kafka.network.RequestChannel.Session
-import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls}
+import kafka.security.auth.SimpleAclAuthorizer.{NoAcls, VersionedAcls}
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
@@ -33,7 +33,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{SecurityUtils, Time}
 
 import scala.collection.JavaConverters._
-import scala.util.Random
+import scala.util.{Failure, Random, Success, Try}
 
 object SimpleAclAuthorizer {
   //optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in
@@ -259,12 +259,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       ZkAclStore.stores.foreach(store => {
         val resourceTypes = zkClient.getResourceTypes(store.patternType)
         for (rType <- resourceTypes) {
-          val resourceType = ResourceType.fromString(rType)
-          val resourceNames = zkClient.getResourceNames(store.patternType, resourceType)
-          for (resourceName <- resourceNames) {
-            val resource = new Resource(resourceType, resourceName, store.patternType)
-            val versionedAcls = getAclsFromZk(resource)
-            updateCache(resource, versionedAcls)
+          val resourceType = Try(ResourceType.fromString(rType))
+          resourceType match {
+            case Success(resourceTypeObj) => {
+              val resourceNames = zkClient.getResourceNames(store.patternType, resourceTypeObj)
+              for (resourceName <- resourceNames) {
+                val resource = new Resource(resourceTypeObj, resourceName, store.patternType)
+                val versionedAcls = getAclsFromZk(resource)
+                updateCache(resource, versionedAcls)
+              }
+            }
+            case Failure(f) => warn(s"Ignoring unknown ResourceType: $rType")
           }
         }
       })