You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2015/12/03 00:19:21 UTC
[05/26] incubator-ranger git commit: RANGER-737: updated Ranger Kakfa
plugin for recent changes in Kafka authorizer
RANGER-737: updated Ranger Kakfa plugin for recent changes in Kafka authorizer
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/e47756ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/e47756ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/e47756ce
Branch: refs/heads/tag-policy
Commit: e47756ced5b9307e4e0c29543847d9ba0f6fad2b
Parents: 0b725f0
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Thu Nov 19 11:16:35 2015 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Nov 19 23:09:42 2015 -0800
----------------------------------------------------------------------
.../kafka/authorizer/RangerKafkaAuthorizer.java | 68 +++++++++++++-------
.../services/kafka/RangerServiceKafka.java | 37 +++++++----
.../kafka/client/ServiceKafkaClient.java | 42 ++++++++----
pom.xml | 5 +-
ranger-kafka-plugin-shim/.gitignore | 1 +
.../kafka/authorizer/RangerKafkaAuthorizer.java | 65 +++++++++++++------
6 files changed, 146 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index c5e955d..08ff928 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -20,14 +20,14 @@
package org.apache.ranger.authorization.kafka.authorizer;
import java.util.Date;
+import java.util.Map;
+
import javax.security.auth.Subject;
import kafka.security.auth.Acl;
import kafka.security.auth.Authorizer;
-import kafka.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
-import kafka.security.auth.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import kafka.security.auth.*;
import kafka.server.KafkaConfig;
import kafka.common.security.LoginManager;
import kafka.network.RequestChannel.Session;
@@ -73,11 +73,10 @@ public class RangerKafkaAuthorizer implements Authorizer {
/*
* (non-Javadoc)
*
- * @see kafka.security.auth.Authorizer#initialize(kafka.server.KafkaConfig)
+ * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
*/
@Override
- public void initialize(KafkaConfig kafkaConfig) {
-
+ public void configure(Map<String, ?> configs) {
if (rangerPlugin == null) {
try {
Subject subject = LoginManager.subject();
@@ -110,7 +109,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
}
// TODO: If resource type if consumer group, then allow it by default
- if (resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
+ if (resource.resourceType().equals(Group$.MODULE$)) {
return true;
}
@@ -124,6 +123,11 @@ public class RangerKafkaAuthorizer implements Authorizer {
.getGroupsForRequestUser(userName);
String ip = session.host();
+ // skip leading slash
+ if(StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
+ ip = ip.substring(1);
+ }
+
Date eventTime = StringUtil.getUTCDate();
String accessType = mapToRangerAccessType(operation);
boolean validationFailed = false;
@@ -152,12 +156,12 @@ public class RangerKafkaAuthorizer implements Authorizer {
rangerRequest.setAction(action);
rangerRequest.setRequestData(resource.name());
- if (resource.resourceType().equals(ResourceType.TOPIC)) {
+ if (resource.resourceType().equals(Topic$.MODULE$)) {
rangerResource.setValue(KEY_TOPIC, resource.name());
- } else if (resource.resourceType().equals(ResourceType.CLUSTER)) {
+ } else if (resource.resourceType().equals(Cluster$.MODULE$)) {
// CLUSTER should go as null
// rangerResource.setValue(KEY_CLUSTER, resource.name());
- } else if (resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
+ } else if (resource.resourceType().equals(Group$.MODULE$)) {
rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
} else {
logger.fatal("Unsupported resourceType=" + resource.resourceType());
@@ -201,7 +205,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
*/
@Override
public void addAcls(Set<Acl> acls, Resource resource) {
- logger.error("addAcls() is not supported by Ranger for Kafka");
+ logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
}
/*
@@ -213,7 +217,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
*/
@Override
public boolean removeAcls(Set<Acl> acls, Resource resource) {
- logger.error("removeAcls() is not supported by Ranger for Kafka");
+ logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
return false;
}
@@ -225,7 +229,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
*/
@Override
public boolean removeAcls(Resource resource) {
- logger.error("removeAcls() is not supported by Ranger for Kafka");
+ logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
return false;
}
@@ -237,7 +241,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
@Override
public Set<Acl> getAcls(Resource resource) {
Set<Acl> aclList = new HashSet<Acl>();
- logger.error("getAcls() is not supported by Ranger for Kafka");
+ logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
return aclList;
}
@@ -246,12 +250,24 @@ public class RangerKafkaAuthorizer implements Authorizer {
* (non-Javadoc)
*
* @see
- * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
- * )
+ * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal)
*/
@Override
- public Set<Acl> getAcls(KafkaPrincipal principal) {
- Set<Acl> aclList = new HashSet<Acl>();
+ public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
+ scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
+ logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
+ return aclList;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * kafka.security.auth.Authorizer#getAcls()
+ */
+ @Override
+ public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
+ scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
logger.error("getAcls() is not supported by Ranger for Kafka");
return aclList;
}
@@ -261,16 +277,20 @@ public class RangerKafkaAuthorizer implements Authorizer {
* @return
*/
private String mapToRangerAccessType(Operation operation) {
- if (operation.equals(Operation.READ)) {
+ if (operation.equals(Read$.MODULE$)) {
return ACCESS_TYPE_READ;
- } else if (operation.equals(Operation.WRITE)) {
+ } else if (operation.equals(Write$.MODULE$)) {
return ACCESS_TYPE_WRITE;
- } else if (operation.equals(Operation.ALTER)) {
+ } else if (operation.equals(Alter$.MODULE$)) {
return ACCESS_TYPE_CONFIGURE;
- } else if (operation.equals(Operation.DESCRIBE)) {
+ } else if (operation.equals(Describe$.MODULE$)) {
return ACCESS_TYPE_DESCRIBE;
- } else if (operation.equals(Operation.CLUSTER_ACTION)) {
+ } else if (operation.equals(ClusterAction$.MODULE$)) {
return ACCESS_TYPE_KAFKA_ADMIN;
+ } else if (operation.equals(Create$.MODULE$)) {
+ return ACCESS_TYPE_CREATE;
+ } else if (operation.equals(Delete$.MODULE$)) {
+ return ACCESS_TYPE_DELETE;
}
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
index ea6d316..8a82b2f 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
@@ -31,7 +31,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class RangerServiceKafka extends RangerBaseService {
-
private static final Log LOG = LogFactory.getLog(RangerServiceKafka.class);
public RangerServiceKafka() {
@@ -46,33 +45,45 @@ public class RangerServiceKafka extends RangerBaseService {
@Override
public HashMap<String, Object> validateConfig() throws Exception {
HashMap<String, Object> ret = new HashMap<String, Object>();
- String serviceName = getServiceName();
+
if (LOG.isDebugEnabled()) {
- LOG.debug("==> RangerServiceKafka.validateConfig Service: ("
- + serviceName + " )");
+ LOG.debug("==> RangerServiceKafka.validateConfig(" + serviceName + ")");
}
+
if (configs != null) {
try {
- ret = ServiceKafkaConnectionMgr.testConnection(serviceName,
- configs);
+ ret = ServiceKafkaConnectionMgr.testConnection(serviceName, configs);
} catch (Exception e) {
LOG.error("<== RangerServiceKafka.validateConfig Error:" + e);
throw e;
}
}
+
if (LOG.isDebugEnabled()) {
- LOG.debug("<== RangerServiceKafka.validateConfig Response : (" + ret
- + " )");
+ LOG.debug("<== RangerServiceKafka.validateConfig(" + serviceName + "): ret=" + ret);
}
+
return ret;
}
@Override
- public List<String> lookupResource(ResourceLookupContext context)
- throws Exception {
+ public List<String> lookupResource(ResourceLookupContext context) throws Exception {
+ List<String> ret = null;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> RangerServiceKafka.lookupResource(" + serviceName + ")");
+ }
- ServiceKafkaClient serviceKafkaClient = ServiceKafkaConnectionMgr
- .getKafkaClient(serviceName, configs);
- return serviceKafkaClient.getResources(context);
+ if(configs != null) {
+ ServiceKafkaClient serviceKafkaClient = ServiceKafkaConnectionMgr.getKafkaClient(serviceName, configs);
+
+ ret = serviceKafkaClient.getResources(context);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== RangerServiceKafka.lookupResource(" + serviceName + "): ret=" + ret);
+ }
+
+ return ret;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index 0698bf6..f5c04fe 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -28,8 +28,9 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import kafka.utils.ZkUtils;
-
-import org.I0Itec.zkclient.ZkClient;
+import kafka.utils.ZkUtils$;
+import org.apache.kafka.common.security.JaasUtils;
+import org.I0Itec.zkclient.*;
import org.apache.log4j.Logger;
import org.apache.ranger.plugin.client.BaseClient;
import org.apache.ranger.plugin.service.ResourceLookupContext;
@@ -79,31 +80,48 @@ public class ServiceKafkaClient {
return responseData;
}
- public List<String> getTopicList(List<String> ignoreTopicList)
- throws Exception {
+ private List<String> getTopicList(List<String> ignoreTopicList) throws Exception {
+ List<String> ret = new ArrayList<String>();
- List<String> list = new ArrayList<String>();
+ int sessionTimeout = 5000;
+ int connectionTimeout = 10000;
+ ZkClient zkClient = null;
+ ZkConnection zkConnection = null;
- ZkClient zkClient = new ZkClient(zookeeperConnect);
try {
- Seq<String> topicList = ZkUtils.getChildrenParentMayNotExist(
- zkClient, ZkUtils.BrokerTopicsPath());
+ zkClient = ZkUtils$.MODULE$.createZkClient(zookeeperConnect, sessionTimeout, connectionTimeout);
+ zkConnection = new ZkConnection(zookeeperConnect, sessionTimeout);
+
+ boolean zkSecurityEnabled = JaasUtils.isZkSecurityEnabled();
+ ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, true);
+ Seq<String> topicList = zkUtils.getChildrenParentMayNotExist(ZkUtils.BrokerTopicsPath());
Iterator<String> iter = topicList.iterator();
while (iter.hasNext()) {
String topic = iter.next();
if (ignoreTopicList == null || !ignoreTopicList.contains(topic)) {
- list.add(topic);
+ ret.add(topic);
}
}
} finally {
try {
- zkClient.close();
+ if(zkClient != null) {
+ zkClient.close();
+ }
} catch (Exception ex) {
- LOG.error("Error closing zookeeper", ex);
+ LOG.error("Error closing zkClient", ex);
+ }
+
+ try {
+ if(zkConnection != null) {
+ zkConnection.close();
+ }
+
+ } catch(Exception ex) {
+ LOG.error("Error closing zkConnection", ex);
}
}
- return list;
+ return ret;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d60fca4..1b183b4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,7 +159,7 @@
<jersey-client.version>2.6</jersey-client.version>
<junit.version>4.11</junit.version>
<kafka.version>0.8.2.0</kafka.version>
- <!-- <kafka.version>0.8.2.2.3.2.0-2950</kafka.version> -->
+ <!-- <kafka.version>0.8.2.2.3.4.0-3288</kafka.version> -->
<mockito.version>1.8.4</mockito.version>
<hamcrest-version>1.3</hamcrest-version>
<knox.gateway.version>0.6.0</knox.gateway.version>
@@ -233,7 +233,8 @@
<profile>
<id>kafka-security</id>
<modules>
- <module>plugin-kafka</module>
+ <module>plugin-kafka</module>
+ <module>ranger-kafka-plugin-shim</module>
</modules>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/ranger-kafka-plugin-shim/.gitignore
----------------------------------------------------------------------
diff --git a/ranger-kafka-plugin-shim/.gitignore b/ranger-kafka-plugin-shim/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/ranger-kafka-plugin-shim/.gitignore
@@ -0,0 +1 @@
+/target/
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index d39cac2..0937835 100644
--- a/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -19,6 +19,8 @@
package org.apache.ranger.authorization.kafka.authorizer;
+import java.util.Map;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
@@ -27,10 +29,9 @@ import scala.collection.immutable.Set;
import kafka.network.RequestChannel.Session;
import kafka.security.auth.Acl;
import kafka.security.auth.Authorizer;
-import kafka.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
import kafka.security.auth.Operation;
import kafka.security.auth.Resource;
-import kafka.server.KafkaConfig;
//public class RangerKafkaAuthorizer extends Authorizer {
@@ -82,31 +83,30 @@ public class RangerKafkaAuthorizer implements Authorizer {
LOG.debug("<== RangerKafkaAuthorizer.init()");
}
}
-
-
+
@Override
- public void initialize(KafkaConfig kafkaConfig) {
+ public void configure(Map<String, ?> configs) {
if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.initialize()");
+ LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
}
try {
activatePluginClassLoader();
- rangerKakfaAuthorizerImpl.initialize(kafkaConfig);
+ rangerKakfaAuthorizerImpl.configure(configs);
} finally {
deactivatePluginClassLoader();
}
if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.initialize()");
+ LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
}
}
@Override
public boolean authorize(Session session, Operation operation,Resource resource) {
if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.authorize()");
+ LOG.debug("==> RangerKafkaAuthorizer.authorize(Session, Operation, Resource)");
}
boolean ret = false;
@@ -120,7 +120,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
}
if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.authorize()");
+ LOG.debug("<== RangerKafkaAuthorizer.authorize(Session, Operation, Resource)");
}
return ret;
@@ -129,7 +129,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
@Override
public void addAcls(Set<Acl> acls, Resource resource) {
if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.addAcls()");
+ LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
}
try {
@@ -141,14 +141,14 @@ public class RangerKafkaAuthorizer implements Authorizer {
}
if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.addAcls()");
+ LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
}
}
@Override
public boolean removeAcls(Set<Acl> acls, Resource resource) {
if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.removeAcls()");
+ LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
}
boolean ret = false;
try {
@@ -160,7 +160,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
}
if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.removeAcls()");
+ LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
}
return ret;
@@ -169,7 +169,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
@Override
public boolean removeAcls(Resource resource) {
if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.removeAcls()");
+ LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Resource)");
}
boolean ret = false;
try {
@@ -181,7 +181,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
}
if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.removeAcls()");
+ LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Resource)");
}
return ret;
@@ -190,7 +190,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
@Override
public Set<Acl> getAcls(Resource resource) {
if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
+ LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)");
}
Set<Acl> ret = null;
@@ -204,19 +204,19 @@ public class RangerKafkaAuthorizer implements Authorizer {
}
if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
+ LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)");
}
return ret;
}
@Override
- public Set<Acl> getAcls(KafkaPrincipal principal) {
+ public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
+ LOG.debug("==> RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
}
- Set<Acl> ret = null;
+ scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
try {
activatePluginClassLoader();
@@ -227,6 +227,29 @@ public class RangerKafkaAuthorizer implements Authorizer {
}
if(LOG.isDebugEnabled()) {
+ LOG.debug("<== RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
+ }
+
+ return ret;
+ }
+
+ @Override
+ public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
+ }
+
+ scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
+
+ try {
+ activatePluginClassLoader();
+
+ ret = rangerKakfaAuthorizerImpl.getAcls();
+ } finally {
+ deactivatePluginClassLoader();
+ }
+
+ if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
}