You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2015/12/03 00:19:21 UTC

[05/26] incubator-ranger git commit: RANGER-737: updated Ranger Kakfa plugin for recent changes in Kafka authorizer

RANGER-737: updated Ranger Kakfa plugin for recent changes in Kafka authorizer


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/e47756ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/e47756ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/e47756ce

Branch: refs/heads/tag-policy
Commit: e47756ced5b9307e4e0c29543847d9ba0f6fad2b
Parents: 0b725f0
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Thu Nov 19 11:16:35 2015 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Nov 19 23:09:42 2015 -0800

----------------------------------------------------------------------
 .../kafka/authorizer/RangerKafkaAuthorizer.java | 68 +++++++++++++-------
 .../services/kafka/RangerServiceKafka.java      | 37 +++++++----
 .../kafka/client/ServiceKafkaClient.java        | 42 ++++++++----
 pom.xml                                         |  5 +-
 ranger-kafka-plugin-shim/.gitignore             |  1 +
 .../kafka/authorizer/RangerKafkaAuthorizer.java | 65 +++++++++++++------
 6 files changed, 146 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index c5e955d..08ff928 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -20,14 +20,14 @@
 package org.apache.ranger.authorization.kafka.authorizer;
 
 import java.util.Date;
+import java.util.Map;
+
 import javax.security.auth.Subject;
 
 import kafka.security.auth.Acl;
 import kafka.security.auth.Authorizer;
-import kafka.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
-import kafka.security.auth.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import kafka.security.auth.*;
 import kafka.server.KafkaConfig;
 import kafka.common.security.LoginManager;
 import kafka.network.RequestChannel.Session;
