You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by bo...@apache.org on 2015/05/24 08:02:09 UTC

incubator-ranger git commit: RANGER-246 - Updates based on changes on Kafka side

Repository: incubator-ranger
Updated Branches:
  refs/heads/master f31274b86 -> 115577a6f


RANGER-246 - Updates based on changes on Kafka side

Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/115577a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/115577a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/115577a6

Branch: refs/heads/master
Commit: 115577a6f1afdc0cee2b029117c9796a98574755
Parents: f31274b
Author: Don Bosco Durai <bo...@apache.org>
Authored: Sat May 23 21:17:12 2015 -0700
Committer: Don Bosco Durai <bo...@apache.org>
Committed: Sat May 23 21:17:12 2015 -0700

----------------------------------------------------------------------
 agents-common/scripts/enable-agent.sh           |   4 +
 .../ranger/plugin/service/RangerBasePlugin.java | 200 +++++++----
 .../service-defs/ranger-servicedef-kafka.json   |  47 ++-
 plugin-kafka/scripts/install.properties         |   3 +
 .../scripts/kafka-plugin-install.properties     |   2 +-
 .../kafka/authorizer/RangerKafkaAuthorizer.java |  80 +++--
 .../solr/authorizer/RangerSolrAuthorizer.java   |   1 +
 src/main/assembly/plugin-kafka.xml              | 349 ++++++++++---------
 8 files changed, 404 insertions(+), 282 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/115577a6/agents-common/scripts/enable-agent.sh
----------------------------------------------------------------------
diff --git a/agents-common/scripts/enable-agent.sh b/agents-common/scripts/enable-agent.sh
index 3550e16..16efe74 100755
--- a/agents-common/scripts/enable-agent.sh
+++ b/agents-common/scripts/enable-agent.sh
@@ -151,6 +151,8 @@ if [ "${HCOMPONENT_NAME}" = "knox" ]; then
 	HCOMPONENT_LIB_DIR=${HCOMPONENT_INSTALL_DIR}/ext
 elif [ "${HCOMPONENT_NAME}" = "solr" ]; then
     HCOMPONENT_LIB_DIR=${HCOMPONENT_INSTALL_DIR}/solr-webapp/webapp/WEB-INF/lib
+elif [ "${HCOMPONENT_NAME}" = "kafka" ]; then
+    HCOMPONENT_LIB_DIR=${HCOMPONENT_INSTALL_DIR}/libs
 fi
 
 HCOMPONENT_CONF_DIR=${HCOMPONENT_INSTALL_DIR}/conf
@@ -163,6 +165,8 @@ if [ "${HCOMPONENT_NAME}" = "solr" ]; then
 	echo "INFO: Changing ownership of  $HCOMPONENT_CONF_DIR to $install_owner" 
 	chown $install_owner:$install_owner $HCOMPONENT_CONF_DIR
     fi    
+elif [ "${HCOMPONENT_NAME}" = "kafka" ]; then
+    HCOMPONENT_CONF_DIR=${HCOMPONENT_INSTALL_DIR}/config
 fi
 
 HCOMPONENT_ARCHIVE_CONF_DIR=${HCOMPONENT_CONF_DIR}/.archive

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/115577a6/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
index a347f75..75ba6b9 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
@@ -20,16 +20,15 @@
 package org.apache.ranger.plugin.service;
 
 import java.util.Collection;
-import java.util.List;
+import java.util.Hashtable;
+import java.util.Map;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.admin.client.RangerAdminClient;
 import org.apache.ranger.admin.client.RangerAdminRESTClient;
 import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
-import org.apache.ranger.plugin.contextenricher.RangerContextEnricher;
 import org.apache.ranger.plugin.model.RangerServiceDef;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
@@ -44,22 +43,23 @@ import org.apache.ranger.plugin.util.GrantRevokeRequest;
 import org.apache.ranger.plugin.util.PolicyRefresher;
 import org.apache.ranger.plugin.util.ServicePolicies;
 
