You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by rm...@apache.org on 2022/04/07 00:07:07 UTC
[ranger] branch master updated: RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504
This is an automated email from the ASF dual-hosted git repository.
rmani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push:
new 4107b274a RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504
4107b274a is described below
commit 4107b274afcba79ed14e180944eab3affb1e577e
Author: Andras Katona <ak...@cloudera.com>
AuthorDate: Fri Feb 25 09:47:35 2022 +0100
RANGER-3231: Ranger-Kafka-Plugin implementing Kafka Authorizer from KIP-504
kafka.security.auth.Authorizer has been deprecated since December 2019, and
it's removed in Apache Kafka 3.0
See the KIP for more details:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-504+-+Add+new+Java+Authorizer+Interface
Co-authored-by: Chia-Ping Tsai <ch...@gmail.com>
Change-Id: Id7115fcac355b1c794ec82201bb6bc6bdf9c5b05
Signed-off-by: Ramesh Mani <rm...@cloudera.com>
---
plugin-kafka/pom.xml | 6 +
.../kafka/authorizer/RangerKafkaAuthorizer.java | 586 +++++++++++----------
ranger-kafka-plugin-shim/pom.xml | 12 +-
.../kafka/authorizer/RangerKafkaAuthorizer.java | 435 ++++++---------
4 files changed, 478 insertions(+), 561 deletions(-)
diff --git a/plugin-kafka/pom.xml b/plugin-kafka/pom.xml
index d95f591fe..75db9e9de 100644
--- a/plugin-kafka/pom.xml
+++ b/plugin-kafka/pom.xml
@@ -47,10 +47,16 @@
<artifactId>credentialbuilder</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
+ <scope>test</scope>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index 97a2f2ec7..64f622586 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -19,21 +19,38 @@
package org.apache.ranger.authorization.kafka.authorizer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
-import scala.collection.immutable.HashSet;
-import scala.collection.immutable.Set;
-import kafka.security.auth.*;
-import kafka.network.RequestChannel.Session;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResult;
@@ -43,286 +60,277 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RangerKafkaAuthorizer implements Authorizer {
- private static final Logger logger = LoggerFactory
- .getLogger(RangerKafkaAuthorizer.class);
- private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
- public static final String KEY_TOPIC = "topic";
- public static final String KEY_CLUSTER = "cluster";
- public static final String KEY_CONSUMER_GROUP = "consumergroup";
- public static final String KEY_TRANSACTIONALID = "transactionalid";
- public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-
- public static final String ACCESS_TYPE_READ = "consume";
- public static final String ACCESS_TYPE_WRITE = "publish";
- public static final String ACCESS_TYPE_CREATE = "create";
- public static final String ACCESS_TYPE_DELETE = "delete";
- public static final String ACCESS_TYPE_CONFIGURE = "configure";
- public static final String ACCESS_TYPE_DESCRIBE = "describe";
- public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
- public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
- public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
- public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
-
- private static volatile RangerBasePlugin rangerPlugin = null;
- RangerKafkaAuditHandler auditHandler = null;
-
- public RangerKafkaAuthorizer() {
- }
-
- /*
- * (non-Javadoc)
- *
- * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
- */
- @Override
- public void configure(Map<String, ?> configs) {
- RangerBasePlugin me = rangerPlugin;
- if (me == null) {
- synchronized(RangerKafkaAuthorizer.class) {
- me = rangerPlugin;
- if (me == null) {
- try {
- // Possible to override JAAS configuration which is used by Ranger, otherwise
- // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
- // if it's not defined, then it reverts to 'KafkaServer' configuration.
- final Object jaasContext = configs.get("ranger.jaas.context");
- final String listenerName = (jaasContext instanceof String
- && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
- : SecurityProtocol.SASL_PLAINTEXT.name();
- final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
- JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
- MiscUtil.setUGIFromJAASConfig(context.name());
- logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
- } catch (Throwable t) {
- logger.error("Error getting principal.", t);
- }
- me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
- }
- }
- }
- logger.info("Calling plugin.init()");
- rangerPlugin.init();
- auditHandler = new RangerKafkaAuditHandler();
- rangerPlugin.setResultProcessor(auditHandler);
- }
-
- @Override
- public void close() {
- logger.info("close() called on authorizer.");
- try {
- if (rangerPlugin != null) {
- rangerPlugin.cleanup();
- }
- } catch (Throwable t) {
- logger.error("Error closing RangerPlugin.", t);
- }
- }
-
- @Override
- public boolean authorize(Session session, Operation operation,
- Resource resource) {
-
- if (rangerPlugin == null) {
- MiscUtil.logErrorMessageByInterval(logger,
- "Authorizer is still not initialized");
- return false;
- }
-
- RangerPerfTracer perf = null;
-
- if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
- perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
- }
- String userName = null;
- if (session.principal() != null) {
- userName = session.principal().getName();
- }
- java.util.Set<String> userGroups = MiscUtil
- .getGroupsForRequestUser(userName);
- String ip = session.clientAddress().getHostAddress();
-
- // skip leading slash
- if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
- ip = ip.substring(1);
- }
-
- Date eventTime = new Date();
- String accessType = mapToRangerAccessType(operation);
- boolean validationFailed = false;
- String validationStr = "";
-
- if (accessType == null) {
- if (MiscUtil.logErrorMessageByInterval(logger,
- "Unsupported access type. operation=" + operation)) {
- logger.error("Unsupported access type. session=" + session
- + ", operation=" + operation + ", resource=" + resource);
- }
- validationFailed = true;
- validationStr += "Unsupported access type. operation=" + operation;
- }
- String action = accessType;
-
- RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
- rangerRequest.setUser(userName);
- rangerRequest.setUserGroups(userGroups);
- rangerRequest.setClientIPAddress(ip);
- rangerRequest.setAccessTime(eventTime);
-
- RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
- rangerRequest.setResource(rangerResource);
- rangerRequest.setAccessType(accessType);
- rangerRequest.setAction(action);
- rangerRequest.setRequestData(resource.name());
-
- if (resource.resourceType().equals(Topic$.MODULE$)) {
- rangerResource.setValue(KEY_TOPIC, resource.name());
- } else if (resource.resourceType().equals(Cluster$.MODULE$)) {
- rangerResource.setValue(KEY_CLUSTER, resource.name());
- } else if (resource.resourceType().equals(Group$.MODULE$)) {
- rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
- } else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
- rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
- } else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
- rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
- } else {
- logger.error("Unsupported resourceType=" + resource.resourceType());
- validationFailed = true;
- }
-
- boolean returnValue = false;
- if (validationFailed) {
- MiscUtil.logErrorMessageByInterval(logger, validationStr
- + ", request=" + rangerRequest);
- } else {
-
- try {
- RangerAccessResult result = rangerPlugin
- .isAccessAllowed(rangerRequest);
- if (result == null) {
- logger.error("Ranger Plugin returned null. Returning false");
- } else {
- returnValue = result.getIsAllowed();
- }
- } catch (Throwable t) {
- logger.error("Error while calling isAccessAllowed(). request="
- + rangerRequest, t);
- } finally {
- auditHandler.flushAudit();
- }
- }
- RangerPerfTracer.log(perf);
-
- if (logger.isDebugEnabled()) {
- logger.debug("rangerRequest=" + rangerRequest + ", return="
- + returnValue);
- }
- return returnValue;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
- * kafka.security.auth.Resource)
- */
- @Override
- public void addAcls(Set<Acl> acls, Resource resource) {
- logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
- * kafka.security.auth.Resource)
- */
- @Override
- public boolean removeAcls(Set<Acl> acls, Resource resource) {
- logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka");
- return false;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
- */
- @Override
- public boolean removeAcls(Resource resource) {
- logger.error("removeAcls(Resource) is not supported by Ranger for Kafka");
- return false;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
- */
- @Override
- public Set<Acl> getAcls(Resource resource) {
- Set<Acl> aclList = new HashSet<Acl>();
- logger.error("getAcls(Resource) is not supported by Ranger for Kafka");
-
- return aclList;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
- * )
- */
- @Override
- public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
- KafkaPrincipal principal) {
- scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
- logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka");
- return aclList;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see kafka.security.auth.Authorizer#getAcls()
- */
- @Override
- public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
- scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
- logger.error("getAcls() is not supported by Ranger for Kafka");
- return aclList;
- }
-
- /**
- * @param operation
- * @return
- */
- private String mapToRangerAccessType(Operation operation) {
- if (operation.equals(Read$.MODULE$)) {
- return ACCESS_TYPE_READ;
- } else if (operation.equals(Write$.MODULE$)) {
- return ACCESS_TYPE_WRITE;
- } else if (operation.equals(Alter$.MODULE$)) {
- return ACCESS_TYPE_CONFIGURE;
- } else if (operation.equals(Describe$.MODULE$)) {
- return ACCESS_TYPE_DESCRIBE;
- } else if (operation.equals(ClusterAction$.MODULE$)) {
- return ACCESS_TYPE_CLUSTER_ACTION;
- } else if (operation.equals(Create$.MODULE$)) {
- return ACCESS_TYPE_CREATE;
- } else if (operation.equals(Delete$.MODULE$)) {
- return ACCESS_TYPE_DELETE;
- } else if (operation.equals(DescribeConfigs$.MODULE$)) {
- return ACCESS_TYPE_DESCRIBE_CONFIGS;
- } else if (operation.equals(AlterConfigs$.MODULE$)) {
- return ACCESS_TYPE_ALTER_CONFIGS;
- } else if (operation.equals(IdempotentWrite$.MODULE$)) {
- return ACCESS_TYPE_IDEMPOTENT_WRITE;
- }
- return null;
- }
+ private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+ private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
+
+ public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
+ public static final String KEY_TOPIC = "topic";
+ public static final String KEY_CLUSTER = "cluster";
+ public static final String KEY_CONSUMER_GROUP = "consumergroup";
+ public static final String KEY_TRANSACTIONALID = "transactionalid";
+ public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
+ public static final String ACCESS_TYPE_READ = "consume";
+ public static final String ACCESS_TYPE_WRITE = "publish";
+ public static final String ACCESS_TYPE_CREATE = "create";
+ public static final String ACCESS_TYPE_DELETE = "delete";
+ public static final String ACCESS_TYPE_CONFIGURE = "configure";
+ public static final String ACCESS_TYPE_DESCRIBE = "describe";
+ public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
+ public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
+ public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+
+ private static volatile RangerBasePlugin rangerPlugin = null;
+ RangerKafkaAuditHandler auditHandler = null;
+
+ public RangerKafkaAuthorizer() {
+ }
+
+ private static String mapToRangerAccessType(AclOperation operation) {
+ switch (operation) {
+ case READ:
+ return ACCESS_TYPE_READ;
+ case WRITE:
+ return ACCESS_TYPE_WRITE;
+ case ALTER:
+ return ACCESS_TYPE_CONFIGURE;
+ case DESCRIBE:
+ return ACCESS_TYPE_DESCRIBE;
+ case CLUSTER_ACTION:
+ return ACCESS_TYPE_CLUSTER_ACTION;
+ case CREATE:
+ return ACCESS_TYPE_CREATE;
+ case DELETE:
+ return ACCESS_TYPE_DELETE;
+ case DESCRIBE_CONFIGS:
+ return ACCESS_TYPE_DESCRIBE_CONFIGS;
+ case ALTER_CONFIGS:
+ return ACCESS_TYPE_ALTER_CONFIGS;
+ case IDEMPOTENT_WRITE:
+ return ACCESS_TYPE_IDEMPOTENT_WRITE;
+ case UNKNOWN:
+ case ANY:
+ case ALL:
+ default:
+ return null;
+ }
+ }
+
+ private static String mapToResourceType(ResourceType resourceType) {
+ switch (resourceType) {
+ case TOPIC:
+ return KEY_TOPIC;
+ case CLUSTER:
+ return KEY_CLUSTER;
+ case GROUP:
+ return KEY_CONSUMER_GROUP;
+ case TRANSACTIONAL_ID:
+ return KEY_TRANSACTIONALID;
+ case DELEGATION_TOKEN:
+ return KEY_DELEGATIONTOKEN;
+ case ANY:
+ case UNKNOWN:
+ default:
+ return null;
+ }
+ }
+
+ private static RangerAccessResourceImpl createRangerAccessResource(String resourceTypeKey, String resourceName) {
+ RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+ rangerResource.setValue(resourceTypeKey, resourceName);
+ return rangerResource;
+ }
+
+ private static RangerAccessRequestImpl createRangerAccessRequest(String userName,
+ Set<String> userGroups,
+ String ip,
+ Date eventTime,
+ String resourceTypeKey,
+ String resourceName,
+ String accessType) {
+ RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+ rangerRequest.setResource(createRangerAccessResource(resourceTypeKey, resourceName));
+ rangerRequest.setUser(userName);
+ rangerRequest.setUserGroups(userGroups);
+ rangerRequest.setClientIPAddress(ip);
+ rangerRequest.setAccessTime(eventTime);
+ rangerRequest.setAccessType(accessType);
+ rangerRequest.setAction(accessType);
+ rangerRequest.setRequestData(resourceName);
+ return rangerRequest;
+ }
+
+ private static List<AuthorizationResult> denyAll(List<Action> actions) {
+ return actions.stream().map(a -> AuthorizationResult.DENIED).collect(Collectors.toList());
+ }
+
+ private static List<AuthorizationResult> mapResults(List<Action> actions, Collection<RangerAccessResult> results) {
+ if (CollectionUtils.isEmpty(results)) {
+ logger.error("Ranger Plugin returned null or empty. Returning Denied for all");
+ return denyAll(actions);
+ }
+ return results.stream()
+ .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED)
+ .collect(Collectors.toList());
+ }
+
+ private static String toString(AuthorizableRequestContext requestContext) {
+ return requestContext == null ? null :
+ String.format("AuthorizableRequestContext{principal=%s, clientAddress=%s, clientId=%s}",
+ requestContext.principal(), requestContext.clientAddress(), requestContext.clientId());
+ }
+
+ @Override
+ public void close() {
+ logger.info("close() called on authorizer.");
+ try {
+ if (rangerPlugin != null) {
+ rangerPlugin.cleanup();
+ }
+ } catch (Throwable t) {
+ logger.error("Error closing RangerPlugin.", t);
+ }
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ RangerBasePlugin me = rangerPlugin;
+ if (me == null) {
+ synchronized (RangerKafkaAuthorizer.class) {
+ me = rangerPlugin;
+ if (me == null) {
+ try {
+ // Possible to override JAAS configuration which is used by Ranger, otherwise
+ // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
+ // if it's not defined, then it reverts to 'KafkaServer' configuration.
+ final Object jaasContext = configs.get("ranger.jaas.context");
+ final String listenerName = (jaasContext instanceof String
+ && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
+ : SecurityProtocol.SASL_PLAINTEXT.name();
+ final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
+ JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
+ MiscUtil.setUGIFromJAASConfig(context.name());
+ UserGroupInformation loginUser = MiscUtil.getUGILoginUser();
+ logger.info("LoginUser={}", loginUser);
+ } catch (Throwable t) {
+ logger.error("Error getting principal.", t);
+ }
+ rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+ logger.info("Calling plugin.init()");
+ rangerPlugin.init();
+ auditHandler = new RangerKafkaAuditHandler();
+ rangerPlugin.setResultProcessor(auditHandler);
+ }
+ }
+ }
+ }
+
+ @Override
+ public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
+ return serverInfo.endpoints().stream()
+ .collect(Collectors.toMap(endpoint -> endpoint, endpoint -> CompletableFuture.completedFuture(null), (a, b) -> b));
+ }
+
+ @Override
+ public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
+ if (rangerPlugin == null) {
+ MiscUtil.logErrorMessageByInterval(logger, "Authorizer is still not initialized");
+ return denyAll(actions);
+ }
+
+ RangerPerfTracer perf = null;
+ if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
+ perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(actions=" + actions + ")");
+ }
+ try {
+ return wrappedAuthorization(requestContext, actions);
+ } finally {
+ RangerPerfTracer.log(perf);
+ }
+ }
+
+ private List<AuthorizationResult> wrappedAuthorization(AuthorizableRequestContext requestContext, List<Action> actions) {
+ if (CollectionUtils.isEmpty(actions)) {
+ return Collections.emptyList();
+ }
+ String userName = requestContext.principal() == null ? null : requestContext.principal().getName();
+ Set<String> userGroups = MiscUtil.getGroupsForRequestUser(userName);
+ String hostAddress = requestContext.clientAddress() == null ? null : requestContext.clientAddress().getHostAddress();
+ String ip = StringUtils.isNotEmpty(hostAddress) && hostAddress.charAt(0) == '/' ? hostAddress.substring(1) : hostAddress;
+ Date eventTime = new Date();
+
+ List<RangerAccessRequest> rangerRequests = new ArrayList<>();
+ for (Action action : actions) {
+ String accessType = mapToRangerAccessType(action.operation());
+ if (accessType == null) {
+ MiscUtil.logErrorMessageByInterval(logger, "Unsupported access type, requestContext=" + toString(requestContext) +
+ ", actions=" + actions + ", operation=" + action.operation());
+ return denyAll(actions);
+ }
+ String resourceTypeKey = mapToResourceType(action.resourcePattern().resourceType());
+ if (resourceTypeKey == null) {
+ MiscUtil.logErrorMessageByInterval(logger, "Unsupported resource type, requestContext=" + toString(requestContext) +
+ ", actions=" + actions + ", resourceType=" + action.resourcePattern().resourceType());
+ return denyAll(actions);
+ }
+
+ RangerAccessRequestImpl rangerAccessRequest = createRangerAccessRequest(
+ userName,
+ userGroups,
+ ip,
+ eventTime,
+ resourceTypeKey,
+ action.resourcePattern().name(),
+ accessType);
+ rangerRequests.add(rangerAccessRequest);
+ }
+
+ Collection<RangerAccessResult> results = callRangerPlugin(rangerRequests);
+
+ List<AuthorizationResult> authorizationResults = mapResults(actions, results);
+
+ logger.debug("rangerRequests={}, return={}", rangerRequests, authorizationResults);
+ return authorizationResults;
+ }
+
+ private Collection<RangerAccessResult> callRangerPlugin(List<RangerAccessRequest> rangerRequests) {
+ try {
+ return rangerPlugin.isAccessAllowed(rangerRequests);
+ } catch (Throwable t) {
+ logger.error("Error while calling isAccessAllowed(). requests={}", rangerRequests, t);
+ return null;
+ } finally {
+ auditHandler.flushAudit();
+ }
+ }
+
+ @Override
+ public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
+ logger.error("createAcls is not supported by Ranger for Kafka");
+
+ return aclBindings.stream()
+ .map(ab -> {
+ CompletableFuture<AclCreateResult> completableFuture = new CompletableFuture<>();
+ completableFuture.completeExceptionally(new UnsupportedOperationException("createAcls is not supported by Ranger for Kafka"));
+ return completableFuture;
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
+ logger.error("deleteAcls is not supported by Ranger for Kafka");
+ return aclBindingFilters.stream()
+ .map(ab -> {
+ CompletableFuture<AclDeleteResult> completableFuture = new CompletableFuture<>();
+ completableFuture.completeExceptionally(new UnsupportedOperationException("deleteAcls is not supported by Ranger for Kafka"));
+ return completableFuture;
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Iterable<AclBinding> acls(AclBindingFilter filter) {
+ logger.error("(getting) acls is not supported by Ranger for Kafka");
+ throw new UnsupportedOperationException("(getting) acls is not supported by Ranger for Kafka");
+ }
}
diff --git a/ranger-kafka-plugin-shim/pom.xml b/ranger-kafka-plugin-shim/pom.xml
index 3264138a8..e0722c0ae 100644
--- a/ranger-kafka-plugin-shim/pom.xml
+++ b/ranger-kafka-plugin-shim/pom.xml
@@ -54,18 +54,8 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
+ <artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
- <exclusions>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty-handler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty-transport-native-epoll</artifactId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
</project>
diff --git a/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index b84b765c2..0b64f3e1d 100644
--- a/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -19,272 +19,185 @@
package org.apache.ranger.authorization.kafka.authorizer;
+import java.io.IOException;
+import java.util.List;
import java.util.Map;
-
+import java.util.concurrent.CompletionStage;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.collection.immutable.Set;
-import kafka.network.RequestChannel.Session;
-import kafka.security.auth.Acl;
-import kafka.security.auth.Authorizer;
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
+public class RangerKafkaAuthorizer implements Authorizer {
+ private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+ private static final String RANGER_PLUGIN_TYPE = "kafka";
+ private static final String RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
-//public class RangerKafkaAuthorizer extends Authorizer {
-public class RangerKafkaAuthorizer implements Authorizer {
- private static final Logger LOG = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
-
- private static final String RANGER_PLUGIN_TYPE = "kafka";
- private static final String RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
-
- private Authorizer rangerKakfaAuthorizerImpl = null;
- private RangerPluginClassLoader rangerPluginClassLoader = null;
-
- public RangerKafkaAuthorizer() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
- }
-
- this.init();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
- }
- }
-
- private void init(){
- if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.init()");
- }
-
- try {
-
- rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
-
- @SuppressWarnings("unchecked")
- Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
-
- activatePluginClassLoader();
-
- rangerKakfaAuthorizerImpl = cls.newInstance();
- } catch (Exception e) {
- // check what need to be done
- LOG.error("Error Enabling RangerKafkaPlugin", e);
- } finally {
- deactivatePluginClassLoader();
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.init()");
- }
- }
-
- @Override
- public void configure(Map<String, ?> configs) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
- }
-
- try {
- activatePluginClassLoader();
-
- rangerKakfaAuthorizerImpl.configure(configs);
- } finally {
- deactivatePluginClassLoader();
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
- }
- }
-
- @Override
- public void close() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.close()");
- }
-
- try {
- activatePluginClassLoader();
-
- rangerKakfaAuthorizerImpl.close();
- } finally {
- deactivatePluginClassLoader();
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.close()");
- }
-
- }
-
- @Override
- public boolean authorize(Session session, Operation operation,Resource resource) {
- if(LOG.isDebugEnabled()) {
- LOG.debug(String.format("==> RangerKafkaAuthorizer.authorize(Session=%s, Operation=%s, Resource=%s)", session, operation, resource));
- }
-
- boolean ret = false;
-
- try {
- activatePluginClassLoader();
-
- ret = rangerKakfaAuthorizerImpl.authorize(session, operation, resource);
- } finally {
- deactivatePluginClassLoader();
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.authorize: " + ret);
- }
-
- return ret;
- }
-
- @Override
- public void addAcls(Set<Acl> acls, Resource resource) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
- }
-
- try {
- activatePluginClassLoader();
-
- rangerKakfaAuthorizerImpl.addAcls(acls, resource);
- } finally {
- deactivatePluginClassLoader();
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
- }
- }
-
- @Override
- public boolean removeAcls(Set<Acl> acls, Resource resource) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
- }
- boolean ret = false;
- try {
- activatePluginClassLoader();
-
- ret = rangerKakfaAuthorizerImpl.removeAcls(acls, resource);
- } finally {
- deactivatePluginClassLoader();
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
- }
-
- return ret;
- }
-
- @Override
- public boolean removeAcls(Resource resource) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Resource)");
- }
- boolean ret = false;
- try {
- activatePluginClassLoader();
-
- ret = rangerKakfaAuthorizerImpl.removeAcls(resource);
- } finally {
- deactivatePluginClassLoader();
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Resource)");
- }
-
- return ret;
- }
-
- @Override
- public Set<Acl> getAcls(Resource resource) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)");
- }
-
- Set<Acl> ret = null;
-
- try {
- activatePluginClassLoader();
-
- ret = rangerKakfaAuthorizerImpl.getAcls(resource);
- } finally {
- deactivatePluginClassLoader();
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)");
- }
-
- return ret;
- }
-
- @Override
- public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
- }
-
- scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
- try {
- activatePluginClassLoader();
-
- ret = rangerKakfaAuthorizerImpl.getAcls(principal);
- } finally {
- deactivatePluginClassLoader();
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
- }
-
- return ret;
- }
-
- @Override
- public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
- }
-
- scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
- try {
- activatePluginClassLoader();
-
- ret = rangerKakfaAuthorizerImpl.getAcls();
- } finally {
- deactivatePluginClassLoader();
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
- }
-
- return ret;
- }
-
- private void activatePluginClassLoader() {
- if(rangerPluginClassLoader != null) {
- rangerPluginClassLoader.activate();
- }
- }
-
- private void deactivatePluginClassLoader() {
- if(rangerPluginClassLoader != null) {
- rangerPluginClassLoader.deactivate();
- }
- }
-
+ private Authorizer rangerKafkaAuthorizerImpl = null;
+ private RangerPluginClassLoader rangerPluginClassLoader = null;
+
+ public RangerKafkaAuthorizer() {
+ logger.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+
+ this.init();
+
+ logger.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+ }
+
+ private static String toString(AuthorizableRequestContext requestContext) {
+ return requestContext == null ? null :
+ String.format("AuthorizableRequestContext{principal=%s, clientAddress=%s, clientId=%s}",
+ requestContext.principal(), requestContext.clientAddress(), requestContext.clientId());
+ }
+
+ private void init() {
+ logger.debug("==> RangerKafkaAuthorizer.init()");
+
+ try {
+
+ rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
+
+ @SuppressWarnings("unchecked")
+ Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);
+
+ activatePluginClassLoader();
+
+ rangerKafkaAuthorizerImpl = cls.newInstance();
+ } catch (Exception e) {
+ logger.error("Error Enabling RangerKafkaPlugin", e);
+ throw new IllegalStateException("Error Enabling RangerKafkaPlugin", e);
+ } finally {
+ deactivatePluginClassLoader();
+ }
+
+ logger.debug("<== RangerKafkaAuthorizer.init()");
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ logger.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
+
+ try {
+ activatePluginClassLoader();
+
+ rangerKafkaAuthorizerImpl.configure(configs);
+ } finally {
+ deactivatePluginClassLoader();
+ }
+
+ logger.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
+ }
+
+ @Override
+ public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo authorizerServerInfo) {
+ logger.debug("==> RangerKafkaAuthorizer.start(AuthorizerServerInfo)");
+
+ try {
+ activatePluginClassLoader();
+
+ return rangerKafkaAuthorizerImpl.start(authorizerServerInfo);
+ } finally {
+ deactivatePluginClassLoader();
+ logger.debug("<== RangerKafkaAuthorizer.start(AuthorizerServerInfo)");
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.debug("==> RangerKafkaAuthorizer.close()");
+
+ try {
+ activatePluginClassLoader();
+
+ rangerKafkaAuthorizerImpl.close();
+ } finally {
+ deactivatePluginClassLoader();
+ }
+
+ logger.debug("<== RangerKafkaAuthorizer.close()");
+ }
+
+ @Override
+ public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
+ logger.debug("==> RangerKafkaAuthorizer.authorize(AuthorizableRequestContext={}, List<Action>={})", toString(requestContext), actions);
+
+ List<AuthorizationResult> ret;
+
+ try {
+ activatePluginClassLoader();
+
+ ret = rangerKafkaAuthorizerImpl.authorize(requestContext, actions);
+ } finally {
+ deactivatePluginClassLoader();
+ }
+
+ logger.debug("<== RangerKafkaAuthorizer.authorize: {}", ret);
+
+ return ret;
+ }
+
+ @Override
+ public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
+ logger.debug("==> RangerKafkaAuthorizer.createAcls(AuthorizableRequestContext, List<AclBinding>)");
+
+ try {
+ activatePluginClassLoader();
+
+ return rangerKafkaAuthorizerImpl.createAcls(requestContext, aclBindings);
+ } finally {
+ deactivatePluginClassLoader();
+ logger.debug("<== RangerKafkaAuthorizer.createAcls(AuthorizableRequestContext, List<AclBinding>)");
+ }
+ }
+
+ @Override
+ public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
+ logger.debug("==> RangerKafkaAuthorizer.deleteAcls(AuthorizableRequestContext, List<AclBindingFilter>)");
+
+ try {
+ activatePluginClassLoader();
+
+ return rangerKafkaAuthorizerImpl.deleteAcls(requestContext, aclBindingFilters);
+ } finally {
+ deactivatePluginClassLoader();
+ logger.debug("<== RangerKafkaAuthorizer.deleteAcls(AuthorizableRequestContext, List<AclBindingFilter>)");
+ }
+ }
+
+ @Override
+ public Iterable<AclBinding> acls(AclBindingFilter filter) {
+ logger.debug("==> RangerKafkaAuthorizer.acls(AclBindingFilter)");
+
+ try {
+ activatePluginClassLoader();
+
+ return rangerKafkaAuthorizerImpl.acls(filter);
+ } finally {
+ deactivatePluginClassLoader();
+ logger.debug("<== RangerKafkaAuthorizer.acls(AclBindingFilter)");
+ }
+ }
+
+ private void activatePluginClassLoader() {
+ if (rangerPluginClassLoader != null) {
+ rangerPluginClassLoader.activate();
+ }
+ }
+
+ private void deactivatePluginClassLoader() {
+ if (rangerPluginClassLoader != null) {
+ rangerPluginClassLoader.deactivate();
+ }
+ }
}