@@ -73,11 +73,10 @@ public class RangerKafkaAuthorizer implements Authorizer {
 	/*
 	 * (non-Javadoc)
 	 * 
-	 * @see kafka.security.auth.Authorizer#initialize(kafka.server.KafkaConfig)
+	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
 	 */
 	@Override
-	public void initialize(KafkaConfig kafkaConfig) {
-
+	public void configure(Map<String, ?> configs) {
 		if (rangerPlugin == null) {
 			try {
 				Subject subject = LoginManager.subject();
@@ -110,7 +109,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 		}
 
 		// TODO: If resource type if consumer group, then allow it by default
-		if (resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
+		if (resource.resourceType().equals(Group$.MODULE$)) {
 			return true;
 		}
 
@@ -124,6 +123,11 @@ public class RangerKafkaAuthorizer implements Authorizer {
 				.getGroupsForRequestUser(userName);
 		String ip = session.host();
 
+		// skip leading slash
+		if(StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
+			ip = ip.substring(1);
+		}
+
 		Date eventTime = StringUtil.getUTCDate();
 		String accessType = mapToRangerAccessType(operation);
 		boolean validationFailed = false;
@@ -152,12 +156,12 @@ public class RangerKafkaAuthorizer implements Authorizer {
 		rangerRequest.setAction(action);
 		rangerRequest.setRequestData(resource.name());
 
-		if (resource.resourceType().equals(ResourceType.TOPIC)) {
+		if (resource.resourceType().equals(Topic$.MODULE$)) {
 			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(ResourceType.CLUSTER)) {
+		} else if (resource.resourceType().equals(Cluster$.MODULE$)) {
 			// CLUSTER should go as null
 			// rangerResource.setValue(KEY_CLUSTER, resource.name());
-		} else if (resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
+		} else if (resource.resourceType().equals(Group$.MODULE$)) {
 			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
 		} else {
 			logger.fatal("Unsupported resourceType=" + resource.resourceType());
@@ -201,7 +205,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 	 */
 	@Override
 	public void addAcls(Set<Acl> acls, Resource resource) {
-		logger.error("addAcls() is not supported by Ranger for Kafka");
+		logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
 	}
 
 	/*
@@ -213,7 +217,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 	 */
 	@Override
 	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		logger.error("removeAcls() is not supported by Ranger for Kafka");
+		logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
 		return false;
 	}
 
@@ -225,7 +229,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 	 */
 	@Override
 	public boolean removeAcls(Resource resource) {
-		logger.error("removeAcls() is not supported by Ranger for Kafka");
+		logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
 		return false;
 	}
 
@@ -237,7 +241,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 	@Override
 	public Set<Acl> getAcls(Resource resource) {
 		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
+		logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
 
 		return aclList;
 	}
@@ -246,12 +250,24 @@ public class RangerKafkaAuthorizer implements Authorizer {
 	 * (non-Javadoc)
 	 * 
 	 * @see
-	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-	 * )
+	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal)
 	 */
 	@Override
-	public Set<Acl> getAcls(KafkaPrincipal principal) {
-		Set<Acl> aclList = new HashSet<Acl>();
+	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
+		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
+		logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
+		return aclList;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * kafka.security.auth.Authorizer#getAcls()
+	 */
+	@Override
+	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
+		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
 		logger.error("getAcls() is not supported by Ranger for Kafka");
 		return aclList;
 	}
@@ -261,16 +277,20 @@ public class RangerKafkaAuthorizer implements Authorizer {
 	 * @return
 	 */
 	private String mapToRangerAccessType(Operation operation) {
-		if (operation.equals(Operation.READ)) {
+		if (operation.equals(Read$.MODULE$)) {
 			return ACCESS_TYPE_READ;
-		} else if (operation.equals(Operation.WRITE)) {
+		} else if (operation.equals(Write$.MODULE$)) {
 			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Operation.ALTER)) {
+		} else if (operation.equals(Alter$.MODULE$)) {
 			return ACCESS_TYPE_CONFIGURE;
-		} else if (operation.equals(Operation.DESCRIBE)) {
+		} else if (operation.equals(Describe$.MODULE$)) {
 			return ACCESS_TYPE_DESCRIBE;
-		} else if (operation.equals(Operation.CLUSTER_ACTION)) {
+		} else if (operation.equals(ClusterAction$.MODULE$)) {
 			return ACCESS_TYPE_KAFKA_ADMIN;
+		} else if (operation.equals(Create$.MODULE$)) {
+			return ACCESS_TYPE_CREATE;
+		} else if (operation.equals(Delete$.MODULE$)) {
+			return ACCESS_TYPE_DELETE;
 		}
 		return null;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
index ea6d316..8a82b2f 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
@@ -31,7 +31,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 public class RangerServiceKafka extends RangerBaseService {
-
 	private static final Log LOG = LogFactory.getLog(RangerServiceKafka.class);
 
 	public RangerServiceKafka() {
@@ -46,33 +45,45 @@ public class RangerServiceKafka extends RangerBaseService {
 	@Override
 	public HashMap<String, Object> validateConfig() throws Exception {
 		HashMap<String, Object> ret = new HashMap<String, Object>();
-		String serviceName = getServiceName();
+
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerServiceKafka.validateConfig Service: ("
-					+ serviceName + " )");
+			LOG.debug("==> RangerServiceKafka.validateConfig(" + serviceName + ")");
 		}
+
 		if (configs != null) {
 			try {
-				ret = ServiceKafkaConnectionMgr.testConnection(serviceName,
-						configs);
+				ret = ServiceKafkaConnectionMgr.testConnection(serviceName, configs);
 			} catch (Exception e) {
 				LOG.error("<== RangerServiceKafka.validateConfig Error:" + e);
 				throw e;
 			}
 		}
+
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerServiceKafka.validateConfig Response : (" + ret
-					+ " )");
+			LOG.debug("<== RangerServiceKafka.validateConfig(" + serviceName + "): ret=" + ret);
 		}
+
 		return ret;
 	}
 
 	@Override
-	public List<String> lookupResource(ResourceLookupContext context)
-			throws Exception {
+	public List<String> lookupResource(ResourceLookupContext context) throws Exception {
+		List<String> ret = null;
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerServiceKafka.lookupResource(" + serviceName + ")");
+		}
 
-		ServiceKafkaClient serviceKafkaClient = ServiceKafkaConnectionMgr
-				.getKafkaClient(serviceName, configs);
-		return serviceKafkaClient.getResources(context);
+		if(configs != null) {
+			ServiceKafkaClient serviceKafkaClient = ServiceKafkaConnectionMgr.getKafkaClient(serviceName, configs);
+
+			ret = serviceKafkaClient.getResources(context);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerServiceKafka.lookupResource(" + serviceName + "): ret=" + ret);
+		}
+
+		return ret;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index 0698bf6..f5c04fe 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -28,8 +28,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import kafka.utils.ZkUtils;
-
-import org.I0Itec.zkclient.ZkClient;
+import kafka.utils.ZkUtils$;
+import org.apache.kafka.common.security.JaasUtils;
+import org.I0Itec.zkclient.*;
 import org.apache.log4j.Logger;
 import org.apache.ranger.plugin.client.BaseClient;
 import org.apache.ranger.plugin.service.ResourceLookupContext;
@@ -79,31 +80,48 @@ public class ServiceKafkaClient {
 		return responseData;
 	}
 
-	public List<String> getTopicList(List<String> ignoreTopicList)
-			throws Exception {
+	private List<String> getTopicList(List<String> ignoreTopicList) throws Exception {
+		List<String> ret = new ArrayList<String>();
 
-		List<String> list = new ArrayList<String>();
+		int          sessionTimeout    = 5000;
+        int          connectionTimeout = 10000;
+		ZkClient     zkClient          = null;
+		ZkConnection zkConnection      = null;
 
-		ZkClient zkClient = new ZkClient(zookeeperConnect);
 		try {
-			Seq<String> topicList = ZkUtils.getChildrenParentMayNotExist(
-					zkClient, ZkUtils.BrokerTopicsPath());
+	        zkClient     = ZkUtils$.MODULE$.createZkClient(zookeeperConnect, sessionTimeout, connectionTimeout);
+	        zkConnection = new ZkConnection(zookeeperConnect, sessionTimeout);
+
+	        boolean      zkSecurityEnabled = JaasUtils.isZkSecurityEnabled();
+	        ZkUtils      zkUtils           = new ZkUtils(zkClient, zkConnection, true);
+	        Seq<String>  topicList         = zkUtils.getChildrenParentMayNotExist(ZkUtils.BrokerTopicsPath());
 
 			Iterator<String> iter = topicList.iterator();
 			while (iter.hasNext()) {
 				String topic = iter.next();
 				if (ignoreTopicList == null || !ignoreTopicList.contains(topic)) {
-					list.add(topic);
+					ret.add(topic);
 				}
 			}
 		} finally {
 			try {
-				zkClient.close();
+				if(zkClient != null) {
+					zkClient.close();
+				}
 			} catch (Exception ex) {
-				LOG.error("Error closing zookeeper", ex);
+				LOG.error("Error closing zkClient", ex);
+			}
+			
+			try {
+				if(zkConnection != null) {
+					zkConnection.close();
+				}
+				
+			} catch(Exception ex) {
+				LOG.error("Error closing zkConnection", ex);
 			}
 		}
-		return list;
+		return ret;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d60fca4..1b183b4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,7 +159,7 @@
 		<jersey-client.version>2.6</jersey-client.version>
 		<junit.version>4.11</junit.version>
 		<kafka.version>0.8.2.0</kafka.version>
-		<!-- <kafka.version>0.8.2.2.3.2.0-2950</kafka.version> -->
+		<!-- <kafka.version>0.8.2.2.3.4.0-3288</kafka.version> -->
 		<mockito.version>1.8.4</mockito.version>
 		<hamcrest-version>1.3</hamcrest-version>
 		<knox.gateway.version>0.6.0</knox.gateway.version>
@@ -233,7 +233,8 @@
       <profile>
           <id>kafka-security</id>
          <modules>
-        	 <module>plugin-kafka</module>         
+             <module>plugin-kafka</module>
+             <module>ranger-kafka-plugin-shim</module>
          </modules>
       </profile>
   </profiles>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/ranger-kafka-plugin-shim/.gitignore
----------------------------------------------------------------------
diff --git a/ranger-kafka-plugin-shim/.gitignore b/ranger-kafka-plugin-shim/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/ranger-kafka-plugin-shim/.gitignore
@@ -0,0 +1 @@
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index d39cac2..0937835 100644
--- a/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -19,6 +19,8 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
@@ -27,10 +29,9 @@ import scala.collection.immutable.Set;
 import kafka.network.RequestChannel.Session;
 import kafka.security.auth.Acl;
 import kafka.security.auth.Authorizer;
-import kafka.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import kafka.security.auth.Operation;
 import kafka.security.auth.Resource;
-import kafka.server.KafkaConfig;
 
 
 //public class RangerKafkaAuthorizer extends Authorizer {
@@ -82,31 +83,30 @@ public class RangerKafkaAuthorizer implements Authorizer {
 			LOG.debug("<== RangerKafkaAuthorizer.init()");
 		}
 	}
-	
-	
+
 	@Override
-	public void initialize(KafkaConfig kafkaConfig) {
+	public void configure(Map<String, ?> configs) {
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.initialize()");
+			LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
 		}
 
 		try {
 			activatePluginClassLoader();
 
-			rangerKakfaAuthorizerImpl.initialize(kafkaConfig);
+			rangerKakfaAuthorizerImpl.configure(configs);
 		} finally {
 			deactivatePluginClassLoader();
 		}
 
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.initialize()");
+			LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
 		}
 	}
 
 	@Override
 	public boolean authorize(Session session, Operation operation,Resource resource) {	
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.authorize()");
+			LOG.debug("==> RangerKafkaAuthorizer.authorize(Session, Operation, Resource)");
 		}
 
 		boolean ret = false;
@@ -120,7 +120,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 		}
 
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.authorize()");
+			LOG.debug("<== RangerKafkaAuthorizer.authorize(Session, Operation, Resource)");
 		}
 		
 		return ret;
@@ -129,7 +129,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 	@Override
 	public void addAcls(Set<Acl> acls, Resource resource) {
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.addAcls()");
+			LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
 		}
 
 		try {
@@ -141,14 +141,14 @@ public class RangerKafkaAuthorizer implements Authorizer {
 		}
 
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.addAcls()");
+			LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
 		}
 	}
 
 	@Override
 	public boolean removeAcls(Set<Acl> acls, Resource resource) {
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls()");
+			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
 		}
 		boolean ret = false;
 		try {
@@ -160,7 +160,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 		}
 
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls()");
+			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
 		}
 		
 		return ret;
@@ -169,7 +169,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 	@Override
 	public boolean removeAcls(Resource resource) {
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls()");
+			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Resource)");
 		}
 		boolean ret = false;
 		try {
@@ -181,7 +181,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 		}
 
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls()");
+			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Resource)");
 		}
 
 		return ret;
@@ -190,7 +190,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 	@Override
 	public Set<Acl> getAcls(Resource resource) {
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
+			LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)");
 		}
 		
 		Set<Acl> ret = null;
@@ -204,19 +204,19 @@ public class RangerKafkaAuthorizer implements Authorizer {
 		}
 
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
+			LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)");
 		}
 
 		return ret;
 	}
 
 	@Override
-	public Set<Acl> getAcls(KafkaPrincipal principal) {
+	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
+			LOG.debug("==> RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
 		}
 
-		Set<Acl> ret = null;
+		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
 
 		try {
 			activatePluginClassLoader();
@@ -227,6 +227,29 @@ public class RangerKafkaAuthorizer implements Authorizer {
 		}
 
 		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
+		}
+
+		return ret;
+	}
+
+	@Override
+	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
+		}
+
+		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
+
+		try {
+			activatePluginClassLoader();
+
+			ret = rangerKakfaAuthorizerImpl.getAcls();
+		} finally {
+			deactivatePluginClassLoader();
+		}
+
+		if(LOG.isDebugEnabled()) {
 			LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
 		}