You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by rm...@apache.org on 2022/04/07 00:07:07 UTC

[ranger] branch master updated: RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504

This is an automated email from the ASF dual-hosted git repository.

rmani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new 4107b274a RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504
4107b274a is described below

commit 4107b274afcba79ed14e180944eab3affb1e577e
Author: Andras Katona <ak...@cloudera.com>
AuthorDate: Fri Feb 25 09:47:35 2022 +0100

    RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504
    
    kafka.security.auth.Authorizer has been deprecated since December 2019, and
    it's removed in Apache Kafka 3.0
    
    See the KIP for more details:
    https://cwiki.apache.org/confluence/display/KAFKA/KIP-504+-+Add+new+Java+Authorizer+Interface
    
    Co-authored-by: Chia-Ping Tsai <ch...@gmail.com>
    Change-Id: Id7115fcac355b1c794ec82201bb6bc6bdf9c5b05
    Signed-off-by: Ramesh Mani <rm...@cloudera.com>
---
 plugin-kafka/pom.xml                               |   6 +
 .../kafka/authorizer/RangerKafkaAuthorizer.java    | 586 +++++++++++----------
 ranger-kafka-plugin-shim/pom.xml                   |  12 +-
 .../kafka/authorizer/RangerKafkaAuthorizer.java    | 435 ++++++---------
 4 files changed, 478 insertions(+), 561 deletions(-)

diff --git a/plugin-kafka/pom.xml b/plugin-kafka/pom.xml
index d95f591fe..75db9e9de 100644
--- a/plugin-kafka/pom.xml
+++ b/plugin-kafka/pom.xml
@@ -47,10 +47,16 @@
             <artifactId>credentialbuilder</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_${scala.binary.version}</artifactId>
             <version>${kafka.version}</version>
+            <scope>test</scope>
             <exclusions>
                 <exclusion>
                    <groupId>io.netty</groupId>
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index 97a2f2ec7..64f622586 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -19,21 +19,38 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
-import scala.collection.immutable.HashSet;
-import scala.collection.immutable.Set;
-import kafka.security.auth.*;
-import kafka.network.RequestChannel.Session;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
 import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
