You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by sn...@apache.org on 2015/05/18 01:04:15 UTC

incubator-ranger git commit: RANGER-246 - Kafka authorization plugin (Bosco Durai via Selvamohan Neethiraj)

Repository: incubator-ranger
Updated Branches:
  refs/heads/master 103104129 -> f0c9216da


RANGER-246 - Kafka authorization plugin (Bosco Durai via Selvamohan Neethiraj)


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/f0c9216d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/f0c9216d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/f0c9216d

Branch: refs/heads/master
Commit: f0c9216dafbe81cbbe7ae9696c9f6de456ebfac6
Parents: 1031041
Author: sneethiraj <sn...@apache.org>
Authored: Sun May 17 18:40:00 2015 -0400
Committer: sneethiraj <sn...@apache.org>
Committed: Sun May 17 18:40:00 2015 -0400

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../service-defs/ranger-servicedef-kafka.json   |  12 +-
 plugin-kafka/pom.xml                            | 103 +++++-----
 .../kafka/authorizer/RangerKafkaAuthorizer.java | 201 ++++++++++++++++++-
 .../kafka/client/ServiceKafkaClient.java        |   2 -
 pom.xml                                         |  13 +-
 6 files changed, 258 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index dd4e2c2..7f41f0c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,3 +7,4 @@
 .project
 /target/
 winpkg/target
+.DS_Store

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
----------------------------------------------------------------------
diff --git a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
index 9928c5d..d19b10c 100644
--- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
+++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
@@ -59,15 +59,9 @@
 		},
 		{
 			"itemId": 7,
-			"name":"replicate",
-			"label":"Replicate"
-		},
-		{
-			"itemId": 8,
-			"name":"connect",
-			"label":"Connect"
+			"name":"kafka_admin",
+			"label":"Kafka Admin"
 		}
