You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ranger.apache.org by GitBox <gi...@apache.org> on 2022/02/25 13:53:57 UTC

[GitHub] [ranger] akatona84 opened a new pull request #133: RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504

akatona84 opened a new pull request #133:
URL: https://github.com/apache/ranger/pull/133


   
   kafka.security.auth.Authorizer has been deprecated since December 2019, and
   it's removed in Apache Kafka 3.0
   
   See the KIP for more details:
   https://cwiki.apache.org/confluence/display/KAFKA/KIP-504+-+Add+new+Java+Authorizer+Interface
   
   Co-authored-by: Chia-Ping Tsai <ch...@gmail.com>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@ranger.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ranger] akatona84 commented on pull request #133: RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504

Posted by GitBox <gi...@apache.org>.
akatona84 commented on pull request #133:
URL: https://github.com/apache/ranger/pull/133#issuecomment-1056837821


   Thank you, @urbandan . Now I'm going to just rebase to see if the checks would work (some snapshot dependency was not available)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@ranger.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ranger] urbandan commented on a change in pull request #133: RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504

Posted by GitBox <gi...@apache.org>.
urbandan commented on a change in pull request #133:
URL: https://github.com/apache/ranger/pull/133#discussion_r817599735



##########
File path: ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -19,272 +19,200 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.CompletionStage;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
 import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.collection.immutable.Set;
-import kafka.network.RequestChannel.Session;
-import kafka.security.auth.Acl;
-import kafka.security.auth.Authorizer;
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
+public class RangerKafkaAuthorizer implements Authorizer {
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static final String RANGER_PLUGIN_TYPE = "kafka";
+  private static final String RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
 
+  private Authorizer rangerKafkaAuthorizerImpl = null;
+  private RangerPluginClassLoader rangerPluginClassLoader = null;
 
-//public class RangerKafkaAuthorizer extends Authorizer {
-public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger LOG  = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
-
-	private static final String   RANGER_PLUGIN_TYPE                      = "kafka";
-	private static final String   RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME  = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
-
-	private Authorizer              rangerKakfaAuthorizerImpl = null;
-	private RangerPluginClassLoader rangerPluginClassLoader   = null;
-	
-	public RangerKafkaAuthorizer() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-
-		this.init();
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-	}
-	
-	private void init(){
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.init()");
-		}
-
-		try {
-			
-			rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
-			
-			@SuppressWarnings("unchecked")
-			Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
-
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl = cls.newInstance();
-		} catch (Exception e) {
-			// check what need to be done
-			LOG.error("Error Enabling RangerKafkaPlugin", e);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.init()");
-		}
-	}
-
-	@Override
-	public void configure(Map<String, ?> configs) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.configure(configs);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-	}
-
-	@Override
-	public void close() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.close()");
-		}
-
-		try {
-			activatePluginClassLoader();
-			
-			rangerKakfaAuthorizerImpl.close();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.close()");
-		}
-		
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,Resource resource) {	
-		if(LOG.isDebugEnabled()) {
-			LOG.debug(String.format("==> RangerKafkaAuthorizer.authorize(Session=%s, Operation=%s, Resource=%s)", session, operation, resource));
-		}
-
-		boolean ret = false;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.authorize(session, operation, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.authorize: " + ret);
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.addAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-	}
-
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public boolean removeAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-		
-		Set<Acl> ret = null;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(principal);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
-		}
-
-		return ret;
-	}
-	
-	private void activatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.activate();
-		}
-	}
-
-	private void deactivatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.deactivate();
-		}
-	}
-		
+  public RangerKafkaAuthorizer() {
+    logger.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+
+    this.init();
+
+    logger.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+  }
+
+  private void init() {
+    logger.debug("==> RangerKafkaAuthorizer.init()");
+
+    try {
+
+      rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
+
+      @SuppressWarnings("unchecked")
+      Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
+
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl = cls.newInstance();
+    } catch (Exception e) {
+      // check what need to be done

Review comment:
       sounds ok to me




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@ranger.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ranger] akatona84 commented on pull request #133: RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504

Posted by GitBox <gi...@apache.org>.
akatona84 commented on pull request #133:
URL: https://github.com/apache/ranger/pull/133#issuecomment-1070450363


   rabased, maybe CI will succeed now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@ranger.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ranger] akatona84 commented on a change in pull request #133: RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504

Posted by GitBox <gi...@apache.org>.
akatona84 commented on a change in pull request #133:
URL: https://github.com/apache/ranger/pull/133#discussion_r816786753



##########
File path: plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -43,286 +60,256 @@
 import org.slf4j.LoggerFactory;
 
 public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger logger = LoggerFactory