@@ -43,286 +60,277 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger logger = LoggerFactory
-			.getLogger(RangerKafkaAuthorizer.class);
-	private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
-	public static final String KEY_TOPIC = "topic";
-	public static final String KEY_CLUSTER = "cluster";
-	public static final String KEY_CONSUMER_GROUP = "consumergroup";
-	public static final String KEY_TRANSACTIONALID = "transactionalid";
-	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-
-	public static final String ACCESS_TYPE_READ = "consume";
-	public static final String ACCESS_TYPE_WRITE = "publish";
-	public static final String ACCESS_TYPE_CREATE = "create";
-	public static final String ACCESS_TYPE_DELETE = "delete";
-	public static final String ACCESS_TYPE_CONFIGURE = "configure";
-	public static final String ACCESS_TYPE_DESCRIBE = "describe";
-	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
-	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
-	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
-	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";
-
-	private static volatile RangerBasePlugin rangerPlugin = null;
-	RangerKafkaAuditHandler auditHandler = null;
-
-	public RangerKafkaAuthorizer() {
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
-	 */
-	@Override
-	public void configure(Map<String, ?> configs) {
-		RangerBasePlugin me = rangerPlugin;
-		if (me == null) {
-			synchronized(RangerKafkaAuthorizer.class) {
-				me = rangerPlugin;
-				if (me == null) {
-					try {
-						// Possible to override JAAS configuration which is used by Ranger, otherwise
-						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
-						// if it's not defined, then it reverts to 'KafkaServer' configuration.
-						final Object jaasContext = configs.get("ranger.jaas.context");
-						final String listenerName = (jaasContext instanceof String
-								&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
-										: SecurityProtocol.SASL_PLAINTEXT.name();
-						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
-						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
-						MiscUtil.setUGIFromJAASConfig(context.name());
-						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
-					} catch (Throwable t) {
-						logger.error("Error getting principal.", t);
-					}
-					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-				}
-			}
-		}
-		logger.info("Calling plugin.init()");
-		rangerPlugin.init();
-		auditHandler = new RangerKafkaAuditHandler();
-		rangerPlugin.setResultProcessor(auditHandler);
-	}
-
-	@Override
-	public void close() {
-		logger.info("close() called on authorizer.");
-		try {
-			if (rangerPlugin != null) {
-				rangerPlugin.cleanup();
-			}
-		} catch (Throwable t) {
-			logger.error("Error closing RangerPlugin.", t);
-		}
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,
-			Resource resource) {
-
-		if (rangerPlugin == null) {
-			MiscUtil.logErrorMessageByInterval(logger,
-					"Authorizer is still not initialized");
-			return false;
-		}
-
-		RangerPerfTracer perf = null;
-
-		if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
-			perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
-		}
-		String userName = null;
-		if (session.principal() != null) {
-			userName = session.principal().getName();
-		}
-		java.util.Set<String> userGroups = MiscUtil
-				.getGroupsForRequestUser(userName);
-		String ip = session.clientAddress().getHostAddress();
-
-		// skip leading slash
-		if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
-			ip = ip.substring(1);
-		}
-
-		Date eventTime = new Date();
-		String accessType = mapToRangerAccessType(operation);
-		boolean validationFailed = false;
-		String validationStr = "";
-
-		if (accessType == null) {
-			if (MiscUtil.logErrorMessageByInterval(logger,
-					"Unsupported access type. operation=" + operation)) {
-				logger.error("Unsupported access type. session=" + session
-						+ ", operation=" + operation + ", resource=" + resource);
-			}
-			validationFailed = true;
-			validationStr += "Unsupported access type. operation=" + operation;
-		}
-		String action = accessType;
-
-		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
-		rangerRequest.setUser(userName);
-		rangerRequest.setUserGroups(userGroups);
-		rangerRequest.setClientIPAddress(ip);
-		rangerRequest.setAccessTime(eventTime);
-
-		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
-		rangerRequest.setResource(rangerResource);
-		rangerRequest.setAccessType(accessType);
-		rangerRequest.setAction(action);
-		rangerRequest.setRequestData(resource.name());
-
-		if (resource.resourceType().equals(Topic$.MODULE$)) {
-			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(Cluster$.MODULE$)) {
-			rangerResource.setValue(KEY_CLUSTER, resource.name());
-		} else if (resource.resourceType().equals(Group$.MODULE$)) {
-			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
-		} else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
-			rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
-		} else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
-			rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
-		} else {
-			logger.error("Unsupported resourceType=" + resource.resourceType());
-			validationFailed = true;
-		}
-
-		boolean returnValue = false;
-		if (validationFailed) {
-			MiscUtil.logErrorMessageByInterval(logger, validationStr
-					+ ", request=" + rangerRequest);
-		} else {
-
-			try {
-				RangerAccessResult result = rangerPlugin
-						.isAccessAllowed(rangerRequest);
-				if (result == null) {
-					logger.error("Ranger Plugin returned null. Returning false");
-				} else {
-					returnValue = result.getIsAllowed();
-				}
-			} catch (Throwable t) {
-				logger.error("Error while calling isAccessAllowed(). request="
-						+ rangerRequest, t);
-			} finally {
-				auditHandler.flushAudit();
-			}
-		}
-		RangerPerfTracer.log(perf);
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("rangerRequest=" + rangerRequest + ", return="
-					+ returnValue);
-		}
-		return returnValue;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-	 * kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public boolean removeAcls(Resource resource) {
-		logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-	 */
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		Set<Acl> aclList = new HashSet<Acl>();
-		logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
-
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see
-	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-	 * )
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
-			KafkaPrincipal principal) {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see kafka.security.auth.Authorizer#getAcls()
-	 */
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-		logger.error("getAcls() is not supported by Ranger for Kafka");
-		return aclList;
-	}
-
-	/**
-	 * @param operation
-	 * @return
-	 */
-	private String mapToRangerAccessType(Operation operation) {
-		if (operation.equals(Read$.MODULE$)) {
-			return ACCESS_TYPE_READ;
-		} else if (operation.equals(Write$.MODULE$)) {
-			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Alter$.MODULE$)) {
-			return ACCESS_TYPE_CONFIGURE;
-		} else if (operation.equals(Describe$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE;
-		} else if (operation.equals(ClusterAction$.MODULE$)) {
-			return ACCESS_TYPE_CLUSTER_ACTION;
-		} else if (operation.equals(Create$.MODULE$)) {
-			return ACCESS_TYPE_CREATE;
-		} else if (operation.equals(Delete$.MODULE$)) {
-			return ACCESS_TYPE_DELETE;
-		} else if (operation.equals(DescribeConfigs$.MODULE$)) {
-			return ACCESS_TYPE_DESCRIBE_CONFIGS;
-		} else if (operation.equals(AlterConfigs$.MODULE$)) {
-			return ACCESS_TYPE_ALTER_CONFIGS;
-		} else if (operation.equals(IdempotentWrite$.MODULE$)) {
-			return ACCESS_TYPE_IDEMPOTENT_WRITE;
-		}
-		return null;
-	}
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+  private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
+
+  public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
+  public static final String KEY_TOPIC = "topic";
+  public static final String KEY_CLUSTER = "cluster";
+  public static final String KEY_CONSUMER_GROUP = "consumergroup";
+  public static final String KEY_TRANSACTIONALID = "transactionalid";
+  public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
+  public static final String ACCESS_TYPE_READ = "consume";
+  public static final String ACCESS_TYPE_WRITE = "publish";
+  public static final String ACCESS_TYPE_CREATE = "create";
+  public static final String ACCESS_TYPE_DELETE = "delete";
+  public static final String ACCESS_TYPE_CONFIGURE = "configure";
+  public static final String ACCESS_TYPE_DESCRIBE = "describe";
+  public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
+  public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
+  public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+
+  private static volatile RangerBasePlugin rangerPlugin = null;
+  RangerKafkaAuditHandler auditHandler = null;
+
+  public RangerKafkaAuthorizer() {
+  }
+
+  private static String mapToRangerAccessType(AclOperation operation) {
+    switch (operation) {
+      case READ:
+        return ACCESS_TYPE_READ;
+      case WRITE:
+        return ACCESS_TYPE_WRITE;
+      case ALTER:
+        return ACCESS_TYPE_CONFIGURE;
+      case DESCRIBE:
+        return ACCESS_TYPE_DESCRIBE;
+      case CLUSTER_ACTION:
+        return ACCESS_TYPE_CLUSTER_ACTION;
+      case CREATE:
+        return ACCESS_TYPE_CREATE;
+      case DELETE:
+        return ACCESS_TYPE_DELETE;
+      case DESCRIBE_CONFIGS:
+        return ACCESS_TYPE_DESCRIBE_CONFIGS;
+      case ALTER_CONFIGS:
+        return ACCESS_TYPE_ALTER_CONFIGS;
+      case IDEMPOTENT_WRITE:
+        return ACCESS_TYPE_IDEMPOTENT_WRITE;
+      case UNKNOWN:
+      case ANY:
+      case ALL:
+      default:
+        return null;
+    }
+  }
+
+  private static String mapToResourceType(ResourceType resourceType) {
+    switch (resourceType) {
+      case TOPIC:
+        return KEY_TOPIC;
+      case CLUSTER:
+        return KEY_CLUSTER;
+      case GROUP:
+        return KEY_CONSUMER_GROUP;
+      case TRANSACTIONAL_ID:
+        return KEY_TRANSACTIONALID;
+      case DELEGATION_TOKEN:
+        return KEY_DELEGATIONTOKEN;
+      case ANY:
+      case UNKNOWN:
+      default:
+        return null;
+    }
+  }
+
+  private static RangerAccessResourceImpl createRangerAccessResource(String resourceTypeKey, String resourceName) {
+    RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+    rangerResource.setValue(resourceTypeKey, resourceName);
+    return rangerResource;
+  }
+
+  private static RangerAccessRequestImpl createRangerAccessRequest(String userName,
+                                                                   Set<String> userGroups,
+                                                                   String ip,
+                                                                   Date eventTime,
+                                                                   String resourceTypeKey,
+                                                                   String resourceName,
+                                                                   String accessType) {
+    RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+    rangerRequest.setResource(createRangerAccessResource(resourceTypeKey, resourceName));
+    rangerRequest.setUser(userName);
+    rangerRequest.setUserGroups(userGroups);
+    rangerRequest.setClientIPAddress(ip);
+    rangerRequest.setAccessTime(eventTime);
+    rangerRequest.setAccessType(accessType);
+    rangerRequest.setAction(accessType);
+    rangerRequest.setRequestData(resourceName);
+    return rangerRequest;
+  }
+
+  private static List<AuthorizationResult> denyAll(List<Action> actions) {
+    return actions.stream().map(a -> AuthorizationResult.DENIED).collect(Collectors.toList());
+  }
+
+  private static List<AuthorizationResult> mapResults(List<Action> actions, Collection<RangerAccessResult> results) {
+    if (CollectionUtils.isEmpty(results)) {
+      logger.error("Ranger Plugin returned null or empty. Returning Denied for all");
+      return denyAll(actions);
+    }
+    return results.stream()
+        .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED)
+        .collect(Collectors.toList());
+  }
+
+  private static String toString(AuthorizableRequestContext requestContext) {
+    return requestContext == null ? null :
+        String.format("AuthorizableRequestContext{principal=%s, clientAddress=%s, clientId=%s}",
+            requestContext.principal(), requestContext.clientAddress(), requestContext.clientId());
+  }
+
+  @Override
+  public void close() {
+    logger.info("close() called on authorizer.");
+    try {
+      if (rangerPlugin != null) {
+        rangerPlugin.cleanup();
+      }
+    } catch (Throwable t) {
+      logger.error("Error closing RangerPlugin.", t);
+    }
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    RangerBasePlugin me = rangerPlugin;
+    if (me == null) {
+      synchronized (RangerKafkaAuthorizer.class) {
+        me = rangerPlugin;
+        if (me == null) {
+          try {
+            // Possible to override JAAS configuration which is used by Ranger, otherwise
+            // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
+            // if it's not defined, then it reverts to 'KafkaServer' configuration.
+            final Object jaasContext = configs.get("ranger.jaas.context");
+            final String listenerName = (jaasContext instanceof String
+                && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
+                : SecurityProtocol.SASL_PLAINTEXT.name();
+            final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
+            JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
+            MiscUtil.setUGIFromJAASConfig(context.name());
+            UserGroupInformation loginUser = MiscUtil.getUGILoginUser();
+            logger.info("LoginUser={}", loginUser);
+          } catch (Throwable t) {
+            logger.error("Error getting principal.", t);
+          }
+          rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+          logger.info("Calling plugin.init()");
+          rangerPlugin.init();
+          auditHandler = new RangerKafkaAuditHandler();
+          rangerPlugin.setResultProcessor(auditHandler);
+        }
+      }
+    }
+  }
+
+  @Override
+  public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
+    return serverInfo.endpoints().stream()
+        .collect(Collectors.toMap(endpoint -> endpoint, endpoint -> CompletableFuture.completedFuture(null), (a, b) -> b));
+  }
+
+  @Override
+  public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
+    if (rangerPlugin == null) {
+      MiscUtil.logErrorMessageByInterval(logger, "Authorizer is still not initialized");
+      return denyAll(actions);
+    }
+
+    RangerPerfTracer perf = null;
+    if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
+      perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(actions=" + actions + ")");
+    }
+    try {
+      return wrappedAuthorization(requestContext, actions);
+    } finally {
+      RangerPerfTracer.log(perf);
+    }
+  }
+
+  private List<AuthorizationResult> wrappedAuthorization(AuthorizableRequestContext requestContext, List<Action> actions) {
+    if (CollectionUtils.isEmpty(actions)) {
+      return Collections.emptyList();
+    }
+    String userName = requestContext.principal() == null ? null : requestContext.principal().getName();
+    Set<String> userGroups = MiscUtil.getGroupsForRequestUser(userName);
+    String hostAddress = requestContext.clientAddress() == null ? null : requestContext.clientAddress().getHostAddress();
+    String ip = StringUtils.isNotEmpty(hostAddress) && hostAddress.charAt(0) == '/' ? hostAddress.substring(1) : hostAddress;
+    Date eventTime = new Date();
+
+    List<RangerAccessRequest> rangerRequests = new ArrayList<>();
+    for (Action action : actions) {
+      String accessType = mapToRangerAccessType(action.operation());
+      if (accessType == null) {
+        MiscUtil.logErrorMessageByInterval(logger, "Unsupported access type, requestContext=" + toString(requestContext) +
+            ", actions=" + actions + ", operation=" + action.operation());
+        return denyAll(actions);
+      }
+      String resourceTypeKey = mapToResourceType(action.resourcePattern().resourceType());
+      if (resourceTypeKey == null) {
+        MiscUtil.logErrorMessageByInterval(logger, "Unsupported resource type, requestContext=" + toString(requestContext) +
+            ", actions=" + actions + ", resourceType=" + action.resourcePattern().resourceType());
+        return denyAll(actions);
+      }
+
+      RangerAccessRequestImpl rangerAccessRequest = createRangerAccessRequest(
+          userName,
+          userGroups,
+          ip,
+          eventTime,
+          resourceTypeKey,
+          action.resourcePattern().name(),
+          accessType);
+      rangerRequests.add(rangerAccessRequest);
+    }
+
+    Collection<RangerAccessResult> results = callRangerPlugin(rangerRequests);
+
+    List<AuthorizationResult> authorizationResults = mapResults(actions, results);
+
+    logger.debug("rangerRequests={}, return={}", rangerRequests, authorizationResults);
+    return authorizationResults;
+  }
+
+  private Collection<RangerAccessResult> callRangerPlugin(List<RangerAccessRequest> rangerRequests) {
+    try {
+      return rangerPlugin.isAccessAllowed(rangerRequests);
+    } catch (Throwable t) {
+      logger.error("Error while calling isAccessAllowed(). requests={}", rangerRequests, t);
+      return null;
+    } finally {
+      auditHandler.flushAudit();
+    }
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
+    logger.error("createAcls is not supported by Ranger for Kafka");
+
+    return aclBindings.stream()
+        .map(ab -> {
+          CompletableFuture<AclCreateResult> completableFuture = new CompletableFuture<>();
+          completableFuture.completeExceptionally(new UnsupportedOperationException("createAcls is not supported by Ranger for Kafka"));
+          return completableFuture;
+        })
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
+    logger.error("deleteAcls is not supported by Ranger for Kafka");
+    return aclBindingFilters.stream()
+        .map(ab -> {
+          CompletableFuture<AclDeleteResult> completableFuture = new CompletableFuture<>();
+          completableFuture.completeExceptionally(new UnsupportedOperationException("deleteAcls is not supported by Ranger for Kafka"));
+          return completableFuture;
+        })
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public Iterable<AclBinding> acls(AclBindingFilter filter) {
+    logger.error("(getting) acls is not supported by Ranger for Kafka");
+    throw new UnsupportedOperationException("(getting) acls is not supported by Ranger for Kafka");
+  }
 }
