You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2017/07/26 18:47:30 UTC

ranger git commit: RANGER-1695:Optimize Ranger code for authorization of HDFS 'getContentSummary' and 'delete' commands

Repository: ranger
Updated Branches:
  refs/heads/master 88352408b -> 96b0c4860


RANGER-1695:Optimize Ranger code for authorization of HDFS 'getContentSummary' and 'delete' commands


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/96b0c486
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/96b0c486
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/96b0c486

Branch: refs/heads/master
Commit: 96b0c48602d777e7f1aacc6b7253fed874811b1a
Parents: 8835240
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Wed Jul 26 11:31:04 2017 -0700
Committer: Abhay Kulkarni <ak...@hortonworks.com>
Committed: Wed Jul 26 11:31:04 2017 -0700

----------------------------------------------------------------------
 .../hadoop/constants/RangerHadoopConstants.java |   2 +
 .../policyengine/RangerPolicyEngineImpl.java    |   4 +-
 .../policyengine/RangerPolicyEngineOptions.java |  15 ++
 .../ranger/plugin/service/RangerBasePlugin.java |   2 +
 .../plugin/util/RangerPerfCollectorTracer.java  |   5 +-
 .../hbase/RangerAuthorizationCoprocessor.java   |  40 ++++--
 .../hadoop/RangerHdfsAuthorizer.java            | 141 ++++++++++++++++++-
 .../ranger/services/hdfs/HDFSRangerTest.java    |  55 ++++++++
 .../src/test/resources/hdfs-policies.json       |  27 ++++
 hdfs-agent/src/test/resources/log4j.properties  |  34 +++++
 .../src/test/resources/ranger-hdfs-security.xml |   8 ++
 .../hive/authorizer/RangerHiveAuthorizer.java   |  28 +++-
 .../authorization/knox/RangerPDPKnoxFilter.java |  11 ++
 .../atlas/authorizer/RangerAtlasAuthorizer.java |  10 ++
 .../kafka/authorizer/RangerKafkaAuthorizer.java |   9 ++
 .../kms/authorizer/RangerKmsAuthorizer.java     |  11 +-
 .../solr/authorizer/RangerSolrAuthorizer.java   |   9 ++
 .../yarn/authorizer/RangerYarnAuthorizer.java   |  20 +++
 ranger-tools/conf/log4j.properties              |   1 +
 .../RangerPolicyenginePerfTester.java           |   1 +
 .../storm/authorizer/RangerStormAuthorizer.java |  22 ++-
 21 files changed, 428 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/agents-common/src/main/java/org/apache/ranger/authorization/hadoop/constants/RangerHadoopConstants.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/authorization/hadoop/constants/RangerHadoopConstants.java b/agents-common/src/main/java/org/apache/ranger/authorization/hadoop/constants/RangerHadoopConstants.java