-			.getLogger(RangerKafkaAuthorizer.class);
-	private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
-	public static final String KEY_TOPIC = "topic";
-	public static final String KEY_CLUSTER = "cluster";
-	public static final String KEY_CONSUMER_GROUP = "consumergroup";
-	public static final String KEY_TRANSACTIONALID = "transactionalid";
-	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-
-	public static final String ACCESS_TYPE_READ = "consume";
-	public static final String ACCESS_TYPE_WRITE = "publish";
-	public static final String ACCESS_TYPE_CREATE = "create";
-	public static final String ACCESS_TYPE_DELETE = "delete";
-	public static final String ACCESS_TYPE_CONFIGURE = "configure";
-	public static final String ACCESS_TYPE_DESCRIBE = "describe";
-	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
-	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
-	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
-	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";
-
-	private static volatile RangerBasePlugin rangerPlugin = null;
-	RangerKafkaAuditHandler auditHandler = null;
-
-	public RangerKafkaAuthorizer() {
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
-	 */
-	@Override
-	public void configure(Map<String, ?> configs) {
-		RangerBasePlugin me = rangerPlugin;
-		if (me == null) {
-			synchronized(RangerKafkaAuthorizer.class) {
-				me = rangerPlugin;
-				if (me == null) {
-					try {
-						// Possible to override JAAS configuration which is used by Ranger, otherwise
-						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
-						// if it's not defined, then it reverts to 'KafkaServer' configuration.
-						final Object jaasContext = configs.get("ranger.jaas.context");
-						final String listenerName = (jaasContext instanceof String
-								&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
-										: SecurityProtocol.SASL_PLAINTEXT.name();
-						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
-						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
-						MiscUtil.setUGIFromJAASConfig(context.name());
-						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
-					} catch (Throwable t) {
-						logger.error("Error getting principal.", t);
-					}
-					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-				}
-			}
-		}
-		logger.info("Calling plugin.init()");
-		rangerPlugin.init();
-		auditHandler = new RangerKafkaAuditHandler();
-		rangerPlugin.setResultProcessor(auditHandler);
-	}
-
-	@Override
-	public void close() {
-		logger.info("close() called on authorizer.");
-		try {
-			if (rangerPlugin != null) {
-				rangerPlugin.cleanup();
-			}
-		} catch (Throwable t) {
-			logger.error("Error closing RangerPlugin.", t);
-		}
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,
-			Resource resource) {
-
-		if (rangerPlugin == null) {
-			MiscUtil.logErrorMessageByInterval(logger,
-					"Authorizer is still not initialized");
-			return false;
-		}
-
-		RangerPerfTracer perf = null;
-
-		if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
-			perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
-		}
-		String userName = null;
-		if (session.principal() != null) {
-			userName = session.principal().getName();
-		}
-		java.util.Set<String> userGroups = MiscUtil
-				.getGroupsForRequestUser(userName);
-		String ip = session.clientAddress().getHostAddress();
-
-		// skip leading slash
-		if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
-			ip = ip.substring(1);
-		}
-
-		Date eventTime = new Date();
-		String accessType = mapToRangerAccessType(operation);
-		boolean validationFailed = false;
-		String validationStr = "";
-
-		if (accessType == null) {
-			if (MiscUtil.logErrorMessageByInterval(logger,
-					"Unsupported access type. operation=" + operation)) {
-				logger.error("Unsupported access type. session=" + session
-						+ ", operation=" + operation + ", resource=" + resource);
-			}
-			validationFailed = true;
-			validationStr += "Unsupported access type. operation=" + operation;
-		}
-		String action = accessType;
-
-		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
-		rangerRequest.setUser(userName);
-		rangerRequest.setUserGroups(userGroups);
-		rangerRequest.setClientIPAddress(ip);
-		rangerRequest.setAccessTime(eventTime);
-
-		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
-		rangerRequest.setResource(rangerResource);
-		rangerRequest.setAccessType(accessType);
-		rangerRequest.setAction(action);
-		rangerRequest.setRequestData(resource.name());
-
-		if (resource.resourceType().equals(Topic$.MODULE$)) {
-			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(Cluster$.MODULE$)) {
-			rangerResource.setValue(KEY_CLUSTER, resource.name());
-		} else if (resource.resourceType().equals(Group$.MODULE$)) {
-			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
-		} else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
-			rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
-		} else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
-			rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
-		} else {
-			logger.error("Unsupported resourceType=" + resource.resourceType());
-			validationFailed = true;
-		}
-
-		boolean returnValue = false;
-		if (validationFailed) {
-			MiscUtil.logErrorMessageByInterval(logger, validationStr
-					+ ", request=" + rangerRequest);
-		} else {
-
-			try {
-				RangerAccessResult result = rangerPlugin
-						.isAccessAllowed(rangerRequest);
-				if (result == null) {
-					logger.error("Ranger Plugin returned null. Returning false");
-				} else {
-					returnValue = result.getIsAllowed();
-				}
-			} catch (Throwable t) {
-				logger.error("Error while calling isAccessAllowed(). request="
-						+ rangerRequest, t);
-			} finally {
-				auditHandler.flushAudit();
-			}
-		}
-		RangerPerfTracer.log(perf);
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("rangerRequest=" + rangerRequest + ", return="
-					+ returnValue);
-		}
-		return returnValue;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Resource resource) {
-		logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
-
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-	 * )
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
-			KafkaPrincipal principal) {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls()
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/**
-	 * @param operation
-	 * @return
-	 */
-	private String mapToRangerAccessType(Operation operation) {
-		if (operation.equals(Read$.MODULE$)) {
-			return ACCESS_TYPE_READ;
-		} else if (operation.equals(Write$.MODULE$)) {
-			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Alter$.MODULE$)) {
-			return ACCESS_TYPE_CONFIGURE;
-		} else if (operation.equals(Describe$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE;
-		} else if (operation.equals(ClusterAction$.MODULE$)) {
-			return ACCESS_TYPE_CLUSTER_ACTION;
-		} else if (operation.equals(Create$.MODULE$)) {
-			return ACCESS_TYPE_CREATE;
-		} else if (operation.equals(Delete$.MODULE$)) {
-			return ACCESS_TYPE_DELETE;
-		} else if (operation.equals(DescribeConfigs$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE_CONFIGS;
-		} else if (operation.equals(AlterConfigs$.MODULE$)) {
-			return ACCESS_TYPE_ALTER_CONFIGS;
-		} else if (operation.equals(IdempotentWrite$.MODULE$)) {
-			return ACCESS_TYPE_IDEMPOTENT_WRITE;
-		}
-		return null;
-	}
+  public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
+  public static final String KEY_TOPIC = "topic";
+  public static final String KEY_CLUSTER = "cluster";
+  public static final String KEY_CONSUMER_GROUP = "consumergroup";
+  public static final String KEY_TRANSACTIONALID = "transactionalid";
+  public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
+  public static final String ACCESS_TYPE_READ = "consume";
+  public static final String ACCESS_TYPE_WRITE = "publish";
+  public static final String ACCESS_TYPE_CREATE = "create";
+  public static final String ACCESS_TYPE_DELETE = "delete";
+  public static final String ACCESS_TYPE_CONFIGURE = "configure";
+  public static final String ACCESS_TYPE_DESCRIBE = "describe";
+  public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
+  public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
+  public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+
+  private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static volatile RangerBasePlugin rangerPlugin = null;
+  RangerKafkaAuditHandler auditHandler = null;
+
+  public RangerKafkaAuthorizer() {
+  }
+
+  private static String mapToRangerAccessType(AclOperation operation) {
+    switch (operation) {
+      case READ:
+        return ACCESS_TYPE_READ;
+      case WRITE:
+        return ACCESS_TYPE_WRITE;
+      case ALTER:
+        return ACCESS_TYPE_CONFIGURE;
+      case DESCRIBE:
+        return ACCESS_TYPE_DESCRIBE;
+      case CLUSTER_ACTION:
+        return ACCESS_TYPE_CLUSTER_ACTION;
+      case CREATE:
+        return ACCESS_TYPE_CREATE;
+      case DELETE:
+        return ACCESS_TYPE_DELETE;
+      case DESCRIBE_CONFIGS:
+        return ACCESS_TYPE_DESCRIBE_CONFIGS;
+      case ALTER_CONFIGS:
+        return ACCESS_TYPE_ALTER_CONFIGS;
+      case IDEMPOTENT_WRITE:
+        return ACCESS_TYPE_IDEMPOTENT_WRITE;
+      case UNKNOWN:
+      case ANY:
+      case ALL:
+      default:
+        return null;
+    }
+  }
+
+  private static String mapToResourceType(ResourceType resourceType) {
+    switch (resourceType) {
+      case TOPIC:
+        return KEY_TOPIC;
+      case CLUSTER:
+        return KEY_CLUSTER;
+      case GROUP:
+        return KEY_CONSUMER_GROUP;
+      case TRANSACTIONAL_ID:
+        return KEY_TRANSACTIONALID;
+      case DELEGATION_TOKEN:
+        return KEY_DELEGATIONTOKEN;
+      case ANY:
+      case UNKNOWN:
+      default:
+        return null;
+    }
+  }
+
+  private static RangerAccessResourceImpl getRangerAccessResource(String resourceTypeKey, String resourceName) {
+    RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+    rangerResource.setValue(resourceTypeKey, resourceName);
+    return rangerResource;
+  }
+
+  private static RangerAccessRequestImpl getRangerAccessRequest(String userName,
+                                                                Set<String> userGroups,
+                                                                String ip,
+                                                                Date eventTime,
+                                                                String resourceTypeKey,
+                                                                String resourceName,
+                                                                String accessType) {
+    RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+    rangerRequest.setResource(getRangerAccessResource(resourceTypeKey, resourceName));
+    rangerRequest.setUser(userName);
+    rangerRequest.setUserGroups(userGroups);
+    rangerRequest.setClientIPAddress(ip);
+    rangerRequest.setAccessTime(eventTime);
+    rangerRequest.setAccessType(accessType);
+    rangerRequest.setAction(accessType);
+    rangerRequest.setRequestData(resourceName);
+    return rangerRequest;
+  }
+
+  private static List<AuthorizationResult> denyAll(List<Action> actions) {
+    return actions.stream().map(a -> AuthorizationResult.DENIED).collect(Collectors.toList());
+  }
+
+  private static List<AuthorizationResult> mapResults(List<Action> actions, Collection<RangerAccessResult> results) {
+    if (CollectionUtils.isEmpty(results)) {
+      logger.error("Ranger Plugin returned null or empty. Returning Denied for all");
+      return denyAll(actions);
+    }
+    return results.stream()
+        .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void close() {
+    logger.info("close() called on authorizer.");
+    try {
+      if (rangerPlugin != null) {
+        rangerPlugin.cleanup();
+      }
+    } catch (Throwable t) {
+      logger.error("Error closing RangerPlugin.", t);
+    }
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    RangerBasePlugin me = rangerPlugin;
+    if (me == null) {
+      synchronized (RangerKafkaAuthorizer.class) {
+        me = rangerPlugin;
+        if (me == null) {
+          try {
+            // Possible to override JAAS configuration which is used by Ranger, otherwise
+            // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
+            // if it's not defined, then it reverts to 'KafkaServer' configuration.
+            final Object jaasContext = configs.get("ranger.jaas.context");
+            final String listenerName = (jaasContext instanceof String
+                && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
+                : SecurityProtocol.SASL_PLAINTEXT.name();
+            final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
+            JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
+            MiscUtil.setUGIFromJAASConfig(context.name());
+            UserGroupInformation loginUser = MiscUtil.getUGILoginUser();
+            logger.info("LoginUser={}", loginUser);
+          } catch (Throwable t) {
+            logger.error("Error getting principal.", t);
+          }
+          rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+        }
+      }
+    }
+    logger.info("Calling plugin.init()");
+    rangerPlugin.init();

Review comment:
       yeah, plugin will be made once, no need to re-init imo.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@ranger.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ranger] akatona84 commented on a change in pull request #133: RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504

Posted by GitBox <gi...@apache.org>.
akatona84 commented on a change in pull request #133:
URL: https://github.com/apache/ranger/pull/133#discussion_r816786117



##########
File path: ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -19,272 +19,200 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.CompletionStage;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
 import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.collection.immutable.Set;
-import kafka.network.RequestChannel.Session;
-import kafka.security.auth.Acl;
-import kafka.security.auth.Authorizer;
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
+public class RangerKafkaAuthorizer implements Authorizer {
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static final String RANGER_PLUGIN_TYPE = "kafka";
+  private static final String RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
 
+  private Authorizer rangerKafkaAuthorizerImpl = null;
+  private RangerPluginClassLoader rangerPluginClassLoader = null;
 
-//public class RangerKafkaAuthorizer extends Authorizer {
-public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger LOG  = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
-
-	private static final String   RANGER_PLUGIN_TYPE                      = "kafka";
-	private static final String   RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME  = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
-
-	private Authorizer              rangerKakfaAuthorizerImpl = null;
-	private RangerPluginClassLoader rangerPluginClassLoader   = null;
-	
-	public RangerKafkaAuthorizer() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-
-		this.init();
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-	}
-	
-	private void init(){
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.init()");
-		}
-
-		try {
-			
-			rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
-			
-			@SuppressWarnings("unchecked")
-			Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
-
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl = cls.newInstance();
-		} catch (Exception e) {
-			// check what need to be done
-			LOG.error("Error Enabling RangerKafkaPlugin", e);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.init()");
-		}
-	}
-
-	@Override
-	public void configure(Map<String, ?> configs) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.configure(configs);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-	}
-
-	@Override
-	public void close() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.close()");
-		}
-
-		try {
-			activatePluginClassLoader();
-			
-			rangerKakfaAuthorizerImpl.close();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.close()");
-		}
-		
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,Resource resource) {	
-		if(LOG.isDebugEnabled()) {
-			LOG.debug(String.format("==> RangerKafkaAuthorizer.authorize(Session=%s, Operation=%s, Resource=%s)", session, operation, resource));
-		}
-
-		boolean ret = false;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.authorize(session, operation, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.authorize: " + ret);
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.addAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-	}
-
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public boolean removeAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-		
-		Set<Acl> ret = null;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(principal);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
-		}
-
-		return ret;
-	}
-	
-	private void activatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.activate();
-		}
-	}
-
-	private void deactivatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.deactivate();
-		}
-	}
-		
+  public RangerKafkaAuthorizer() {
+    logger.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+
+    this.init();
+
+    logger.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+  }
+
+  private void init() {
+    logger.debug("==> RangerKafkaAuthorizer.init()");
+
+    try {
+
+      rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
+
+      @SuppressWarnings("unchecked")
+      Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
+
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl = cls.newInstance();
+    } catch (Exception e) {
+      // check what need to be done

Review comment:
       throwing illegalstate?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@ranger.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ranger] urbandan commented on a change in pull request #133: RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504

Posted by GitBox <gi...@apache.org>.
urbandan commented on a change in pull request #133:
URL: https://github.com/apache/ranger/pull/133#discussion_r816093117



##########
File path: plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -43,286 +60,256 @@
 import org.slf4j.LoggerFactory;
 
 public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger logger = LoggerFactory
-			.getLogger(RangerKafkaAuthorizer.class);
-	private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
-	public static final String KEY_TOPIC = "topic";
-	public static final String KEY_CLUSTER = "cluster";
-	public static final String KEY_CONSUMER_GROUP = "consumergroup";
-	public static final String KEY_TRANSACTIONALID = "transactionalid";
-	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-
-	public static final String ACCESS_TYPE_READ = "consume";
-	public static final String ACCESS_TYPE_WRITE = "publish";
-	public static final String ACCESS_TYPE_CREATE = "create";
-	public static final String ACCESS_TYPE_DELETE = "delete";
-	public static final String ACCESS_TYPE_CONFIGURE = "configure";
-	public static final String ACCESS_TYPE_DESCRIBE = "describe";
-	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
-	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
-	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
-	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";
-
-	private static volatile RangerBasePlugin rangerPlugin = null;
-	RangerKafkaAuditHandler auditHandler = null;
-
-	public RangerKafkaAuthorizer() {
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
-	 */
-	@Override
-	public void configure(Map<String, ?> configs) {
-		RangerBasePlugin me = rangerPlugin;
-		if (me == null) {
-			synchronized(RangerKafkaAuthorizer.class) {
-				me = rangerPlugin;
-				if (me == null) {
-					try {
-						// Possible to override JAAS configuration which is used by Ranger, otherwise
-						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
-						// if it's not defined, then it reverts to 'KafkaServer' configuration.
-						final Object jaasContext = configs.get("ranger.jaas.context");
-						final String listenerName = (jaasContext instanceof String
-								&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
-										: SecurityProtocol.SASL_PLAINTEXT.name();
-						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
-						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
-						MiscUtil.setUGIFromJAASConfig(context.name());
-						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
-					} catch (Throwable t) {
-						logger.error("Error getting principal.", t);
-					}
-					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-				}
-			}
-		}
-		logger.info("Calling plugin.init()");
-		rangerPlugin.init();
-		auditHandler = new RangerKafkaAuditHandler();
-		rangerPlugin.setResultProcessor(auditHandler);
-	}
-
-	@Override
-	public void close() {
-		logger.info("close() called on authorizer.");
-		try {
-			if (rangerPlugin != null) {
-				rangerPlugin.cleanup();
-			}
-		} catch (Throwable t) {
-			logger.error("Error closing RangerPlugin.", t);
-		}
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,
-			Resource resource) {
-
-		if (rangerPlugin == null) {
-			MiscUtil.logErrorMessageByInterval(logger,
-					"Authorizer is still not initialized");
-			return false;
-		}
-
-		RangerPerfTracer perf = null;
-
-		if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
-			perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
-		}
-		String userName = null;
-		if (session.principal() != null) {
-			userName = session.principal().getName();
-		}
-		java.util.Set<String> userGroups = MiscUtil
-				.getGroupsForRequestUser(userName);
-		String ip = session.clientAddress().getHostAddress();
-
-		// skip leading slash
-		if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
-			ip = ip.substring(1);
-		}
-
-		Date eventTime = new Date();
-		String accessType = mapToRangerAccessType(operation);
-		boolean validationFailed = false;
-		String validationStr = "";
-
-		if (accessType == null) {
-			if (MiscUtil.logErrorMessageByInterval(logger,
-					"Unsupported access type. operation=" + operation)) {
-				logger.error("Unsupported access type. session=" + session
-						+ ", operation=" + operation + ", resource=" + resource);
-			}
-			validationFailed = true;
-			validationStr += "Unsupported access type. operation=" + operation;
-		}
-		String action = accessType;
-
-		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
-		rangerRequest.setUser(userName);
-		rangerRequest.setUserGroups(userGroups);
-		rangerRequest.setClientIPAddress(ip);
-		rangerRequest.setAccessTime(eventTime);
-
-		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
-		rangerRequest.setResource(rangerResource);
-		rangerRequest.setAccessType(accessType);
-		rangerRequest.setAction(action);
-		rangerRequest.setRequestData(resource.name());
-
-		if (resource.resourceType().equals(Topic$.MODULE$)) {
-			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(Cluster$.MODULE$)) {
-			rangerResource.setValue(KEY_CLUSTER, resource.name());
-		} else if (resource.resourceType().equals(Group$.MODULE$)) {
-			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
-		} else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
-			rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
-		} else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
-			rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
-		} else {
-			logger.error("Unsupported resourceType=" + resource.resourceType());
-			validationFailed = true;
-		}
-
-		boolean returnValue = false;
-		if (validationFailed) {
-			MiscUtil.logErrorMessageByInterval(logger, validationStr
-					+ ", request=" + rangerRequest);
-		} else {
-
-			try {
-				RangerAccessResult result = rangerPlugin
-						.isAccessAllowed(rangerRequest);
-				if (result == null) {
-					logger.error("Ranger Plugin returned null. Returning false");
-				} else {
-					returnValue = result.getIsAllowed();
-				}
-			} catch (Throwable t) {
-				logger.error("Error while calling isAccessAllowed(). request="
-						+ rangerRequest, t);
-			} finally {
-				auditHandler.flushAudit();
-			}
-		}
-		RangerPerfTracer.log(perf);
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("rangerRequest=" + rangerRequest + ", return="
-					+ returnValue);
-		}
-		return returnValue;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Resource resource) {
-		logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
-
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-	 * )
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
-			KafkaPrincipal principal) {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls()
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/**
-	 * @param operation
-	 * @return
-	 */
-	private String mapToRangerAccessType(Operation operation) {
-		if (operation.equals(Read$.MODULE$)) {
-			return ACCESS_TYPE_READ;
-		} else if (operation.equals(Write$.MODULE$)) {
-			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Alter$.MODULE$)) {
-			return ACCESS_TYPE_CONFIGURE;
-		} else if (operation.equals(Describe$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE;
-		} else if (operation.equals(ClusterAction$.MODULE$)) {
-			return ACCESS_TYPE_CLUSTER_ACTION;
-		} else if (operation.equals(Create$.MODULE$)) {
-			return ACCESS_TYPE_CREATE;
-		} else if (operation.equals(Delete$.MODULE$)) {
-			return ACCESS_TYPE_DELETE;
-		} else if (operation.equals(DescribeConfigs$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE_CONFIGS;
-		} else if (operation.equals(AlterConfigs$.MODULE$)) {
-			return ACCESS_TYPE_ALTER_CONFIGS;
-		} else if (operation.equals(IdempotentWrite$.MODULE$)) {
-			return ACCESS_TYPE_IDEMPOTENT_WRITE;
-		}
-		return null;
-	}
+  public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
+  public static final String KEY_TOPIC = "topic";
+  public static final String KEY_CLUSTER = "cluster";
+  public static final String KEY_CONSUMER_GROUP = "consumergroup";
+  public static final String KEY_TRANSACTIONALID = "transactionalid";
+  public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
+  public static final String ACCESS_TYPE_READ = "consume";
+  public static final String ACCESS_TYPE_WRITE = "publish";
+  public static final String ACCESS_TYPE_CREATE = "create";
+  public static final String ACCESS_TYPE_DELETE = "delete";
+  public static final String ACCESS_TYPE_CONFIGURE = "configure";
+  public static final String ACCESS_TYPE_DESCRIBE = "describe";
+  public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
+  public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
+  public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+
+  private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");

Review comment:
       nit: loggers should be on top

##########
File path: plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -43,286 +60,256 @@
 import org.slf4j.LoggerFactory;
 
 public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger logger = LoggerFactory
-			.getLogger(RangerKafkaAuthorizer.class);
-	private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
-	public static final String KEY_TOPIC = "topic";
-	public static final String KEY_CLUSTER = "cluster";
-	public static final String KEY_CONSUMER_GROUP = "consumergroup";
-	public static final String KEY_TRANSACTIONALID = "transactionalid";
-	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-
-	public static final String ACCESS_TYPE_READ = "consume";
-	public static final String ACCESS_TYPE_WRITE = "publish";
-	public static final String ACCESS_TYPE_CREATE = "create";
-	public static final String ACCESS_TYPE_DELETE = "delete";
-	public static final String ACCESS_TYPE_CONFIGURE = "configure";
-	public static final String ACCESS_TYPE_DESCRIBE = "describe";
-	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
-	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
-	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
-	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";
-
-	private static volatile RangerBasePlugin rangerPlugin = null;
-	RangerKafkaAuditHandler auditHandler = null;
-
-	public RangerKafkaAuthorizer() {
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
-	 */
-	@Override
-	public void configure(Map<String, ?> configs) {
-		RangerBasePlugin me = rangerPlugin;
-		if (me == null) {
-			synchronized(RangerKafkaAuthorizer.class) {
-				me = rangerPlugin;
-				if (me == null) {
-					try {
-						// Possible to override JAAS configuration which is used by Ranger, otherwise
-						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
-						// if it's not defined, then it reverts to 'KafkaServer' configuration.
-						final Object jaasContext = configs.get("ranger.jaas.context");
-						final String listenerName = (jaasContext instanceof String
-								&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
-										: SecurityProtocol.SASL_PLAINTEXT.name();
-						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
-						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
-						MiscUtil.setUGIFromJAASConfig(context.name());
-						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
-					} catch (Throwable t) {
-						logger.error("Error getting principal.", t);
-					}
-					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-				}
-			}
-		}
-		logger.info("Calling plugin.init()");
-		rangerPlugin.init();
-		auditHandler = new RangerKafkaAuditHandler();
-		rangerPlugin.setResultProcessor(auditHandler);
-	}
-
-	@Override
-	public void close() {
-		logger.info("close() called on authorizer.");
-		try {
-			if (rangerPlugin != null) {
-				rangerPlugin.cleanup();
-			}
-		} catch (Throwable t) {
-			logger.error("Error closing RangerPlugin.", t);
-		}
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,
-			Resource resource) {
-
-		if (rangerPlugin == null) {
-			MiscUtil.logErrorMessageByInterval(logger,
-					"Authorizer is still not initialized");
-			return false;
-		}
-
-		RangerPerfTracer perf = null;
-
-		if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
-			perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
-		}
-		String userName = null;
-		if (session.principal() != null) {
-			userName = session.principal().getName();
-		}
-		java.util.Set<String> userGroups = MiscUtil
-				.getGroupsForRequestUser(userName);
-		String ip = session.clientAddress().getHostAddress();
-
-		// skip leading slash
-		if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
-			ip = ip.substring(1);
-		}
-
-		Date eventTime = new Date();
-		String accessType = mapToRangerAccessType(operation);
-		boolean validationFailed = false;
-		String validationStr = "";
-
-		if (accessType == null) {
-			if (MiscUtil.logErrorMessageByInterval(logger,
-					"Unsupported access type. operation=" + operation)) {
-				logger.error("Unsupported access type. session=" + session
-						+ ", operation=" + operation + ", resource=" + resource);
-			}
-			validationFailed = true;
-			validationStr += "Unsupported access type. operation=" + operation;
-		}
-		String action = accessType;
-
-		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
-		rangerRequest.setUser(userName);
-		rangerRequest.setUserGroups(userGroups);
-		rangerRequest.setClientIPAddress(ip);
-		rangerRequest.setAccessTime(eventTime);
-
-		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
-		rangerRequest.setResource(rangerResource);
-		rangerRequest.setAccessType(accessType);
-		rangerRequest.setAction(action);
-		rangerRequest.setRequestData(resource.name());
-
-		if (resource.resourceType().equals(Topic$.MODULE$)) {
-			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(Cluster$.MODULE$)) {
-			rangerResource.setValue(KEY_CLUSTER, resource.name());
-		} else if (resource.resourceType().equals(Group$.MODULE$)) {
-			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
-		} else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
-			rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
-		} else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
-			rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
-		} else {
-			logger.error("Unsupported resourceType=" + resource.resourceType());
-			validationFailed = true;
-		}
-
-		boolean returnValue = false;
-		if (validationFailed) {
-			MiscUtil.logErrorMessageByInterval(logger, validationStr
-					+ ", request=" + rangerRequest);
-		} else {
-
-			try {
-				RangerAccessResult result = rangerPlugin
-						.isAccessAllowed(rangerRequest);
-				if (result == null) {
-					logger.error("Ranger Plugin returned null. Returning false");
-				} else {
-					returnValue = result.getIsAllowed();
-				}
-			} catch (Throwable t) {
-				logger.error("Error while calling isAccessAllowed(). request="
-						+ rangerRequest, t);
-			} finally {
-				auditHandler.flushAudit();
-			}
-		}
-		RangerPerfTracer.log(perf);
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("rangerRequest=" + rangerRequest + ", return="
-					+ returnValue);
-		}
-		return returnValue;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Resource resource) {
-		logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
-
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-	 * )
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
-			KafkaPrincipal principal) {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls()
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/**
-	 * @param operation
-	 * @return
-	 */
-	private String mapToRangerAccessType(Operation operation) {
-		if (operation.equals(Read$.MODULE$)) {
-			return ACCESS_TYPE_READ;
-		} else if (operation.equals(Write$.MODULE$)) {
-			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Alter$.MODULE$)) {
-			return ACCESS_TYPE_CONFIGURE;
-		} else if (operation.equals(Describe$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE;
-		} else if (operation.equals(ClusterAction$.MODULE$)) {
-			return ACCESS_TYPE_CLUSTER_ACTION;
-		} else if (operation.equals(Create$.MODULE$)) {
-			return ACCESS_TYPE_CREATE;
-		} else if (operation.equals(Delete$.MODULE$)) {
-			return ACCESS_TYPE_DELETE;
-		} else if (operation.equals(DescribeConfigs$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE_CONFIGS;
-		} else if (operation.equals(AlterConfigs$.MODULE$)) {
-			return ACCESS_TYPE_ALTER_CONFIGS;
-		} else if (operation.equals(IdempotentWrite$.MODULE$)) {
-			return ACCESS_TYPE_IDEMPOTENT_WRITE;
-		}
-		return null;
-	}
+  public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
+  public static final String KEY_TOPIC = "topic";
+  public static final String KEY_CLUSTER = "cluster";
+  public static final String KEY_CONSUMER_GROUP = "consumergroup";
+  public static final String KEY_TRANSACTIONALID = "transactionalid";
+  public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
+  public static final String ACCESS_TYPE_READ = "consume";
+  public static final String ACCESS_TYPE_WRITE = "publish";
+  public static final String ACCESS_TYPE_CREATE = "create";
+  public static final String ACCESS_TYPE_DELETE = "delete";
+  public static final String ACCESS_TYPE_CONFIGURE = "configure";
+  public static final String ACCESS_TYPE_DESCRIBE = "describe";
+  public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
+  public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
+  public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+
+  private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static volatile RangerBasePlugin rangerPlugin = null;
+  RangerKafkaAuditHandler auditHandler = null;
+
+  public RangerKafkaAuthorizer() {
+  }
+
+  private static String mapToRangerAccessType(AclOperation operation) {
+    switch (operation) {
+      case READ:
+        return ACCESS_TYPE_READ;
+      case WRITE:
+        return ACCESS_TYPE_WRITE;
+      case ALTER:
+        return ACCESS_TYPE_CONFIGURE;
+      case DESCRIBE:
+        return ACCESS_TYPE_DESCRIBE;
+      case CLUSTER_ACTION:
+        return ACCESS_TYPE_CLUSTER_ACTION;
+      case CREATE:
+        return ACCESS_TYPE_CREATE;
+      case DELETE:
+        return ACCESS_TYPE_DELETE;
+      case DESCRIBE_CONFIGS:
+        return ACCESS_TYPE_DESCRIBE_CONFIGS;
+      case ALTER_CONFIGS:
+        return ACCESS_TYPE_ALTER_CONFIGS;
+      case IDEMPOTENT_WRITE:
+        return ACCESS_TYPE_IDEMPOTENT_WRITE;
+      case UNKNOWN:
+      case ANY:
+      case ALL:
+      default:
+        return null;
+    }
+  }
+
+  private static String mapToResourceType(ResourceType resourceType) {
+    switch (resourceType) {
+      case TOPIC:
+        return KEY_TOPIC;
+      case CLUSTER:
+        return KEY_CLUSTER;
+      case GROUP:
+        return KEY_CONSUMER_GROUP;
+      case TRANSACTIONAL_ID:
+        return KEY_TRANSACTIONALID;
+      case DELEGATION_TOKEN:
+        return KEY_DELEGATIONTOKEN;
+      case ANY:
+      case UNKNOWN:
+      default:
+        return null;
+    }
+  }
+
+  private static RangerAccessResourceImpl getRangerAccessResource(String resourceTypeKey, String resourceName) {

Review comment:
       nit: "get" in these method names are kind of misleading, as they create new objects - createRangerAccessResource? rangerAccessResourceFor?

##########
File path: ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -19,272 +19,200 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.CompletionStage;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
 import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.collection.immutable.Set;
-import kafka.network.RequestChannel.Session;
-import kafka.security.auth.Acl;
-import kafka.security.auth.Authorizer;
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
+public class RangerKafkaAuthorizer implements Authorizer {
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static final String RANGER_PLUGIN_TYPE = "kafka";
+  private static final String RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
 
+  private Authorizer rangerKafkaAuthorizerImpl = null;
+  private RangerPluginClassLoader rangerPluginClassLoader = null;
 
-//public class RangerKafkaAuthorizer extends Authorizer {
-public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger LOG  = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
-
-	private static final String   RANGER_PLUGIN_TYPE                      = "kafka";
-	private static final String   RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME  = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
-
-	private Authorizer              rangerKakfaAuthorizerImpl = null;
-	private RangerPluginClassLoader rangerPluginClassLoader   = null;
-	
-	public RangerKafkaAuthorizer() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-
-		this.init();
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-	}
-	
-	private void init(){
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.init()");
-		}
-
-		try {
-			
-			rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
-			
-			@SuppressWarnings("unchecked")
-			Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
-
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl = cls.newInstance();
-		} catch (Exception e) {
-			// check what need to be done
-			LOG.error("Error Enabling RangerKafkaPlugin", e);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.init()");
-		}
-	}
-
-	@Override
-	public void configure(Map<String, ?> configs) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.configure(configs);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-	}
-
-	@Override
-	public void close() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.close()");
-		}
-
-		try {
-			activatePluginClassLoader();
-			
-			rangerKakfaAuthorizerImpl.close();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.close()");
-		}
-		
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,Resource resource) {	
-		if(LOG.isDebugEnabled()) {
-			LOG.debug(String.format("==> RangerKafkaAuthorizer.authorize(Session=%s, Operation=%s, Resource=%s)", session, operation, resource));
-		}
-
-		boolean ret = false;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.authorize(session, operation, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.authorize: " + ret);
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.addAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-	}
-
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public boolean removeAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-		
-		Set<Acl> ret = null;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(principal);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
-		}
-
-		return ret;
-	}
-	
-	private void activatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.activate();
-		}
-	}
-
-	private void deactivatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.deactivate();
-		}
-	}
-		
+  public RangerKafkaAuthorizer() {
+    logger.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+
+    this.init();
+
+    logger.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+  }
+
+  private void init() {
+    logger.debug("==> RangerKafkaAuthorizer.init()");
+
+    try {
+
+      rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
+
+      @SuppressWarnings("unchecked")
+      Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
+
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl = cls.newInstance();
+    } catch (Exception e) {
+      // check what need to be done
+      logger.error("Error Enabling RangerKafkaPlugin", e);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.init()");
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    logger.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
+
+    try {
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl.configure(configs);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
+  }
+
+  @Override
+  public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo authorizerServerInfo) {
+    logger.debug("==> RangerKafkaAuthorizer.start(AuthorizerServerInfo)");
+
+    Map<Endpoint, ? extends CompletionStage<Void>> ret;
+
+    try {
+      activatePluginClassLoader();
+
+      ret = rangerKafkaAuthorizerImpl.start(authorizerServerInfo);
+
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.start(AuthorizerServerInfo)");
+
+    return ret;
+  }
+
+  @Override
+  public void close() throws IOException {
+    logger.debug("==> RangerKafkaAuthorizer.close()");
+
+    try {
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl.close();
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.close()");
+  }
+
+  @Override
+  public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
+    logger.debug("==> RangerKafkaAuthorizer.authorize(AuthorizableRequestContext={}, List<Action>={})", requestContext, actions);
+
+    List<AuthorizationResult> ret;
+
+    try {
+      activatePluginClassLoader();
+
+      ret = rangerKafkaAuthorizerImpl.authorize(requestContext, actions);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.authorize: {}", ret);
+
+    return ret;
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
+    logger.debug("==> RangerKafkaAuthorizer.createAcls(AuthorizableRequestContext, List<AclBinding>)");
+
+    List<? extends CompletionStage<AclCreateResult>> ret;
+
+    try {
+      activatePluginClassLoader();
+
+      ret = rangerKafkaAuthorizerImpl.createAcls(requestContext, aclBindings);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.createAcls(AuthorizableRequestContext, List<AclBinding>)");
+
+    return ret;

Review comment:
       nit: ret is not logged, this could be reworked to return from the try, and not use a variable

##########
File path: plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -43,286 +60,256 @@
 import org.slf4j.LoggerFactory;
 
 public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger logger = LoggerFactory
-			.getLogger(RangerKafkaAuthorizer.class);
-	private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
-	public static final String KEY_TOPIC = "topic";
-	public static final String KEY_CLUSTER = "cluster";
-	public static final String KEY_CONSUMER_GROUP = "consumergroup";
-	public static final String KEY_TRANSACTIONALID = "transactionalid";
-	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-
-	public static final String ACCESS_TYPE_READ = "consume";
-	public static final String ACCESS_TYPE_WRITE = "publish";
-	public static final String ACCESS_TYPE_CREATE = "create";
-	public static final String ACCESS_TYPE_DELETE = "delete";
-	public static final String ACCESS_TYPE_CONFIGURE = "configure";
-	public static final String ACCESS_TYPE_DESCRIBE = "describe";
-	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
-	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
-	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
-	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";
-
-	private static volatile RangerBasePlugin rangerPlugin = null;
-	RangerKafkaAuditHandler auditHandler = null;
-
-	public RangerKafkaAuthorizer() {
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
-	 */
-	@Override
-	public void configure(Map<String, ?> configs) {
-		RangerBasePlugin me = rangerPlugin;
-		if (me == null) {
-			synchronized(RangerKafkaAuthorizer.class) {
-				me = rangerPlugin;
-				if (me == null) {
-					try {
-						// Possible to override JAAS configuration which is used by Ranger, otherwise
-						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
-						// if it's not defined, then it reverts to 'KafkaServer' configuration.
-						final Object jaasContext = configs.get("ranger.jaas.context");
-						final String listenerName = (jaasContext instanceof String
-								&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
-										: SecurityProtocol.SASL_PLAINTEXT.name();
-						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
-						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
-						MiscUtil.setUGIFromJAASConfig(context.name());
-						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
-					} catch (Throwable t) {
-						logger.error("Error getting principal.", t);
-					}
-					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-				}
-			}
-		}
-		logger.info("Calling plugin.init()");
-		rangerPlugin.init();
-		auditHandler = new RangerKafkaAuditHandler();
-		rangerPlugin.setResultProcessor(auditHandler);
-	}
-
-	@Override
-	public void close() {
-		logger.info("close() called on authorizer.");
-		try {
-			if (rangerPlugin != null) {
-				rangerPlugin.cleanup();
-			}
-		} catch (Throwable t) {
-			logger.error("Error closing RangerPlugin.", t);
-		}
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,
-			Resource resource) {
-
-		if (rangerPlugin == null) {
-			MiscUtil.logErrorMessageByInterval(logger,
-					"Authorizer is still not initialized");
-			return false;
-		}
-
-		RangerPerfTracer perf = null;
-
-		if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
-			perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
-		}
-		String userName = null;
-		if (session.principal() != null) {
-			userName = session.principal().getName();
-		}
-		java.util.Set<String> userGroups = MiscUtil
-				.getGroupsForRequestUser(userName);
-		String ip = session.clientAddress().getHostAddress();
-
-		// skip leading slash
-		if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
-			ip = ip.substring(1);
-		}
-
-		Date eventTime = new Date();
-		String accessType = mapToRangerAccessType(operation);
-		boolean validationFailed = false;
-		String validationStr = "";
-
-		if (accessType == null) {
-			if (MiscUtil.logErrorMessageByInterval(logger,
-					"Unsupported access type. operation=" + operation)) {
-				logger.error("Unsupported access type. session=" + session
-						+ ", operation=" + operation + ", resource=" + resource);
-			}
-			validationFailed = true;
-			validationStr += "Unsupported access type. operation=" + operation;
-		}
-		String action = accessType;
-
-		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
-		rangerRequest.setUser(userName);
-		rangerRequest.setUserGroups(userGroups);
-		rangerRequest.setClientIPAddress(ip);
-		rangerRequest.setAccessTime(eventTime);
-
-		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
-		rangerRequest.setResource(rangerResource);
-		rangerRequest.setAccessType(accessType);
-		rangerRequest.setAction(action);
-		rangerRequest.setRequestData(resource.name());
-
-		if (resource.resourceType().equals(Topic$.MODULE$)) {
-			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(Cluster$.MODULE$)) {
-			rangerResource.setValue(KEY_CLUSTER, resource.name());
-		} else if (resource.resourceType().equals(Group$.MODULE$)) {
-			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
-		} else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
-			rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
-		} else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
-			rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
-		} else {
-			logger.error("Unsupported resourceType=" + resource.resourceType());
-			validationFailed = true;
-		}
-
-		boolean returnValue = false;
-		if (validationFailed) {
-			MiscUtil.logErrorMessageByInterval(logger, validationStr
-					+ ", request=" + rangerRequest);
-		} else {
-
-			try {
-				RangerAccessResult result = rangerPlugin
-						.isAccessAllowed(rangerRequest);
-				if (result == null) {
-					logger.error("Ranger Plugin returned null. Returning false");
-				} else {
-					returnValue = result.getIsAllowed();
-				}
-			} catch (Throwable t) {
-				logger.error("Error while calling isAccessAllowed(). request="
-						+ rangerRequest, t);
-			} finally {
-				auditHandler.flushAudit();
-			}
-		}
-		RangerPerfTracer.log(perf);
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("rangerRequest=" + rangerRequest + ", return="
-					+ returnValue);
-		}
-		return returnValue;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Resource resource) {
-		logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
-
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-	 * )
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
-			KafkaPrincipal principal) {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls()
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/**
-	 * @param operation
-	 * @return
-	 */
-	private String mapToRangerAccessType(Operation operation) {
-		if (operation.equals(Read$.MODULE$)) {
-			return ACCESS_TYPE_READ;
-		} else if (operation.equals(Write$.MODULE$)) {
-			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Alter$.MODULE$)) {
-			return ACCESS_TYPE_CONFIGURE;
-		} else if (operation.equals(Describe$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE;
-		} else if (operation.equals(ClusterAction$.MODULE$)) {
-			return ACCESS_TYPE_CLUSTER_ACTION;
-		} else if (operation.equals(Create$.MODULE$)) {
-			return ACCESS_TYPE_CREATE;
-		} else if (operation.equals(Delete$.MODULE$)) {
-			return ACCESS_TYPE_DELETE;
-		} else if (operation.equals(DescribeConfigs$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE_CONFIGS;
-		} else if (operation.equals(AlterConfigs$.MODULE$)) {
-			return ACCESS_TYPE_ALTER_CONFIGS;
-		} else if (operation.equals(IdempotentWrite$.MODULE$)) {
-			return ACCESS_TYPE_IDEMPOTENT_WRITE;
-		}
-		return null;
-	}
+  public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
+  public static final String KEY_TOPIC = "topic";
+  public static final String KEY_CLUSTER = "cluster";
+  public static final String KEY_CONSUMER_GROUP = "consumergroup";
+  public static final String KEY_TRANSACTIONALID = "transactionalid";
+  public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
+  public static final String ACCESS_TYPE_READ = "consume";
+  public static final String ACCESS_TYPE_WRITE = "publish";
+  public static final String ACCESS_TYPE_CREATE = "create";
+  public static final String ACCESS_TYPE_DELETE = "delete";
+  public static final String ACCESS_TYPE_CONFIGURE = "configure";
+  public static final String ACCESS_TYPE_DESCRIBE = "describe";
+  public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
+  public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
+  public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+
+  private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static volatile RangerBasePlugin rangerPlugin = null;
+  RangerKafkaAuditHandler auditHandler = null;
+
+  public RangerKafkaAuthorizer() {
+  }
+
+  private static String mapToRangerAccessType(AclOperation operation) {
+    switch (operation) {
+      case READ:
+        return ACCESS_TYPE_READ;
+      case WRITE:
+        return ACCESS_TYPE_WRITE;
+      case ALTER:
+        return ACCESS_TYPE_CONFIGURE;
+      case DESCRIBE:
+        return ACCESS_TYPE_DESCRIBE;
+      case CLUSTER_ACTION:
+        return ACCESS_TYPE_CLUSTER_ACTION;
+      case CREATE:
+        return ACCESS_TYPE_CREATE;
+      case DELETE:
+        return ACCESS_TYPE_DELETE;
+      case DESCRIBE_CONFIGS:
+        return ACCESS_TYPE_DESCRIBE_CONFIGS;
+      case ALTER_CONFIGS:
+        return ACCESS_TYPE_ALTER_CONFIGS;
+      case IDEMPOTENT_WRITE:
+        return ACCESS_TYPE_IDEMPOTENT_WRITE;
+      case UNKNOWN:
+      case ANY:
+      case ALL:
+      default:
+        return null;
+    }
+  }
+
+  private static String mapToResourceType(ResourceType resourceType) {
+    switch (resourceType) {
+      case TOPIC:
+        return KEY_TOPIC;
+      case CLUSTER:
+        return KEY_CLUSTER;
+      case GROUP:
+        return KEY_CONSUMER_GROUP;
+      case TRANSACTIONAL_ID:
+        return KEY_TRANSACTIONALID;
+      case DELEGATION_TOKEN:
+        return KEY_DELEGATIONTOKEN;
+      case ANY:
+      case UNKNOWN:
+      default:
+        return null;
+    }
+  }
+
+  private static RangerAccessResourceImpl getRangerAccessResource(String resourceTypeKey, String resourceName) {
+    RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+    rangerResource.setValue(resourceTypeKey, resourceName);
+    return rangerResource;
+  }
+
+  private static RangerAccessRequestImpl getRangerAccessRequest(String userName,
+                                                                Set<String> userGroups,
+                                                                String ip,
+                                                                Date eventTime,
+                                                                String resourceTypeKey,
+                                                                String resourceName,
+                                                                String accessType) {
+    RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+    rangerRequest.setResource(getRangerAccessResource(resourceTypeKey, resourceName));
+    rangerRequest.setUser(userName);
+    rangerRequest.setUserGroups(userGroups);
+    rangerRequest.setClientIPAddress(ip);
+    rangerRequest.setAccessTime(eventTime);
+    rangerRequest.setAccessType(accessType);
+    rangerRequest.setAction(accessType);
+    rangerRequest.setRequestData(resourceName);
+    return rangerRequest;
+  }
+
+  private static List<AuthorizationResult> denyAll(List<Action> actions) {
+    return actions.stream().map(a -> AuthorizationResult.DENIED).collect(Collectors.toList());
+  }
+
+  private static List<AuthorizationResult> mapResults(List<Action> actions, Collection<RangerAccessResult> results) {
+    if (CollectionUtils.isEmpty(results)) {
+      logger.error("Ranger Plugin returned null or empty. Returning Denied for all");
+      return denyAll(actions);
+    }
+    return results.stream()
+        .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void close() {
+    logger.info("close() called on authorizer.");
+    try {
+      if (rangerPlugin != null) {
+        rangerPlugin.cleanup();
+      }
+    } catch (Throwable t) {
+      logger.error("Error closing RangerPlugin.", t);
+    }
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    RangerBasePlugin me = rangerPlugin;
+    if (me == null) {
+      synchronized (RangerKafkaAuthorizer.class) {
+        me = rangerPlugin;
+        if (me == null) {
+          try {
+            // Possible to override JAAS configuration which is used by Ranger, otherwise
+            // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
+            // if it's not defined, then it reverts to 'KafkaServer' configuration.
+            final Object jaasContext = configs.get("ranger.jaas.context");
+            final String listenerName = (jaasContext instanceof String
+                && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
+                : SecurityProtocol.SASL_PLAINTEXT.name();
+            final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
+            JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
+            MiscUtil.setUGIFromJAASConfig(context.name());
+            UserGroupInformation loginUser = MiscUtil.getUGILoginUser();
+            logger.info("LoginUser={}", loginUser);
+          } catch (Throwable t) {
+            logger.error("Error getting principal.", t);
+          }
+          rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+        }
+      }
+    }
+    logger.info("Calling plugin.init()");
+    rangerPlugin.init();

Review comment:
       I understand that this is not new code, and not too familiar with the contract of init, but still - does it make sense to init the plugin on each configure call?
   If there is a single instance, it would make more sense to only init after the plugin was created

##########
File path: ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -19,272 +19,200 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.CompletionStage;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
 import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.collection.immutable.Set;
-import kafka.network.RequestChannel.Session;
-import kafka.security.auth.Acl;
-import kafka.security.auth.Authorizer;
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
+public class RangerKafkaAuthorizer implements Authorizer {
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static final String RANGER_PLUGIN_TYPE = "kafka";
+  private static final String RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
 
+  private Authorizer rangerKafkaAuthorizerImpl = null;
+  private RangerPluginClassLoader rangerPluginClassLoader = null;
 
-//public class RangerKafkaAuthorizer extends Authorizer {
-public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger LOG  = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
-
-	private static final String   RANGER_PLUGIN_TYPE                      = "kafka";
-	private static final String   RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME  = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
-
-	private Authorizer              rangerKakfaAuthorizerImpl = null;
-	private RangerPluginClassLoader rangerPluginClassLoader   = null;
-	
-	public RangerKafkaAuthorizer() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-
-		this.init();
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-	}
-	
-	private void init(){
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.init()");
-		}
-
-		try {
-			
-			rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
-			
-			@SuppressWarnings("unchecked")
-			Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
-
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl = cls.newInstance();
-		} catch (Exception e) {
-			// check what need to be done
-			LOG.error("Error Enabling RangerKafkaPlugin", e);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.init()");
-		}
-	}
-
-	@Override
-	public void configure(Map<String, ?> configs) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.configure(configs);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-	}
-
-	@Override
-	public void close() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.close()");
-		}
-
-		try {
-			activatePluginClassLoader();
-			
-			rangerKakfaAuthorizerImpl.close();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.close()");
-		}
-		
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,Resource resource) {	
-		if(LOG.isDebugEnabled()) {
-			LOG.debug(String.format("==> RangerKafkaAuthorizer.authorize(Session=%s, Operation=%s, Resource=%s)", session, operation, resource));
-		}
-
-		boolean ret = false;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.authorize(session, operation, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.authorize: " + ret);
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.addAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-	}
-
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public boolean removeAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-		
-		Set<Acl> ret = null;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(principal);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
-		}
-
-		return ret;
-	}
-	
-	private void activatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.activate();
-		}
-	}
-
-	private void deactivatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.deactivate();
-		}
-	}
-		
+  public RangerKafkaAuthorizer() {
+    logger.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+
+    this.init();
+
+    logger.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+  }
+
+  private void init() {
+    logger.debug("==> RangerKafkaAuthorizer.init()");
+
+    try {
+
+      rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
+
+      @SuppressWarnings("unchecked")
+      Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
+
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl = cls.newInstance();
+    } catch (Exception e) {
+      // check what need to be done

Review comment:
       An exception should be throw here, otherwise we fail silently and let the broker proceed - this will probably cause an NPE at some other point

##########
File path: plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -43,286 +60,256 @@
 import org.slf4j.LoggerFactory;
 
 public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger logger = LoggerFactory
-			.getLogger(RangerKafkaAuthorizer.class);
-	private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
-	public static final String KEY_TOPIC = "topic";
-	public static final String KEY_CLUSTER = "cluster";
-	public static final String KEY_CONSUMER_GROUP = "consumergroup";
-	public static final String KEY_TRANSACTIONALID = "transactionalid";
-	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-
-	public static final String ACCESS_TYPE_READ = "consume";
-	public static final String ACCESS_TYPE_WRITE = "publish";
-	public static final String ACCESS_TYPE_CREATE = "create";
-	public static final String ACCESS_TYPE_DELETE = "delete";
-	public static final String ACCESS_TYPE_CONFIGURE = "configure";
-	public static final String ACCESS_TYPE_DESCRIBE = "describe";
-	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
-	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
-	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
-	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";
-
-	private static volatile RangerBasePlugin rangerPlugin = null;
-	RangerKafkaAuditHandler auditHandler = null;
-
-	public RangerKafkaAuthorizer() {
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
-	 */
-	@Override
-	public void configure(Map<String, ?> configs) {
-		RangerBasePlugin me = rangerPlugin;
-		if (me == null) {
-			synchronized(RangerKafkaAuthorizer.class) {
-				me = rangerPlugin;
-				if (me == null) {
-					try {
-						// Possible to override JAAS configuration which is used by Ranger, otherwise
-						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
-						// if it's not defined, then it reverts to 'KafkaServer' configuration.
-						final Object jaasContext = configs.get("ranger.jaas.context");
-						final String listenerName = (jaasContext instanceof String
-								&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
-										: SecurityProtocol.SASL_PLAINTEXT.name();
-						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
-						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
-						MiscUtil.setUGIFromJAASConfig(context.name());
-						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
-					} catch (Throwable t) {
-						logger.error("Error getting principal.", t);
-					}
-					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-				}
-			}
-		}
-		logger.info("Calling plugin.init()");
-		rangerPlugin.init();
-		auditHandler = new RangerKafkaAuditHandler();
-		rangerPlugin.setResultProcessor(auditHandler);
-	}
-
-	@Override
-	public void close() {
-		logger.info("close() called on authorizer.");
-		try {
-			if (rangerPlugin != null) {
-				rangerPlugin.cleanup();
-			}
-		} catch (Throwable t) {
-			logger.error("Error closing RangerPlugin.", t);
-		}
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,
-			Resource resource) {
-
-		if (rangerPlugin == null) {
-			MiscUtil.logErrorMessageByInterval(logger,
-					"Authorizer is still not initialized");
-			return false;
-		}
-
-		RangerPerfTracer perf = null;
-
-		if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
-			perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
-		}
-		String userName = null;
-		if (session.principal() != null) {
-			userName = session.principal().getName();
-		}
-		java.util.Set<String> userGroups = MiscUtil
-				.getGroupsForRequestUser(userName);
-		String ip = session.clientAddress().getHostAddress();
-
-		// skip leading slash
-		if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
-			ip = ip.substring(1);
-		}
-
-		Date eventTime = new Date();
-		String accessType = mapToRangerAccessType(operation);
-		boolean validationFailed = false;
-		String validationStr = "";
-
-		if (accessType == null) {
-			if (MiscUtil.logErrorMessageByInterval(logger,
-					"Unsupported access type. operation=" + operation)) {
-				logger.error("Unsupported access type. session=" + session
-						+ ", operation=" + operation + ", resource=" + resource);
-			}
-			validationFailed = true;
-			validationStr += "Unsupported access type. operation=" + operation;
-		}
-		String action = accessType;
-
-		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
-		rangerRequest.setUser(userName);
-		rangerRequest.setUserGroups(userGroups);
-		rangerRequest.setClientIPAddress(ip);
-		rangerRequest.setAccessTime(eventTime);
-
-		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
-		rangerRequest.setResource(rangerResource);
-		rangerRequest.setAccessType(accessType);
-		rangerRequest.setAction(action);
-		rangerRequest.setRequestData(resource.name());
-
-		if (resource.resourceType().equals(Topic$.MODULE$)) {
-			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(Cluster$.MODULE$)) {
-			rangerResource.setValue(KEY_CLUSTER, resource.name());
-		} else if (resource.resourceType().equals(Group$.MODULE$)) {
-			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
-		} else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
-			rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
-		} else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
-			rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
-		} else {
-			logger.error("Unsupported resourceType=" + resource.resourceType());
-			validationFailed = true;
-		}
-
-		boolean returnValue = false;
-		if (validationFailed) {
-			MiscUtil.logErrorMessageByInterval(logger, validationStr
-					+ ", request=" + rangerRequest);
-		} else {
-
-			try {
-				RangerAccessResult result = rangerPlugin
-						.isAccessAllowed(rangerRequest);
-				if (result == null) {
-					logger.error("Ranger Plugin returned null. Returning false");
-				} else {
-					returnValue = result.getIsAllowed();
-				}
-			} catch (Throwable t) {
-				logger.error("Error while calling isAccessAllowed(). request="
-						+ rangerRequest, t);
-			} finally {
-				auditHandler.flushAudit();
-			}
-		}
-		RangerPerfTracer.log(perf);
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("rangerRequest=" + rangerRequest + ", return="
-					+ returnValue);
-		}
-		return returnValue;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Resource resource) {
-		logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
-
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-	 * )
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
-			KafkaPrincipal principal) {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls()
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/**
-	 * @param operation
-	 * @return
-	 */
-	private String mapToRangerAccessType(Operation operation) {
-		if (operation.equals(Read$.MODULE$)) {
-			return ACCESS_TYPE_READ;
-		} else if (operation.equals(Write$.MODULE$)) {
-			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Alter$.MODULE$)) {
-			return ACCESS_TYPE_CONFIGURE;
-		} else if (operation.equals(Describe$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE;
-		} else if (operation.equals(ClusterAction$.MODULE$)) {
-			return ACCESS_TYPE_CLUSTER_ACTION;
-		} else if (operation.equals(Create$.MODULE$)) {
-			return ACCESS_TYPE_CREATE;
-		} else if (operation.equals(Delete$.MODULE$)) {
-			return ACCESS_TYPE_DELETE;
-		} else if (operation.equals(DescribeConfigs$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE_CONFIGS;
-		} else if (operation.equals(AlterConfigs$.MODULE$)) {
-			return ACCESS_TYPE_ALTER_CONFIGS;
-		} else if (operation.equals(IdempotentWrite$.MODULE$)) {
-			return ACCESS_TYPE_IDEMPOTENT_WRITE;
-		}
-		return null;
-	}
+  public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
+  public static final String KEY_TOPIC = "topic";
+  public static final String KEY_CLUSTER = "cluster";
+  public static final String KEY_CONSUMER_GROUP = "consumergroup";
+  public static final String KEY_TRANSACTIONALID = "transactionalid";
+  public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
+  public static final String ACCESS_TYPE_READ = "consume";
+  public static final String ACCESS_TYPE_WRITE = "publish";
+  public static final String ACCESS_TYPE_CREATE = "create";
+  public static final String ACCESS_TYPE_DELETE = "delete";
+  public static final String ACCESS_TYPE_CONFIGURE = "configure";
+  public static final String ACCESS_TYPE_DESCRIBE = "describe";
+  public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
+  public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
+  public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+
+  private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static volatile RangerBasePlugin rangerPlugin = null;
+  RangerKafkaAuditHandler auditHandler = null;
+
+  public RangerKafkaAuthorizer() {
+  }
+
+  private static String mapToRangerAccessType(AclOperation operation) {
+    switch (operation) {
+      case READ:
+        return ACCESS_TYPE_READ;
+      case WRITE:
+        return ACCESS_TYPE_WRITE;
+      case ALTER:
+        return ACCESS_TYPE_CONFIGURE;
+      case DESCRIBE:
+        return ACCESS_TYPE_DESCRIBE;
+      case CLUSTER_ACTION:
+        return ACCESS_TYPE_CLUSTER_ACTION;
+      case CREATE:
+        return ACCESS_TYPE_CREATE;
+      case DELETE:
+        return ACCESS_TYPE_DELETE;
+      case DESCRIBE_CONFIGS:
+        return ACCESS_TYPE_DESCRIBE_CONFIGS;
+      case ALTER_CONFIGS:
+        return ACCESS_TYPE_ALTER_CONFIGS;
+      case IDEMPOTENT_WRITE:
+        return ACCESS_TYPE_IDEMPOTENT_WRITE;
+      case UNKNOWN:
+      case ANY:
+      case ALL:
+      default:
+        return null;
+    }
+  }
+
+  private static String mapToResourceType(ResourceType resourceType) {
+    switch (resourceType) {
+      case TOPIC:
+        return KEY_TOPIC;
+      case CLUSTER:
+        return KEY_CLUSTER;
+      case GROUP:
+        return KEY_CONSUMER_GROUP;
+      case TRANSACTIONAL_ID:
+        return KEY_TRANSACTIONALID;
+      case DELEGATION_TOKEN:
+        return KEY_DELEGATIONTOKEN;
+      case ANY:
+      case UNKNOWN:
+      default:
+        return null;
+    }
+  }
+
+  private static RangerAccessResourceImpl getRangerAccessResource(String resourceTypeKey, String resourceName) {
+    RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+    rangerResource.setValue(resourceTypeKey, resourceName);
+    return rangerResource;
+  }
+
+  private static RangerAccessRequestImpl getRangerAccessRequest(String userName,
+                                                                Set<String> userGroups,
+                                                                String ip,
+                                                                Date eventTime,
+                                                                String resourceTypeKey,
+                                                                String resourceName,
+                                                                String accessType) {
+    RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+    rangerRequest.setResource(getRangerAccessResource(resourceTypeKey, resourceName));
+    rangerRequest.setUser(userName);
+    rangerRequest.setUserGroups(userGroups);
+    rangerRequest.setClientIPAddress(ip);
+    rangerRequest.setAccessTime(eventTime);
+    rangerRequest.setAccessType(accessType);
+    rangerRequest.setAction(accessType);
+    rangerRequest.setRequestData(resourceName);
+    return rangerRequest;
+  }
+
+  private static List<AuthorizationResult> denyAll(List<Action> actions) {
+    return actions.stream().map(a -> AuthorizationResult.DENIED).collect(Collectors.toList());
+  }
+
+  private static List<AuthorizationResult> mapResults(List<Action> actions, Collection<RangerAccessResult> results) {
+    if (CollectionUtils.isEmpty(results)) {
+      logger.error("Ranger Plugin returned null or empty. Returning Denied for all");
+      return denyAll(actions);
+    }
+    return results.stream()
+        .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void close() {
+    logger.info("close() called on authorizer.");
+    try {
+      if (rangerPlugin != null) {
+        rangerPlugin.cleanup();
+      }
+    } catch (Throwable t) {
+      logger.error("Error closing RangerPlugin.", t);
+    }
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    RangerBasePlugin me = rangerPlugin;
+    if (me == null) {
+      synchronized (RangerKafkaAuthorizer.class) {
+        me = rangerPlugin;
+        if (me == null) {
+          try {
+            // Possible to override JAAS configuration which is used by Ranger, otherwise
+            // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
+            // if it's not defined, then it reverts to 'KafkaServer' configuration.
+            final Object jaasContext = configs.get("ranger.jaas.context");
+            final String listenerName = (jaasContext instanceof String
+                && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
+                : SecurityProtocol.SASL_PLAINTEXT.name();
+            final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
+            JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
+            MiscUtil.setUGIFromJAASConfig(context.name());
+            UserGroupInformation loginUser = MiscUtil.getUGILoginUser();
+            logger.info("LoginUser={}", loginUser);
+          } catch (Throwable t) {
+            logger.error("Error getting principal.", t);
+          }
+          rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+        }
+      }
+    }
+    logger.info("Calling plugin.init()");
+    rangerPlugin.init();
+    auditHandler = new RangerKafkaAuditHandler();
+    rangerPlugin.setResultProcessor(auditHandler);
+  }
+
+  @Override
+  public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
+    return serverInfo.endpoints().stream()
+        .collect(Collectors.toMap(endpoint -> endpoint, endpoint -> CompletableFuture.completedFuture(null), (a, b) -> b));
+  }
+
+  @Override
+  public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
+    if (rangerPlugin == null) {
+      MiscUtil.logErrorMessageByInterval(logger, "Authorizer is still not initialized");
+      return denyAll(actions);
+    }
+
+    RangerPerfTracer perf = null;
+    if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
+      perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(actions=" + actions + ")");
+    }
+    try {
+      return wrappedAuthorization(requestContext, actions);
+    } finally {
+      RangerPerfTracer.log(perf);
+    }
+  }
+
+  private List<AuthorizationResult> wrappedAuthorization(AuthorizableRequestContext requestContext, List<Action> actions) {
+    String userName = requestContext.principal() == null ? null : requestContext.principal().getName();
+    Set<String> userGroups = MiscUtil.getGroupsForRequestUser(userName);
+    String hostAddress = requestContext.clientAddress().getHostAddress();
+    String ip = StringUtils.isNotEmpty(hostAddress) && hostAddress.charAt(0) == '/' ? hostAddress.substring(1) : hostAddress;
+    Date eventTime = new Date();
+
+    boolean isInvalid = false;
+    List<RangerAccessRequest> rangerRequests = new ArrayList<>();
+    for (Action action : actions) {
+      String accessType = mapToRangerAccessType(action.operation());
+      if (accessType == null) {
+        MiscUtil.logErrorMessageByInterval(logger, "Unsupported access type. operation=" + action.operation());
+        isInvalid = true;
+      }
+      String resourceTypeKey = mapToResourceType(action.resourcePattern().resourceType());
+      if (resourceTypeKey == null) {
+        MiscUtil.logErrorMessageByInterval(logger, "Unsupported resource type. resourceType=" + action.resourcePattern().resourceType());
+        isInvalid = true;
+      }
+      RangerAccessRequestImpl rangerAccessRequest = getRangerAccessRequest(
+          userName,
+          userGroups,
+          ip,
+          eventTime,
+          resourceTypeKey,
+          action.resourcePattern().name(),
+          accessType);
+      rangerRequests.add(rangerAccessRequest);
+    }
+
+    if (isInvalid) {
+      MiscUtil.logErrorMessageByInterval(logger, "Validation failed, requestContext=" + requestContext + ", actions=" + actions);
+      return denyAll(actions);
+    }
+
+    List<AuthorizationResult> authorizationResults = mapResults(actions, callRangerPlugin(rangerRequests));
+    logger.debug("rangerRequests={}, return={}", rangerRequests, authorizationResults);
+    return authorizationResults;
+  }
+
+  private Collection<RangerAccessResult> callRangerPlugin(List<RangerAccessRequest> rangerRequests) {
+    Collection<RangerAccessResult> results = null;

Review comment:
       remove this var by returning from try and returning null from catch

##########
File path: ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -19,272 +19,200 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.CompletionStage;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
 import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.collection.immutable.Set;
-import kafka.network.RequestChannel.Session;
-import kafka.security.auth.Acl;
-import kafka.security.auth.Authorizer;
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
+public class RangerKafkaAuthorizer implements Authorizer {
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static final String RANGER_PLUGIN_TYPE = "kafka";
+  private static final String RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
 
+  private Authorizer rangerKafkaAuthorizerImpl = null;
+  private RangerPluginClassLoader rangerPluginClassLoader = null;
 
-//public class RangerKafkaAuthorizer extends Authorizer {
-public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger LOG  = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
-
-	private static final String   RANGER_PLUGIN_TYPE                      = "kafka";
-	private static final String   RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME  = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
-
-	private Authorizer              rangerKakfaAuthorizerImpl = null;
-	private RangerPluginClassLoader rangerPluginClassLoader   = null;
-	
-	public RangerKafkaAuthorizer() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-
-		this.init();
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-	}
-	
-	private void init(){
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.init()");
-		}
-
-		try {
-			
-			rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
-			
-			@SuppressWarnings("unchecked")
-			Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
-
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl = cls.newInstance();
-		} catch (Exception e) {
-			// check what need to be done
-			LOG.error("Error Enabling RangerKafkaPlugin", e);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.init()");
-		}
-	}
-
-	@Override
-	public void configure(Map<String, ?> configs) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.configure(configs);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-	}
-
-	@Override
-	public void close() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.close()");
-		}
-
-		try {
-			activatePluginClassLoader();
-			
-			rangerKakfaAuthorizerImpl.close();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.close()");
-		}
-		
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,Resource resource) {	
-		if(LOG.isDebugEnabled()) {
-			LOG.debug(String.format("==> RangerKafkaAuthorizer.authorize(Session=%s, Operation=%s, Resource=%s)", session, operation, resource));
-		}
-
-		boolean ret = false;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.authorize(session, operation, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.authorize: " + ret);
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.addAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-	}
-
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public boolean removeAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-		
-		Set<Acl> ret = null;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(principal);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
-		}
-
-		return ret;
-	}
-	
-	private void activatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.activate();
-		}
-	}
-
-	private void deactivatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.deactivate();
-		}
-	}
-		
+  public RangerKafkaAuthorizer() {
+    logger.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+
+    this.init();
+
+    logger.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+  }
+
+  private void init() {
+    logger.debug("==> RangerKafkaAuthorizer.init()");
+
+    try {
+
+      rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
+
+      @SuppressWarnings("unchecked")
+      Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
+
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl = cls.newInstance();
+    } catch (Exception e) {
+      // check what need to be done
+      logger.error("Error Enabling RangerKafkaPlugin", e);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.init()");
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    logger.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
+
+    try {
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl.configure(configs);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
+  }
+
+  @Override
+  public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo authorizerServerInfo) {
+    logger.debug("==> RangerKafkaAuthorizer.start(AuthorizerServerInfo)");
+
+    Map<Endpoint, ? extends CompletionStage<Void>> ret;
+
+    try {
+      activatePluginClassLoader();
+
+      ret = rangerKafkaAuthorizerImpl.start(authorizerServerInfo);
+
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.start(AuthorizerServerInfo)");
+
+    return ret;
+  }
+
+  @Override
+  public void close() throws IOException {
+    logger.debug("==> RangerKafkaAuthorizer.close()");
+
+    try {
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl.close();
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.close()");
+  }
+
+  @Override
+  public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
+    logger.debug("==> RangerKafkaAuthorizer.authorize(AuthorizableRequestContext={}, List<Action>={})", requestContext, actions);
+
+    List<AuthorizationResult> ret;
+
+    try {
+      activatePluginClassLoader();
+
+      ret = rangerKafkaAuthorizerImpl.authorize(requestContext, actions);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.authorize: {}", ret);
+
+    return ret;
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
+    logger.debug("==> RangerKafkaAuthorizer.createAcls(AuthorizableRequestContext, List<AclBinding>)");
+
+    List<? extends CompletionStage<AclCreateResult>> ret;
+
+    try {
+      activatePluginClassLoader();
+
+      ret = rangerKafkaAuthorizerImpl.createAcls(requestContext, aclBindings);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.createAcls(AuthorizableRequestContext, List<AclBinding>)");
+
+    return ret;
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
+    logger.debug("==> RangerKafkaAuthorizer.deleteAcls(AuthorizableRequestContext, List<AclBindingFilter>)");
+
+    List<? extends CompletionStage<AclDeleteResult>> ret;
+
+    try {
+      activatePluginClassLoader();
+
+      ret = rangerKafkaAuthorizerImpl.deleteAcls(requestContext, aclBindingFilters);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.deleteAcls(AuthorizableRequestContext, List<AclBindingFilter>)");
+
+    return ret;
+  }
+
+  @Override
+  public Iterable<AclBinding> acls(AclBindingFilter filter) {
+    logger.debug("==> RangerKafkaAuthorizer.acls(AclBindingFilter)");
+
+    Iterable<AclBinding> ret;
+
+    try {
+      activatePluginClassLoader();
+
+      ret = rangerKafkaAuthorizerImpl.acls(filter);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.acls(AclBindingFilter)");
+
+    return ret;

Review comment:
       same nit about the variable

##########
File path: plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -43,286 +60,256 @@
 import org.slf4j.LoggerFactory;
 
 public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger logger = LoggerFactory
-			.getLogger(RangerKafkaAuthorizer.class);
-	private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
-	public static final String KEY_TOPIC = "topic";
-	public static final String KEY_CLUSTER = "cluster";
-	public static final String KEY_CONSUMER_GROUP = "consumergroup";
-	public static final String KEY_TRANSACTIONALID = "transactionalid";
-	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-
-	public static final String ACCESS_TYPE_READ = "consume";
-	public static final String ACCESS_TYPE_WRITE = "publish";
-	public static final String ACCESS_TYPE_CREATE = "create";
-	public static final String ACCESS_TYPE_DELETE = "delete";
-	public static final String ACCESS_TYPE_CONFIGURE = "configure";
-	public static final String ACCESS_TYPE_DESCRIBE = "describe";
-	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
-	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
-	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
-	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";
-
-	private static volatile RangerBasePlugin rangerPlugin = null;
-	RangerKafkaAuditHandler auditHandler = null;
-
-	public RangerKafkaAuthorizer() {
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
-	 */
-	@Override
-	public void configure(Map<String, ?> configs) {
-		RangerBasePlugin me = rangerPlugin;
-		if (me == null) {
-			synchronized(RangerKafkaAuthorizer.class) {
-				me = rangerPlugin;
-				if (me == null) {
-					try {
-						// Possible to override JAAS configuration which is used by Ranger, otherwise
-						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
-						// if it's not defined, then it reverts to 'KafkaServer' configuration.
-						final Object jaasContext = configs.get("ranger.jaas.context");
-						final String listenerName = (jaasContext instanceof String
-								&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
-										: SecurityProtocol.SASL_PLAINTEXT.name();
-						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
-						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
-						MiscUtil.setUGIFromJAASConfig(context.name());
-						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
-					} catch (Throwable t) {
-						logger.error("Error getting principal.", t);
-					}
-					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-				}
-			}
-		}
-		logger.info("Calling plugin.init()");
-		rangerPlugin.init();
-		auditHandler = new RangerKafkaAuditHandler();
-		rangerPlugin.setResultProcessor(auditHandler);
-	}
-
-	@Override
-	public void close() {
-		logger.info("close() called on authorizer.");
-		try {
-			if (rangerPlugin != null) {
-				rangerPlugin.cleanup();
-			}
-		} catch (Throwable t) {
-			logger.error("Error closing RangerPlugin.", t);
-		}
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,
-			Resource resource) {
-
-		if (rangerPlugin == null) {
-			MiscUtil.logErrorMessageByInterval(logger,
-					"Authorizer is still not initialized");
-			return false;
-		}
-
-		RangerPerfTracer perf = null;
-
-		if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
-			perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
-		}
-		String userName = null;
-		if (session.principal() != null) {
-			userName = session.principal().getName();
-		}
-		java.util.Set<String> userGroups = MiscUtil
-				.getGroupsForRequestUser(userName);
-		String ip = session.clientAddress().getHostAddress();
-
-		// skip leading slash
-		if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
-			ip = ip.substring(1);
-		}
-
-		Date eventTime = new Date();
-		String accessType = mapToRangerAccessType(operation);
-		boolean validationFailed = false;
-		String validationStr = "";
-
-		if (accessType == null) {
-			if (MiscUtil.logErrorMessageByInterval(logger,
-					"Unsupported access type. operation=" + operation)) {
-				logger.error("Unsupported access type. session=" + session
-						+ ", operation=" + operation + ", resource=" + resource);
-			}
-			validationFailed = true;
-			validationStr += "Unsupported access type. operation=" + operation;
-		}
-		String action = accessType;
-
-		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
-		rangerRequest.setUser(userName);
-		rangerRequest.setUserGroups(userGroups);
-		rangerRequest.setClientIPAddress(ip);
-		rangerRequest.setAccessTime(eventTime);
-
-		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
-		rangerRequest.setResource(rangerResource);
-		rangerRequest.setAccessType(accessType);
-		rangerRequest.setAction(action);
-		rangerRequest.setRequestData(resource.name());
-
-		if (resource.resourceType().equals(Topic$.MODULE$)) {
-			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(Cluster$.MODULE$)) {
-			rangerResource.setValue(KEY_CLUSTER, resource.name());
-		} else if (resource.resourceType().equals(Group$.MODULE$)) {
-			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
-		} else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
-			rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
-		} else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
-			rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
-		} else {
-			logger.error("Unsupported resourceType=" + resource.resourceType());
-			validationFailed = true;
-		}
-
-		boolean returnValue = false;
-		if (validationFailed) {
-			MiscUtil.logErrorMessageByInterval(logger, validationStr
-					+ ", request=" + rangerRequest);
-		} else {
-
-			try {
-				RangerAccessResult result = rangerPlugin
-						.isAccessAllowed(rangerRequest);
-				if (result == null) {
-					logger.error("Ranger Plugin returned null. Returning false");
-				} else {
-					returnValue = result.getIsAllowed();
-				}
-			} catch (Throwable t) {
-				logger.error("Error while calling isAccessAllowed(). request="
-						+ rangerRequest, t);
-			} finally {
-				auditHandler.flushAudit();
-			}
-		}
-		RangerPerfTracer.log(perf);
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("rangerRequest=" + rangerRequest + ", return="
-					+ returnValue);
-		}
-		return returnValue;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Resource resource) {
-		logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
-
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-	 * )
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
-			KafkaPrincipal principal) {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls()
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/**
-	 * @param operation
-	 * @return
-	 */
-	private String mapToRangerAccessType(Operation operation) {
-		if (operation.equals(Read$.MODULE$)) {
-			return ACCESS_TYPE_READ;
-		} else if (operation.equals(Write$.MODULE$)) {
-			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Alter$.MODULE$)) {
-			return ACCESS_TYPE_CONFIGURE;
-		} else if (operation.equals(Describe$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE;
-		} else if (operation.equals(ClusterAction$.MODULE$)) {
-			return ACCESS_TYPE_CLUSTER_ACTION;
-		} else if (operation.equals(Create$.MODULE$)) {
-			return ACCESS_TYPE_CREATE;
-		} else if (operation.equals(Delete$.MODULE$)) {
-			return ACCESS_TYPE_DELETE;
-		} else if (operation.equals(DescribeConfigs$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE_CONFIGS;
-		} else if (operation.equals(AlterConfigs$.MODULE$)) {
-			return ACCESS_TYPE_ALTER_CONFIGS;
-		} else if (operation.equals(IdempotentWrite$.MODULE$)) {
-			return ACCESS_TYPE_IDEMPOTENT_WRITE;
-		}
-		return null;
-	}
+  public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
+  public static final String KEY_TOPIC = "topic";
+  public static final String KEY_CLUSTER = "cluster";
+  public static final String KEY_CONSUMER_GROUP = "consumergroup";
+  public static final String KEY_TRANSACTIONALID = "transactionalid";
+  public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
+  public static final String ACCESS_TYPE_READ = "consume";
+  public static final String ACCESS_TYPE_WRITE = "publish";
+  public static final String ACCESS_TYPE_CREATE = "create";
+  public static final String ACCESS_TYPE_DELETE = "delete";
+  public static final String ACCESS_TYPE_CONFIGURE = "configure";
+  public static final String ACCESS_TYPE_DESCRIBE = "describe";
+  public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
+  public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
+  public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+
+  private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static volatile RangerBasePlugin rangerPlugin = null;
+  RangerKafkaAuditHandler auditHandler = null;
+
+  public RangerKafkaAuthorizer() {
+  }
+
+  private static String mapToRangerAccessType(AclOperation operation) {
+    switch (operation) {
+      case READ:
+        return ACCESS_TYPE_READ;
+      case WRITE:
+        return ACCESS_TYPE_WRITE;
+      case ALTER:
+        return ACCESS_TYPE_CONFIGURE;
+      case DESCRIBE:
+        return ACCESS_TYPE_DESCRIBE;
+      case CLUSTER_ACTION:
+        return ACCESS_TYPE_CLUSTER_ACTION;
+      case CREATE:
+        return ACCESS_TYPE_CREATE;
+      case DELETE:
+        return ACCESS_TYPE_DELETE;
+      case DESCRIBE_CONFIGS:
+        return ACCESS_TYPE_DESCRIBE_CONFIGS;
+      case ALTER_CONFIGS:
+        return ACCESS_TYPE_ALTER_CONFIGS;
+      case IDEMPOTENT_WRITE:
+        return ACCESS_TYPE_IDEMPOTENT_WRITE;
+      case UNKNOWN:
+      case ANY:
+      case ALL:
+      default:
+        return null;
+    }
+  }
+
+  private static String mapToResourceType(ResourceType resourceType) {
+    switch (resourceType) {
+      case TOPIC:
+        return KEY_TOPIC;
+      case CLUSTER:
+        return KEY_CLUSTER;
+      case GROUP:
+        return KEY_CONSUMER_GROUP;
+      case TRANSACTIONAL_ID:
+        return KEY_TRANSACTIONALID;
+      case DELEGATION_TOKEN:
+        return KEY_DELEGATIONTOKEN;
+      case ANY:
+      case UNKNOWN:
+      default:
+        return null;
+    }
+  }
+
+  private static RangerAccessResourceImpl getRangerAccessResource(String resourceTypeKey, String resourceName) {
+    RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+    rangerResource.setValue(resourceTypeKey, resourceName);
+    return rangerResource;
+  }
+
+  private static RangerAccessRequestImpl getRangerAccessRequest(String userName,
+                                                                Set<String> userGroups,
+                                                                String ip,
+                                                                Date eventTime,
+                                                                String resourceTypeKey,
+                                                                String resourceName,
+                                                                String accessType) {
+    RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+    rangerRequest.setResource(getRangerAccessResource(resourceTypeKey, resourceName));
+    rangerRequest.setUser(userName);
+    rangerRequest.setUserGroups(userGroups);
+    rangerRequest.setClientIPAddress(ip);
+    rangerRequest.setAccessTime(eventTime);
+    rangerRequest.setAccessType(accessType);
+    rangerRequest.setAction(accessType);
+    rangerRequest.setRequestData(resourceName);
+    return rangerRequest;
+  }
+
+  private static List<AuthorizationResult> denyAll(List<Action> actions) {
+    return actions.stream().map(a -> AuthorizationResult.DENIED).collect(Collectors.toList());
+  }
+
+  private static List<AuthorizationResult> mapResults(List<Action> actions, Collection<RangerAccessResult> results) {
+    if (CollectionUtils.isEmpty(results)) {
+      logger.error("Ranger Plugin returned null or empty. Returning Denied for all");
+      return denyAll(actions);
+    }
+    return results.stream()
+        .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void close() {
+    logger.info("close() called on authorizer.");
+    try {
+      if (rangerPlugin != null) {
+        rangerPlugin.cleanup();
+      }
+    } catch (Throwable t) {
+      logger.error("Error closing RangerPlugin.", t);
+    }
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    RangerBasePlugin me = rangerPlugin;
+    if (me == null) {
+      synchronized (RangerKafkaAuthorizer.class) {
+        me = rangerPlugin;
+        if (me == null) {
+          try {
+            // Possible to override JAAS configuration which is used by Ranger, otherwise
+            // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
+            // if it's not defined, then it reverts to 'KafkaServer' configuration.
+            final Object jaasContext = configs.get("ranger.jaas.context");
+            final String listenerName = (jaasContext instanceof String
+                && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
+                : SecurityProtocol.SASL_PLAINTEXT.name();
+            final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
+            JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
+            MiscUtil.setUGIFromJAASConfig(context.name());
+            UserGroupInformation loginUser = MiscUtil.getUGILoginUser();
+            logger.info("LoginUser={}", loginUser);
+          } catch (Throwable t) {
+            logger.error("Error getting principal.", t);
+          }
+          rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+        }
+      }
+    }
+    logger.info("Calling plugin.init()");
+    rangerPlugin.init();
+    auditHandler = new RangerKafkaAuditHandler();
+    rangerPlugin.setResultProcessor(auditHandler);
+  }
+
+  @Override
+  public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
+    return serverInfo.endpoints().stream()
+        .collect(Collectors.toMap(endpoint -> endpoint, endpoint -> CompletableFuture.completedFuture(null), (a, b) -> b));
+  }
+
+  @Override
+  public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
+    if (rangerPlugin == null) {
+      MiscUtil.logErrorMessageByInterval(logger, "Authorizer is still not initialized");
+      return denyAll(actions);
+    }
+
+    RangerPerfTracer perf = null;
+    if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
+      perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(actions=" + actions + ")");
+    }
+    try {
+      return wrappedAuthorization(requestContext, actions);
+    } finally {
+      RangerPerfTracer.log(perf);
+    }
+  }
+
+  private List<AuthorizationResult> wrappedAuthorization(AuthorizableRequestContext requestContext, List<Action> actions) {
+    String userName = requestContext.principal() == null ? null : requestContext.principal().getName();
+    Set<String> userGroups = MiscUtil.getGroupsForRequestUser(userName);
+    String hostAddress = requestContext.clientAddress().getHostAddress();
+    String ip = StringUtils.isNotEmpty(hostAddress) && hostAddress.charAt(0) == '/' ? hostAddress.substring(1) : hostAddress;
+    Date eventTime = new Date();
+
+    boolean isInvalid = false;
+    List<RangerAccessRequest> rangerRequests = new ArrayList<>();
+    for (Action action : actions) {
+      String accessType = mapToRangerAccessType(action.operation());
+      if (accessType == null) {
+        MiscUtil.logErrorMessageByInterval(logger, "Unsupported access type. operation=" + action.operation());
+        isInvalid = true;

Review comment:
       maybe a fast failure would be preferred here - no need to go through all actions?

##########
File path: plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -43,286 +60,256 @@
 import org.slf4j.LoggerFactory;
 
 public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger logger = LoggerFactory
-			.getLogger(RangerKafkaAuthorizer.class);
-	private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
-	public static final String KEY_TOPIC = "topic";
-	public static final String KEY_CLUSTER = "cluster";
-	public static final String KEY_CONSUMER_GROUP = "consumergroup";
-	public static final String KEY_TRANSACTIONALID = "transactionalid";
-	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-
-	public static final String ACCESS_TYPE_READ = "consume";
-	public static final String ACCESS_TYPE_WRITE = "publish";
-	public static final String ACCESS_TYPE_CREATE = "create";
-	public static final String ACCESS_TYPE_DELETE = "delete";
-	public static final String ACCESS_TYPE_CONFIGURE = "configure";
-	public static final String ACCESS_TYPE_DESCRIBE = "describe";
-	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
-	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
-	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
-	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";
-
-	private static volatile RangerBasePlugin rangerPlugin = null;
-	RangerKafkaAuditHandler auditHandler = null;
-
-	public RangerKafkaAuthorizer() {
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
-	 */
-	@Override
-	public void configure(Map<String, ?> configs) {
-		RangerBasePlugin me = rangerPlugin;
-		if (me == null) {
-			synchronized(RangerKafkaAuthorizer.class) {
-				me = rangerPlugin;
-				if (me == null) {
-					try {
-						// Possible to override JAAS configuration which is used by Ranger, otherwise
-						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
-						// if it's not defined, then it reverts to 'KafkaServer' configuration.
-						final Object jaasContext = configs.get("ranger.jaas.context");
-						final String listenerName = (jaasContext instanceof String
-								&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
-										: SecurityProtocol.SASL_PLAINTEXT.name();
-						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
-						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
-						MiscUtil.setUGIFromJAASConfig(context.name());
-						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
-					} catch (Throwable t) {
-						logger.error("Error getting principal.", t);
-					}
-					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-				}
-			}
-		}
-		logger.info("Calling plugin.init()");
-		rangerPlugin.init();
-		auditHandler = new RangerKafkaAuditHandler();
-		rangerPlugin.setResultProcessor(auditHandler);
-	}
-
-	@Override
-	public void close() {
-		logger.info("close() called on authorizer.");
-		try {
-			if (rangerPlugin != null) {
-				rangerPlugin.cleanup();
-			}
-		} catch (Throwable t) {
-			logger.error("Error closing RangerPlugin.", t);
-		}
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,
-			Resource resource) {
-
-		if (rangerPlugin == null) {
-			MiscUtil.logErrorMessageByInterval(logger,
-					"Authorizer is still not initialized");
-			return false;
-		}
-
-		RangerPerfTracer perf = null;
-
-		if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
-			perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
-		}
-		String userName = null;
-		if (session.principal() != null) {
-			userName = session.principal().getName();
-		}
-		java.util.Set<String> userGroups = MiscUtil
-				.getGroupsForRequestUser(userName);
-		String ip = session.clientAddress().getHostAddress();
-
-		// skip leading slash
-		if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
-			ip = ip.substring(1);
-		}
-
-		Date eventTime = new Date();
-		String accessType = mapToRangerAccessType(operation);
-		boolean validationFailed = false;
-		String validationStr = "";
-
-		if (accessType == null) {
-			if (MiscUtil.logErrorMessageByInterval(logger,
-					"Unsupported access type. operation=" + operation)) {
-				logger.error("Unsupported access type. session=" + session
-						+ ", operation=" + operation + ", resource=" + resource);
-			}
-			validationFailed = true;
-			validationStr += "Unsupported access type. operation=" + operation;
-		}
-		String action = accessType;
-
-		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
-		rangerRequest.setUser(userName);
-		rangerRequest.setUserGroups(userGroups);
-		rangerRequest.setClientIPAddress(ip);
-		rangerRequest.setAccessTime(eventTime);
-
-		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
-		rangerRequest.setResource(rangerResource);
-		rangerRequest.setAccessType(accessType);
-		rangerRequest.setAction(action);
-		rangerRequest.setRequestData(resource.name());
-
-		if (resource.resourceType().equals(Topic$.MODULE$)) {
-			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(Cluster$.MODULE$)) {
-			rangerResource.setValue(KEY_CLUSTER, resource.name());
-		} else if (resource.resourceType().equals(Group$.MODULE$)) {
-			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
-		} else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
-			rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
-		} else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
-			rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
-		} else {
-			logger.error("Unsupported resourceType=" + resource.resourceType());
-			validationFailed = true;
-		}
-
-		boolean returnValue = false;
-		if (validationFailed) {
-			MiscUtil.logErrorMessageByInterval(logger, validationStr
-					+ ", request=" + rangerRequest);
-		} else {
-
-			try {
-				RangerAccessResult result = rangerPlugin
-						.isAccessAllowed(rangerRequest);
-				if (result == null) {
-					logger.error("Ranger Plugin returned null. Returning false");
-				} else {
-					returnValue = result.getIsAllowed();
-				}
-			} catch (Throwable t) {
-				logger.error("Error while calling isAccessAllowed(). request="
-						+ rangerRequest, t);
-			} finally {
-				auditHandler.flushAudit();
-			}
-		}
-		RangerPerfTracer.log(perf);
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("rangerRequest=" + rangerRequest + ", return="
-					+ returnValue);
-		}
-		return returnValue;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Resource resource) {
-		logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
-
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-	 * )
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
-			KafkaPrincipal principal) {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls()
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/**
-	 * @param operation
-	 * @return
-	 */
-	private String mapToRangerAccessType(Operation operation) {
-		if (operation.equals(Read$.MODULE$)) {
-			return ACCESS_TYPE_READ;
-		} else if (operation.equals(Write$.MODULE$)) {
-			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Alter$.MODULE$)) {
-			return ACCESS_TYPE_CONFIGURE;
-		} else if (operation.equals(Describe$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE;
-		} else if (operation.equals(ClusterAction$.MODULE$)) {
-			return ACCESS_TYPE_CLUSTER_ACTION;
-		} else if (operation.equals(Create$.MODULE$)) {
-			return ACCESS_TYPE_CREATE;
-		} else if (operation.equals(Delete$.MODULE$)) {
-			return ACCESS_TYPE_DELETE;
-		} else if (operation.equals(DescribeConfigs$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE_CONFIGS;
-		} else if (operation.equals(AlterConfigs$.MODULE$)) {
-			return ACCESS_TYPE_ALTER_CONFIGS;
-		} else if (operation.equals(IdempotentWrite$.MODULE$)) {
-			return ACCESS_TYPE_IDEMPOTENT_WRITE;
-		}
-		return null;
-	}
+  public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
+  public static final String KEY_TOPIC = "topic";
+  public static final String KEY_CLUSTER = "cluster";
+  public static final String KEY_CONSUMER_GROUP = "consumergroup";
+  public static final String KEY_TRANSACTIONALID = "transactionalid";
+  public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
+  public static final String ACCESS_TYPE_READ = "consume";
+  public static final String ACCESS_TYPE_WRITE = "publish";
+  public static final String ACCESS_TYPE_CREATE = "create";
+  public static final String ACCESS_TYPE_DELETE = "delete";
+  public static final String ACCESS_TYPE_CONFIGURE = "configure";
+  public static final String ACCESS_TYPE_DESCRIBE = "describe";
+  public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
+  public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
+  public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+
+  private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static volatile RangerBasePlugin rangerPlugin = null;
+  RangerKafkaAuditHandler auditHandler = null;
+
+  public RangerKafkaAuthorizer() {
+  }
+
+  private static String mapToRangerAccessType(AclOperation operation) {
+    switch (operation) {
+      case READ:
+        return ACCESS_TYPE_READ;
+      case WRITE:
+        return ACCESS_TYPE_WRITE;
+      case ALTER:
+        return ACCESS_TYPE_CONFIGURE;
+      case DESCRIBE:
+        return ACCESS_TYPE_DESCRIBE;
+      case CLUSTER_ACTION:
+        return ACCESS_TYPE_CLUSTER_ACTION;
+      case CREATE:
+        return ACCESS_TYPE_CREATE;
+      case DELETE:
+        return ACCESS_TYPE_DELETE;
+      case DESCRIBE_CONFIGS:
+        return ACCESS_TYPE_DESCRIBE_CONFIGS;
+      case ALTER_CONFIGS:
+        return ACCESS_TYPE_ALTER_CONFIGS;
+      case IDEMPOTENT_WRITE:
+        return ACCESS_TYPE_IDEMPOTENT_WRITE;
+      case UNKNOWN:
+      case ANY:
+      case ALL:
+      default:
+        return null;
+    }
+  }
+
+  private static String mapToResourceType(ResourceType resourceType) {
+    switch (resourceType) {
+      case TOPIC:
+        return KEY_TOPIC;
+      case CLUSTER:
+        return KEY_CLUSTER;
+      case GROUP:
+        return KEY_CONSUMER_GROUP;
+      case TRANSACTIONAL_ID:
+        return KEY_TRANSACTIONALID;
+      case DELEGATION_TOKEN:
+        return KEY_DELEGATIONTOKEN;
+      case ANY:
+      case UNKNOWN:
+      default:
+        return null;
+    }
+  }
+
+  private static RangerAccessResourceImpl getRangerAccessResource(String resourceTypeKey, String resourceName) {
+    RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+    rangerResource.setValue(resourceTypeKey, resourceName);
+    return rangerResource;
+  }
+
+  private static RangerAccessRequestImpl getRangerAccessRequest(String userName,
+                                                                Set<String> userGroups,
+                                                                String ip,
+                                                                Date eventTime,
+                                                                String resourceTypeKey,
+                                                                String resourceName,
+                                                                String accessType) {
+    RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+    rangerRequest.setResource(getRangerAccessResource(resourceTypeKey, resourceName));
+    rangerRequest.setUser(userName);
+    rangerRequest.setUserGroups(userGroups);
+    rangerRequest.setClientIPAddress(ip);
+    rangerRequest.setAccessTime(eventTime);
+    rangerRequest.setAccessType(accessType);
+    rangerRequest.setAction(accessType);
+    rangerRequest.setRequestData(resourceName);
+    return rangerRequest;
+  }
+
+  private static List<AuthorizationResult> denyAll(List<Action> actions) {
+    return actions.stream().map(a -> AuthorizationResult.DENIED).collect(Collectors.toList());
+  }
+
+  private static List<AuthorizationResult> mapResults(List<Action> actions, Collection<RangerAccessResult> results) {
+    if (CollectionUtils.isEmpty(results)) {
+      logger.error("Ranger Plugin returned null or empty. Returning Denied for all");
+      return denyAll(actions);
+    }
+    return results.stream()
+        .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void close() {
+    logger.info("close() called on authorizer.");
+    try {
+      if (rangerPlugin != null) {
+        rangerPlugin.cleanup();
+      }
+    } catch (Throwable t) {
+      logger.error("Error closing RangerPlugin.", t);
+    }
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    RangerBasePlugin me = rangerPlugin;
+    if (me == null) {
+      synchronized (RangerKafkaAuthorizer.class) {
+        me = rangerPlugin;
+        if (me == null) {
+          try {
+            // Possible to override JAAS configuration which is used by Ranger, otherwise
+            // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
+            // if it's not defined, then it reverts to 'KafkaServer' configuration.
+            final Object jaasContext = configs.get("ranger.jaas.context");
+            final String listenerName = (jaasContext instanceof String
+                && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
+                : SecurityProtocol.SASL_PLAINTEXT.name();
+            final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
+            JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
+            MiscUtil.setUGIFromJAASConfig(context.name());
+            UserGroupInformation loginUser = MiscUtil.getUGILoginUser();
+            logger.info("LoginUser={}", loginUser);
+          } catch (Throwable t) {
+            logger.error("Error getting principal.", t);
+          }
+          rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+        }
+      }
+    }
+    logger.info("Calling plugin.init()");
+    rangerPlugin.init();
+    auditHandler = new RangerKafkaAuditHandler();
+    rangerPlugin.setResultProcessor(auditHandler);
+  }
+
+  @Override
+  public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
+    return serverInfo.endpoints().stream()
+        .collect(Collectors.toMap(endpoint -> endpoint, endpoint -> CompletableFuture.completedFuture(null), (a, b) -> b));
+  }
+
+  @Override
+  public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
+    if (rangerPlugin == null) {
+      MiscUtil.logErrorMessageByInterval(logger, "Authorizer is still not initialized");
+      return denyAll(actions);
+    }
+
+    RangerPerfTracer perf = null;
+    if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
+      perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(actions=" + actions + ")");
+    }
+    try {
+      return wrappedAuthorization(requestContext, actions);
+    } finally {
+      RangerPerfTracer.log(perf);
+    }
+  }
+
+  private List<AuthorizationResult> wrappedAuthorization(AuthorizableRequestContext requestContext, List<Action> actions) {
+    String userName = requestContext.principal() == null ? null : requestContext.principal().getName();
+    Set<String> userGroups = MiscUtil.getGroupsForRequestUser(userName);
+    String hostAddress = requestContext.clientAddress().getHostAddress();
+    String ip = StringUtils.isNotEmpty(hostAddress) && hostAddress.charAt(0) == '/' ? hostAddress.substring(1) : hostAddress;
+    Date eventTime = new Date();
+
+    boolean isInvalid = false;
+    List<RangerAccessRequest> rangerRequests = new ArrayList<>();
+    for (Action action : actions) {
+      String accessType = mapToRangerAccessType(action.operation());
+      if (accessType == null) {
+        MiscUtil.logErrorMessageByInterval(logger, "Unsupported access type. operation=" + action.operation());
+        isInvalid = true;
+      }
+      String resourceTypeKey = mapToResourceType(action.resourcePattern().resourceType());
+      if (resourceTypeKey == null) {
+        MiscUtil.logErrorMessageByInterval(logger, "Unsupported resource type. resourceType=" + action.resourcePattern().resourceType());
+        isInvalid = true;
+      }
+      RangerAccessRequestImpl rangerAccessRequest = getRangerAccessRequest(
+          userName,
+          userGroups,
+          ip,
+          eventTime,
+          resourceTypeKey,
+          action.resourcePattern().name(),
+          accessType);
+      rangerRequests.add(rangerAccessRequest);
+    }
+
+    if (isInvalid) {
+      MiscUtil.logErrorMessageByInterval(logger, "Validation failed, requestContext=" + requestContext + ", actions=" + actions);
+      return denyAll(actions);
+    }
+
+    List<AuthorizationResult> authorizationResults = mapResults(actions, callRangerPlugin(rangerRequests));

Review comment:
       nit: put the callRangerPlugin call on a separate line and assign to a variable - the most important call is kind of hidden as a function param, hard to read

##########
File path: ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -19,272 +19,200 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.CompletionStage;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
 import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.collection.immutable.Set;
-import kafka.network.RequestChannel.Session;
-import kafka.security.auth.Acl;
-import kafka.security.auth.Authorizer;
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
+public class RangerKafkaAuthorizer implements Authorizer {
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static final String RANGER_PLUGIN_TYPE = "kafka";
+  private static final String RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
 
+  private Authorizer rangerKafkaAuthorizerImpl = null;
+  private RangerPluginClassLoader rangerPluginClassLoader = null;
 
-//public class RangerKafkaAuthorizer extends Authorizer {
-public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger LOG  = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
-
-	private static final String   RANGER_PLUGIN_TYPE                      = "kafka";
-	private static final String   RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME  = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
-
-	private Authorizer              rangerKakfaAuthorizerImpl = null;
-	private RangerPluginClassLoader rangerPluginClassLoader   = null;
-	
-	public RangerKafkaAuthorizer() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-
-		this.init();
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-	}
-	
-	private void init(){
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.init()");
-		}
-
-		try {
-			
-			rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
-			
-			@SuppressWarnings("unchecked")
-			Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
-
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl = cls.newInstance();
-		} catch (Exception e) {
-			// check what need to be done
-			LOG.error("Error Enabling RangerKafkaPlugin", e);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.init()");
-		}
-	}
-
-	@Override
-	public void configure(Map<String, ?> configs) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.configure(configs);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-	}
-
-	@Override
-	public void close() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.close()");
-		}
-
-		try {
-			activatePluginClassLoader();
-			
-			rangerKakfaAuthorizerImpl.close();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.close()");
-		}
-		
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,Resource resource) {	
-		if(LOG.isDebugEnabled()) {
-			LOG.debug(String.format("==> RangerKafkaAuthorizer.authorize(Session=%s, Operation=%s, Resource=%s)", session, operation, resource));
-		}
-
-		boolean ret = false;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.authorize(session, operation, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.authorize: " + ret);
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.addAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-	}
-
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public boolean removeAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-		
-		Set<Acl> ret = null;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(principal);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
-		}
-
-		return ret;
-	}
-	
-	private void activatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.activate();
-		}
-	}
-
-	private void deactivatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.deactivate();
-		}
-	}
-		
+  public RangerKafkaAuthorizer() {
+    logger.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+
+    this.init();
+
+    logger.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+  }
+
+  private void init() {
+    logger.debug("==> RangerKafkaAuthorizer.init()");
+
+    try {
+
+      rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
+
+      @SuppressWarnings("unchecked")
+      Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
+
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl = cls.newInstance();
+    } catch (Exception e) {
+      // check what need to be done
+      logger.error("Error Enabling RangerKafkaPlugin", e);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.init()");
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    logger.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
+
+    try {
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl.configure(configs);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
+  }
+
+  @Override
+  public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo authorizerServerInfo) {
+    logger.debug("==> RangerKafkaAuthorizer.start(AuthorizerServerInfo)");
+
+    Map<Endpoint, ? extends CompletionStage<Void>> ret;
+
+    try {
+      activatePluginClassLoader();
+
+      ret = rangerKafkaAuthorizerImpl.start(authorizerServerInfo);
+
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.start(AuthorizerServerInfo)");
+
+    return ret;
+  }
+
+  @Override
+  public void close() throws IOException {
+    logger.debug("==> RangerKafkaAuthorizer.close()");
+
+    try {
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl.close();
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.close()");
+  }
+
+  @Override
+  public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
+    logger.debug("==> RangerKafkaAuthorizer.authorize(AuthorizableRequestContext={}, List<Action>={})", requestContext, actions);
+
+    List<AuthorizationResult> ret;
+
+    try {
+      activatePluginClassLoader();
+
+      ret = rangerKafkaAuthorizerImpl.authorize(requestContext, actions);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.authorize: {}", ret);
+
+    return ret;
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
+    logger.debug("==> RangerKafkaAuthorizer.createAcls(AuthorizableRequestContext, List<AclBinding>)");
+
+    List<? extends CompletionStage<AclCreateResult>> ret;
+
+    try {
+      activatePluginClassLoader();
+
+      ret = rangerKafkaAuthorizerImpl.createAcls(requestContext, aclBindings);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.createAcls(AuthorizableRequestContext, List<AclBinding>)");
+
+    return ret;
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
+    logger.debug("==> RangerKafkaAuthorizer.deleteAcls(AuthorizableRequestContext, List<AclBindingFilter>)");
+
+    List<? extends CompletionStage<AclDeleteResult>> ret;
+
+    try {
+      activatePluginClassLoader();
+
+      ret = rangerKafkaAuthorizerImpl.deleteAcls(requestContext, aclBindingFilters);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.deleteAcls(AuthorizableRequestContext, List<AclBindingFilter>)");
+
+    return ret;

Review comment:
       same nit about eliminating the variable

##########
File path: plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -43,286 +60,256 @@
 import org.slf4j.LoggerFactory;
 
 public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger logger = LoggerFactory
-			.getLogger(RangerKafkaAuthorizer.class);
-	private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
-	public static final String KEY_TOPIC = "topic";
-	public static final String KEY_CLUSTER = "cluster";
-	public static final String KEY_CONSUMER_GROUP = "consumergroup";
-	public static final String KEY_TRANSACTIONALID = "transactionalid";
-	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-
-	public static final String ACCESS_TYPE_READ = "consume";
-	public static final String ACCESS_TYPE_WRITE = "publish";
-	public static final String ACCESS_TYPE_CREATE = "create";
-	public static final String ACCESS_TYPE_DELETE = "delete";
-	public static final String ACCESS_TYPE_CONFIGURE = "configure";
-	public static final String ACCESS_TYPE_DESCRIBE = "describe";
-	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
-	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
-	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
-	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";
-
-	private static volatile RangerBasePlugin rangerPlugin = null;
-	RangerKafkaAuditHandler auditHandler = null;
-
-	public RangerKafkaAuthorizer() {
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
-	 */
-	@Override
-	public void configure(Map<String, ?> configs) {
-		RangerBasePlugin me = rangerPlugin;
-		if (me == null) {
-			synchronized(RangerKafkaAuthorizer.class) {
-				me = rangerPlugin;
-				if (me == null) {
-					try {
-						// Possible to override JAAS configuration which is used by Ranger, otherwise
-						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
-						// if it's not defined, then it reverts to 'KafkaServer' configuration.
-						final Object jaasContext = configs.get("ranger.jaas.context");
-						final String listenerName = (jaasContext instanceof String
-								&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
-										: SecurityProtocol.SASL_PLAINTEXT.name();
-						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
-						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
-						MiscUtil.setUGIFromJAASConfig(context.name());
-						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
-					} catch (Throwable t) {
-						logger.error("Error getting principal.", t);
-					}
-					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-				}
-			}
-		}
-		logger.info("Calling plugin.init()");
-		rangerPlugin.init();
-		auditHandler = new RangerKafkaAuditHandler();
-		rangerPlugin.setResultProcessor(auditHandler);
-	}
-
-	@Override
-	public void close() {
-		logger.info("close() called on authorizer.");
-		try {
-			if (rangerPlugin != null) {
-				rangerPlugin.cleanup();
-			}
-		} catch (Throwable t) {
-			logger.error("Error closing RangerPlugin.", t);
-		}
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,
-			Resource resource) {
-
-		if (rangerPlugin == null) {
-			MiscUtil.logErrorMessageByInterval(logger,
-					"Authorizer is still not initialized");
-			return false;
-		}
-
-		RangerPerfTracer perf = null;
-
-		if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
-			perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
-		}
-		String userName = null;
-		if (session.principal() != null) {
-			userName = session.principal().getName();
-		}
-		java.util.Set<String> userGroups = MiscUtil
-				.getGroupsForRequestUser(userName);
-		String ip = session.clientAddress().getHostAddress();
-
-		// skip leading slash
-		if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
-			ip = ip.substring(1);
-		}
-
-		Date eventTime = new Date();
-		String accessType = mapToRangerAccessType(operation);
-		boolean validationFailed = false;
-		String validationStr = "";
-
-		if (accessType == null) {
-			if (MiscUtil.logErrorMessageByInterval(logger,
-					"Unsupported access type. operation=" + operation)) {
-				logger.error("Unsupported access type. session=" + session
-						+ ", operation=" + operation + ", resource=" + resource);
-			}
-			validationFailed = true;
-			validationStr += "Unsupported access type. operation=" + operation;
-		}
-		String action = accessType;
-
-		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
-		rangerRequest.setUser(userName);
-		rangerRequest.setUserGroups(userGroups);
-		rangerRequest.setClientIPAddress(ip);
-		rangerRequest.setAccessTime(eventTime);
-
-		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
-		rangerRequest.setResource(rangerResource);
-		rangerRequest.setAccessType(accessType);
-		rangerRequest.setAction(action);
-		rangerRequest.setRequestData(resource.name());
-
-		if (resource.resourceType().equals(Topic$.MODULE$)) {
-			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(Cluster$.MODULE$)) {
-			rangerResource.setValue(KEY_CLUSTER, resource.name());
-		} else if (resource.resourceType().equals(Group$.MODULE$)) {
-			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
-		} else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
-			rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
-		} else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
-			rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
-		} else {
-			logger.error("Unsupported resourceType=" + resource.resourceType());
-			validationFailed = true;
-		}
-
-		boolean returnValue = false;
-		if (validationFailed) {
-			MiscUtil.logErrorMessageByInterval(logger, validationStr
-					+ ", request=" + rangerRequest);
-		} else {
-
-			try {
-				RangerAccessResult result = rangerPlugin
-						.isAccessAllowed(rangerRequest);
-				if (result == null) {
-					logger.error("Ranger Plugin returned null. Returning false");
-				} else {
-					returnValue = result.getIsAllowed();
-				}
-			} catch (Throwable t) {
-				logger.error("Error while calling isAccessAllowed(). request="
-						+ rangerRequest, t);
-			} finally {
-				auditHandler.flushAudit();
-			}
-		}
-		RangerPerfTracer.log(perf);
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("rangerRequest=" + rangerRequest + ", return="
-					+ returnValue);
-		}
-		return returnValue;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Resource resource) {
-		logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
-
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-	 * )
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
-			KafkaPrincipal principal) {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls()
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/**
-	 * @param operation
-	 * @return
-	 */
-	private String mapToRangerAccessType(Operation operation) {
-		if (operation.equals(Read$.MODULE$)) {
-			return ACCESS_TYPE_READ;
-		} else if (operation.equals(Write$.MODULE$)) {
-			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Alter$.MODULE$)) {
-			return ACCESS_TYPE_CONFIGURE;
-		} else if (operation.equals(Describe$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE;
-		} else if (operation.equals(ClusterAction$.MODULE$)) {
-			return ACCESS_TYPE_CLUSTER_ACTION;
-		} else if (operation.equals(Create$.MODULE$)) {
-			return ACCESS_TYPE_CREATE;
-		} else if (operation.equals(Delete$.MODULE$)) {
-			return ACCESS_TYPE_DELETE;
-		} else if (operation.equals(DescribeConfigs$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE_CONFIGS;
-		} else if (operation.equals(AlterConfigs$.MODULE$)) {
-			return ACCESS_TYPE_ALTER_CONFIGS;
-		} else if (operation.equals(IdempotentWrite$.MODULE$)) {
-			return ACCESS_TYPE_IDEMPOTENT_WRITE;
-		}
-		return null;
-	}
+  public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
+  public static final String KEY_TOPIC = "topic";
+  public static final String KEY_CLUSTER = "cluster";
+  public static final String KEY_CONSUMER_GROUP = "consumergroup";
+  public static final String KEY_TRANSACTIONALID = "transactionalid";
+  public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
+  public static final String ACCESS_TYPE_READ = "consume";
+  public static final String ACCESS_TYPE_WRITE = "publish";
+  public static final String ACCESS_TYPE_CREATE = "create";
+  public static final String ACCESS_TYPE_DELETE = "delete";
+  public static final String ACCESS_TYPE_CONFIGURE = "configure";
+  public static final String ACCESS_TYPE_DESCRIBE = "describe";
+  public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
+  public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
+  public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+
+  private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static volatile RangerBasePlugin rangerPlugin = null;
+  RangerKafkaAuditHandler auditHandler = null;
+
+  public RangerKafkaAuthorizer() {
+  }
+
+  private static String mapToRangerAccessType(AclOperation operation) {
+    switch (operation) {
+      case READ:
+        return ACCESS_TYPE_READ;
+      case WRITE:
+        return ACCESS_TYPE_WRITE;
+      case ALTER:
+        return ACCESS_TYPE_CONFIGURE;
+      case DESCRIBE:
+        return ACCESS_TYPE_DESCRIBE;
+      case CLUSTER_ACTION:
+        return ACCESS_TYPE_CLUSTER_ACTION;
+      case CREATE:
+        return ACCESS_TYPE_CREATE;
+      case DELETE:
+        return ACCESS_TYPE_DELETE;
+      case DESCRIBE_CONFIGS:
+        return ACCESS_TYPE_DESCRIBE_CONFIGS;
+      case ALTER_CONFIGS:
+        return ACCESS_TYPE_ALTER_CONFIGS;
+      case IDEMPOTENT_WRITE:
+        return ACCESS_TYPE_IDEMPOTENT_WRITE;
+      case UNKNOWN:
+      case ANY:
+      case ALL:
+      default:
+        return null;
+    }
+  }
+
+  private static String mapToResourceType(ResourceType resourceType) {
+    switch (resourceType) {
+      case TOPIC:
+        return KEY_TOPIC;
+      case CLUSTER:
+        return KEY_CLUSTER;
+      case GROUP:
+        return KEY_CONSUMER_GROUP;
+      case TRANSACTIONAL_ID:
+        return KEY_TRANSACTIONALID;
+      case DELEGATION_TOKEN:
+        return KEY_DELEGATIONTOKEN;
+      case ANY:
+      case UNKNOWN:
+      default:
+        return null;
+    }
+  }
+
+  private static RangerAccessResourceImpl getRangerAccessResource(String resourceTypeKey, String resourceName) {
+    RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+    rangerResource.setValue(resourceTypeKey, resourceName);
+    return rangerResource;
+  }
+
+  private static RangerAccessRequestImpl getRangerAccessRequest(String userName,
+                                                                Set<String> userGroups,
+                                                                String ip,
+                                                                Date eventTime,
+                                                                String resourceTypeKey,
+                                                                String resourceName,
+                                                                String accessType) {
+    RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+    rangerRequest.setResource(getRangerAccessResource(resourceTypeKey, resourceName));
+    rangerRequest.setUser(userName);
+    rangerRequest.setUserGroups(userGroups);
+    rangerRequest.setClientIPAddress(ip);
+    rangerRequest.setAccessTime(eventTime);
+    rangerRequest.setAccessType(accessType);
+    rangerRequest.setAction(accessType);
+    rangerRequest.setRequestData(resourceName);
+    return rangerRequest;
+  }
+
+  private static List<AuthorizationResult> denyAll(List<Action> actions) {
+    return actions.stream().map(a -> AuthorizationResult.DENIED).collect(Collectors.toList());
+  }
+
+  private static List<AuthorizationResult> mapResults(List<Action> actions, Collection<RangerAccessResult> results) {
+    if (CollectionUtils.isEmpty(results)) {
+      logger.error("Ranger Plugin returned null or empty. Returning Denied for all");
+      return denyAll(actions);
+    }
+    return results.stream()
+        .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void close() {
+    logger.info("close() called on authorizer.");
+    try {
+      if (rangerPlugin != null) {
+        rangerPlugin.cleanup();
+      }
+    } catch (Throwable t) {
+      logger.error("Error closing RangerPlugin.", t);
+    }
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    RangerBasePlugin me = rangerPlugin;
+    if (me == null) {
+      synchronized (RangerKafkaAuthorizer.class) {
+        me = rangerPlugin;
+        if (me == null) {
+          try {
+            // Possible to override JAAS configuration which is used by Ranger, otherwise
+            // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
+            // if it's not defined, then it reverts to 'KafkaServer' configuration.
+            final Object jaasContext = configs.get("ranger.jaas.context");
+            final String listenerName = (jaasContext instanceof String
+                && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
+                : SecurityProtocol.SASL_PLAINTEXT.name();
+            final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
+            JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
+            MiscUtil.setUGIFromJAASConfig(context.name());
+            UserGroupInformation loginUser = MiscUtil.getUGILoginUser();
+            logger.info("LoginUser={}", loginUser);
+          } catch (Throwable t) {
+            logger.error("Error getting principal.", t);
+          }
+          rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+        }
+      }
+    }
+    logger.info("Calling plugin.init()");
+    rangerPlugin.init();
+    auditHandler = new RangerKafkaAuditHandler();
+    rangerPlugin.setResultProcessor(auditHandler);
+  }
+
+  @Override
+  public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
+    return serverInfo.endpoints().stream()
+        .collect(Collectors.toMap(endpoint -> endpoint, endpoint -> CompletableFuture.completedFuture(null), (a, b) -> b));
+  }
+
+  @Override
+  public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
+    if (rangerPlugin == null) {
+      MiscUtil.logErrorMessageByInterval(logger, "Authorizer is still not initialized");
+      return denyAll(actions);
+    }
+
+    RangerPerfTracer perf = null;
+    if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
+      perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(actions=" + actions + ")");
+    }
+    try {
+      return wrappedAuthorization(requestContext, actions);
+    } finally {
+      RangerPerfTracer.log(perf);
+    }
+  }
+
+  private List<AuthorizationResult> wrappedAuthorization(AuthorizableRequestContext requestContext, List<Action> actions) {
+    String userName = requestContext.principal() == null ? null : requestContext.principal().getName();
+    Set<String> userGroups = MiscUtil.getGroupsForRequestUser(userName);
+    String hostAddress = requestContext.clientAddress().getHostAddress();
+    String ip = StringUtils.isNotEmpty(hostAddress) && hostAddress.charAt(0) == '/' ? hostAddress.substring(1) : hostAddress;
+    Date eventTime = new Date();
+
+    boolean isInvalid = false;
+    List<RangerAccessRequest> rangerRequests = new ArrayList<>();
+    for (Action action : actions) {
+      String accessType = mapToRangerAccessType(action.operation());
+      if (accessType == null) {
+        MiscUtil.logErrorMessageByInterval(logger, "Unsupported access type. operation=" + action.operation());
+        isInvalid = true;
+      }
+      String resourceTypeKey = mapToResourceType(action.resourcePattern().resourceType());
+      if (resourceTypeKey == null) {
+        MiscUtil.logErrorMessageByInterval(logger, "Unsupported resource type. resourceType=" + action.resourcePattern().resourceType());
+        isInvalid = true;
+      }
+      RangerAccessRequestImpl rangerAccessRequest = getRangerAccessRequest(
+          userName,
+          userGroups,
+          ip,
+          eventTime,
+          resourceTypeKey,
+          action.resourcePattern().name(),
+          accessType);
+      rangerRequests.add(rangerAccessRequest);
+    }
+
+    if (isInvalid) {
+      MiscUtil.logErrorMessageByInterval(logger, "Validation failed, requestContext=" + requestContext + ", actions=" + actions);
+      return denyAll(actions);
+    }
+
+    List<AuthorizationResult> authorizationResults = mapResults(actions, callRangerPlugin(rangerRequests));
+    logger.debug("rangerRequests={}, return={}", rangerRequests, authorizationResults);
+    return authorizationResults;
+  }
+
+  private Collection<RangerAccessResult> callRangerPlugin(List<RangerAccessRequest> rangerRequests) {
+    Collection<RangerAccessResult> results = null;
+    try {
+      results = rangerPlugin.isAccessAllowed(rangerRequests);
+    } catch (Throwable t) {
+      logger.error("Error while calling isAccessAllowed(). requests=" + rangerRequests, t);
+    } finally {
+      auditHandler.flushAudit();
+    }
+    return results;
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
+    logger.error("createAcls is not supported by Ranger for Kafka");
+    return Collections.emptyList();

Review comment:
       I think this should return a list of failed completion stages (create a CompletableFuture then call completeExceptionally with an InvalidRequestException? UnsupportedOperationException?)

##########
File path: plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -43,286 +60,256 @@
 import org.slf4j.LoggerFactory;
 
 public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger logger = LoggerFactory
-			.getLogger(RangerKafkaAuthorizer.class);
-	private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
-	public static final String KEY_TOPIC = "topic";
-	public static final String KEY_CLUSTER = "cluster";
-	public static final String KEY_CONSUMER_GROUP = "consumergroup";
-	public static final String KEY_TRANSACTIONALID = "transactionalid";
-	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-
-	public static final String ACCESS_TYPE_READ = "consume";
-	public static final String ACCESS_TYPE_WRITE = "publish";
-	public static final String ACCESS_TYPE_CREATE = "create";
-	public static final String ACCESS_TYPE_DELETE = "delete";
-	public static final String ACCESS_TYPE_CONFIGURE = "configure";
-	public static final String ACCESS_TYPE_DESCRIBE = "describe";
-	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
-	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
-	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
-	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";
-
-	private static volatile RangerBasePlugin rangerPlugin = null;
-	RangerKafkaAuditHandler auditHandler = null;
-
-	public RangerKafkaAuthorizer() {
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
-	 */
-	@Override
-	public void configure(Map<String, ?> configs) {
-		RangerBasePlugin me = rangerPlugin;
-		if (me == null) {
-			synchronized(RangerKafkaAuthorizer.class) {
-				me = rangerPlugin;
-				if (me == null) {
-					try {
-						// Possible to override JAAS configuration which is used by Ranger, otherwise
-						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
-						// if it's not defined, then it reverts to 'KafkaServer' configuration.
-						final Object jaasContext = configs.get("ranger.jaas.context");
-						final String listenerName = (jaasContext instanceof String
-								&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
-										: SecurityProtocol.SASL_PLAINTEXT.name();
-						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
-						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
-						MiscUtil.setUGIFromJAASConfig(context.name());
-						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
-					} catch (Throwable t) {
-						logger.error("Error getting principal.", t);
-					}
-					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-				}
-			}
-		}
-		logger.info("Calling plugin.init()");
-		rangerPlugin.init();
-		auditHandler = new RangerKafkaAuditHandler();
-		rangerPlugin.setResultProcessor(auditHandler);
-	}
-
-	@Override
-	public void close() {
-		logger.info("close() called on authorizer.");
-		try {
-			if (rangerPlugin != null) {
-				rangerPlugin.cleanup();
-			}
-		} catch (Throwable t) {
-			logger.error("Error closing RangerPlugin.", t);
-		}
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,
-			Resource resource) {
-
-		if (rangerPlugin == null) {
-			MiscUtil.logErrorMessageByInterval(logger,
-					"Authorizer is still not initialized");
-			return false;
-		}
-
-		RangerPerfTracer perf = null;
-
-		if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
-			perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
-		}
-		String userName = null;
-		if (session.principal() != null) {
-			userName = session.principal().getName();
-		}
-		java.util.Set<String> userGroups = MiscUtil
-				.getGroupsForRequestUser(userName);
-		String ip = session.clientAddress().getHostAddress();
-
-		// skip leading slash
-		if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
-			ip = ip.substring(1);
-		}
-
-		Date eventTime = new Date();
-		String accessType = mapToRangerAccessType(operation);
-		boolean validationFailed = false;
-		String validationStr = "";
-
-		if (accessType == null) {
-			if (MiscUtil.logErrorMessageByInterval(logger,
-					"Unsupported access type. operation=" + operation)) {
-				logger.error("Unsupported access type. session=" + session
-						+ ", operation=" + operation + ", resource=" + resource);
-			}
-			validationFailed = true;
-			validationStr += "Unsupported access type. operation=" + operation;
-		}
-		String action = accessType;
-
-		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
-		rangerRequest.setUser(userName);
-		rangerRequest.setUserGroups(userGroups);
-		rangerRequest.setClientIPAddress(ip);
-		rangerRequest.setAccessTime(eventTime);
-
-		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
-		rangerRequest.setResource(rangerResource);
-		rangerRequest.setAccessType(accessType);
-		rangerRequest.setAction(action);
-		rangerRequest.setRequestData(resource.name());
-
-		if (resource.resourceType().equals(Topic$.MODULE$)) {
-			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(Cluster$.MODULE$)) {
-			rangerResource.setValue(KEY_CLUSTER, resource.name());
-		} else if (resource.resourceType().equals(Group$.MODULE$)) {
-			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
-		} else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
-			rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
-		} else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
-			rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
-		} else {
-			logger.error("Unsupported resourceType=" + resource.resourceType());
-			validationFailed = true;
-		}
-
-		boolean returnValue = false;
-		if (validationFailed) {
-			MiscUtil.logErrorMessageByInterval(logger, validationStr
-					+ ", request=" + rangerRequest);
-		} else {
-
-			try {
-				RangerAccessResult result = rangerPlugin
-						.isAccessAllowed(rangerRequest);
-				if (result == null) {
-					logger.error("Ranger Plugin returned null. Returning false");
-				} else {
-					returnValue = result.getIsAllowed();
-				}
-			} catch (Throwable t) {
-				logger.error("Error while calling isAccessAllowed(). request="
-						+ rangerRequest, t);
-			} finally {
-				auditHandler.flushAudit();
-			}
-		}
-		RangerPerfTracer.log(perf);
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("rangerRequest=" + rangerRequest + ", return="
-					+ returnValue);
-		}
-		return returnValue;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Resource resource) {
-		logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
-
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-	 * )
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
-			KafkaPrincipal principal) {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls()
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/**
-	 * @param operation
-	 * @return
-	 */
-	private String mapToRangerAccessType(Operation operation) {
-		if (operation.equals(Read$.MODULE$)) {
-			return ACCESS_TYPE_READ;
-		} else if (operation.equals(Write$.MODULE$)) {
-			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Alter$.MODULE$)) {
-			return ACCESS_TYPE_CONFIGURE;
-		} else if (operation.equals(Describe$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE;
-		} else if (operation.equals(ClusterAction$.MODULE$)) {
-			return ACCESS_TYPE_CLUSTER_ACTION;
-		} else if (operation.equals(Create$.MODULE$)) {
-			return ACCESS_TYPE_CREATE;
-		} else if (operation.equals(Delete$.MODULE$)) {
-			return ACCESS_TYPE_DELETE;
-		} else if (operation.equals(DescribeConfigs$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE_CONFIGS;
-		} else if (operation.equals(AlterConfigs$.MODULE$)) {
-			return ACCESS_TYPE_ALTER_CONFIGS;
-		} else if (operation.equals(IdempotentWrite$.MODULE$)) {
-			return ACCESS_TYPE_IDEMPOTENT_WRITE;
-		}
-		return null;
-	}
+  public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
+  public static final String KEY_TOPIC = "topic";
+  public static final String KEY_CLUSTER = "cluster";
+  public static final String KEY_CONSUMER_GROUP = "consumergroup";
+  public static final String KEY_TRANSACTIONALID = "transactionalid";
+  public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
+  public static final String ACCESS_TYPE_READ = "consume";
+  public static final String ACCESS_TYPE_WRITE = "publish";
+  public static final String ACCESS_TYPE_CREATE = "create";
+  public static final String ACCESS_TYPE_DELETE = "delete";
+  public static final String ACCESS_TYPE_CONFIGURE = "configure";
+  public static final String ACCESS_TYPE_DESCRIBE = "describe";
+  public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
+  public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
+  public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+
+  private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static volatile RangerBasePlugin rangerPlugin = null;
+  RangerKafkaAuditHandler auditHandler = null;
+
+  public RangerKafkaAuthorizer() {
+  }
+
+  private static String mapToRangerAccessType(AclOperation operation) {
+    switch (operation) {
+      case READ:
+        return ACCESS_TYPE_READ;
+      case WRITE:
+        return ACCESS_TYPE_WRITE;
+      case ALTER:
+        return ACCESS_TYPE_CONFIGURE;
+      case DESCRIBE:
+        return ACCESS_TYPE_DESCRIBE;
+      case CLUSTER_ACTION:
+        return ACCESS_TYPE_CLUSTER_ACTION;
+      case CREATE:
+        return ACCESS_TYPE_CREATE;
+      case DELETE:
+        return ACCESS_TYPE_DELETE;
+      case DESCRIBE_CONFIGS:
+        return ACCESS_TYPE_DESCRIBE_CONFIGS;
+      case ALTER_CONFIGS:
+        return ACCESS_TYPE_ALTER_CONFIGS;
+      case IDEMPOTENT_WRITE:
+        return ACCESS_TYPE_IDEMPOTENT_WRITE;
+      case UNKNOWN:
+      case ANY:
+      case ALL:
+      default:
+        return null;
+    }
+  }
+
+  private static String mapToResourceType(ResourceType resourceType) {
+    switch (resourceType) {
+      case TOPIC:
+        return KEY_TOPIC;
+      case CLUSTER:
+        return KEY_CLUSTER;
+      case GROUP:
+        return KEY_CONSUMER_GROUP;
+      case TRANSACTIONAL_ID:
+        return KEY_TRANSACTIONALID;
+      case DELEGATION_TOKEN:
+        return KEY_DELEGATIONTOKEN;
+      case ANY:
+      case UNKNOWN:
+      default:
+        return null;
+    }
+  }
+
+  private static RangerAccessResourceImpl getRangerAccessResource(String resourceTypeKey, String resourceName) {
+    RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+    rangerResource.setValue(resourceTypeKey, resourceName);
+    return rangerResource;
+  }
+
+  private static RangerAccessRequestImpl getRangerAccessRequest(String userName,
+                                                                Set<String> userGroups,
+                                                                String ip,
+                                                                Date eventTime,
+                                                                String resourceTypeKey,
+                                                                String resourceName,
+                                                                String accessType) {
+    RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+    rangerRequest.setResource(getRangerAccessResource(resourceTypeKey, resourceName));
+    rangerRequest.setUser(userName);
+    rangerRequest.setUserGroups(userGroups);
+    rangerRequest.setClientIPAddress(ip);
+    rangerRequest.setAccessTime(eventTime);
+    rangerRequest.setAccessType(accessType);
+    rangerRequest.setAction(accessType);
+    rangerRequest.setRequestData(resourceName);
+    return rangerRequest;
+  }
+
+  private static List<AuthorizationResult> denyAll(List<Action> actions) {
+    return actions.stream().map(a -> AuthorizationResult.DENIED).collect(Collectors.toList());
+  }
+
+  private static List<AuthorizationResult> mapResults(List<Action> actions, Collection<RangerAccessResult> results) {
+    if (CollectionUtils.isEmpty(results)) {
+      logger.error("Ranger Plugin returned null or empty. Returning Denied for all");
+      return denyAll(actions);
+    }
+    return results.stream()
+        .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void close() {
+    logger.info("close() called on authorizer.");
+    try {
+      if (rangerPlugin != null) {
+        rangerPlugin.cleanup();
+      }
+    } catch (Throwable t) {
+      logger.error("Error closing RangerPlugin.", t);
+    }
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    RangerBasePlugin me = rangerPlugin;
+    if (me == null) {
+      synchronized (RangerKafkaAuthorizer.class) {
+        me = rangerPlugin;
+        if (me == null) {
+          try {
+            // Possible to override JAAS configuration which is used by Ranger, otherwise
+            // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
+            // if it's not defined, then it reverts to 'KafkaServer' configuration.
+            final Object jaasContext = configs.get("ranger.jaas.context");
+            final String listenerName = (jaasContext instanceof String
+                && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
+                : SecurityProtocol.SASL_PLAINTEXT.name();
+            final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
+            JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
+            MiscUtil.setUGIFromJAASConfig(context.name());
+            UserGroupInformation loginUser = MiscUtil.getUGILoginUser();
+            logger.info("LoginUser={}", loginUser);
+          } catch (Throwable t) {
+            logger.error("Error getting principal.", t);
+          }
+          rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+        }
+      }
+    }
+    logger.info("Calling plugin.init()");
+    rangerPlugin.init();
+    auditHandler = new RangerKafkaAuditHandler();
+    rangerPlugin.setResultProcessor(auditHandler);
+  }
+
+  @Override
+  public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
+    return serverInfo.endpoints().stream()
+        .collect(Collectors.toMap(endpoint -> endpoint, endpoint -> CompletableFuture.completedFuture(null), (a, b) -> b));
+  }
+
+  @Override
+  public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
+    if (rangerPlugin == null) {
+      MiscUtil.logErrorMessageByInterval(logger, "Authorizer is still not initialized");
+      return denyAll(actions);
+    }
+
+    RangerPerfTracer perf = null;
+    if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
+      perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(actions=" + actions + ")");
+    }
+    try {
+      return wrappedAuthorization(requestContext, actions);
+    } finally {
+      RangerPerfTracer.log(perf);
+    }
+  }
+
+  private List<AuthorizationResult> wrappedAuthorization(AuthorizableRequestContext requestContext, List<Action> actions) {
+    String userName = requestContext.principal() == null ? null : requestContext.principal().getName();
+    Set<String> userGroups = MiscUtil.getGroupsForRequestUser(userName);
+    String hostAddress = requestContext.clientAddress().getHostAddress();
+    String ip = StringUtils.isNotEmpty(hostAddress) && hostAddress.charAt(0) == '/' ? hostAddress.substring(1) : hostAddress;
+    Date eventTime = new Date();
+
+    boolean isInvalid = false;
+    List<RangerAccessRequest> rangerRequests = new ArrayList<>();
+    for (Action action : actions) {
+      String accessType = mapToRangerAccessType(action.operation());
+      if (accessType == null) {
+        MiscUtil.logErrorMessageByInterval(logger, "Unsupported access type. operation=" + action.operation());
+        isInvalid = true;
+      }
+      String resourceTypeKey = mapToResourceType(action.resourcePattern().resourceType());
+      if (resourceTypeKey == null) {
+        MiscUtil.logErrorMessageByInterval(logger, "Unsupported resource type. resourceType=" + action.resourcePattern().resourceType());
+        isInvalid = true;
+      }
+      RangerAccessRequestImpl rangerAccessRequest = getRangerAccessRequest(
+          userName,
+          userGroups,
+          ip,
+          eventTime,
+          resourceTypeKey,
+          action.resourcePattern().name(),
+          accessType);
+      rangerRequests.add(rangerAccessRequest);
+    }
+
+    if (isInvalid) {
+      MiscUtil.logErrorMessageByInterval(logger, "Validation failed, requestContext=" + requestContext + ", actions=" + actions);
+      return denyAll(actions);
+    }
+
+    List<AuthorizationResult> authorizationResults = mapResults(actions, callRangerPlugin(rangerRequests));
+    logger.debug("rangerRequests={}, return={}", rangerRequests, authorizationResults);
+    return authorizationResults;
+  }
+
+  private Collection<RangerAccessResult> callRangerPlugin(List<RangerAccessRequest> rangerRequests) {
+    Collection<RangerAccessResult> results = null;
+    try {
+      results = rangerPlugin.isAccessAllowed(rangerRequests);
+    } catch (Throwable t) {
+      logger.error("Error while calling isAccessAllowed(). requests=" + rangerRequests, t);
+    } finally {
+      auditHandler.flushAudit();
+    }
+    return results;
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
+    logger.error("createAcls is not supported by Ranger for Kafka");
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
+    logger.error("deleteAcls is not supported by Ranger for Kafka");
+    return Collections.emptyList();
+  }
+
+  @Override
+  public Iterable<AclBinding> acls(AclBindingFilter filter) {
+    logger.error("(getting) acls is not supported by Ranger for Kafka");
+    return Collections.emptyList();

Review comment:
       return failed futures

##########
File path: plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -43,286 +60,256 @@
 import org.slf4j.LoggerFactory;
 
 public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger logger = LoggerFactory
-			.getLogger(RangerKafkaAuthorizer.class);
-	private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
-	public static final String KEY_TOPIC = "topic";
-	public static final String KEY_CLUSTER = "cluster";
-	public static final String KEY_CONSUMER_GROUP = "consumergroup";
-	public static final String KEY_TRANSACTIONALID = "transactionalid";
-	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-
-	public static final String ACCESS_TYPE_READ = "consume";
-	public static final String ACCESS_TYPE_WRITE = "publish";
-	public static final String ACCESS_TYPE_CREATE = "create";
-	public static final String ACCESS_TYPE_DELETE = "delete";
-	public static final String ACCESS_TYPE_CONFIGURE = "configure";
-	public static final String ACCESS_TYPE_DESCRIBE = "describe";
-	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
-	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
-	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
-	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";
-
-	private static volatile RangerBasePlugin rangerPlugin = null;
-	RangerKafkaAuditHandler auditHandler = null;
-
-	public RangerKafkaAuthorizer() {
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
-	 */
-	@Override
-	public void configure(Map<String, ?> configs) {
-		RangerBasePlugin me = rangerPlugin;
-		if (me == null) {
-			synchronized(RangerKafkaAuthorizer.class) {
-				me = rangerPlugin;
-				if (me == null) {
-					try {
-						// Possible to override JAAS configuration which is used by Ranger, otherwise
-						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
-						// if it's not defined, then it reverts to 'KafkaServer' configuration.
-						final Object jaasContext = configs.get("ranger.jaas.context");
-						final String listenerName = (jaasContext instanceof String
-								&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
-										: SecurityProtocol.SASL_PLAINTEXT.name();
-						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
-						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
-						MiscUtil.setUGIFromJAASConfig(context.name());
-						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
-					} catch (Throwable t) {
-						logger.error("Error getting principal.", t);
-					}
-					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-				}
-			}
-		}
-		logger.info("Calling plugin.init()");
-		rangerPlugin.init();
-		auditHandler = new RangerKafkaAuditHandler();
-		rangerPlugin.setResultProcessor(auditHandler);
-	}
-
-	@Override
-	public void close() {
-		logger.info("close() called on authorizer.");
-		try {
-			if (rangerPlugin != null) {
-				rangerPlugin.cleanup();
-			}
-		} catch (Throwable t) {
-			logger.error("Error closing RangerPlugin.", t);
-		}
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,
-			Resource resource) {
-
-		if (rangerPlugin == null) {
-			MiscUtil.logErrorMessageByInterval(logger,
-					"Authorizer is still not initialized");
-			return false;
-		}
-
-		RangerPerfTracer perf = null;
-
-		if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
-			perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
-		}
-		String userName = null;
-		if (session.principal() != null) {
-			userName = session.principal().getName();
-		}
-		java.util.Set<String> userGroups = MiscUtil
-				.getGroupsForRequestUser(userName);
-		String ip = session.clientAddress().getHostAddress();
-
-		// skip leading slash
-		if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
-			ip = ip.substring(1);
-		}
-
-		Date eventTime = new Date();
-		String accessType = mapToRangerAccessType(operation);
-		boolean validationFailed = false;
-		String validationStr = "";
-
-		if (accessType == null) {
-			if (MiscUtil.logErrorMessageByInterval(logger,
-					"Unsupported access type. operation=" + operation)) {
-				logger.error("Unsupported access type. session=" + session
-						+ ", operation=" + operation + ", resource=" + resource);
-			}
-			validationFailed = true;
-			validationStr += "Unsupported access type. operation=" + operation;
-		}
-		String action = accessType;
-
-		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
-		rangerRequest.setUser(userName);
-		rangerRequest.setUserGroups(userGroups);
-		rangerRequest.setClientIPAddress(ip);
-		rangerRequest.setAccessTime(eventTime);
-
-		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
-		rangerRequest.setResource(rangerResource);
-		rangerRequest.setAccessType(accessType);
-		rangerRequest.setAction(action);
-		rangerRequest.setRequestData(resource.name());
-
-		if (resource.resourceType().equals(Topic$.MODULE$)) {
-			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(Cluster$.MODULE$)) {
-			rangerResource.setValue(KEY_CLUSTER, resource.name());
-		} else if (resource.resourceType().equals(Group$.MODULE$)) {
-			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
-		} else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
-			rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
-		} else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
-			rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
-		} else {
-			logger.error("Unsupported resourceType=" + resource.resourceType());
-			validationFailed = true;
-		}
-
-		boolean returnValue = false;
-		if (validationFailed) {
-			MiscUtil.logErrorMessageByInterval(logger, validationStr
-					+ ", request=" + rangerRequest);
-		} else {
-
-			try {
-				RangerAccessResult result = rangerPlugin
-						.isAccessAllowed(rangerRequest);
-				if (result == null) {
-					logger.error("Ranger Plugin returned null. Returning false");
-				} else {
-					returnValue = result.getIsAllowed();
-				}
-			} catch (Throwable t) {
-				logger.error("Error while calling isAccessAllowed(). request="
-						+ rangerRequest, t);
-			} finally {
-				auditHandler.flushAudit();
-			}
-		}
-		RangerPerfTracer.log(perf);
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("rangerRequest=" + rangerRequest + ", return="
-					+ returnValue);
-		}
-		return returnValue;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Resource resource) {
-		logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
-
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-	 * )
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
-			KafkaPrincipal principal) {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls()
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/**
-	 * @param operation
-	 * @return
-	 */
-	private String mapToRangerAccessType(Operation operation) {
-		if (operation.equals(Read$.MODULE$)) {
-			return ACCESS_TYPE_READ;
-		} else if (operation.equals(Write$.MODULE$)) {
-			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Alter$.MODULE$)) {
-			return ACCESS_TYPE_CONFIGURE;
-		} else if (operation.equals(Describe$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE;
-		} else if (operation.equals(ClusterAction$.MODULE$)) {
-			return ACCESS_TYPE_CLUSTER_ACTION;
-		} else if (operation.equals(Create$.MODULE$)) {
-			return ACCESS_TYPE_CREATE;
-		} else if (operation.equals(Delete$.MODULE$)) {
-			return ACCESS_TYPE_DELETE;
-		} else if (operation.equals(DescribeConfigs$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE_CONFIGS;
-		} else if (operation.equals(AlterConfigs$.MODULE$)) {
-			return ACCESS_TYPE_ALTER_CONFIGS;
-		} else if (operation.equals(IdempotentWrite$.MODULE$)) {
-			return ACCESS_TYPE_IDEMPOTENT_WRITE;
-		}
-		return null;
-	}
+  public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
+  public static final String KEY_TOPIC = "topic";
+  public static final String KEY_CLUSTER = "cluster";
+  public static final String KEY_CONSUMER_GROUP = "consumergroup";
+  public static final String KEY_TRANSACTIONALID = "transactionalid";
+  public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
+  public static final String ACCESS_TYPE_READ = "consume";
+  public static final String ACCESS_TYPE_WRITE = "publish";
+  public static final String ACCESS_TYPE_CREATE = "create";
+  public static final String ACCESS_TYPE_DELETE = "delete";
+  public static final String ACCESS_TYPE_CONFIGURE = "configure";
+  public static final String ACCESS_TYPE_DESCRIBE = "describe";
+  public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
+  public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
+  public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+
+  private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static volatile RangerBasePlugin rangerPlugin = null;
+  RangerKafkaAuditHandler auditHandler = null;
+
+  public RangerKafkaAuthorizer() {
+  }
+
+  private static String mapToRangerAccessType(AclOperation operation) {
+    switch (operation) {
+      case READ:
+        return ACCESS_TYPE_READ;
+      case WRITE:
+        return ACCESS_TYPE_WRITE;
+      case ALTER:
+        return ACCESS_TYPE_CONFIGURE;
+      case DESCRIBE:
+        return ACCESS_TYPE_DESCRIBE;
+      case CLUSTER_ACTION:
+        return ACCESS_TYPE_CLUSTER_ACTION;
+      case CREATE:
+        return ACCESS_TYPE_CREATE;
+      case DELETE:
+        return ACCESS_TYPE_DELETE;
+      case DESCRIBE_CONFIGS:
+        return ACCESS_TYPE_DESCRIBE_CONFIGS;
+      case ALTER_CONFIGS:
+        return ACCESS_TYPE_ALTER_CONFIGS;
+      case IDEMPOTENT_WRITE:
+        return ACCESS_TYPE_IDEMPOTENT_WRITE;
+      case UNKNOWN:
+      case ANY:
+      case ALL:
+      default:
+        return null;
+    }
+  }
+
+  private static String mapToResourceType(ResourceType resourceType) {
+    switch (resourceType) {
+      case TOPIC:
+        return KEY_TOPIC;
+      case CLUSTER:
+        return KEY_CLUSTER;
+      case GROUP:
+        return KEY_CONSUMER_GROUP;
+      case TRANSACTIONAL_ID:
+        return KEY_TRANSACTIONALID;
+      case DELEGATION_TOKEN:
+        return KEY_DELEGATIONTOKEN;
+      case ANY:
+      case UNKNOWN:
+      default:
+        return null;
+    }
+  }
+
+  private static RangerAccessResourceImpl getRangerAccessResource(String resourceTypeKey, String resourceName) {
+    RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+    rangerResource.setValue(resourceTypeKey, resourceName);
+    return rangerResource;
+  }
+
+  private static RangerAccessRequestImpl getRangerAccessRequest(String userName,
+                                                                Set<String> userGroups,
+                                                                String ip,
+                                                                Date eventTime,
+                                                                String resourceTypeKey,
+                                                                String resourceName,
+                                                                String accessType) {
+    RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+    rangerRequest.setResource(getRangerAccessResource(resourceTypeKey, resourceName));
+    rangerRequest.setUser(userName);
+    rangerRequest.setUserGroups(userGroups);
+    rangerRequest.setClientIPAddress(ip);
+    rangerRequest.setAccessTime(eventTime);
+    rangerRequest.setAccessType(accessType);
+    rangerRequest.setAction(accessType);
+    rangerRequest.setRequestData(resourceName);
+    return rangerRequest;
+  }
+
+  private static List<AuthorizationResult> denyAll(List<Action> actions) {
+    return actions.stream().map(a -> AuthorizationResult.DENIED).collect(Collectors.toList());
+  }
+
+  private static List<AuthorizationResult> mapResults(List<Action> actions, Collection<RangerAccessResult> results) {
+    if (CollectionUtils.isEmpty(results)) {
+      logger.error("Ranger Plugin returned null or empty. Returning Denied for all");
+      return denyAll(actions);
+    }
+    return results.stream()
+        .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void close() {
+    logger.info("close() called on authorizer.");
+    try {
+      if (rangerPlugin != null) {
+        rangerPlugin.cleanup();
+      }
+    } catch (Throwable t) {
+      logger.error("Error closing RangerPlugin.", t);
+    }
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    RangerBasePlugin me = rangerPlugin;
+    if (me == null) {
+      synchronized (RangerKafkaAuthorizer.class) {
+        me = rangerPlugin;
+        if (me == null) {
+          try {
+            // Possible to override JAAS configuration which is used by Ranger, otherwise
+            // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
+            // if it's not defined, then it reverts to 'KafkaServer' configuration.
+            final Object jaasContext = configs.get("ranger.jaas.context");
+            final String listenerName = (jaasContext instanceof String
+                && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
+                : SecurityProtocol.SASL_PLAINTEXT.name();
+            final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
+            JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
+            MiscUtil.setUGIFromJAASConfig(context.name());
+            UserGroupInformation loginUser = MiscUtil.getUGILoginUser();
+            logger.info("LoginUser={}", loginUser);
+          } catch (Throwable t) {
+            logger.error("Error getting principal.", t);
+          }
+          rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+        }
+      }
+    }
+    logger.info("Calling plugin.init()");
+    rangerPlugin.init();
+    auditHandler = new RangerKafkaAuditHandler();
+    rangerPlugin.setResultProcessor(auditHandler);
+  }
+
+  @Override
+  public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
+    return serverInfo.endpoints().stream()
+        .collect(Collectors.toMap(endpoint -> endpoint, endpoint -> CompletableFuture.completedFuture(null), (a, b) -> b));
+  }
+
+  @Override
+  public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
+    if (rangerPlugin == null) {
+      MiscUtil.logErrorMessageByInterval(logger, "Authorizer is still not initialized");
+      return denyAll(actions);
+    }
+
+    RangerPerfTracer perf = null;
+    if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
+      perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(actions=" + actions + ")");
+    }
+    try {
+      return wrappedAuthorization(requestContext, actions);
+    } finally {
+      RangerPerfTracer.log(perf);
+    }
+  }
+
+  private List<AuthorizationResult> wrappedAuthorization(AuthorizableRequestContext requestContext, List<Action> actions) {
+    String userName = requestContext.principal() == null ? null : requestContext.principal().getName();
+    Set<String> userGroups = MiscUtil.getGroupsForRequestUser(userName);
+    String hostAddress = requestContext.clientAddress().getHostAddress();
+    String ip = StringUtils.isNotEmpty(hostAddress) && hostAddress.charAt(0) == '/' ? hostAddress.substring(1) : hostAddress;
+    Date eventTime = new Date();
+
+    boolean isInvalid = false;
+    List<RangerAccessRequest> rangerRequests = new ArrayList<>();
+    for (Action action : actions) {
+      String accessType = mapToRangerAccessType(action.operation());
+      if (accessType == null) {
+        MiscUtil.logErrorMessageByInterval(logger, "Unsupported access type. operation=" + action.operation());
+        isInvalid = true;
+      }
+      String resourceTypeKey = mapToResourceType(action.resourcePattern().resourceType());
+      if (resourceTypeKey == null) {
+        MiscUtil.logErrorMessageByInterval(logger, "Unsupported resource type. resourceType=" + action.resourcePattern().resourceType());
+        isInvalid = true;
+      }
+      RangerAccessRequestImpl rangerAccessRequest = getRangerAccessRequest(
+          userName,
+          userGroups,
+          ip,
+          eventTime,
+          resourceTypeKey,
+          action.resourcePattern().name(),
+          accessType);
+      rangerRequests.add(rangerAccessRequest);
+    }
+
+    if (isInvalid) {
+      MiscUtil.logErrorMessageByInterval(logger, "Validation failed, requestContext=" + requestContext + ", actions=" + actions);
+      return denyAll(actions);
+    }
+
+    List<AuthorizationResult> authorizationResults = mapResults(actions, callRangerPlugin(rangerRequests));
+    logger.debug("rangerRequests={}, return={}", rangerRequests, authorizationResults);
+    return authorizationResults;
+  }
+
+  private Collection<RangerAccessResult> callRangerPlugin(List<RangerAccessRequest> rangerRequests) {
+    Collection<RangerAccessResult> results = null;
+    try {
+      results = rangerPlugin.isAccessAllowed(rangerRequests);
+    } catch (Throwable t) {
+      logger.error("Error while calling isAccessAllowed(). requests=" + rangerRequests, t);
+    } finally {
+      auditHandler.flushAudit();
+    }
+    return results;
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
+    logger.error("createAcls is not supported by Ranger for Kafka");
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
+    logger.error("deleteAcls is not supported by Ranger for Kafka");
+    return Collections.emptyList();

Review comment:
       same note about returning failed futures




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@ranger.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ranger] akatona84 edited a comment on pull request #133: RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504

Posted by GitBox <gi...@apache.org>.
akatona84 edited a comment on pull request #133:
URL: https://github.com/apache/ranger/pull/133#issuecomment-1070450363


   rebased, maybe CI will succeed now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@ranger.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org