diff --git a/ranger-kafka-plugin-shim/pom.xml b/ranger-kafka-plugin-shim/pom.xml
index 3264138a8..e0722c0ae 100644
--- a/ranger-kafka-plugin-shim/pom.xml
+++ b/ranger-kafka-plugin-shim/pom.xml
@@ -54,18 +54,8 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_${scala.binary.version}</artifactId>
+            <artifactId>kafka-clients</artifactId>
             <version>${kafka.version}</version>
-            <exclusions>
-                <exclusion>
-                   <groupId>io.netty</groupId>
-                   <artifactId>netty-handler</artifactId>
-                </exclusion>
-                <exclusion>
-                   <groupId>io.netty</groupId>
-                   <artifactId>netty-transport-native-epoll</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
     </dependencies>
 </project>
diff --git a/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index b84b765c2..0b64f3e1d 100644
--- a/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -19,272 +19,185 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.CompletionStage;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
 import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.collection.immutable.Set;
-import kafka.network.RequestChannel.Session;
-import kafka.security.auth.Acl;
-import kafka.security.auth.Authorizer;
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
+public class RangerKafkaAuthorizer implements Authorizer {
+  private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
 
+  private static final String RANGER_PLUGIN_TYPE = "kafka";
+  private static final String RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
 
-//public class RangerKafkaAuthorizer extends Authorizer {
-public class RangerKafkaAuthorizer implements Authorizer {
-	private static final Logger LOG  = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
-
-	private static final String   RANGER_PLUGIN_TYPE                      = "kafka";
-	private static final String   RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME  = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
-
-	private Authorizer              rangerKakfaAuthorizerImpl = null;
-	private RangerPluginClassLoader rangerPluginClassLoader   = null;
-	
-	public RangerKafkaAuthorizer() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-
-		this.init();
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-		}
-	}
-	
-	private void init(){
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.init()");
-		}
-
-		try {
-			
-			rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
-			
-			@SuppressWarnings("unchecked")
-			Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
-
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl = cls.newInstance();
-		} catch (Exception e) {
-			// check what need to be done
-			LOG.error("Error Enabling RangerKafkaPlugin", e);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.init()");
-		}
-	}
-
-	@Override
-	public void configure(Map<String, ?> configs) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.configure(configs);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
-		}
-	}
-
-	@Override
-	public void close() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.close()");
-		}
-
-		try {
-			activatePluginClassLoader();
-			
-			rangerKakfaAuthorizerImpl.close();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.close()");
-		}
-		
-	}
-
-	@Override
-	public boolean authorize(Session session, Operation operation,Resource resource) {	
-		if(LOG.isDebugEnabled()) {
-			LOG.debug(String.format("==> RangerKafkaAuthorizer.authorize(Session=%s, Operation=%s, Resource=%s)", session, operation, resource));
-		}
-
-		boolean ret = false;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.authorize(session, operation, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.authorize: " + ret);
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public void addAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-
-		try {
-			activatePluginClassLoader();
-
-			rangerKakfaAuthorizerImpl.addAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
-		}
-	}
-
-	@Override
-	public boolean removeAcls(Set<Acl> acls, Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(acls, resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-		}
-		
-		return ret;
-	}
-
-	@Override
-	public boolean removeAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-		boolean ret = false;
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.removeAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public Set<Acl> getAcls(Resource resource) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-		
-		Set<Acl> ret = null;
-		
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(resource);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls(principal);
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-		}
-
-		return ret;
-	}
-
-	@Override
-	public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
-		}
-
-		scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-		try {
-			activatePluginClassLoader();
-
-			ret = rangerKakfaAuthorizerImpl.getAcls();
-		} finally {
-			deactivatePluginClassLoader();
-		}
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
-		}
-
-		return ret;
-	}
-	
-	private void activatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.activate();
-		}
-	}
-
-	private void deactivatePluginClassLoader() {
-		if(rangerPluginClassLoader != null) {
-			rangerPluginClassLoader.deactivate();
-		}
-	}
-		
+  private Authorizer rangerKafkaAuthorizerImpl = null;
+  private RangerPluginClassLoader rangerPluginClassLoader = null;
+
+  public RangerKafkaAuthorizer() {
+    logger.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+
+    this.init();
+
+    logger.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+  }
+
+  private static String toString(AuthorizableRequestContext requestContext) {
+    return requestContext == null ? null :
+        String.format("AuthorizableRequestContext{principal=%s, clientAddress=%s, clientId=%s}",
+            requestContext.principal(), requestContext.clientAddress(), requestContext.clientId());
+  }
+
+  private void init() {
+    logger.debug("==> RangerKafkaAuthorizer.init()");
+
+    try {
+
+      rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
+
+      @SuppressWarnings("unchecked")
+      Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
+
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl = cls.newInstance();
+    } catch (Exception e) {
+      logger.error("Error Enabling RangerKafkaPlugin", e);
+      throw new IllegalStateException("Error Enabling RangerKafkaPlugin", e);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.init()");
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    logger.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
+
+    try {
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl.configure(configs);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
+  }
+
+  @Override
+  public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo authorizerServerInfo) {
+    logger.debug("==> RangerKafkaAuthorizer.start(AuthorizerServerInfo)");
+
+    try {
+      activatePluginClassLoader();
+
+      return rangerKafkaAuthorizerImpl.start(authorizerServerInfo);
+    } finally {
+      deactivatePluginClassLoader();
+      logger.debug("<== RangerKafkaAuthorizer.start(AuthorizerServerInfo)");
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    logger.debug("==> RangerKafkaAuthorizer.close()");
+
+    try {
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl.close();
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.close()");
+  }
+
+  @Override
+  public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
+    logger.debug("==> RangerKafkaAuthorizer.authorize(AuthorizableRequestContext={}, List<Action>={})", toString(requestContext), actions);
+
+    List<AuthorizationResult> ret;
+
+    try {
+      activatePluginClassLoader();
+
+      ret = rangerKafkaAuthorizerImpl.authorize(requestContext, actions);
+    } finally {
+      deactivatePluginClassLoader();
+    }
+
+    logger.debug("<== RangerKafkaAuthorizer.authorize: {}", ret);
+
+    return ret;
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
+    logger.debug("==> RangerKafkaAuthorizer.createAcls(AuthorizableRequestContext, List<AclBinding>)");
+
+    try {
+      activatePluginClassLoader();
+
+      return rangerKafkaAuthorizerImpl.createAcls(requestContext, aclBindings);
+    } finally {
+      deactivatePluginClassLoader();
+      logger.debug("<== RangerKafkaAuthorizer.createAcls(AuthorizableRequestContext, List<AclBinding>)");
+    }
+  }
+
+  @Override
+  public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
+    logger.debug("==> RangerKafkaAuthorizer.deleteAcls(AuthorizableRequestContext, List<AclBindingFilter>)");
+
+    try {
+      activatePluginClassLoader();
+
+      return rangerKafkaAuthorizerImpl.deleteAcls(requestContext, aclBindingFilters);
+    } finally {
+      deactivatePluginClassLoader();
+      logger.debug("<== RangerKafkaAuthorizer.deleteAcls(AuthorizableRequestContext, List<AclBindingFilter>)");
+    }
+  }
+
+  @Override
+  public Iterable<AclBinding> acls(AclBindingFilter filter) {
+    logger.debug("==> RangerKafkaAuthorizer.acls(AclBindingFilter)");
+
+    try {
+      activatePluginClassLoader();
+
+      return rangerKafkaAuthorizerImpl.acls(filter);
+    } finally {
+      deactivatePluginClassLoader();
+      logger.debug("<== RangerKafkaAuthorizer.acls(AclBindingFilter)");
+    }
+  }
+
+  private void activatePluginClassLoader() {
+    if (rangerPluginClassLoader != null) {
+      rangerPluginClassLoader.activate();
+    }
+  }
+
+  private void deactivatePluginClassLoader() {
+    if (rangerPluginClassLoader != null) {
+      rangerPluginClassLoader.deactivate();
+    }
+  }
 }