index 83f720a..6d9fe26 100644
--- a/agents-common/src/main/java/org/apache/ranger/authorization/hadoop/constants/RangerHadoopConstants.java
+++ b/agents-common/src/main/java/org/apache/ranger/authorization/hadoop/constants/RangerHadoopConstants.java
@@ -21,7 +21,9 @@ package org.apache.ranger.authorization.hadoop.constants;
 public class RangerHadoopConstants {
 	
 	public static final String RANGER_ADD_HDFS_PERMISSION_PROP = "xasecure.add-hadoop-authorization";
+	public static final String RANGER_OPTIMIZE_SUBACCESS_AUTHORIZATION_PROP = "ranger.optimize-subaccess-authorization" ;
 	public static final boolean RANGER_ADD_HDFS_PERMISSION_DEFAULT = false;
+	public static final boolean RANGER_OPTIMIZE_SUBACCESS_AUTHORIZATION_DEFAULT = false ;
 	public static final String READ_ACCCESS_TYPE = "read";
 	public static final String WRITE_ACCCESS_TYPE = "write";
 	public static final String EXECUTE_ACCCESS_TYPE = "execute";

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineImpl.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineImpl.java b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineImpl.java
index c72c8b5..2bebb95 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineImpl.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineImpl.java
@@ -297,7 +297,9 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 		RangerPerfTracer perf = null;
 
 		if(RangerPerfTracer.isPerfTraceEnabled(PERF_POLICYENGINE_REQUEST_LOG)) {
-			perf = RangerPerfTracer.getPerfTracer(PERF_POLICYENGINE_REQUEST_LOG, "RangerPolicyEngine.isAccessAllowed(requestHashCode=" + Integer.toHexString(System.identityHashCode(request)) + ")");
+			String requestHashCode = Integer.toHexString(System.identityHashCode(request));
+			perf = RangerPerfTracer.getPerfTracer(PERF_POLICYENGINE_REQUEST_LOG, "RangerPolicyEngine.isAccessAllowed(requestHashCode=" + requestHashCode + ")");
+			LOG.info("RangerPolicyEngineImpl.isAccessAllowed(" + requestHashCode + ", " + request + ")");
 		}
 
 		RangerAccessResult ret = isAccessAllowedNoAudit(request);

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java
index 63162f6..9ec2049 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java
@@ -40,6 +40,9 @@ public class RangerPolicyEngineOptions {
 
 		cacheAuditResults = conf.getBoolean(propertyPrefix + ".policyengine.option.cache.audit.results", true);
 
+		if (!disableTrieLookupPrefilter) {
+			cacheAuditResults = false;
+		}
 		evaluateDelegateAdminOnly = false;
 		enableTagEnricherWithLocalRefresher = false;
 	}
@@ -118,4 +121,16 @@ public class RangerPolicyEngineOptions {
 		ret *= 2;
 		return ret;
 	}
+
+	@Override
+	public String toString() {
+		return "PolicyEngineOptions: {" +
+				" evaluatorType: " + evaluatorType +
+				", cacheAuditResult: " + cacheAuditResults +
+				", disableContextEnrichers: " + disableContextEnrichers +
+				", disableCustomConditions: " + disableContextEnrichers +
+				", disableTrieLookupPrefilter: " + disableTrieLookupPrefilter +
+				" }";
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
index 06b8f4d..d5aa1ae 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
@@ -143,6 +143,8 @@ public class RangerBasePlugin {
 
 		policyEngineOptions.configureForPlugin(configuration, propertyPrefix);
 
+		LOG.info(policyEngineOptions);
+
 		RangerAdminClient admin = createAdminClient(serviceName, appId, propertyPrefix);
 
 		refresher = new PolicyRefresher(this, serviceType, appId, serviceName, admin, pollingIntervalMs, cacheDir);

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerPerfCollectorTracer.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerPerfCollectorTracer.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerPerfCollectorTracer.java
index d899c6f..353f7da 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerPerfCollectorTracer.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerPerfCollectorTracer.java
@@ -22,13 +22,16 @@ package org.apache.ranger.plugin.util;
 import org.apache.commons.logging.Log;
 
 public class RangerPerfCollectorTracer extends RangerPerfTracer {
+	private final long   startTimeNanos;
 
 	public RangerPerfCollectorTracer(Log logger, String tag, String data) {
 		super(logger, tag, data);
+		startTimeNanos = System.nanoTime();
 	}
 
 	@Override
 	public void log() {
-		PerfDataRecorder.recordStatistic(tag, getElapsedTime());
+		// Collect elapsed time in microseconds
+		PerfDataRecorder.recordStatistic(tag, ((System.nanoTime() - startTimeNanos) + 500) / 1000);
 	}
 }

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
----------------------------------------------------------------------
diff --git a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
index cf2ffcf..fc1db46 100644
--- a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
+++ b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
@@ -111,9 +111,11 @@ import com.google.common.collect.MapMaker;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
+import org.apache.ranger.plugin.util.RangerPerfTracer;
 
 public class RangerAuthorizationCoprocessor extends RangerAuthorizationCoprocessorBase implements AccessControlService.Interface, CoprocessorService {
 	private static final Log LOG = LogFactory.getLog(RangerAuthorizationCoprocessor.class.getName());
+	private static final Log PERF_HBASEAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("hbaseauth.request");
 	private static boolean UpdateRangerPoliciesOnGrantRevoke = RangerHadoopConstants.HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE;
 	private static final String GROUP_PREFIX = "@";
 		
@@ -334,7 +336,7 @@ public class RangerAuthorizationCoprocessor extends RangerAuthorizationCoprocess
 			}
 			return result;
 		}
-		
+
 		// let's create a session that would be reused.  Set things on it that won't change.
 		HbaseAuditHandler auditHandler = _factory.getAuditHandler();
 		AuthorizationSession session = new AuthorizationSession(hbasePlugin)
@@ -505,7 +507,11 @@ public class RangerAuthorizationCoprocessor extends RangerAuthorizationCoprocess
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> authorizeAccess");
 		}
+		RangerPerfTracer perf = null;
+
 		try {
+			perf = RangerPerfTracer.getPerfTracer(PERF_HBASEAUTH_REQUEST_LOG, "RangerAuthorizationCoprocessor.authorizeAccess(request=Operation[" + operation + "]");
+
 			ColumnFamilyAccessResult accessResult = evaluateAccess(operation, action, env, familyMap);
 			RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler();
 			if (accessResult._everythingIsAccessible) {
@@ -525,6 +531,7 @@ public class RangerAuthorizationCoprocessor extends RangerAuthorizationCoprocess
 				throw new AccessDeniedException(accessResult._denialReason);
 			}
 		} finally {
+			RangerPerfTracer.log(perf);
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("<== authorizeAccess");
 			}
@@ -542,17 +549,26 @@ public class RangerAuthorizationCoprocessor extends RangerAuthorizationCoprocess
 	void requirePermission(final String operation, final Action action, final RegionCoprocessorEnvironment regionServerEnv, final Map<byte[], ? extends Collection<?>> familyMap)
 			throws AccessDeniedException {
 
-		ColumnFamilyAccessResult accessResult = evaluateAccess(operation, action, regionServerEnv, familyMap);
-		RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler();
-		if (accessResult._everythingIsAccessible) {
-			auditHandler.logAuthzAudits(accessResult._accessAllowedEvents);
-			auditHandler.logAuthzAudits(accessResult._familyLevelAccessEvents);
-			LOG.debug("requirePermission: exiting: all access was allowed");
-			return;
-		} else {
-			auditHandler.logAuthzAudit(accessResult._accessDeniedEvent);
-			LOG.debug("requirePermission: exiting: throwing exception as everything wasn't accessible");
-			throw new AccessDeniedException(accessResult._denialReason);
+		RangerPerfTracer perf = null;
+
+		try {
+			if (RangerPerfTracer.isPerfTraceEnabled(PERF_HBASEAUTH_REQUEST_LOG)) {
+				perf = RangerPerfTracer.getPerfTracer(PERF_HBASEAUTH_REQUEST_LOG, "RangerAuthorizationCoprocessor.requirePermission(request=Operation[" + operation + "]");
+			}
+			ColumnFamilyAccessResult accessResult = evaluateAccess(operation, action, regionServerEnv, familyMap);
+			RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler();
+			if (accessResult._everythingIsAccessible) {
+				auditHandler.logAuthzAudits(accessResult._accessAllowedEvents);
+				auditHandler.logAuthzAudits(accessResult._familyLevelAccessEvents);
+				LOG.debug("requirePermission: exiting: all access was allowed");
+				return;
+			} else {
+				auditHandler.logAuthzAudit(accessResult._accessDeniedEvent);
+				LOG.debug("requirePermission: exiting: throwing exception as everything wasn't accessible");
+				throw new AccessDeniedException(accessResult._denialReason);
+			}
+		} finally {
+			RangerPerfTracer.log(perf);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizer.java
----------------------------------------------------------------------
diff --git a/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizer.java b/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizer.java
index d28685a..f82fd57 100644
--- a/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizer.java
+++ b/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizer.java
@@ -28,10 +28,12 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.Stack;
 
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,7 +56,9 @@ import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResource;
 import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+import org.apache.ranger.plugin.resourcematcher.RangerPathResourceMatcher;
 import org.apache.ranger.plugin.service.RangerBasePlugin;
+import org.apache.ranger.plugin.util.RangerPerfTracer;
 
 import com.google.common.collect.Sets;
 
@@ -70,6 +74,7 @@ public class RangerHdfsAuthorizer extends INodeAttributeProvider {
     public static final String RANGER_FILENAME_EXTENSION_SEPARATOR_PROP = "ranger.plugin.hdfs.filename.extension.separator";
 
 	private static final Log LOG = LogFactory.getLog(RangerHdfsAuthorizer.class);
+	private static final Log PERF_HDFSAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("hdfsauth.request");
 
 	private RangerHdfsPlugin           rangerPlugin            = null;
 	private Map<FsAction, Set<String>> access2ActionListMapper = new HashMap<FsAction, Set<String>>();
@@ -92,6 +97,10 @@ public class RangerHdfsAuthorizer extends INodeAttributeProvider {
 		RangerHdfsPlugin plugin = new RangerHdfsPlugin();
 		plugin.init();
 
+		if (plugin.isOptimizeSubAccessAuthEnabled()) {
+			LOG.info(RangerHadoopConstants.RANGER_OPTIMIZE_SUBACCESS_AUTHORIZATION_PROP + " is enabled");
+		}
+
 		access2ActionListMapper.put(FsAction.NONE,          new HashSet<String>());
 		access2ActionListMapper.put(FsAction.ALL,           Sets.newHashSet(READ_ACCCESS_TYPE, WRITE_ACCCESS_TYPE, EXECUTE_ACCCESS_TYPE));
 		access2ActionListMapper.put(FsAction.READ,          Sets.newHashSet(READ_ACCCESS_TYPE));
@@ -208,6 +217,12 @@ public class RangerHdfsAuthorizer extends INodeAttributeProvider {
 						+ ", access=" + access + ", subAccess=" + subAccess + ", ignoreEmptyDir=" + ignoreEmptyDir + ")");
 			}
 
+			RangerPerfTracer perf = null;
+
+			if(RangerPerfTracer.isPerfTraceEnabled(PERF_HDFSAUTH_REQUEST_LOG)) {
+				perf = RangerPerfTracer.getPerfTracer(PERF_HDFSAUTH_REQUEST_LOG, "RangerHdfsAuthorizer.checkPermission(path=" + path + ")");
+			}
+
 			try {
 				boolean isTraverseOnlyCheck = access == null && parentAccess == null && ancestorAccess == null && subAccess == null;
 				INode   ancestor            = null;
@@ -311,19 +326,37 @@ public class RangerHdfsAuthorizer extends INodeAttributeProvider {
 								if(authzStatus != AuthzStatus.ALLOW) {
 									break;
 								}
-							}
 
-							for(INode child : cList) {
-								if (child.isDirectory()) {
-									directories.push(child.asDirectory());
+								AuthzStatus subDirAuthStatus = AuthzStatus.NOT_DETERMINED;
+
+								boolean optimizeSubAccessAuthEnabled = RangerHdfsPlugin.isOptimizeSubAccessAuthEnabled();
+
+								if (optimizeSubAccessAuthEnabled) {
+									subDirAuthStatus = isAccessAllowedForHierarchy(dir, dirAttribs, subAccess, user, groups, plugin);
+								}
+
+								if (subDirAuthStatus != AuthzStatus.ALLOW) {
+									for(INode child : cList) {
+										if (child.isDirectory()) {
+											directories.push(child.asDirectory());
+										}
+									}
 								}
 							}
 						}
 						if (authzStatus == AuthzStatus.NOT_DETERMINED) {
+							RangerPerfTracer hadoopAuthPerf = null;
+
+							if(RangerPerfTracer.isPerfTraceEnabled(PERF_HDFSAUTH_REQUEST_LOG)) {
+								hadoopAuthPerf = RangerPerfTracer.getPerfTracer(PERF_HDFSAUTH_REQUEST_LOG, "defaultEnforcer.checkPermission(path=" + path + ")");
+							}
+
 							authzStatus = checkDefaultEnforcer(fsOwner, superGroup, ugi, inodeAttrs, inodes,
 											pathByNameArr, snapshotId, path, ancestorIndex, doCheckOwner,
 											FsAction.NONE, FsAction.NONE, FsAction.NONE, subAccess, ignoreEmptyDir,
 											isTraverseOnlyCheck, ancestor, parent, inode, auditHandler);
+
+							RangerPerfTracer.log(hadoopAuthPerf);
 						}
 					}
 
@@ -363,6 +396,8 @@ public class RangerHdfsAuthorizer extends INodeAttributeProvider {
 					auditHandler.flushAudit();
 				}
 
+				RangerPerfTracer.log(perf);
+
 				if(LOG.isDebugEnabled()) {
 					LOG.debug("<== RangerAccessControlEnforcer.checkPermission(" + path + ", " + access + ", user=" + user + ") : " + authzStatus);
 				}
@@ -379,6 +414,7 @@ public class RangerHdfsAuthorizer extends INodeAttributeProvider {
 												 ) throws AccessControlException {
 			    AuthzStatus authzStatus = AuthzStatus.NOT_DETERMINED;
 				if(RangerHdfsPlugin.isHadoopAuthEnabled() && defaultEnforcer != null) {
+
 					try {
 						defaultEnforcer.checkPermission(fsOwner, superGroup, ugi, inodeAttrs, inodes,
 														pathByNameArr, snapshotId, path, ancestorIndex, doCheckOwner,
@@ -475,6 +511,70 @@ public class RangerHdfsAuthorizer extends INodeAttributeProvider {
 
 			return ret;
 		}
+
+		private AuthzStatus isAccessAllowedForHierarchy(INode inode, INodeAttributes inodeAttribs, FsAction access, String user, Set<String> groups, RangerHdfsPlugin plugin) {
+			AuthzStatus ret   = null;
+			String  path      = inode != null ? inode.getFullPathName() : null;
+			String  pathOwner = inodeAttribs != null ? inodeAttribs.getUserName() : null;
+			String 		clusterName = plugin.getClusterName();
+
+			if (pathOwner == null && inode != null) {
+				pathOwner = inode.getUserName();
+			}
+
+			if (RangerHadoopConstants.HDFS_ROOT_FOLDER_PATH_ALT.equals(path)) {
+				path = RangerHadoopConstants.HDFS_ROOT_FOLDER_PATH;
+			}
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("==> RangerAccessControlEnforcer.isAccessAllowedForHierarchy(" + path + ", " + access + ", " + user + ")");
+			}
+
+			if (path != null) {
+
+				Set<String> accessTypes = access2ActionListMapper.get(access);
+
+				if (accessTypes == null) {
+					LOG.warn("RangerAccessControlEnforcer.isAccessAllowedForHierarchy(" + path + ", " + access + ", " + user + "): no Ranger accessType found for " + access);
+
+					accessTypes = access2ActionListMapper.get(FsAction.NONE);
+				}
+
+				String subDirPath = path;
+				if (subDirPath.charAt(subDirPath.length() - 1) != org.apache.hadoop.fs.Path.SEPARATOR_CHAR) {
+					subDirPath = subDirPath + Character.toString(org.apache.hadoop.fs.Path.SEPARATOR_CHAR);
+				}
+				subDirPath = subDirPath + RangerHdfsPlugin.getRandomizedWildcardPathName();
+
+				for (String accessType : accessTypes) {
+					RangerHdfsAccessRequest request = new RangerHdfsAccessRequest(null, subDirPath, pathOwner, access, accessType, user, groups, clusterName);
+
+					RangerAccessResult result = plugin.isAccessAllowed(request, null);
+
+					if (result == null || !result.getIsAccessDetermined()) {
+						ret = AuthzStatus.NOT_DETERMINED;
+						// don't break yet; subsequent accessType could be denied
+					} else if(! result.getIsAllowed()) { // explicit deny
+						ret = AuthzStatus.DENY;
+						break;
+					} else { // allowed
+						if(!AuthzStatus.NOT_DETERMINED.equals(ret)) { // set to ALLOW only if there was no NOT_DETERMINED earlier
+							ret = AuthzStatus.ALLOW;
+						}
+					}
+				}
+			}
+
+			if(ret == null) {
+				ret = AuthzStatus.NOT_DETERMINED;
+			}
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("<== RangerAccessControlEnforcer.isAccessAllowedForHierarchy(" + path + ", " + access + ", " + user + "): " + ret);
+			}
+
+			return ret;
+		}
 	}
 }
 
@@ -482,7 +582,8 @@ public class RangerHdfsAuthorizer extends INodeAttributeProvider {
 class RangerHdfsPlugin extends RangerBasePlugin {
 	private static boolean hadoopAuthEnabled = RangerHadoopConstants.RANGER_ADD_HDFS_PERMISSION_DEFAULT;
 	private static String fileNameExtensionSeparator;
-
+	private static boolean optimizeSubAccessAuthEnabled = RangerHadoopConstants.RANGER_OPTIMIZE_SUBACCESS_AUTHORIZATION_DEFAULT;
+	private static String randomizedWildcardPathName;
 
 	public RangerHdfsPlugin() {
 		super("hdfs", "hdfs");
@@ -493,6 +594,17 @@ class RangerHdfsPlugin extends RangerBasePlugin {
 		
 		RangerHdfsPlugin.hadoopAuthEnabled = RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.RANGER_ADD_HDFS_PERMISSION_PROP, RangerHadoopConstants.RANGER_ADD_HDFS_PERMISSION_DEFAULT);
 		RangerHdfsPlugin.fileNameExtensionSeparator = RangerConfiguration.getInstance().get(RangerHdfsAuthorizer.RANGER_FILENAME_EXTENSION_SEPARATOR_PROP, RangerHdfsAuthorizer.DEFAULT_FILENAME_EXTENSION_SEPARATOR);
+		RangerHdfsPlugin.optimizeSubAccessAuthEnabled = RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.RANGER_OPTIMIZE_SUBACCESS_AUTHORIZATION_PROP, RangerHadoopConstants.RANGER_OPTIMIZE_SUBACCESS_AUTHORIZATION_DEFAULT);
+
+		// Build random string of random length
+		byte[] bytes = new byte[1];
+		new Random().nextBytes(bytes);
+		int count = bytes[0];
+		count = count < 56 ? 56 : count;
+		count = count > 112 ? 112 : count;
+
+		String random = RandomStringUtils.random(count, "^&#@!%()-_+=@:;'<>`~abcdefghijklmnopqrstuvwxyz01234567890");
+		randomizedWildcardPathName = RangerPathResourceMatcher.WILDCARD_ASTERISK + random + RangerPathResourceMatcher.WILDCARD_ASTERISK;
 	}
 
 	public static boolean isHadoopAuthEnabled() {
@@ -501,6 +613,12 @@ class RangerHdfsPlugin extends RangerBasePlugin {
 	public static String getFileNameExtensionSeparator() {
 		return RangerHdfsPlugin.fileNameExtensionSeparator;
 	}
+	public static boolean isOptimizeSubAccessAuthEnabled() {
+		return RangerHdfsPlugin.optimizeSubAccessAuthEnabled;
+	}
+	public static String getRandomizedWildcardPathName() {
+		return RangerHdfsPlugin.randomizedWildcardPathName;
+	}
 }
 
 class RangerHdfsResource extends RangerAccessResourceImpl {
@@ -589,17 +707,28 @@ class RangerHdfsAuditHandler extends RangerDefaultAuditHandler {
 			isAuditEnabled = true;
 		}
 
-		auditEvent = super.getAuthzEvents(result);
+		if (auditEvent == null) {
+			auditEvent = super.getAuthzEvents(result);
+		}
 
 		if (auditEvent != null) {
 			RangerAccessRequest request = result.getAccessRequest();
 			RangerAccessResource resource = request.getResource();
 			String resourcePath = resource != null ? resource.getAsString() : null;
 
+			// Overwrite fields in original auditEvent
 			auditEvent.setEventTime(request.getAccessTime());
 			auditEvent.setAccessType(request.getAction());
 			auditEvent.setResourcePath(this.pathToBeValidated);
 			auditEvent.setResultReason(resourcePath);
+
+			auditEvent.setAccessResult((short) (result.getIsAllowed() ? 1 : 0));
+			auditEvent.setPolicyId(result.getPolicyId());
+
+			Set<String> tags = getTags(request);
+			if (tags != null) {
+				auditEvent.setTags(tags);
+			}
 		}
 
 		if(LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/hdfs-agent/src/test/java/org/apache/ranger/services/hdfs/HDFSRangerTest.java
----------------------------------------------------------------------
diff --git a/hdfs-agent/src/test/java/org/apache/ranger/services/hdfs/HDFSRangerTest.java b/hdfs-agent/src/test/java/org/apache/ranger/services/hdfs/HDFSRangerTest.java
index fe6a705..a7215ce 100644
--- a/hdfs-agent/src/test/java/org/apache/ranger/services/hdfs/HDFSRangerTest.java
+++ b/hdfs-agent/src/test/java/org/apache/ranger/services/hdfs/HDFSRangerTest.java
@@ -23,6 +23,7 @@ import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -357,6 +358,11 @@ public class HDFSRangerTest {
         HDFSReadFailTest("/tmp/tmpdir5/t/data-file.txt");
     }
 
+    @org.junit.Test
+    public void HDFSContentSummaryTest() throws Exception {
+        HDFSGetContentSummary("/tmp/get-content-summary");
+    }
+
     void HDFSReadTest(String fileName) throws Exception {
         FileSystem fileSystem = hdfsCluster.getFileSystem();
 
@@ -527,4 +533,53 @@ public class HDFSRangerTest {
             }
         });
     }
+
+    void HDFSGetContentSummary(final String dirName) throws Exception {
+
+        String subdirName = dirName + "/tmpdir";
+
+        createFile(subdirName, 1);
+        createFile(subdirName, 2);
+
+        UserGroupInformation ugi = UserGroupInformation.createUserForTesting("bob", new String[] {});
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+
+            public Void run() throws Exception {
+                Configuration conf = new Configuration();
+                conf.set("fs.defaultFS", defaultFs);
+
+                FileSystem fs = FileSystem.get(conf);
+
+                try {
+                    // GetContentSummary on the directory dirName
+                    ContentSummary contentSummary = fs.getContentSummary(new Path(dirName));
+
+                    long directoryCount = contentSummary.getDirectoryCount();
+                    Assert.assertTrue("Found unexpected number of directories; expected-count=3, actual-count=" + directoryCount, directoryCount == 3);
+                } catch (Exception e) {
+                    Assert.fail("Failed to getContentSummary, exception=" + e);
+                }
+                fs.close();
+                return null;
+            }
+        });
+    }
+
+    void createFile(String baseDir, Integer index) throws Exception {
+        FileSystem fileSystem = hdfsCluster.getFileSystem();
+
+        // Write a file - the AccessControlEnforcer won't be invoked as we are the "superuser"
+        String dirName = baseDir + (index != null ? String.valueOf(index) : "");
+        String fileName = dirName + "/dummy-data";
+        final Path file = new Path(fileName);
+        FSDataOutputStream out = fileSystem.create(file);
+        for (int i = 0; i < 1024; ++i) {
+            out.write(("data" + i + "\n").getBytes("UTF-8"));
+            out.flush();
+        }
+        out.close();
+
+        // Change permissions to read-only
+        fileSystem.setPermission(file, new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE));
+    }
 }

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/hdfs-agent/src/test/resources/hdfs-policies.json
----------------------------------------------------------------------
diff --git a/hdfs-agent/src/test/resources/hdfs-policies.json b/hdfs-agent/src/test/resources/hdfs-policies.json
index 6055dd5..056231f 100644
--- a/hdfs-agent/src/test/resources/hdfs-policies.json
+++ b/hdfs-agent/src/test/resources/hdfs-policies.json
@@ -215,6 +215,33 @@
       "createdBy": "Admin",
       "updatedBy": "Admin",
       "version": 1
+    },
+    {
+      "service": "cl1_hadoop",
+      "name": "/tmp/get-content-summary",
+      "policyType": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "path": {"values": ["/tmp/get-content-summary", "/tmp/get-content-summary/tmpdir1", "/tmp/get-content-summary/tmpdir2"], "isExcludes": false, "isRecursive": false}
+      },
+      "policyItems": [
+        {
+          "accesses": [{"type": "read","isAllowed": true}, {"type": "execute","isAllowed": true}],
+          "users": ["bob"],
+          "groups": ["IT"],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "id": 40,
+      "isEnabled": true,
+      "version": 1
     }
   ],
   "serviceDef": {

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/hdfs-agent/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hdfs-agent/src/test/resources/log4j.properties b/hdfs-agent/src/test/resources/log4j.properties
new file mode 100644
index 0000000..f7ab2ba
--- /dev/null
+++ b/hdfs-agent/src/test/resources/log4j.properties
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+##-- To prevent junits from cluttering the build run by default all test runs send output to null appender
+log4j.appender.devnull=org.apache.log4j.varia.NullAppender
+ranger.root.logger=FATAL,devnull
+
+##-- uncomment the following line during during development/debugging so see debug messages during test run to be emitted to console
+# ranger.root.logger=DEBUG,console
+log4j.rootLogger=${ranger.root.logger}
+
+# Logging Threshold
+log4j.threshold=ALL
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}: %m%n

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/hdfs-agent/src/test/resources/ranger-hdfs-security.xml
----------------------------------------------------------------------
diff --git a/hdfs-agent/src/test/resources/ranger-hdfs-security.xml b/hdfs-agent/src/test/resources/ranger-hdfs-security.xml
index 9b1176e..2c7f22f 100644
--- a/hdfs-agent/src/test/resources/ranger-hdfs-security.xml
+++ b/hdfs-agent/src/test/resources/ranger-hdfs-security.xml
@@ -42,4 +42,12 @@
 		</description>
 	</property>
 
+	<property>
+		<name>ranger.optimize-subaccess-authorization</name>
+		<value>true</value>
+		<description>
+			Enable skipping subdirectories if proper policy exists for getContentSummary and delete commands
+		</description>
+	</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizer.java
----------------------------------------------------------------------
diff --git a/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizer.java b/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizer.java
index 56ef187..85a865a 100644
--- a/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizer.java
+++ b/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizer.java
@@ -66,11 +66,14 @@ import org.apache.ranger.plugin.util.RangerAccessRequestUtil;
 
 import com.google.common.collect.Sets;
 
+import org.apache.ranger.plugin.util.RangerPerfTracer;
 import org.apache.ranger.plugin.util.RangerRequestedResources;
 
 public class RangerHiveAuthorizer extends RangerHiveAuthorizerBase {
 	private static final Log LOG = LogFactory.getLog(RangerHiveAuthorizer.class);
 
+	private static final Log PERF_HIVEAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("hiveauth.request");
+
 	private static final char COLUMN_SEP = ',';
 
 	private static final String HIVE_CONF_VAR_QUERY_STRING = "hive.query.string";
@@ -221,6 +224,8 @@ public class RangerHiveAuthorizer extends RangerHiveAuthorizerBase {
 
 		RangerHiveAuditHandler auditHandler = new RangerHiveAuditHandler();
 
+		RangerPerfTracer perf = null;
+
 		try {
 			HiveAuthzSessionContext sessionContext = getHiveAuthzSessionContext();
 			String                  user           = ugi.getShortUserName();
@@ -237,6 +242,10 @@ public class RangerHiveAuthorizer extends RangerHiveAuthorizerBase {
 				return;
 			}
 
+			if(RangerPerfTracer.isPerfTraceEnabled(PERF_HIVEAUTH_REQUEST_LOG)) {
+				perf = RangerPerfTracer.getPerfTracer(PERF_HIVEAUTH_REQUEST_LOG, "RangerHiveAuthorizer.checkPrivileges(hiveOpType=" + hiveOpType + ")");
+			}
+
 			List<RangerHiveAccessRequest> requests = new ArrayList<RangerHiveAccessRequest>();
 
 			if(!CollectionUtils.isEmpty(inputHObjs)) {
@@ -420,6 +429,7 @@ public class RangerHiveAuthorizer extends RangerHiveAuthorizerBase {
 			}
 		} finally {
 			auditHandler.flushAudit();
+			RangerPerfTracer.log(perf);
 		}
 	}
 
@@ -439,7 +449,13 @@ public class RangerHiveAuthorizer extends RangerHiveAuthorizerBase {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(String.format("==> filterListCmdObjects(%s, %s)", objs, context));
 		}
-		
+
+		RangerPerfTracer perf = null;
+
+		if(RangerPerfTracer.isPerfTraceEnabled(PERF_HIVEAUTH_REQUEST_LOG)) {
+			perf = RangerPerfTracer.getPerfTracer(PERF_HIVEAUTH_REQUEST_LOG, "RangerHiveAuthorizer.filterListCmdObjects()");
+		}
+
 		List<HivePrivilegeObject> ret = null;
 
 		// bail out early if nothing is there to validate!
@@ -509,6 +525,8 @@ public class RangerHiveAuthorizer extends RangerHiveAuthorizerBase {
 			}
 		}
 
+		RangerPerfTracer.log(perf);
+
 		if (LOG.isDebugEnabled()) {
 			int count = ret == null ? 0 : ret.size();
 			LOG.debug(String.format("<== filterListCmdObjects: count[%d], ret[%s]", count, ret));
@@ -524,6 +542,12 @@ public class RangerHiveAuthorizer extends RangerHiveAuthorizerBase {
 			LOG.debug("==> applyRowFilterAndColumnMasking(" + queryContext + ", objCount=" + hiveObjs.size() + ")");
 		}
 
+		RangerPerfTracer perf = null;
+
+		if(RangerPerfTracer.isPerfTraceEnabled(PERF_HIVEAUTH_REQUEST_LOG)) {
+			perf = RangerPerfTracer.getPerfTracer(PERF_HIVEAUTH_REQUEST_LOG, "RangerHiveAuthorizer.applyRowFilterAndColumnMasking()");
+		}
+
 		if(CollectionUtils.isNotEmpty(hiveObjs)) {
 			for (HivePrivilegeObject hiveObj : hiveObjs) {
 				HivePrivilegeObjectType hiveObjType = hiveObj.getType();
@@ -576,6 +600,8 @@ public class RangerHiveAuthorizer extends RangerHiveAuthorizerBase {
 			}
 		}
 
+		RangerPerfTracer.log(perf);
+
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("<== applyRowFilterAndColumnMasking(" + queryContext + ", objCount=" + hiveObjs.size() + "): retCount=" + ret.size());
 		}

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/knox-agent/src/main/java/org/apache/ranger/authorization/knox/RangerPDPKnoxFilter.java
----------------------------------------------------------------------
diff --git a/knox-agent/src/main/java/org/apache/ranger/authorization/knox/RangerPDPKnoxFilter.java b/knox-agent/src/main/java/org/apache/ranger/authorization/knox/RangerPDPKnoxFilter.java
index 1d58b21..24e8702 100644
--- a/knox-agent/src/main/java/org/apache/ranger/authorization/knox/RangerPDPKnoxFilter.java
+++ b/knox-agent/src/main/java/org/apache/ranger/authorization/knox/RangerPDPKnoxFilter.java
@@ -42,11 +42,14 @@ import org.apache.hadoop.gateway.security.PrimaryPrincipal;
 import org.apache.ranger.audit.provider.MiscUtil;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+import org.apache.ranger.plugin.util.RangerPerfTracer;
 
 public class RangerPDPKnoxFilter implements Filter {
 
 	private static final Log LOG = LogFactory.getLog(RangerPDPKnoxFilter.class);
 
+	private static final Log PERF_KNOXAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("knoxauth.request");
+
 	private static final String KNOX_GATEWAY_JASS_CONFIG_SECTION = "com.sun.security.jgss.initiate";
 
 	private String resourceRole = null;
@@ -93,6 +96,12 @@ public class RangerPDPKnoxFilter implements Filter {
 		String topologyName = getTopologyName(sourceUrl);
 		String serviceName = getServiceName();
 
+		RangerPerfTracer perf = null;
+
+		if(RangerPerfTracer.isPerfTraceEnabled(PERF_KNOXAUTH_REQUEST_LOG)) {
+			perf = RangerPerfTracer.getPerfTracer(PERF_KNOXAUTH_REQUEST_LOG, "RangerPDPKnoxFilter.doFilter(url=" + sourceUrl + ", topologyName=" + topologyName + ")");
+		}
+
 		Subject subject = Subject.getSubject(AccessController.getContext());
 
 		Principal primaryPrincipal = (Principal) subject.getPrincipals(
@@ -151,6 +160,8 @@ public class RangerPDPKnoxFilter implements Filter {
 			LOG.debug("Access allowed: " + accessAllowed);
 		}
 
+		RangerPerfTracer.log(perf);
+
 		if (accessAllowed) {
 			chain.doFilter(request, response);
 		} else {

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java b/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java
index 9712f95..acd1111 100644
--- a/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java
+++ b/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java
@@ -26,15 +26,18 @@ import org.apache.atlas.authorize.AtlasAccessRequest;
 import org.apache.atlas.authorize.AtlasAuthorizationException;
 import org.apache.atlas.authorize.AtlasAuthorizer;
 import org.apache.atlas.authorize.AtlasResourceTypes;
+import org.apache.commons.logging.Log;
 import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
 import org.apache.ranger.plugin.service.RangerBasePlugin;
+import org.apache.ranger.plugin.util.RangerPerfTracer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RangerAtlasAuthorizer implements AtlasAuthorizer {
     private static final Logger LOG = LoggerFactory.getLogger(RangerAtlasAuthorizer.class);
+    private static final Log PERF_ATLASAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("atlasauth.request");
     private static boolean isDebugEnabled = LOG.isDebugEnabled();
     private static volatile RangerBasePlugin atlasPlugin = null;
 
@@ -70,6 +73,11 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer {
         if (isDebugEnabled) {
             LOG.debug("==> isAccessAllowed( " + request + " )");
         }
+        RangerPerfTracer perf = null;
+
+        if(RangerPerfTracer.isPerfTraceEnabled(PERF_ATLASAUTH_REQUEST_LOG)) {
+            perf = RangerPerfTracer.getPerfTracer(PERF_ATLASAUTH_REQUEST_LOG, "RangerAtlasAuthorizer.isAccessAllowed(request=" + request + ")");
+        }
 
         String resource = request.getResource();
         String user = request.getUser();
@@ -93,6 +101,8 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer {
             }
         }
 
+        RangerPerfTracer.log(perf);
+
         if (isDebugEnabled) {
             LOG.debug("<== isAccessAllowed Returning value :: " + isAccessAllowed);
         }

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index b6e052f..b3d5a74 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -42,12 +42,14 @@ import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
 import org.apache.ranger.plugin.service.RangerBasePlugin;
 
+import org.apache.ranger.plugin.util.RangerPerfTracer;
 import scala.collection.immutable.HashSet;
 import scala.collection.immutable.Set;
 
 public class RangerKafkaAuthorizer implements Authorizer {
 	private static final Log logger = LogFactory
 			.getLog(RangerKafkaAuthorizer.class);
+	private static final Log PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
 
 	public static final String KEY_TOPIC = "topic";
 	public static final String KEY_CLUSTER = "cluster";
@@ -130,6 +132,11 @@ public class RangerKafkaAuthorizer implements Authorizer {
 			return true;
 		}
 
+		RangerPerfTracer perf = null;
+
+		if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
+			perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
+		}
 		String userName = null;
 		if (session.principal() != null) {
 			userName = session.principal().getName();
@@ -204,6 +211,8 @@ public class RangerKafkaAuthorizer implements Authorizer {
 						+ rangerRequest, t);
 			}
 		}
+		RangerPerfTracer.log(perf);
+
 		if (logger.isDebugEnabled()) {
 			logger.debug("rangerRequest=" + rangerRequest + ", return="
 					+ returnValue);

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/plugin-kms/src/main/java/org/apache/ranger/authorization/kms/authorizer/RangerKmsAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-kms/src/main/java/org/apache/ranger/authorization/kms/authorizer/RangerKmsAuthorizer.java b/plugin-kms/src/main/java/org/apache/ranger/authorization/kms/authorizer/RangerKmsAuthorizer.java
index 4cda8fa..c3d75a1 100755
--- a/plugin-kms/src/main/java/org/apache/ranger/authorization/kms/authorizer/RangerKmsAuthorizer.java
+++ b/plugin-kms/src/main/java/org/apache/ranger/authorization/kms/authorizer/RangerKmsAuthorizer.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.kms.server.KMSACLsType;
 import org.apache.hadoop.crypto.key.kms.server.KMSConfiguration;
@@ -46,6 +48,7 @@ import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
 import org.apache.ranger.plugin.service.RangerBasePlugin;
+import org.apache.ranger.plugin.util.RangerPerfTracer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +56,7 @@ import com.google.common.collect.Sets;
 
 public class RangerKmsAuthorizer implements Runnable, KeyACLs {
 	  private static final Logger LOG = LoggerFactory.getLogger(RangerKmsAuthorizer.class);
+	private static final Log PERF_KMSAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kmsauth.request");
 
 	  private static final String KMS_USER_PRINCIPAL = "ranger.ks.kerberos.principal";
 	  private static final String KMS_USER_KEYTAB = "ranger.ks.kerberos.keytab";
@@ -200,6 +204,11 @@ public class RangerKmsAuthorizer implements Runnable, KeyACLs {
 		  if(LOG.isDebugEnabled()) {
 				LOG.debug("==> RangerKmsAuthorizer.hasAccess(" + type + ", " + ugi + ")");
 			}
+		  RangerPerfTracer perf = null;
+
+		  if(RangerPerfTracer.isPerfTraceEnabled(PERF_KMSAUTH_REQUEST_LOG)) {
+			  perf = RangerPerfTracer.getPerfTracer(PERF_KMSAUTH_REQUEST_LOG, "RangerKmsAuthorizer.hasAccess(type=" + type + ")");
+		  }
 			boolean ret = false;
 			RangerKMSPlugin plugin = kmsPlugin;
 			String rangerAccessType = getRangerAccessType(type);
@@ -215,7 +224,7 @@ public class RangerKmsAuthorizer implements Runnable, KeyACLs {
 				RangerAccessResult result = plugin.isAccessAllowed(request);
 				ret = result == null ? false : result.getIsAllowed();
 			}
-			
+			RangerPerfTracer.log(perf);
 			if(LOG.isDebugEnabled()) {
 				LOG.debug("<== RangerkmsAuthorizer.hasAccess(" + type + ", " + ugi + "): " + ret);
 			}

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/plugin-solr/src/main/java/org/apache/ranger/authorization/solr/authorizer/RangerSolrAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-solr/src/main/java/org/apache/ranger/authorization/solr/authorizer/RangerSolrAuthorizer.java b/plugin-solr/src/main/java/org/apache/ranger/authorization/solr/authorizer/RangerSolrAuthorizer.java
index 0cc9de9..e833d1e 100644
--- a/plugin-solr/src/main/java/org/apache/ranger/authorization/solr/authorizer/RangerSolrAuthorizer.java
+++ b/plugin-solr/src/main/java/org/apache/ranger/authorization/solr/authorizer/RangerSolrAuthorizer.java
@@ -39,6 +39,7 @@ import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
 import org.apache.ranger.plugin.service.RangerBasePlugin;
+import org.apache.ranger.plugin.util.RangerPerfTracer;
 import org.apache.solr.security.AuthorizationContext.RequestType;
 import org.apache.solr.security.AuthorizationPlugin;
 import org.apache.solr.security.AuthorizationResponse;
@@ -48,6 +49,7 @@ import org.apache.solr.security.AuthorizationContext.CollectionRequest;
 public class RangerSolrAuthorizer implements AuthorizationPlugin {
 	private static final Log logger = LogFactory
 			.getLog(RangerSolrAuthorizer.class);
+	private static final Log PERF_SOLRAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("solrauth.request");
 
 	public static final String PROP_USE_PROXY_IP = "xasecure.solr.use_proxy_ip";
 	public static final String PROP_PROXY_IP_HEADER = "xasecure.solr.proxy_ip_header";
@@ -168,6 +170,12 @@ public class RangerSolrAuthorizer implements AuthorizationPlugin {
 
 			RangerMultiResourceAuditHandler auditHandler = new RangerMultiResourceAuditHandler();
 
+			RangerPerfTracer perf = null;
+
+			if(RangerPerfTracer.isPerfTraceEnabled(PERF_SOLRAUTH_REQUEST_LOG)) {
+				perf = RangerPerfTracer.getPerfTracer(PERF_SOLRAUTH_REQUEST_LOG, "RangerSolrAuthorizer.authorize()");
+			}
+
 			String userName = getUserName(context);
 			Set<String> userGroups = getGroupsForUser(userName);
 			String ip = null;
@@ -217,6 +225,7 @@ public class RangerSolrAuthorizer implements AuthorizationPlugin {
 				}
 			} finally {
 				auditHandler.flushAudit();
+				RangerPerfTracer.log(perf);
 			}
 		} catch (Throwable t) {
 			isDenied = true;

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizer.java b/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizer.java
index c589060..b448830 100644
--- a/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizer.java
+++ b/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizer.java
@@ -44,6 +44,7 @@ import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
 import org.apache.ranger.plugin.service.RangerBasePlugin;
 
 import com.google.common.collect.Sets;
+import org.apache.ranger.plugin.util.RangerPerfTracer;
 
 public class RangerYarnAuthorizer extends YarnAuthorizationProvider {
 	public static final String ACCESS_TYPE_ADMIN_QUEUE = "admin-queue";
@@ -56,6 +57,8 @@ public class RangerYarnAuthorizer extends YarnAuthorizationProvider {
 
 	private static final Log LOG = LogFactory.getLog(RangerYarnAuthorizer.class);
 
+	private static final Log PERF_YARNAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("yarnauth.request");
+
 	private static volatile RangerYarnPlugin yarnPlugin = null;
 
 	private AccessControlList admins = null;
@@ -101,7 +104,15 @@ public class RangerYarnAuthorizer extends YarnAuthorizationProvider {
 		RangerAccessResult     result       = null;
 		String				   clusterName  = yarnPlugin.getClusterName();
 
+		RangerPerfTracer perf = null;
+		RangerPerfTracer yarnAclPerf = null;
+
 		if(plugin != null) {
+
+			if(RangerPerfTracer.isPerfTraceEnabled(PERF_YARNAUTH_REQUEST_LOG)) {
+				perf = RangerPerfTracer.getPerfTracer(PERF_YARNAUTH_REQUEST_LOG, "RangerYarnAuthorizer.checkPermission(entity=" + entity + ")");
+			}
+
 			RangerYarnAccessRequest request = new RangerYarnAccessRequest(entity, getRangerAccessType(accessType), accessType.name(), ugi, clusterName);
 
 			auditHandler = new RangerYarnAuditHandler();
@@ -110,6 +121,11 @@ public class RangerYarnAuthorizer extends YarnAuthorizationProvider {
 		}
 
 		if(RangerYarnAuthorizer.yarnAuthEnabled && (result == null || !result.getIsAccessDetermined())) {
+
+			if(RangerPerfTracer.isPerfTraceEnabled(PERF_YARNAUTH_REQUEST_LOG)) {
+				yarnAclPerf = RangerPerfTracer.getPerfTracer(PERF_YARNAUTH_REQUEST_LOG, "RangerYarnNativeAuthorizer.isAllowedByYarnAcl(entity=" + entity + ")");
+			}
+
 			ret = isAllowedByYarnAcl(accessType, entity, ugi, auditHandler);
 		} else {
 			ret = result == null ? false : result.getIsAllowed();
@@ -119,6 +135,10 @@ public class RangerYarnAuthorizer extends YarnAuthorizationProvider {
 			auditHandler.flushAudit();
 		}
 
+		RangerPerfTracer.log(yarnAclPerf);
+
+		RangerPerfTracer.log(perf);
+
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("<== RangerYarnAuthorizer.checkPermission(" + accessType + ", " + toString(entity) + ", " + ugi + "): " + ret);
 		}

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/ranger-tools/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/ranger-tools/conf/log4j.properties b/ranger-tools/conf/log4j.properties
index 4ead802..e95a6c8 100644
--- a/ranger-tools/conf/log4j.properties
+++ b/ranger-tools/conf/log4j.properties
@@ -31,6 +31,7 @@ log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}: %m%
 ranger.perf.logger=DEBUG,PERF
 ranger.perf.log.file=ranger-perf-test.log
 
+log4j.logger.org.apache.ranger.plugin.util.PerfDataRecorder=${ranger.perf.logger}
 log4j.logger.org.apache.ranger.perf=${ranger.perf.logger}
 log4j.additivity.org.apache.ranger.perf=false
 

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/ranger-tools/src/main/java/org/apache/ranger/policyengine/RangerPolicyenginePerfTester.java
----------------------------------------------------------------------
diff --git a/ranger-tools/src/main/java/org/apache/ranger/policyengine/RangerPolicyenginePerfTester.java b/ranger-tools/src/main/java/org/apache/ranger/policyengine/RangerPolicyenginePerfTester.java
index 056c548..78cbe02 100644
--- a/ranger-tools/src/main/java/org/apache/ranger/policyengine/RangerPolicyenginePerfTester.java
+++ b/ranger-tools/src/main/java/org/apache/ranger/policyengine/RangerPolicyenginePerfTester.java
@@ -59,6 +59,7 @@ public class RangerPolicyenginePerfTester {
             RangerPolicyEngineOptions policyEngineOptions = new RangerPolicyEngineOptions();
             policyEngineOptions.disableTagPolicyEvaluation = false;
             policyEngineOptions.evaluatorType = RangerPolicyEvaluator.EVALUATOR_TYPE_OPTIMIZED;
+            policyEngineOptions.cacheAuditResults = false;
             policyEngineOptions.disableTrieLookupPrefilter = perfTestOptions.getIsTrieLookupPrefixDisabled();
 
             PerfTestEngine perfTestEngine = new PerfTestEngine(servicePoliciesFileURL, policyEngineOptions, perfTestOptions.getIsDynamicReorderingDisabled());

http://git-wip-us.apache.org/repos/asf/ranger/blob/96b0c486/storm-agent/src/main/java/org/apache/ranger/authorization/storm/authorizer/RangerStormAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-agent/src/main/java/org/apache/ranger/authorization/storm/authorizer/RangerStormAuthorizer.java b/storm-agent/src/main/java/org/apache/ranger/authorization/storm/authorizer/RangerStormAuthorizer.java
index 23c2b5f..0fe658e 100644
--- a/storm-agent/src/main/java/org/apache/ranger/authorization/storm/authorizer/RangerStormAuthorizer.java
+++ b/storm-agent/src/main/java/org/apache/ranger/authorization/storm/authorizer/RangerStormAuthorizer.java
@@ -23,12 +23,14 @@ import java.security.Principal;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ranger.audit.provider.MiscUtil;
 import org.apache.ranger.authorization.storm.StormRangerPlugin;
 import org.apache.ranger.authorization.utils.StringUtil;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+import org.apache.ranger.plugin.util.RangerPerfTracer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +45,8 @@ public class RangerStormAuthorizer implements IAuthorizer {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RangerStormAuthorizer.class);
 
+	private static final Log PERF_STORMAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("stormauth.request");
+
 	private static final String STORM_CLIENT_JASS_CONFIG_SECTION = "StormClient";
 
 	private static volatile StormRangerPlugin plugin = null;
@@ -51,9 +55,9 @@ public class RangerStormAuthorizer implements IAuthorizer {
 
 	/**
      * permit() method is invoked for each incoming Thrift request.
-     * @param context request context includes info about
-     * @param operation operation name
-     * @param topology_storm configuration of targeted topology
+     * @param aRequestContext request context includes info about
+     * @param aOperationName operation name
+     * @param aTopologyConfigMap configuration of targeted topology
      * @return true if the request is authorized, false if reject
      */
 	
@@ -64,8 +68,15 @@ public class RangerStormAuthorizer implements IAuthorizer {
 		boolean isAuditEnabled = false;
 
 		String topologyName = null;
-		
+
+		RangerPerfTracer perf = null;
+
 		try {
+
+			if(RangerPerfTracer.isPerfTraceEnabled(PERF_STORMAUTH_REQUEST_LOG)) {
+				perf = RangerPerfTracer.getPerfTracer(PERF_STORMAUTH_REQUEST_LOG, "RangerStormAuthorizer.permit()");
+			}
+
 			topologyName = (aTopologyConfigMap == null ? "" : (String)aTopologyConfigMap.get(Config.TOPOLOGY_NAME));
 	
 			if (LOG.isDebugEnabled()) {
@@ -130,6 +141,7 @@ public class RangerStormAuthorizer implements IAuthorizer {
 			LOG.error("RangerStormAuthorizer found this exception", t);
 		}
 		finally {
+			RangerPerfTracer.log(perf);
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("[req "+ aRequestContext.requestID()+ "] Access "
 		                + " from: [" + aRequestContext.remoteAddress() + "]"
@@ -144,7 +156,7 @@ public class RangerStormAuthorizer implements IAuthorizer {
 	
 	/**
      * Invoked once immediately after construction
-     * @param conf Storm configuration
+     * @param aStormConfigMap Storm configuration
      */
 
 	@Override