You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by bo...@apache.org on 2015/05/17 23:46:32 UTC
incubator-ranger git commit: RANGER-246 - Kafka authorization plugin
Repository: incubator-ranger
Updated Branches:
refs/heads/master 5b9851383 -> a5f8531a1
RANGER-246 - Kafka authorization plugin
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/a5f8531a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/a5f8531a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/a5f8531a
Branch: refs/heads/master
Commit: a5f8531a17558cfc75e2ad216816f272705898cf
Parents: 5b98513
Author: Don Bosco Durai <bo...@apache.org>
Authored: Sun May 17 13:59:10 2015 -0700
Committer: Don Bosco Durai <bo...@apache.org>
Committed: Sun May 17 14:44:51 2015 -0700
----------------------------------------------------------------------
.gitignore | 1 +
.../service-defs/ranger-servicedef-kafka.json | 12 +-
plugin-kafka/pom.xml | 103 +++++-----
.../kafka/authorizer/RangerKafkaAuthorizer.java | 201 ++++++++++++++++++-
.../kafka/client/ServiceKafkaClient.java | 2 -
pom.xml | 11 +-
6 files changed, 259 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a5f8531a/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index dd4e2c2..7f41f0c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,3 +7,4 @@
.project
/target/
winpkg/target
+.DS_Store
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a5f8531a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
----------------------------------------------------------------------
diff --git a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
index 9928c5d..d19b10c 100644
--- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
+++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
@@ -59,15 +59,9 @@
},
{
"itemId": 7,
- "name":"replicate",
- "label":"Replicate"
- },
- {
- "itemId": 8,
- "name":"connect",
- "label":"Connect"
+ "name":"kafka_admin",
+ "label":"Kafka Admin"
}
-
],
"configs":[
{
@@ -97,7 +91,7 @@
"name":"commonNameForCertificate",
"type":"string",
"mandatory":false,
- "label":"Common Name for Certificate"
+ "label":"Ranger Plugin SSL CName"
}
],
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a5f8531a/plugin-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/plugin-kafka/pom.xml b/plugin-kafka/pom.xml
index e9ea265..afee47d 100644
--- a/plugin-kafka/pom.xml
+++ b/plugin-kafka/pom.xml
@@ -1,56 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>security_plugins.ranger-kafka-plugin</groupId>
- <artifactId>ranger-kafka-plugin</artifactId>
- <name>KAFKA Security Plugin</name>
- <description>KAFKA Security Plugin</description>
- <packaging>jar</packaging>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
- <parent>
- <groupId>org.apache.ranger</groupId>
- <artifactId>ranger</artifactId>
- <version>0.5.0</version>
- <relativePath>..</relativePath>
- </parent>
- <dependencies>
- <dependency>
- <groupId>security_plugins.ranger-plugins-common</groupId>
- <artifactId>ranger-plugins-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>security_plugins.ranger-plugins-audit</groupId>
- <artifactId>ranger-plugins-audit</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.ranger</groupId>
- <artifactId>credentialbuilder</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>${kafka.version}</version>
- </dependency>
- </dependencies>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>security_plugins.ranger-kafka-plugin</groupId>
+ <artifactId>ranger-kafka-plugin</artifactId>
+ <name>KAFKA Security Plugin</name>
+ <description>KAFKA Security Plugin</description>
+ <packaging>jar</packaging>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <parent>
+ <groupId>org.apache.ranger</groupId>
+ <artifactId>ranger</artifactId>
+ <version>0.5.0</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <dependencies>
+ <dependency>
+ <groupId>security_plugins.ranger-plugins-common</groupId>
+ <artifactId>ranger-plugins-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>security_plugins.ranger-plugins-audit</groupId>
+ <artifactId>ranger-plugins-audit</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ranger</groupId>
+ <artifactId>credentialbuilder</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a5f8531a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index 40c2204..4689957 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,14 +19,208 @@
package org.apache.ranger.authorization.kafka.authorizer;
+import java.util.Date;
+
+import kafka.security.auth.Acl;
+import kafka.security.auth.Authorizer;
+import kafka.security.auth.KafkaPrincipal;
+import kafka.security.auth.Operation;
+import kafka.security.auth.Resource;
+import kafka.security.auth.ResourceType;
+import kafka.server.KafkaConfig;
+import kafka.network.RequestChannel.Session;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.authorization.utils.StringUtil;
+import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+import org.apache.ranger.plugin.service.RangerBasePlugin;
+
+import scala.collection.immutable.HashSet;
+import scala.collection.immutable.Set;
+
+public class RangerKafkaAuthorizer implements Authorizer {
+ private static final Log logger = LogFactory
+ .getLog(RangerKafkaAuthorizer.class);
+
+ public static final String KEY_TOPIC = "topic";
+ public static final String KEY_CLUSTER = "cluster";
+ public static final String KEY_CONSUMER_GROUP = "consumer_group";
+
+ public static final String ACCESS_TYPE_READ = "read";
+ public static final String ACCESS_TYPE_WRITE = "write";
+ public static final String ACCESS_TYPE_CREATE = "create";
+ public static final String ACCESS_TYPE_DELETE = "delete";
+ public static final String ACCESS_TYPE_ALTER = "alter";
+ public static final String ACCESS_TYPE_DESCRIBE = "describe";
+ public static final String ACCESS_TYPE_KAFKA_ADMIN = "kafka_admin";
+
+ private static volatile RangerBasePlugin rangerPlugin = null;
+
+ public RangerKafkaAuthorizer() {
+ if (rangerPlugin == null) {
+ rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see kafka.security.auth.Authorizer#initialize(kafka.server.KafkaConfig)
+ */
+ @Override
+ public void initialize(KafkaConfig kafkaConfig) {
+ rangerPlugin.init();
+ RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler();
+
+ rangerPlugin.setResultProcessor(auditHandler);
+ }
+
+ // TODO: Fix this after Session is fixed
+ // @Override
+ public boolean authorize(Session session, Operation operation,
+ Resource resource) {
+
+ String userName = null;
+ java.util.Set<String> userGroups = getGroupsForUser(userName);
+ String ip = null;
+ Date eventTime = StringUtil.getUTCDate();
+ String accessType = mapToRangerAccessType(operation);
+ if (accessType == null) {
+ logger.fatal("Unsupported access type. session=" + session
+ + ", operation=" + operation + ", resource=" + resource);
+ return false;
+ }
+ String action = accessType;
+
+ RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+ rangerRequest.setUser(userName);
+ rangerRequest.setUserGroups(userGroups);
+ rangerRequest.setClientIPAddress(ip);
+ rangerRequest.setAccessTime(eventTime);
+
+ RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+
+ if (resource.resourceType().equals(ResourceType.TOPIC)) {
+ rangerResource.setValue(KEY_TOPIC, resource.name());
+ } else if (resource.resourceType().equals(ResourceType.CLUSTER)) {
+ rangerResource.setValue(KEY_CLUSTER, resource.name());
+ } else if (resource.resourceType().equals(ResourceType.GROUP)) {
+ rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
+ } else {
+ logger.fatal("Unsupported resourceType=" + resource.resourceType());
+ return false;
+ }
+
+ rangerRequest.setResource(rangerResource);
+ rangerRequest.setAccessType(accessType);
+ rangerRequest.setAction(action);
+ rangerRequest.setRequestData(resource.name());
+
+ RangerAccessResult result = rangerPlugin.isAccessAllowed(rangerRequest);
+ return result.getIsAllowed();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
+ * kafka.security.auth.Resource)
+ */
+ @Override
+ public void addAcls(Set<Acl> acls, Resource resource) {
+ logger.error("addAcls() is not supported by Ranger for Kafka");
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
+ * kafka.security.auth.Resource)
+ */
+ @Override
+ public boolean removeAcls(Set<Acl> acls, Resource resource) {
+ logger.error("removeAcls() is not supported by Ranger for Kafka");
+ return false;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
+ */
+ @Override
+ public boolean removeAcls(Resource resource) {
+ logger.error("removeAcls() is not supported by Ranger for Kafka");
+ return false;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
+ */
+ @Override
+ public Set<Acl> getAcls(Resource resource) {
+ Set<Acl> aclList = new HashSet<Acl>();
+ logger.error("getAcls() is not supported by Ranger for Kafka");
-public class RangerKafkaAuthorizer /*KafkaAuthorizationPlugin*/ {
+ return aclList;
+ }
- private static final Log LOG = LogFactory.getLog(RangerKafkaAuthorizer.class);
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
+ * )
+ */
+ @Override
+ public Set<Acl> getAcls(KafkaPrincipal principal) {
+ Set<Acl> aclList = new HashSet<Acl>();
+ logger.error("getAcls() is not supported by Ranger for Kafka");
+ return aclList;
+ }
- //private static volatile RangerKafkaPlugin kafkaPlugin = null;
+ /**
+ * @param userName
+ * @return
+ */
+ private java.util.Set<String> getGroupsForUser(String userName) {
+ if (userName == null) {
+ return null;
+ }
+ // TODO: Need to implement this method
+ return null;
+ }
+ /**
+ * @param operation
+ * @return
+ */
+ private String mapToRangerAccessType(Operation operation) {
+ if (operation.equals(Operation.READ)) {
+ return ACCESS_TYPE_READ;
+ } else if (operation.equals(Operation.WRITE)) {
+ return ACCESS_TYPE_WRITE;
+ } else if (operation.equals(Operation.CREATE)) {
+ return ACCESS_TYPE_CREATE;
+ } else if (operation.equals(Operation.DELETE)) {
+ return ACCESS_TYPE_DELETE;
+ } else if (operation.equals(Operation.ALTER)) {
+ return ACCESS_TYPE_ALTER;
+ } else if (operation.equals(Operation.DESCRIBE)) {
+ return ACCESS_TYPE_DESCRIBE;
+ } else if (operation.equals(Operation.CLUSTER_ACTION)) {
+ return ACCESS_TYPE_KAFKA_ADMIN;
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a5f8531a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index a62bd95..5cca619 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -22,10 +22,8 @@ package org.apache.ranger.services.kafka.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a5f8531a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0b5608a..d26fe5d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,8 +87,8 @@
<module>hive-agent</module>
<module>knox-agent</module>
<module>storm-agent</module>
- <module>plugin-kafka</module>
<!-- <module>plugin-solr</module> -->
+ <!-- <module>plugin-kafka</module> -->
<module>plugin-yarn</module>
<module>ranger_solrj</module>
<module>security-admin</module>
@@ -148,7 +148,8 @@
<jersey-bundle.version>1.17.1</jersey-bundle.version>
<jersey-client.version>2.6</jersey-client.version>
<junit.version>4.11</junit.version>
- <kafka.version>0.8.2.0</kafka.version>
+ <!-- <kafka.version>0.8.2.0</kafka.version> -->
+ <kafka.version>0.8.2.2.3.0.0-1860</kafka.version>
<mockito.version>1.8.4</mockito.version>
<hamcrest-version>1.3</hamcrest-version>
<knox.gateway.version>0.5.0</knox.gateway.version>
@@ -224,6 +225,12 @@
<module>plugin-solr</module>
</modules>
</profile>
+ <profile>
+ <id>kafka-security</id>
+ <modules>
+ <module>plugin-kafka</module>
+ </modules>
+ </profile>
</profiles>
<distributionManagement>
<repository>