-
 public class RangerBasePlugin {
 	private static final Log LOG = LogFactory.getLog(RangerBasePlugin.class);
 
-	private String                    serviceType  = null;
-	private String                    appId        = null;
-	private String                    serviceName  = null;
-	private PolicyRefresher           refresher    = null;
-	private RangerPolicyEngine        policyEngine = null;
+	private String serviceType = null;
+	private String appId = null;
+	private String serviceName = null;
+	private PolicyRefresher refresher = null;
+	private RangerPolicyEngine policyEngine = null;
 	private RangerPolicyEngineOptions policyEngineOptions = new RangerPolicyEngineOptions();
 	private RangerAccessResultProcessor resultProcessor = null;
 
+	Map<String, LogHistory> logHistoryList = new Hashtable<String, RangerBasePlugin.LogHistory>();
+	int logInterval = 30000; // 30 seconds
 
 	public RangerBasePlugin(String serviceType, String appId) {
 		this.serviceType = serviceType;
-		this.appId       = appId;
+		this.appId = appId;
 	}
 
 	public String getServiceType() {
@@ -75,7 +75,8 @@ public class RangerBasePlugin {
 	public int getServiceDefId() {
 		RangerServiceDef serviceDef = getServiceDef();
 
-		return serviceDef != null && serviceDef.getId() != null ? serviceDef.getId().intValue() : -1;
+		return serviceDef != null && serviceDef.getId() != null ? serviceDef
+				.getId().intValue() : -1;
 	}
 
 	public String getAppId() {
@@ -89,29 +90,50 @@ public class RangerBasePlugin {
 	public void init() {
 		cleanup();
 
-		RangerConfiguration.getInstance().addResourcesForServiceType(serviceType);
+		RangerConfiguration.getInstance().addResourcesForServiceType(
+				serviceType);
 		RangerConfiguration.getInstance().initAudit(appId);
 
-		String propertyPrefix    = "ranger.plugin." + serviceType;
-		long   pollingIntervalMs = RangerConfiguration.getInstance().getLong(propertyPrefix + ".policy.pollIntervalMs", 30 * 1000);
-		String cacheDir          = RangerConfiguration.getInstance().get(propertyPrefix + ".policy.cache.dir");
-
-		serviceName = RangerConfiguration.getInstance().get(propertyPrefix + ".service.name");
-
-		policyEngineOptions.evaluatorType           = RangerConfiguration.getInstance().get(propertyPrefix + ".policyengine.option.evaluator.type", RangerPolicyEvaluator.EVALUATOR_TYPE_CACHED);
-		policyEngineOptions.cacheAuditResults       = RangerConfiguration.getInstance().getBoolean(propertyPrefix + ".policyengine.option.cache.audit.results", true);
-		policyEngineOptions.disableContextEnrichers = RangerConfiguration.getInstance().getBoolean(propertyPrefix + ".policyengine.option.disable.context.enrichers", false);
-		policyEngineOptions.disableCustomConditions = RangerConfiguration.getInstance().getBoolean(propertyPrefix + ".policyengine.option.disable.custom.conditions", false);
-
+		String propertyPrefix = "ranger.plugin." + serviceType;
+		long pollingIntervalMs = RangerConfiguration.getInstance().getLong(
+				propertyPrefix + ".policy.pollIntervalMs", 30 * 1000);
+		String cacheDir = RangerConfiguration.getInstance().get(
+				propertyPrefix + ".policy.cache.dir");
+
+		serviceName = RangerConfiguration.getInstance().get(
+				propertyPrefix + ".service.name");
+
+		policyEngineOptions.evaluatorType = RangerConfiguration.getInstance()
+				.get(propertyPrefix + ".policyengine.option.evaluator.type",
+						RangerPolicyEvaluator.EVALUATOR_TYPE_CACHED);
+		policyEngineOptions.cacheAuditResults = RangerConfiguration
+				.getInstance().getBoolean(
+						propertyPrefix
+								+ ".policyengine.option.cache.audit.results",
+						true);
+		policyEngineOptions.disableContextEnrichers = RangerConfiguration
+				.getInstance()
+				.getBoolean(
+						propertyPrefix
+								+ ".policyengine.option.disable.context.enrichers",
+						false);
+		policyEngineOptions.disableCustomConditions = RangerConfiguration
+				.getInstance()
+				.getBoolean(
+						propertyPrefix
+								+ ".policyengine.option.disable.custom.conditions",
+						false);
 
 		RangerAdminClient admin = createAdminClient(propertyPrefix);
 
-		refresher = new PolicyRefresher(this, serviceType, appId, serviceName, admin, pollingIntervalMs, cacheDir);
+		refresher = new PolicyRefresher(this, serviceType, appId, serviceName,
+				admin, pollingIntervalMs, cacheDir);
 		refresher.startRefresher();
 	}
 
 	public void setPolicies(ServicePolicies policies) {
-		RangerPolicyEngine policyEngine = new RangerPolicyEngineImpl(policies, policyEngineOptions);
+		RangerPolicyEngine policyEngine = new RangerPolicyEngineImpl(policies,
+				policyEngineOptions);
 
 		this.policyEngine = policyEngine;
 	}
@@ -119,11 +141,11 @@ public class RangerBasePlugin {
 	public void cleanup() {
 		PolicyRefresher refresher = this.refresher;
 
-		this.serviceName  = null;
+		this.serviceName = null;
 		this.policyEngine = null;
-		this.refresher    = null;
+		this.refresher = null;
 
-		if(refresher != null) {
+		if (refresher != null) {
 			refresher.stopRefresher();
 		}
 	}
@@ -140,14 +162,16 @@ public class RangerBasePlugin {
 		return isAccessAllowed(request, resultProcessor);
 	}
 
-	public Collection<RangerAccessResult> isAccessAllowed(Collection<RangerAccessRequest> requests) {
+	public Collection<RangerAccessResult> isAccessAllowed(
+			Collection<RangerAccessRequest> requests) {
 		return isAccessAllowed(requests, resultProcessor);
 	}
 
-	public RangerAccessResult isAccessAllowed(RangerAccessRequest request, RangerAccessResultProcessor resultProcessor) {
+	public RangerAccessResult isAccessAllowed(RangerAccessRequest request,
+			RangerAccessResultProcessor resultProcessor) {
 		RangerPolicyEngine policyEngine = this.policyEngine;
 
-		if(policyEngine != null) {
+		if (policyEngine != null) {
 			policyEngine.enrichContext(request);
 
 			return policyEngine.isAccessAllowed(request, resultProcessor);
@@ -156,10 +180,12 @@ public class RangerBasePlugin {
 		return null;
 	}
 
-	public Collection<RangerAccessResult> isAccessAllowed(Collection<RangerAccessRequest> requests, RangerAccessResultProcessor resultProcessor) {
+	public Collection<RangerAccessResult> isAccessAllowed(
+			Collection<RangerAccessRequest> requests,
+			RangerAccessResultProcessor resultProcessor) {
 		RangerPolicyEngine policyEngine = this.policyEngine;
 
-		if(policyEngine != null) {
+		if (policyEngine != null) {
 			policyEngine.enrichContext(requests);
 
 			return policyEngine.isAccessAllowed(requests, resultProcessor);
@@ -171,24 +197,26 @@ public class RangerBasePlugin {
 	public RangerAccessResult createAccessResult(RangerAccessRequest request) {
 		RangerPolicyEngine policyEngine = this.policyEngine;
 
-		if(policyEngine != null) {
+		if (policyEngine != null) {
 			return policyEngine.createAccessResult(request);
 		}
 
 		return null;
 	}
 
-	public void grantAccess(GrantRevokeRequest request, RangerAccessResultProcessor resultProcessor) throws Exception {
-		if(LOG.isDebugEnabled()) {
+	public void grantAccess(GrantRevokeRequest request,
+			RangerAccessResultProcessor resultProcessor) throws Exception {
+		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> RangerAdminRESTClient.grantAccess(" + request + ")");
 		}
 
-		PolicyRefresher   refresher = this.refresher;
-		RangerAdminClient admin     = refresher == null ? null : refresher.getRangerAdminClient();
-		boolean           isSuccess = false;
+		PolicyRefresher refresher = this.refresher;
+		RangerAdminClient admin = refresher == null ? null : refresher
+				.getRangerAdminClient();
+		boolean isSuccess = false;
 
 		try {
-			if(admin == null) {
+			if (admin == null) {
 				throw new Exception("ranger-admin client is null");
 			}
 
@@ -199,22 +227,24 @@ public class RangerBasePlugin {
 			auditGrantRevoke(request, "grant", isSuccess, resultProcessor);
 		}
 
-		if(LOG.isDebugEnabled()) {
+		if (LOG.isDebugEnabled()) {
 			LOG.debug("<== RangerAdminRESTClient.grantAccess(" + request + ")");
 		}
 	}
 
-	public void revokeAccess(GrantRevokeRequest request, RangerAccessResultProcessor resultProcessor) throws Exception {
-		if(LOG.isDebugEnabled()) {
+	public void revokeAccess(GrantRevokeRequest request,
+			RangerAccessResultProcessor resultProcessor) throws Exception {
+		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> RangerAdminRESTClient.revokeAccess(" + request + ")");
 		}
 
-		PolicyRefresher   refresher = this.refresher;
-		RangerAdminClient admin     = refresher == null ? null : refresher.getRangerAdminClient();
-		boolean           isSuccess = false;
+		PolicyRefresher refresher = this.refresher;
+		RangerAdminClient admin = refresher == null ? null : refresher
+				.getRangerAdminClient();
+		boolean isSuccess = false;
 
 		try {
-			if(admin == null) {
+			if (admin == null) {
 				throw new Exception("ranger-admin client is null");
 			}
 
@@ -225,59 +255,72 @@ public class RangerBasePlugin {
 			auditGrantRevoke(request, "revoke", isSuccess, resultProcessor);
 		}
 
-		if(LOG.isDebugEnabled()) {
+		if (LOG.isDebugEnabled()) {
 			LOG.debug("<== RangerAdminRESTClient.revokeAccess(" + request + ")");
 		}
 	}
 
-
 	private RangerAdminClient createAdminClient(String propertyPrefix) {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerAdminRESTClient.createAdminClient(" + propertyPrefix + ")");
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerAdminRESTClient.createAdminClient("
+					+ propertyPrefix + ")");
 		}
 
 		RangerAdminClient ret = null;
 
 		String propertyName = propertyPrefix + ".policy.source.impl";
-		String policySourceImpl = RangerConfiguration.getInstance().get(propertyName);
+		String policySourceImpl = RangerConfiguration.getInstance().get(
+				propertyName);
 
-		if(StringUtils.isEmpty(policySourceImpl)) {
+		if (StringUtils.isEmpty(policySourceImpl)) {
 			if (LOG.isDebugEnabled()) {
-				LOG.debug(String.format("Value for property[%s] was null or empty. Unxpected! Will use policy source of type[%s]", propertyName, RangerAdminRESTClient.class.getName()));
+				LOG.debug(String
+						.format("Value for property[%s] was null or empty. Unxpected! Will use policy source of type[%s]",
+								propertyName,
+								RangerAdminRESTClient.class.getName()));
 			}
 		} else {
 			if (LOG.isDebugEnabled()) {
-				LOG.debug(String.format("Value for property[%s] was [%s].", propertyName, policySourceImpl));
+				LOG.debug(String.format("Value for property[%s] was [%s].",
+						propertyName, policySourceImpl));
 			}
 			try {
 				@SuppressWarnings("unchecked")
-				Class<RangerAdminClient> adminClass = (Class<RangerAdminClient>)Class.forName(policySourceImpl);
-				
+				Class<RangerAdminClient> adminClass = (Class<RangerAdminClient>) Class
+						.forName(policySourceImpl);
+
 				ret = adminClass.newInstance();
 			} catch (Exception excp) {
-				LOG.error("failed to instantiate policy source of type '" + policySourceImpl + "'. Will use policy source of type '" + RangerAdminRESTClient.class.getName() + "'", excp);
+				LOG.error("failed to instantiate policy source of type '"
+						+ policySourceImpl
+						+ "'. Will use policy source of type '"
+						+ RangerAdminRESTClient.class.getName() + "'", excp);
 			}
 		}
 
-		if(ret == null) {
+		if (ret == null) {
 			ret = new RangerAdminRESTClient();
 		}
 
 		ret.init(serviceName, appId, propertyPrefix);
 
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerAdminRESTClient.createAdminClient(" + propertyPrefix + "): policySourceImpl=" + policySourceImpl + ", client=" + ret);
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerAdminRESTClient.createAdminClient("
+					+ propertyPrefix + "): policySourceImpl="
+					+ policySourceImpl + ", client=" + ret);
 		}
 		return ret;
 	}
 
-	private void auditGrantRevoke(GrantRevokeRequest request, String action, boolean isSuccess, RangerAccessResultProcessor resultProcessor) {
+	private void auditGrantRevoke(GrantRevokeRequest request, String action,
+			boolean isSuccess, RangerAccessResultProcessor resultProcessor) {
 		RangerPolicyEngine policyEngine = this.policyEngine;
 
-		if(request != null && resultProcessor != null && policyEngine != null) {
+		if (request != null && resultProcessor != null && policyEngine != null) {
 			RangerAccessRequestImpl accessRequest = new RangerAccessRequestImpl();
-	
-			accessRequest.setResource(new RangerAccessResourceImpl(request.getResource()));
+
+			accessRequest.setResource(new RangerAccessResourceImpl(request
+					.getResource()));
 			accessRequest.setUser(request.getGrantor());
 			accessRequest.setAccessType(RangerPolicyEngine.ADMIN_ACCESS);
 			accessRequest.setAction(action);
@@ -287,13 +330,14 @@ public class RangerBasePlugin {
 			accessRequest.setSessionId(request.getSessionId());
 
 			// call isAccessAllowed() to determine if audit is enabled or not
-			RangerAccessResult accessResult = policyEngine.isAccessAllowed(accessRequest, null);
+			RangerAccessResult accessResult = policyEngine.isAccessAllowed(
+					accessRequest, null);
 
-			if(accessResult != null && accessResult.getIsAudited()) {
+			if (accessResult != null && accessResult.getIsAudited()) {
 				accessRequest.setAccessType(action);
 				accessResult.setIsAllowed(isSuccess);
 
-				if(! isSuccess) {
+				if (!isSuccess) {
 					accessResult.setPolicyId(-1);
 				}
 
@@ -301,4 +345,24 @@ public class RangerBasePlugin {
 			}
 		}
 	}
+
+	public boolean logErrorMessage(String message) {
+		LogHistory log = logHistoryList.get(message);
+		if (log == null) {
+			log = new LogHistory();
+			log.message = message;
+			log.lastLogTime = 0;
+		}
+		if ((System.currentTimeMillis() - log.lastLogTime) > logInterval) {
+			log.lastLogTime = System.currentTimeMillis();
+			LOG.error(message);
+			return true;
+		}
+		return false;
+	}
+
+	class LogHistory {
+		long lastLogTime;
+		String message;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/115577a6/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
----------------------------------------------------------------------
diff --git a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
index d19b10c..2d4142d 100644
--- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
+++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
@@ -28,58 +28,49 @@
 	],
 	"accessTypes":[
 		{
-			"itemId": 1,
+			"itemId":1,
 			"name":"publish",
 			"label":"Publish"
 		},
 		{
-			"itemId": 2,
+			"itemId":2,
 			"name":"consume",
 			"label":"Consume"
 		},
 		{
-			"itemId": 3,
-			"name":"create",
-			"label":"Create"
-		},
-		{
-			"itemId": 4,
-			"name":"delete",
-			"label":"Delete"
-		},
-		{
-			"itemId": 5,
+			"itemId":5,
 			"name":"configure",
 			"label":"Configure"
 		},
 		{
-			"itemId": 6,
+			"itemId":6,
 			"name":"describe",
 			"label":"Describe"
 		},
 		{
-			"itemId": 7,
+			"itemId":7,
 			"name":"kafka_admin",
 			"label":"Kafka Admin"
 		}
+		
 	],
 	"configs":[
 		{
-			"itemId": 1,
+			"itemId":1,
 			"name":"username",
 			"type":"string",
 			"mandatory":true,
 			"label":"Username"
 		},
 		{
-			"itemId": 2,
+			"itemId":2,
 			"name":"password",
 			"type":"password",
 			"mandatory":true,
 			"label":"Password"
 		},
 		{
-			"itemId": 3,
+			"itemId":3,
 			"name":"zookeeper.connect",
 			"type":"string",
 			"mandatory":true,
@@ -87,7 +78,7 @@
 			"label":"Zookeeper Connect String"
 		},
 		{
-			"itemId": 4,
+			"itemId":4,
 			"name":"commonNameForCertificate",
 			"type":"string",
 			"mandatory":false,
@@ -103,16 +94,18 @@
 	],
 	"policyConditions":[
 		{
-			"itemId": 1,
-			"name": "ip-range",
-			"evaluator": "org.apache.ranger.plugin.conditionevaluator.RangerIpMatcher",
-			"evaluatorOptions": { },
+			"itemId":1,
+			"name":"ip-range",
+			"evaluator":"org.apache.ranger.plugin.conditionevaluator.RangerIpMatcher",
+			"evaluatorOptions":{
+				
+			},
 			"validationRegEx":"",
-			"validationMessage": "",
+			"validationMessage":"",
 			"uiHint":"",
-			"label": "IP Address Range",
-			"description": "IP Address Range"
+			"label":"IP Address Range",
+			"description":"IP Address Range"
 		}
 		
 	]
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/115577a6/plugin-kafka/scripts/install.properties
----------------------------------------------------------------------
diff --git a/plugin-kafka/scripts/install.properties b/plugin-kafka/scripts/install.properties
index 4e8cbf9..1e2854e 100644
--- a/plugin-kafka/scripts/install.properties
+++ b/plugin-kafka/scripts/install.properties
@@ -13,6 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+# Location of component folder
+COMPONENT_INSTALL_DIR_NAME=../kafka
+
 #
 # Location of Policy Manager URL  
 #

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/115577a6/plugin-kafka/scripts/kafka-plugin-install.properties
----------------------------------------------------------------------
diff --git a/plugin-kafka/scripts/kafka-plugin-install.properties b/plugin-kafka/scripts/kafka-plugin-install.properties
index f4df857..4f64c62 100644
--- a/plugin-kafka/scripts/kafka-plugin-install.properties
+++ b/plugin-kafka/scripts/kafka-plugin-install.properties
@@ -20,4 +20,4 @@
 # This location should be relative to the parent of the directory containing
 # the plugin installation files.
 # 
-COMPONENT_INSTALL_DIR_NAME=kafka
+#COMPONENT_INSTALL_DIR_NAME=kafka

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/115577a6/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index 655f1b9..0d0cffc 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -30,6 +30,7 @@ import kafka.security.auth.ResourceType;
 import kafka.server.KafkaConfig;
 import kafka.network.RequestChannel.Session;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.authorization.utils.StringUtil;
@@ -50,15 +51,17 @@ public class RangerKafkaAuthorizer implements Authorizer {
 	public static final String KEY_CLUSTER = "cluster";
 	public static final String KEY_CONSUMER_GROUP = "consumer_group";
 
-	public static final String ACCESS_TYPE_READ = "read";
-	public static final String ACCESS_TYPE_WRITE = "write";
+	public static final String ACCESS_TYPE_READ = "consume";
+	public static final String ACCESS_TYPE_WRITE = "publish";
 	public static final String ACCESS_TYPE_CREATE = "create";
 	public static final String ACCESS_TYPE_DELETE = "delete";
-	public static final String ACCESS_TYPE_ALTER = "alter";
+	public static final String ACCESS_TYPE_CONFIGURE = "configure";
 	public static final String ACCESS_TYPE_DESCRIBE = "describe";
 	public static final String ACCESS_TYPE_KAFKA_ADMIN = "kafka_admin";
 
 	private static volatile RangerBasePlugin rangerPlugin = null;
+	long lastLogTime = 0;
+	int errorLogFreq = 30000; // Log after every 30 seconds
 
 	public RangerKafkaAuthorizer() {
 		if (rangerPlugin == null) {
@@ -79,20 +82,33 @@ public class RangerKafkaAuthorizer implements Authorizer {
 		rangerPlugin.setResultProcessor(auditHandler);
 	}
 
-	// TODO: Fix this after Session is fixed
-	// @Override
+	@Override
 	public boolean authorize(Session session, Operation operation,
 			Resource resource) {
 
 		String userName = null;
+		if (session.principal() != null) {
+			userName = session.principal().getName();
+			userName = StringUtils.substringBefore(userName, "/");
+			userName = StringUtils.substringBefore(userName, "@");
+		}
 		java.util.Set<String> userGroups = getGroupsForUser(userName);
-		String ip = null;
+		String ip = session.host();
+
 		Date eventTime = StringUtil.getUTCDate();
 		String accessType = mapToRangerAccessType(operation);
+		boolean validationFailed = false;
+		String validationStr = "";
+
 		if (accessType == null) {
-			logger.fatal("Unsupported access type. session=" + session
-					+ ", operation=" + operation + ", resource=" + resource);
-			return false;
+			if (rangerPlugin
+					.logErrorMessage("Unsupported access type. operation="
+							+ operation)) {
+				logger.fatal("Unsupported access type. session=" + session
+						+ ", operation=" + operation + ", resource=" + resource);
+			}
+			validationFailed = true;
+			validationStr += "Unsupported access type. operation=" + operation;
 		}
 		String action = accessType;
 
@@ -103,25 +119,49 @@ public class RangerKafkaAuthorizer implements Authorizer {
 		rangerRequest.setAccessTime(eventTime);
 
 		RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+		rangerRequest.setResource(rangerResource);
+		rangerRequest.setAccessType(accessType);
+		rangerRequest.setAction(action);
+		rangerRequest.setRequestData(resource.name());
 
 		if (resource.resourceType().equals(ResourceType.TOPIC)) {
 			rangerResource.setValue(KEY_TOPIC, resource.name());
 		} else if (resource.resourceType().equals(ResourceType.CLUSTER)) {
-			rangerResource.setValue(KEY_CLUSTER, resource.name());
+			// CLUSTER should go as null
+			// rangerResource.setValue(KEY_CLUSTER, resource.name());
 		} else if (resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
 			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
 		} else {
 			logger.fatal("Unsupported resourceType=" + resource.resourceType());
-			return false;
+			validationFailed = true;
 		}
 
-		rangerRequest.setResource(rangerResource);
-		rangerRequest.setAccessType(accessType);
-		rangerRequest.setAction(action);
-		rangerRequest.setRequestData(resource.name());
+		boolean returnValue = true;
+		if (validationFailed) {
+			rangerPlugin.logErrorMessage(validationStr + ", request="
+					+ rangerRequest);
+			returnValue = false;
+		} else {
 
-		RangerAccessResult result = rangerPlugin.isAccessAllowed(rangerRequest);
-		return result.getIsAllowed();
+			try {
+				RangerAccessResult result = rangerPlugin
+						.isAccessAllowed(rangerRequest);
+				if (result == null) {
+					logger.error("Ranger Plugin returned null. Returning false");
+					returnValue = false;
+				} else {
+					returnValue = result.getIsAllowed();
+				}
+			} catch (Throwable t) {
+				logger.error("Error while calling isAccessAllowed(). request="
+						+ rangerRequest, t);
+			}
+		}
+		if (logger.isDebugEnabled()) {
+			logger.debug("rangerRequest=" + rangerRequest + ", return="
+					+ returnValue);
+		}
+		return returnValue;
 	}
 
 	/*
@@ -210,12 +250,8 @@ public class RangerKafkaAuthorizer implements Authorizer {
 			return ACCESS_TYPE_READ;
 		} else if (operation.equals(Operation.WRITE)) {
 			return ACCESS_TYPE_WRITE;
-		} else if (operation.equals(Operation.CREATE)) {
-			return ACCESS_TYPE_CREATE;
-		} else if (operation.equals(Operation.DELETE)) {
-			return ACCESS_TYPE_DELETE;
 		} else if (operation.equals(Operation.ALTER)) {
-			return ACCESS_TYPE_ALTER;
+			return ACCESS_TYPE_CONFIGURE;
 		} else if (operation.equals(Operation.DESCRIBE)) {
 			return ACCESS_TYPE_DESCRIBE;
 		} else if (operation.equals(Operation.CLUSTER_ACTION)) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/115577a6/plugin-solr/src/main/java/org/apache/ranger/authorization/solr/authorizer/RangerSolrAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-solr/src/main/java/org/apache/ranger/authorization/solr/authorizer/RangerSolrAuthorizer.java b/plugin-solr/src/main/java/org/apache/ranger/authorization/solr/authorizer/RangerSolrAuthorizer.java
index 673f652..8e0ada8 100644
--- a/plugin-solr/src/main/java/org/apache/ranger/authorization/solr/authorizer/RangerSolrAuthorizer.java
+++ b/plugin-solr/src/main/java/org/apache/ranger/authorization/solr/authorizer/RangerSolrAuthorizer.java
@@ -223,6 +223,7 @@ public class RangerSolrAuthorizer implements AuthorizationPlugin {
 		String userName = null;
 		if (principal != null) {
 			userName = principal.getName();
+			userName = StringUtils.substringBefore(userName, "/");
 			userName = StringUtils.substringBefore(userName, "@");
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/115577a6/src/main/assembly/plugin-kafka.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/plugin-kafka.xml b/src/main/assembly/plugin-kafka.xml
index aa66e08..77c4e65 100644
--- a/src/main/assembly/plugin-kafka.xml
+++ b/src/main/assembly/plugin-kafka.xml
@@ -1,167 +1,188 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	You under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
 <assembly>
-  <id>kafka-plugin</id>
-  <formats>
-     <format>tar.gz</format>
-	 <format>zip</format>
-  </formats>
-  <baseDirectory>${project.name}-${project.version}-kafka-plugin</baseDirectory>
-  <includeBaseDirectory>true</includeBaseDirectory>
-  <moduleSets>
-    <moduleSet>
-     <binaries>
-        <includeDependencies>false</includeDependencies>
-        <unpack>false</unpack>
-	    <directoryMode>755</directoryMode>
-	    <fileMode>644</fileMode>
-        <dependencySets>
-            <dependencySet>
-                <outputDirectory>/lib</outputDirectory>
-                <unpack>false</unpack>
-                <includes>
-                    <include>commons-configuration:commons-configuration:jar:${commons.configuration.version}</include>
-                    <include>org.apache.hadoop:hadoop-common:jar:${hadoop-common.version}</include>
-                    <include>org.apache.hadoop:hadoop-common-plus:jar:${hadoop-common.version}</include>
-                    <include>com.google.code.gson:gson</include>
-                    <include>org.eclipse.persistence:eclipselink</include>
-                    <include>org.eclipse.persistence:javax.persistence</include>
-                    <include>commons-collections:commons-collections</include>
-		    <include>com.sun.jersey:jersey-bundle</include>
-                    <include>commons-logging:commons-logging:jar:${commons.logging.version}</include>
-                    <include>com.google.guava:guava:jar:${guava.version}</include>
-		    <include>org.apache.httpcomponents:httpclient:jar:${httpcomponent.httpclient.version}</include>
-		    <include>org.apache.httpcomponents:httpcore:jar:${httpcomponent.httpcore.version}</include>
-		    <include>org.apache.httpcomponents:httpmime:jar:${httpcomponent.httpmime.version}</include>
-		    <include>org.noggit:noggit:jar:${noggit.version}</include>
-                </includes>
-            </dependencySet>
-            <dependencySet>
-                    <outputDirectory>/install/lib</outputDirectory>
-                    <unpack>false</unpack>
-            		<directoryMode>755</directoryMode>
-            		<fileMode>644</fileMode>
-                    <includes>
-                        <include>commons-cli:commons-cli</include>
-                        <include>commons-collections:commons-collections</include>
-                        <include>commons-configuration:commons-configuration:jar:${commons.configuration.version}</include>
-                        <include>commons-io:commons-io:jar:${commons.io.version}</include>
-                        <include>commons-lang:commons-lang:jar:${commons.lang.version}</include>
-                        <include>commons-logging:commons-logging</include>
-                        <include>com.google.guava:guava:jar:${guava.version}</include>
-                        <include>org.hamcrest:hamcrest-all</include>
-                        <include>junit:junit</include>
-                        <include>org.slf4j:slf4j-api:jar:${slf4j-api.version}</include>
-                        <include>org.apache.hadoop:hadoop-common:jar:${hadoop-common.version}</include>
-                        <include>org.apache.hadoop:hadoop-auth:jar:${hadoop-common.version}</include>
-						<include>security_plugins.ranger-plugins-cred:ranger-plugins-cred</include>
-						<include>org.apache.ranger:credentialbuilder</include>
-                    </includes>
-            </dependencySet>
-        </dependencySets>
-        <outputDirectory>/lib</outputDirectory>
-     </binaries>
-     <includes>
-		<include>org.apache.ranger:ranger_solrj</include>
-		<include>security_plugins.ranger-plugins-audit:ranger-plugins-audit</include>
-		<include>security_plugins.ranger-plugins-cred:ranger-plugins-cred</include>
-		<include>security_plugins.ranger-plugins-impl:ranger-plugins-impl</include>
-		<include>security_plugins.ranger-plugins-common:ranger-plugins-common</include>
-		<include>security_plugins.ranger-kafka-plugin:ranger-kafka-plugin</include>
-     </includes>
-    </moduleSet>
-    <moduleSet>
-     <binaries>
-        <includeDependencies>false</includeDependencies>
-        <outputDirectory>/install/lib</outputDirectory>
-        <unpack>false</unpack>
-     </binaries>
-     <includes>
-		<include>security_plugins.ranger-plugins-installer:ranger-plugins-installer</include>
-		<include>org.apache.ranger:credentialbuilder</include>
-     </includes>
-    </moduleSet>
-   </moduleSets>
-   <fileSets>
-   <!-- conf.templates for enable -->
-    <fileSet>
-        <outputDirectory>/install/conf.templates/enable</outputDirectory>
-        <directory>plugin-kafka/conf</directory>
-        <excludes>
-            <exclude>*.sh</exclude>
-        </excludes>
-        <fileMode>700</fileMode>
-    </fileSet>
-    <fileSet>
-        <outputDirectory>/install/conf.templates/disable</outputDirectory>
-        <directory>plugin-kafka/disable-conf</directory>
-        <fileMode>700</fileMode>
-    </fileSet>
-    <fileSet>
-        <outputDirectory>/install/conf.templates/default</outputDirectory>
-        <directory>plugin-kafka/template</directory>
-        <fileMode>700</fileMode>
-    </fileSet>
-    <!-- version file -->
-    <fileSet>
-        <outputDirectory>/</outputDirectory>
-        <directory>${project.build.outputDirectory}</directory>
-        <includes>
-            <include>version</include>
-        </includes>
-        <fileMode>444</fileMode>
-    </fileSet>
-  </fileSets>
-  <!-- enable/disable script for Plugin -->
- <files>
-    <file>
-		<source>agents-common/scripts/enable-agent.sh</source>
-        <outputDirectory>/</outputDirectory>
-        <destName>enable-kafka-plugin.sh</destName>
-        <fileMode>755</fileMode>
-    </file>
-    <file>
-		<source>agents-common/scripts/enable-agent.sh</source>
-        <outputDirectory>/</outputDirectory>
-        <destName>disable-kafka-plugin.sh</destName>
-        <fileMode>755</fileMode>
-    </file>
-    <file>
-        <source>security-admin/scripts/ranger_credential_helper.py</source>
-        <outputDirectory>/</outputDirectory>
-        <fileMode>755</fileMode>
-    </file>
-    <file>
-        <source>security-admin/scripts/ranger_credential_helper.py</source>
-        <outputDirectory>/</outputDirectory>
-        <fileMode>755</fileMode>
-    </file>
-    <file>
-        <source>plugin-kafka/scripts/install.properties</source>
-        <outputDirectory>/</outputDirectory>
-        <destName>install.properties</destName>
-        <fileMode>755</fileMode>
-    </file>
-    <file>
-        <source>plugin-kafka/scripts/kafka-plugin-install.properties</source>
-        <outputDirectory>/</outputDirectory>
-        <destName>kafka-plugin-install.properties</destName>
-        <fileMode>755</fileMode>
-    </file>
-  </files>
+	<id>kafka-plugin</id>
+	<formats>
+		<format>tar.gz</format>
+		<format>zip</format>
+	</formats>
+	<baseDirectory>${project.name}-${project.version}-kafka-plugin
+	</baseDirectory>
+	<includeBaseDirectory>true</includeBaseDirectory>
+	<moduleSets>
+		<moduleSet>
+			<binaries>
+				<includeDependencies>false</includeDependencies>
+				<unpack>false</unpack>
+				<directoryMode>755</directoryMode>
+				<fileMode>644</fileMode>
+				<dependencySets>
+					<dependencySet>
+						<outputDirectory>/lib</outputDirectory>
+						<unpack>false</unpack>
+						<includes>
+							<include>commons-configuration:commons-configuration:jar:${commons.configuration.version}
+							</include>
+							<include>org.apache.hadoop:hadoop-common:jar:${hadoop-common.version}
+							</include>
+							<include>org.apache.hadoop:hadoop-common-plus:jar:${hadoop-common.version}
+							</include>
+							<include>com.google.code.gson:gson</include>
+							<include>org.eclipse.persistence:eclipselink</include>
+							<include>org.eclipse.persistence:javax.persistence</include>
+							<include>commons-collections:commons-collections</include>
+							<include>com.sun.jersey:jersey-bundle</include>
+							<include>commons-logging:commons-logging:jar:${commons.logging.version}
+							</include>
+							<include>commons-lang:commons-lang</include>
+							<include>commons-io:commons-io</include>
+							<include>com.google.guava:guava:jar:${guava.version}</include>
+							<include>org.apache.httpcomponents:httpclient:jar:${httpcomponent.httpclient.version}
+							</include>
+							<include>org.apache.httpcomponents:httpcore:jar:${httpcomponent.httpcore.version}
+							</include>
+							<include>org.apache.httpcomponents:httpmime:jar:${httpcomponent.httpmime.version}
+							</include>
+							<include>org.noggit:noggit:jar:${noggit.version}</include>
+							<include>org.codehaus.jackson:jackson-core-asl</include>
+							<include>org.codehaus.jackson:jackson-jaxrs</include>
+							<include>org.codehaus.jackson:jackson-mapper-asl</include>
+							<include>org.codehaus.jackson:jackson-xc</include>
+
+						</includes>
+					</dependencySet>
+					<dependencySet>
+						<outputDirectory>/install/lib</outputDirectory>
+						<unpack>false</unpack>
+						<directoryMode>755</directoryMode>
+						<fileMode>644</fileMode>
+						<includes>
+							<include>commons-cli:commons-cli</include>
+							<include>commons-collections:commons-collections</include>
+							<include>commons-configuration:commons-configuration:jar:${commons.configuration.version}
+							</include>
+							<include>commons-io:commons-io:jar:${commons.io.version}
+							</include>
+							<include>commons-lang:commons-lang:jar:${commons.lang.version}
+							</include>
+							<include>commons-logging:commons-logging</include>
+							<include>com.google.guava:guava:jar:${guava.version}</include>
+							<include>org.hamcrest:hamcrest-all</include>
+							<include>junit:junit</include>
+							<include>org.slf4j:slf4j-api:jar:${slf4j-api.version}</include>
+							<include>org.apache.hadoop:hadoop-common:jar:${hadoop-common.version}
+							</include>
+							<include>org.apache.hadoop:hadoop-auth:jar:${hadoop-common.version}
+							</include>
+							<include>security_plugins.ranger-plugins-cred:ranger-plugins-cred
+							</include>
+							<include>org.apache.ranger:credentialbuilder</include>
+						</includes>
+					</dependencySet>
+				</dependencySets>
+				<outputDirectory>/lib</outputDirectory>
+			</binaries>
+			<includes>
+				<include>org.apache.ranger:ranger_solrj</include>
+				<include>security_plugins.ranger-plugins-audit:ranger-plugins-audit
+				</include>
+				<include>security_plugins.ranger-plugins-cred:ranger-plugins-cred
+				</include>
+				<include>security_plugins.ranger-plugins-impl:ranger-plugins-impl
+				</include>
+				<include>security_plugins.ranger-plugins-common:ranger-plugins-common
+				</include>
+				<include>security_plugins.ranger-kafka-plugin:ranger-kafka-plugin
+				</include>
+			</includes>
+		</moduleSet>
+		<moduleSet>
+			<binaries>
+				<includeDependencies>false</includeDependencies>
+				<outputDirectory>/install/lib</outputDirectory>
+				<unpack>false</unpack>
+			</binaries>
+			<includes>
+				<include>security_plugins.ranger-plugins-installer:ranger-plugins-installer
+				</include>
+				<include>org.apache.ranger:credentialbuilder</include>
+			</includes>
+		</moduleSet>
+	</moduleSets>
+	<fileSets>
+		<!-- conf.templates for enable -->
+		<fileSet>
+			<outputDirectory>/install/conf.templates/enable</outputDirectory>
+			<directory>plugin-kafka/conf</directory>
+			<excludes>
+				<exclude>*.sh</exclude>
+			</excludes>
+			<fileMode>700</fileMode>
+		</fileSet>
+		<fileSet>
+			<outputDirectory>/install/conf.templates/disable</outputDirectory>
+			<directory>plugin-kafka/disable-conf</directory>
+			<fileMode>700</fileMode>
+		</fileSet>
+		<fileSet>
+			<outputDirectory>/install/conf.templates/default</outputDirectory>
+			<directory>plugin-kafka/template</directory>
+			<fileMode>700</fileMode>
+		</fileSet>
+		<!-- version file -->
+		<fileSet>
+			<outputDirectory>/</outputDirectory>
+			<directory>${project.build.outputDirectory}</directory>
+			<includes>
+				<include>version</include>
+			</includes>
+			<fileMode>444</fileMode>
+		</fileSet>
+	</fileSets>
+	<!-- enable/disable script for Plugin -->
+	<files>
+		<file>
+			<source>agents-common/scripts/enable-agent.sh</source>
+			<outputDirectory>/</outputDirectory>
+			<destName>enable-kafka-plugin.sh</destName>
+			<fileMode>755</fileMode>
+		</file>
+		<file>
+			<source>agents-common/scripts/enable-agent.sh</source>
+			<outputDirectory>/</outputDirectory>
+			<destName>disable-kafka-plugin.sh</destName>
+			<fileMode>755</fileMode>
+		</file>
+		<file>
+			<source>security-admin/scripts/ranger_credential_helper.py</source>
+			<outputDirectory>/</outputDirectory>
+			<fileMode>755</fileMode>
+		</file>
+		<file>
+			<source>security-admin/scripts/ranger_credential_helper.py</source>
+			<outputDirectory>/</outputDirectory>
+			<fileMode>755</fileMode>
+		</file>
+		<file>
+			<source>plugin-kafka/scripts/install.properties</source>
+			<outputDirectory>/</outputDirectory>
+			<destName>install.properties</destName>
+			<fileMode>755</fileMode>
+		</file>
+		<file>
+			<source>plugin-kafka/scripts/kafka-plugin-install.properties</source>
+			<outputDirectory>/</outputDirectory>
+			<destName>kafka-plugin-install.properties</destName>
+			<fileMode>755</fileMode>
+		</file>
+	</files>
 </assembly>