You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/22 20:24:00 UTC

[jira] [Commented] (KAFKA-2794) Add group support for authorizer acls

    [ https://issues.apache.org/jira/browse/KAFKA-2794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301915#comment-16301915 ] 

ASF GitHub Bot commented on KAFKA-2794:
---------------------------------------

guozhangwang closed pull request #483: KAFKA-2794: Added group support to authorizer.
URL: https://github.com/apache/kafka/pull/483
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/cache/TTLCache.java b/clients/src/main/java/org/apache/kafka/common/cache/TTLCache.java
new file mode 100644
index 00000000000..628ddc688b1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/cache/TTLCache.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.common.cache;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A cache that can expire keys with TTL.
+ */
+public class TTLCache<K, V> implements Cache<K, V> {
+    private final ConcurrentMap<K, ValueWithTimeStamp<V>> cache;
+    private final Long ttlMillis;
+    private final Time time;
+
+    public TTLCache(final long ttlMilliSeconds, Time time) {
+        this.ttlMillis = ttlMilliSeconds;
+        this.cache = new ConcurrentHashMap<>();
+        this.time = time;
+    }
+
+    @Override
+    public V get(K key) {
+        ValueWithTimeStamp<V> valueWithTimeStamp = cache.get(key);
+        if(valueWithTimeStamp != null && (valueWithTimeStamp.timeStamp + ttlMillis) < time.milliseconds()) {
+            return valueWithTimeStamp.value;
+        } else {
+            remove(key);
+            return null;
+        }
+    }
+
+    @Override
+    public void put(K key, V value) {
+        cache.putIfAbsent(key, new ValueWithTimeStamp<V>(value, this.time));
+    }
+
+    @Override
+    public boolean remove(K key) {
+        return cache.remove(key) != null;
+    }
+
+    @Override
+    public long size() {
+        return cache.size();
+    }
+
+    private static class ValueWithTimeStamp<V> {
+        V value;
+        long timeStamp;
+
+        public ValueWithTimeStamp(V value, Time time) {
+            this.value = value;
+            this.timeStamp = time.milliseconds();
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
index 06c59d1c083..bb19ca52932 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
@@ -21,6 +21,7 @@
 public class KafkaPrincipal implements Principal {
     public static final String SEPARATOR = ":";
     public static final String USER_TYPE = "User";
+    public static final String GROUP_TYPE = "Group";
     public final static KafkaPrincipal ANONYMOUS = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ANONYMOUS");
 
     private String principalType;
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index fd6d420d81e..3938181a1e3 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -30,9 +30,9 @@ object AclCommand {
   val Delimiter = ','
   val Newline = scala.util.Properties.lineSeparator
   val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
-    Topic -> Set(Read, Write, Describe),
-    Group -> Set(Read),
-    Cluster -> Set(Create, ClusterAction)
+    Topic -> Set(Read, Write, Describe, All),
+    Group -> Set(Read, All),
+    Cluster -> Set(Create, ClusterAction, All)
   )
 
   def main(args: Array[String]) {
diff --git a/core/src/main/scala/kafka/security/auth/KerberosPrincipalToLocal.scala b/core/src/main/scala/kafka/security/auth/KerberosPrincipalToLocal.scala
new file mode 100644
index 00000000000..1496ac92823
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/KerberosPrincipalToLocal.scala
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.auth
+
+import java.util
+
+import kafka.utils._
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+
+class KerberosPrincipalToLocal extends PrincipalToLocal with Logging  {
+
+  override def toLocal(principal: KafkaPrincipal): KafkaPrincipal = {
+    new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName.split("[/@]")(0))
+  }
+
+  override def configure(configs: util.Map[String, _]) = {
+    //no-op
+  }
+}
diff --git a/core/src/main/scala/kafka/security/auth/PrincipalToGroup.scala b/core/src/main/scala/kafka/security/auth/PrincipalToGroup.scala
new file mode 100644
index 00000000000..70edd2bb37a
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/PrincipalToGroup.scala
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.auth
+
+import java.security.Principal
+
+import org.apache.kafka.common.Configurable
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+
+/**
+ * A plugin that converts kafka Principal to Groups.
+ */
+trait PrincipalToGroup extends Configurable {
+
+  /**
+   *
+   * @param principal
+   * @return Set of Groups this principal is part of, empty Set if not part of any group.
+   */
+  def toGroups(principal: KafkaPrincipal): Set[KafkaPrincipal]
+}
diff --git a/core/src/main/scala/kafka/security/auth/PrincipalToLocal.scala b/core/src/main/scala/kafka/security/auth/PrincipalToLocal.scala
new file mode 100644
index 00000000000..748484ff650
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/PrincipalToLocal.scala
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.auth
+
+import org.apache.kafka.common.Configurable
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+
+
+trait PrincipalToLocal extends Configurable {
+
+  /**
+   * Given a principal converts it to local identity.
+   * @param principal
+   * @return local identity.
+   */
+  def toLocal(principal: KafkaPrincipal): KafkaPrincipal
+}
diff --git a/core/src/main/scala/kafka/security/auth/PrincipalToUnixGroup.scala b/core/src/main/scala/kafka/security/auth/PrincipalToUnixGroup.scala
new file mode 100644
index 00000000000..542b1257322
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/PrincipalToUnixGroup.scala
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.auth
+
+import java.util
+
+import kafka.utils.{SystemTime, Logging}
+import org.apache.kafka.common.cache.{Cache, TTLCache}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.utils.Shell.ShellCommandExecutor
+import org.apache.kafka.common.utils.SystemTime
+
+/**
+ * Converts principal to Unix Group
+ */
+
+object PrincipalToUnixGroup {
+  val GroupOutputRegex = "[ \t\n\r\f]"
+
+  val GroupCacheExpirationTTLMillisProperty = "group.cache.ttl.millis"
+
+  //default TTL is 1 hour.
+  val GroupCacheExpirationTTLMillisDefault: Long = 1 * 60 * 60 * 1000
+}
+
+class PrincipalToUnixGroup extends PrincipalToGroup with Logging {
+
+  private var principalToGroupsCache: Cache[KafkaPrincipal, Set[KafkaPrincipal]] = null
+
+  /**
+   *
+   * @param principal
+   * @return Set of Groups this principal is part of, empty Set if not part of any group.
+   */
+  override def toGroups(principal: KafkaPrincipal): Set[KafkaPrincipal] = {
+    var groups = if (principal != null) principalToGroupsCache.get(principal) else Set.empty[KafkaPrincipal]
+    if (groups == null) {
+      groups = getGroups(principal)
+      principalToGroupsCache.put(principal, groups)
+    }
+    groups
+  }
+
+  /**
+   * Gets the groups by executing a shell command.
+   * @param principal
+   * @return
+   */
+  private def getGroups(principal: KafkaPrincipal): Set[KafkaPrincipal] = {
+    val shellCmdExecutor = new ShellCommandExecutor(Array("bash", "-c", s"id -gn ${principal.getName} && id -Gn ${principal.getName}"), 5000)
+    try {
+      shellCmdExecutor.execute()
+    } catch {
+      case e: Exception => logger.warn(s"Failed to get groups for $principal, ignoring exception and returning empty set.", e)
+    }
+    val output = shellCmdExecutor.output()
+    val groups = if (output != null && !output.trim.isEmpty)
+      output.split(PrincipalToUnixGroup.GroupOutputRegex).map(group => new KafkaPrincipal(KafkaPrincipal.GROUP_TYPE, group)).toSet
+    else
+      Set.empty[KafkaPrincipal]
+
+    groups
+  }
+
+  override def configure(configs: util.Map[String, _]) = {
+    val ttlMillis: Long = if (configs !=null && configs.containsKey(PrincipalToUnixGroup.GroupCacheExpirationTTLMillisProperty))
+      configs.get(PrincipalToUnixGroup.GroupCacheExpirationTTLMillisProperty).asInstanceOf[Long]
+    else
+      PrincipalToUnixGroup.GroupCacheExpirationTTLMillisDefault
+
+    //The time instance is not propagated to Authorizer so right now there is no way to propagate that to this layer.
+    principalToGroupsCache = new TTLCache[KafkaPrincipal, Set[KafkaPrincipal]](ttlMillis, new SystemTime())
+  }
+}
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 7bfb09206fd..28865aed0ab 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -26,7 +26,7 @@ import kafka.network.RequestChannel.Session
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
-import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.I0Itec.zkclient.{IZkStateListener}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import scala.collection.JavaConverters._
@@ -43,7 +43,10 @@ object SimpleAclAuthorizer {
   val SuperUsersProp = "super.users"
   //If set to true when no acls are found for a resource , authorizer allows access to everyone. Defaults to false.
   val AllowEveryoneIfNoAclIsFoundProp = "allow.everyone.if.no.acl.found"
-
+  //FQCN of Plugin class that can convert kafka principal to local user name.
+  val PrincipalToLocalProp = "principal.to.local.class.name"
+  //FQCN of Plugin class that can convert principal to group name.
+  val PrincipalToGroupPlugin = "principal.to.group.class.name"
   /**
    * The root acl storage node. Under this node there will be one child node per resource type (Topic, Cluster, ConsumerGroup).
    * under each resourceType there will be a unique child for each resource instance and the data for that child will contain
@@ -66,8 +69,12 @@ object SimpleAclAuthorizer {
 
 class SimpleAclAuthorizer extends Authorizer with Logging {
   private val authorizerLogger = Logger.getLogger("kafka.authorizer.logger")
+
   private var superUsers = Set.empty[KafkaPrincipal]
   private var shouldAllowEveryoneIfNoAclIsFound = false
+  private var principalToLocalPlugin: Option[PrincipalToLocal] = None
+  private var principalToGroupPlugin: Option[PrincipalToGroup] = None
+
   private var zkUtils: ZkUtils = null
   private var aclChangeListener: ZkNodeChangeNotificationListener = null
 
@@ -87,6 +94,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       case str: String if str.nonEmpty => str.split(",").map(s => KafkaPrincipal.fromString(s.trim)).toSet
     }.getOrElse(Set.empty[KafkaPrincipal])
 
+    def getInstanceForConfig(configName: String): Option[Nothing] = {
+      configs.get(configName).collect {
+        case str: String if str.nonEmpty => CoreUtils.createObject(str)
+      }
+    }
+    principalToLocalPlugin = getInstanceForConfig(SimpleAclAuthorizer.PrincipalToLocalProp)
+    principalToGroupPlugin = getInstanceForConfig(SimpleAclAuthorizer.PrincipalToGroupPlugin)
+
+    principalToLocalPlugin.foreach(_.configure(javaConfigs))
+    principalToGroupPlugin.foreach(_.configure(javaConfigs))
+
     shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).map(_.toString.toBoolean).getOrElse(false)
 
     val zkUrl = configs.getOrElse(SimpleAclAuthorizer.ZkUrlProp, kafkaConfig.zkConnect).toString
@@ -109,12 +127,16 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
-    val principal: KafkaPrincipal = session.principal
+    val localPrincipal: KafkaPrincipal = principalToLocalPlugin map (ptol => ptol.toLocal(session.principal)) getOrElse(null)
+    var principals = (if (localPrincipal != null) Set(session.principal, localPrincipal) else Set(session.principal))
+    val groups: Set[KafkaPrincipal] = principals.map(principal =>
+      principalToGroupPlugin map (ptog => ptog.toGroups(principal)) getOrElse(Set.empty[KafkaPrincipal])).flatten.toSet
     val host = session.host
     val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))
+    principals = principals ++ groups
 
     //check if there is any Deny acl match that would disallow this operation.
-    val denyMatch = aclMatch(session, operation, resource, principal, host, Deny, acls)
+    val denyMatch = principals.foldLeft(false)((result, principal) => result || aclMatch(session, operation, resource, principal, host, Deny, acls))
 
     //if principal is allowed to read or write we allow describe by default, the reverse does not apply to Deny.
     val ops = if (Describe == operation)
@@ -123,15 +145,16 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       Set[Operation](operation)
 
     //now check if there is any allow acl that will allow this operation.
-    val allowMatch = ops.exists(operation => aclMatch(session, operation, resource, principal, host, Allow, acls))
+    val allowMatch = ops.exists(operation =>
+      principals.foldLeft(false)((result, principal) => result || aclMatch(session, operation, resource, principal, host, Allow, acls)))
 
     //we allow an operation if a user is a super user or if no acls are found and user has configured to allow all users
     //when no acls are found or if no deny acls are found and at least one allow acls matches.
-    val authorized = isSuperUser(operation, resource, principal, host) ||
-      isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
+    val authorized = principals.foldLeft(false)(
+      (result, principal) => isSuperUser(operation, resource, principal, host) || isEmptyAclAndAuthorized(operation, resource, principal, host, acls)) ||
       (!denyMatch && allowMatch)
 
-    logAuditMessage(principal, authorized, operation, resource, host)
+    logAuditMessage(session.principal, authorized, operation, resource, host)
     authorized
   }
 
diff --git a/core/src/test/scala/unit/kafka/security/auth/KerberosPrincipalToLocalTest.scala b/core/src/test/scala/unit/kafka/security/auth/KerberosPrincipalToLocalTest.scala
new file mode 100644
index 00000000000..1aaabd238e0
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/security/auth/KerberosPrincipalToLocalTest.scala
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.security.auth
+
+import kafka.security.auth.KerberosPrincipalToLocal
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.junit.{Test, Assert}
+import org.scalatest.junit.{JUnitSuite}
+import java.security.Principal
+
+class KerberosPrincipalToLocalTest extends JUnitSuite   {
+
+  @Test
+  def testToLocal(): Unit = {
+    val user: String = "test"
+    val principalToLocal: KerberosPrincipalToLocal = new KerberosPrincipalToLocal()
+    val principalWithHostAndRealm: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test/testhost@testRealm.com")
+    val principalWithHostNoRealm: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test/testhost")
+    val principalWithRealmNoHost: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test@testRealm.com")
+    val principalWithNoRealmNoHost: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test")
+
+    Assert.assertEquals(user, principalToLocal.toLocal(principalWithHostAndRealm))
+    Assert.assertEquals(user, principalToLocal.toLocal(principalWithHostNoRealm))
+    Assert.assertEquals(user, principalToLocal.toLocal(principalWithRealmNoHost))
+    Assert.assertEquals(user, principalToLocal.toLocal(principalWithNoRealmNoHost))
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/security/auth/PrincipalToUnixGroupTest.scala b/core/src/test/scala/unit/kafka/security/auth/PrincipalToUnixGroupTest.scala
new file mode 100644
index 00000000000..f8edbd40833
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/security/auth/PrincipalToUnixGroupTest.scala
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.security.auth
+
+import kafka.security.auth.PrincipalToUnixGroup
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.junit.{Assert, Test}
+import org.scalatest.junit.JUnitSuite
+
+class PrincipalToUnixGroupTest extends JUnitSuite   {
+
+  @Test
+  def testToGroups(): Unit = {
+    val user: String = "test"
+    val principalToGroup = new PrincipalToUnixGroup
+    principalToGroup.configure(null)
+
+    val currentUser: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, System.getProperty("user.name"))
+    Assert.assertTrue("Should have returned > 0 groups for the current user", principalToGroup.toGroups(currentUser).size > 0)
+
+    val unknownUser: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "unknown-user")
+    Assert.assertTrue("Should have returned 0 groups for the current user", principalToGroup.toGroups(unknownUser).size == 0)
+
+    Assert.assertTrue("Should have returned 0 groups for the current user", principalToGroup.toGroups(null).size == 0)
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 6238f6dfcee..d140cbefc85 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -21,7 +21,7 @@ import java.util.UUID
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.Acl.WildCardHost
 import kafka.server.KafkaConfig
-import kafka.utils.{ZkUtils, TestUtils}
+import kafka.utils.{TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Assert._
@@ -117,6 +117,60 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     assertFalse("deny should take precedence over allow.", simpleAclAuthorizer.authorize(session, Read, resource))
   }
 
+
+  @Test
+  def testPrincipalToLocalPlugin() {
+    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    props.put(SimpleAclAuthorizer.PrincipalToLocalProp, classOf[KerberosPrincipalToLocal].getName)
+
+    val cfg = KafkaConfig.fromProps(props)
+    val testAuthoizer: SimpleAclAuthorizer = new SimpleAclAuthorizer
+    testAuthoizer.configure(cfg.originals)
+
+    val host = "random-host"
+    val kerberosPrincipal = s"$username/$host@EXAMPLE.COM"
+    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, kerberosPrincipal)
+    val session = new Session(user, host)
+
+    val allowAll = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, host, All)
+    val acls = Set[Acl](allowAll)
+
+    changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
+
+    assertTrue(s"Principal to local should have transalted $kerberosPrincipal to $username which should have in turn allowed access.",
+      testAuthoizer.authorize(session, Read, resource))
+  }
+
+  @Test
+  def testGroupAcls() {
+    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    props.put(SimpleAclAuthorizer.PrincipalToLocalProp, classOf[KerberosPrincipalToLocal].getName)
+    props.put(SimpleAclAuthorizer.PrincipalToGroupPlugin, classOf[PrincipalToUnixGroup].getName)
+
+    val cfg = KafkaConfig.fromProps(props)
+    val testAuthoizer: SimpleAclAuthorizer = new SimpleAclAuthorizer
+    testAuthoizer.configure(cfg.originals)
+
+    val host = "random-host"
+    val currentUser = System.getProperty("user.name")
+    val kerberosPrincipal = s"$currentUser/$host@EXAMPLE.COM"
+
+    val principalToGroup = new PrincipalToUnixGroup
+    principalToGroup.configure(cfg.originals())
+    val groups = principalToGroup.toGroups(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, currentUser))
+
+    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, kerberosPrincipal)
+    val session = new Session(user, host)
+
+    val allowAll = new Acl(groups.head, Allow, host, All)
+    val acls = Set[Acl](allowAll)
+
+    changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
+
+    assertTrue(s"Principal to group should have transalted $currentUser to group ${groups.head} which should have in turn allowed access.",
+      testAuthoizer.authorize(session, Read, resource))
+  }
+
   @Test
   def testAllowAllAccess() {
     val allowAllAcl = Acl.AllowAllAcl


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add group support for authorizer acls
> -------------------------------------
>
>                 Key: KAFKA-2794
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2794
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: security
>    Affects Versions: 0.9.0.0
>            Reporter: Parth Brahmbhatt
>            Assignee: Parth Brahmbhatt
>
> Currently out of box kafka authorizer and ACLs only support single principal/users. This is kind of a two part jira where we add support to convert kerberos principal to unix user names and then from unix user name to group. 
> With this feature , authorizer will be able to support Group level acls so admins wont have to add support for individual users. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)