-		
 	],
 	"configs":[
 		{
@@ -97,7 +91,7 @@
 			"name":"commonNameForCertificate",
 			"type":"string",
 			"mandatory":false,
-			"label":"Common Name for Certificate"
+			"label":"Ranger Plugin SSL CName"
 		}
 		
 	],

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/plugin-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/plugin-kafka/pom.xml b/plugin-kafka/pom.xml
index e9ea265..afee47d 100644
--- a/plugin-kafka/pom.xml
+++ b/plugin-kafka/pom.xml
@@ -1,56 +1,51 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <groupId>security_plugins.ranger-kafka-plugin</groupId>
-  <artifactId>ranger-kafka-plugin</artifactId>
-  <name>KAFKA Security Plugin</name>
-  <description>KAFKA Security Plugin</description>
-  <packaging>jar</packaging>
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-  </properties>
-  <parent>
-     <groupId>org.apache.ranger</groupId>
-     <artifactId>ranger</artifactId>
-     <version>0.5.0</version>
-     <relativePath>..</relativePath>
-  </parent>
-  <dependencies>
-    <dependency>
-      <groupId>security_plugins.ranger-plugins-common</groupId>
-      <artifactId>ranger-plugins-common</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>security_plugins.ranger-plugins-audit</groupId>
-      <artifactId>ranger-plugins-audit</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.ranger</groupId>
-      <artifactId>credentialbuilder</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.10</artifactId>
-      <version>${kafka.version}</version>
-    </dependency>
-  </dependencies>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	You under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<groupId>security_plugins.ranger-kafka-plugin</groupId>
+	<artifactId>ranger-kafka-plugin</artifactId>
+	<name>KAFKA Security Plugin</name>
+	<description>KAFKA Security Plugin</description>
+	<packaging>jar</packaging>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+	<parent>
+		<groupId>org.apache.ranger</groupId>
+		<artifactId>ranger</artifactId>
+		<version>0.5.0</version>
+		<relativePath>..</relativePath>
+	</parent>
+	<dependencies>
+		<dependency>
+			<groupId>security_plugins.ranger-plugins-common</groupId>
+			<artifactId>ranger-plugins-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>security_plugins.ranger-plugins-audit</groupId>
+			<artifactId>ranger-plugins-audit</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.ranger</groupId>
+			<artifactId>credentialbuilder</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.10</artifactId>
+			<version>${kafka.version}</version>
+		</dependency>
+	</dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index 40c2204..4689957 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -1,4 +1,3 @@
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,14 +19,208 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.util.Date;
+
+import kafka.security.auth.Acl;
+import kafka.security.auth.Authorizer;
+import kafka.security.auth.KafkaPrincipal;
+import kafka.security.auth.Operation;
+import kafka.security.auth.Resource;
+import kafka.security.auth.ResourceType;
+import kafka.server.KafkaConfig;
+import kafka.network.RequestChannel.Session;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.authorization.utils.StringUtil;
+import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+import org.apache.ranger.plugin.service.RangerBasePlugin;
+
+import scala.collection.immutable.HashSet;
+import scala.collection.immutable.Set;
+
+public class RangerKafkaAuthorizer implements Authorizer {
+	private static final Log logger = LogFactory
+			.getLog(RangerKafkaAuthorizer.class);
+
+	public static final String KEY_TOPIC = "topic";
+	public static final String KEY_CLUSTER = "cluster";
+	public static final String KEY_CONSUMER_GROUP = "consumer_group";
+
+	public static final String ACCESS_TYPE_READ = "read";
+	public static final String ACCESS_TYPE_WRITE = "write";
+	public static final String ACCESS_TYPE_CREATE = "create";
+	public static final String ACCESS_TYPE_DELETE = "delete";
+	public static final String ACCESS_TYPE_ALTER = "alter";
+	public static final String ACCESS_TYPE_DESCRIBE = "describe";
+	public static final String ACCESS_TYPE_KAFKA_ADMIN = "kafka_admin";
+
+	private static volatile RangerBasePlugin rangerPlugin = null;
+
+	public RangerKafkaAuthorizer() {
+		if (rangerPlugin == null) {
+			rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+		}
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see kafka.security.auth.Authorizer#initialize(kafka.server.KafkaConfig)
+	 */
+	@Override
+	public void initialize(KafkaConfig kafkaConfig) {
+		rangerPlugin.init();
+		RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler();
+
+		rangerPlugin.setResultProcessor(auditHandler);
+	}
+
+	// TODO: Fix this after Session is fixed
+	// @Override
+	public boolean authorize(Session session, Operation operation,
+			Resource resource) {
+
+		String userName = null;
+		java.util.Set<String> userGroups = getGroupsForUser(userName);
+		String ip = null;
+		Date eventTime = StringUtil.getUTCDate();
+		String accessType = mapToRangerAccessType(operation);
+		if (accessType == null) {
+			logger.fatal("Unsupported access type. session=" + session
+					+ ", operation=" + operation + ", resource=" + resource);
+			return false;
+		}
+		String action = accessType;
+
+		RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+		rangerRequest.setUser(userName);
+		rangerRequest.setUserGroups(userGroups);
+		rangerRequest.setClientIPAddress(ip);
+		rangerRequest.setAccessTime(eventTime);
+
+		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+
+		if (resource.resourceType().equals(ResourceType.TOPIC)) {
+			rangerResource.setValue(KEY_TOPIC, resource.name());
+		} else if (resource.resourceType().equals(ResourceType.CLUSTER)) {
+			rangerResource.setValue(KEY_CLUSTER, resource.name());
+		} else if (resource.resourceType().equals(ResourceType.GROUP)) {
+			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
+		} else {
+			logger.fatal("Unsupported resourceType=" + resource.resourceType());
+			return false;
+		}
+
+		rangerRequest.setResource(rangerResource);
+		rangerRequest.setAccessType(accessType);
+		rangerRequest.setAction(action);
+		rangerRequest.setRequestData(resource.name());
+
+		RangerAccessResult result = rangerPlugin.isAccessAllowed(rangerRequest);
+		return result.getIsAllowed();
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
+	 * kafka.security.auth.Resource)
+	 */
+	@Override
+	public void addAcls(Set<Acl> acls, Resource resource) {
+		logger.error("addAcls() is not supported by Ranger for Kafka");
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
+	 * kafka.security.auth.Resource)
+	 */
+	@Override
+	public boolean removeAcls(Set<Acl> acls, Resource resource) {
+		logger.error("removeAcls() is not supported by Ranger for Kafka");
+		return false;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
+	 */
+	@Override
+	public boolean removeAcls(Resource resource) {
+		logger.error("removeAcls() is not supported by Ranger for Kafka");
+		return false;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
+	 */
+	@Override
+	public Set<Acl> getAcls(Resource resource) {
+		Set<Acl> aclList = new HashSet<Acl>();
+		logger.error("getAcls() is not supported by Ranger for Kafka");
 
-public class RangerKafkaAuthorizer /*KafkaAuthorizationPlugin*/ {
+		return aclList;
+	}
 
-	private static final Log LOG = LogFactory.getLog(RangerKafkaAuthorizer.class);
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
+	 * )
+	 */
+	@Override
+	public Set<Acl> getAcls(KafkaPrincipal principal) {
+		Set<Acl> aclList = new HashSet<Acl>();
+		logger.error("getAcls() is not supported by Ranger for Kafka");
+		return aclList;
+	}
 
-    //private static volatile RangerKafkaPlugin kafkaPlugin = null;
+	/**
+	 * @param userName
+	 * @return
+	 */
+	private java.util.Set<String> getGroupsForUser(String userName) {
+		if (userName == null) {
+			return null;
+		}
 
+		// TODO: Need to implement this method
+		return null;
+	}
 
+	/**
+	 * @param operation
+	 * @return
+	 */
+	private String mapToRangerAccessType(Operation operation) {
+		if (operation.equals(Operation.READ)) {
+			return ACCESS_TYPE_READ;
+		} else if (operation.equals(Operation.WRITE)) {
+			return ACCESS_TYPE_WRITE;
+		} else if (operation.equals(Operation.CREATE)) {
+			return ACCESS_TYPE_CREATE;
+		} else if (operation.equals(Operation.DELETE)) {
+			return ACCESS_TYPE_DELETE;
+		} else if (operation.equals(Operation.ALTER)) {
+			return ACCESS_TYPE_ALTER;
+		} else if (operation.equals(Operation.DESCRIBE)) {
+			return ACCESS_TYPE_DESCRIBE;
+		} else if (operation.equals(Operation.CLUSTER_ACTION)) {
+			return ACCESS_TYPE_KAFKA_ADMIN;
+		}
+		return null;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index a62bd95..5cca619 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -22,10 +22,8 @@ package org.apache.ranger.services.kafka.client;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0b5608a..cbb3a08 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,8 +87,6 @@
   <module>hive-agent</module>
   <module>knox-agent</module>
   <module>storm-agent</module>
-  <module>plugin-kafka</module>
-  <!-- <module>plugin-solr</module> -->
   <module>plugin-yarn</module>
   <module>ranger_solrj</module>
   <module>security-admin</module>
@@ -224,6 +222,12 @@
         	 <module>plugin-solr</module>         
          </modules>
       </profile>
+      <profile>
+          <id>kafka-security</id>
+         <modules>
+        	 <module>plugin-kafka</module>         
+         </modules>
+      </profile>
   </profiles>
   <distributionManagement>
         <repository>
@@ -375,15 +379,14 @@
              <descriptor>src/main/assembly/hbase-agent.xml</descriptor>
              <descriptor>src/main/assembly/knox-agent.xml</descriptor>
              <descriptor>src/main/assembly/storm-agent.xml</descriptor>
-	     <descriptor>src/main/assembly/plugin-kafka.xml</descriptor>
+	         <descriptor>src/main/assembly/plugin-kafka.xml</descriptor>
              <descriptor>src/main/assembly/plugin-yarn.xml</descriptor>
-	     <descriptor>src/main/assembly/plugin-solr.xml</descriptor>
+	         <descriptor>src/main/assembly/plugin-solr.xml</descriptor>
              <descriptor>src/main/assembly/admin-web.xml</descriptor>
              <descriptor>src/main/assembly/usersync.xml</descriptor>
              <descriptor>src/main/assembly/migration-util.xml</descriptor>
              <descriptor>src/main/assembly/kms.xml</descriptor>
              <descriptor>src/main/assembly/ranger-src.xml</descriptor>
-	     <!--<descriptor>src/main/assembly/plugin-kms.xml</descriptor>-->
            </descriptors>
          </configuration>
       </plugin>