You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2018/09/27 05:14:54 UTC

[1/2] ranger git commit: RANGER-2188: Support multiple threads to build Trie and on-lookup post-setup for Trie nodes

Repository: ranger
Updated Branches:
  refs/heads/ranger-0.7 29801e0e5 -> d533e10af


RANGER-2188: Support multiple threads to build Trie and on-lookup post-setup for Trie nodes


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/660707eb
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/660707eb
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/660707eb

Branch: refs/heads/ranger-0.7
Commit: 660707eb3ae25a4b9666e28e9de0fc2cef72c1c1
Parents: 29801e0
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Tue Aug 14 10:42:42 2018 -0700
Committer: Abhay Kulkarni <ak...@hortonworks.com>
Committed: Wed Sep 26 22:11:08 2018 -0700

----------------------------------------------------------------------
 .../contextenricher/RangerTagEnricher.java      |   9 +
 .../policyengine/RangerPolicyEngineOptions.java |  15 +-
 .../policyengine/RangerPolicyRepository.java    |  25 +-
 .../ranger/plugin/util/RangerResourceTrie.java  | 386 ++++++++++++++++---
 .../plugin/policyengine/TestPolicyEngine.java   |   8 +-
 5 files changed, 382 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/660707eb/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java b/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
index 858a7a4..8634bf2 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
@@ -60,6 +60,8 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 	private static final Log LOG = LogFactory.getLog(RangerTagEnricher.class);
 
 	private static final Log PERF_CONTEXTENRICHER_INIT_LOG = RangerPerfTracer.getPerfLogger("contextenricher.init");
+	private static final Log PERF_TRIE_OP_LOG = RangerPerfTracer.getPerfLogger("resourcetrie.retrieval");
+
 
 	public static final String TAG_REFRESHER_POLLINGINTERVAL_OPTION = "tagRefresherPollingInterval";
 	public static final String TAG_RETRIEVER_CLASSNAME_OPTION       = "tagRetrieverClassName";
@@ -366,6 +368,12 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 		if (resource == null || resource.getKeys() == null || resource.getKeys().isEmpty() || serviceResourceTrie == null) {
 			ret = enrichedServiceTags.getServiceResourceMatchers();
 		} else {
+			RangerPerfTracer perf = null;
+
+			if(RangerPerfTracer.isPerfTraceEnabled(PERF_TRIE_OP_LOG)) {
+				perf = RangerPerfTracer.getPerfTracer(PERF_TRIE_OP_LOG, "RangerTagEnricher.getEvaluators(resource=" + resource.getAsString() + ")");
+			}
+
 			Set<String> resourceKeys = resource.getKeys();
 			List<List<RangerServiceResourceMatcher>> serviceResourceMatchersList = null;
 			List<RangerServiceResourceMatcher> smallestList = null;
@@ -417,6 +425,7 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 					ret = smallestList;
 				}
 			}
+			RangerPerfTracer.logAlways(perf);
 		}
 
 		if(ret == null) {

http://git-wip-us.apache.org/repos/asf/ranger/blob/660707eb/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java
index 22a63fd..269ad1e 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java
@@ -33,6 +33,7 @@ public class RangerPolicyEngineOptions {
 	public boolean cacheAuditResults = true;
 	public boolean evaluateDelegateAdminOnly = false;
 	public boolean enableTagEnricherWithLocalRefresher = false;
+	public boolean optimizeTrieForRetrieval = true;
 
 	public void configureForPlugin(Configuration conf, String propertyPrefix) {
 		disableContextEnrichers = conf.getBoolean(propertyPrefix + ".policyengine.option.disable.context.enrichers", false);
@@ -47,6 +48,8 @@ public class RangerPolicyEngineOptions {
 		}
 		evaluateDelegateAdminOnly = false;
 		enableTagEnricherWithLocalRefresher = false;
+		optimizeTrieForRetrieval = conf.getBoolean(propertyPrefix + ".policyengine.option.optimize.trie.for.retrieval", true);
+
 	}
 
 	public void configureDefaultRangerAdmin(Configuration conf, String propertyPrefix) {
@@ -58,6 +61,8 @@ public class RangerPolicyEngineOptions {
 		cacheAuditResults = false;
 		evaluateDelegateAdminOnly = false;
 		enableTagEnricherWithLocalRefresher = false;
+		optimizeTrieForRetrieval = conf.getBoolean(propertyPrefix + ".policyengine.option.optimize.trie.for.retrieval", false);
+
 	}
 
 	public void configureDelegateAdmin(Configuration conf, String propertyPrefix) {
@@ -65,6 +70,8 @@ public class RangerPolicyEngineOptions {
 		disableCustomConditions = conf.getBoolean(propertyPrefix + ".policyengine.option.disable.custom.conditions", true);
 		disableTagPolicyEvaluation = conf.getBoolean(propertyPrefix + ".policyengine.option.disable.tagpolicy.evaluation", true);
 		disableTrieLookupPrefilter = conf.getBoolean(propertyPrefix + ".policyengine.option.disable.trie.lookup.prefilter", false);
+		optimizeTrieForRetrieval = conf.getBoolean(propertyPrefix + ".policyengine.option.optimize.trie.for.retrieval", false);
+
 
 		cacheAuditResults = false;
 		evaluateDelegateAdminOnly = true;
@@ -77,6 +84,8 @@ public class RangerPolicyEngineOptions {
 		disableCustomConditions = conf.getBoolean(propertyPrefix + ".policyengine.option.disable.custom.conditions", true);
 		disableTagPolicyEvaluation = conf.getBoolean(propertyPrefix + ".policyengine.option.disable.tagpolicy.evaluation", false);
 		disableTrieLookupPrefilter = conf.getBoolean(propertyPrefix + ".policyengine.option.disable.trie.lookup.prefilter", false);
+		optimizeTrieForRetrieval = conf.getBoolean(propertyPrefix + ".policyengine.option.optimize.trie.for.retrieval", false);
+
 
 		cacheAuditResults = false;
 		evaluateDelegateAdminOnly = false;
@@ -99,7 +108,8 @@ public class RangerPolicyEngineOptions {
 					&& this.disableTrieLookupPrefilter == that.disableTrieLookupPrefilter
 					&& this.cacheAuditResults == that.cacheAuditResults
 					&& this.evaluateDelegateAdminOnly == that.evaluateDelegateAdminOnly
-					&& this.enableTagEnricherWithLocalRefresher == that.enableTagEnricherWithLocalRefresher;
+					&& this.enableTagEnricherWithLocalRefresher == that.enableTagEnricherWithLocalRefresher
+					&& this.optimizeTrieForRetrieval == that.optimizeTrieForRetrieval;
 		}
 		return ret;
 	}
@@ -121,6 +131,8 @@ public class RangerPolicyEngineOptions {
 		ret *= 2;
 		ret += enableTagEnricherWithLocalRefresher ? 1 : 0;
 		ret *= 2;
+		ret += optimizeTrieForRetrieval ? 1 : 0;
+		ret *= 2;
 		return ret;
 	}
 
@@ -132,6 +144,7 @@ public class RangerPolicyEngineOptions {
 				", disableContextEnrichers: " + disableContextEnrichers +
 				", disableCustomConditions: " + disableContextEnrichers +
 				", disableTrieLookupPrefilter: " + disableTrieLookupPrefilter +
+				", optimizeTrieForRetrieval: " + optimizeTrieForRetrieval +
 				" }";
 
 	}

http://git-wip-us.apache.org/repos/asf/ranger/blob/660707eb/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java
index a8fa292..78d68ad 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java
@@ -52,6 +52,7 @@ class RangerPolicyRepository {
     private static final Log LOG = LogFactory.getLog(RangerPolicyRepository.class);
 
     private static final Log PERF_CONTEXTENRICHER_INIT_LOG = RangerPerfTracer.getPerfLogger("contextenricher.init");
+    private static final Log PERF_TRIE_OP_LOG = RangerPerfTracer.getPerfLogger("resourcetrie.retrieval");
 
     enum AuditModeEnum {
         AUDIT_ALL, AUDIT_NONE, AUDIT_DEFAULT
@@ -146,9 +147,9 @@ class RangerPolicyRepository {
             dataMaskResourceTrie  = null;
             rowFilterResourceTrie = null;
         } else {
-            policyResourceTrie    = createResourceTrieMap(policyEvaluators);
-            dataMaskResourceTrie  = createResourceTrieMap(dataMaskPolicyEvaluators);
-            rowFilterResourceTrie = createResourceTrieMap(rowFilterPolicyEvaluators);
+            policyResourceTrie    = createResourceTrieMap(policyEvaluators, options.optimizeTrieForRetrieval);
+            dataMaskResourceTrie  = createResourceTrieMap(dataMaskPolicyEvaluators, options.optimizeTrieForRetrieval);
+            rowFilterResourceTrie = createResourceTrieMap(rowFilterPolicyEvaluators, options.optimizeTrieForRetrieval);
         }
     }
 
@@ -191,9 +192,9 @@ class RangerPolicyRepository {
             dataMaskResourceTrie  = null;
             rowFilterResourceTrie = null;
         } else {
-            policyResourceTrie    = createResourceTrieMap(policyEvaluators);
-            dataMaskResourceTrie  = createResourceTrieMap(dataMaskPolicyEvaluators);
-            rowFilterResourceTrie = createResourceTrieMap(rowFilterPolicyEvaluators);
+            policyResourceTrie    = createResourceTrieMap(policyEvaluators, options.optimizeTrieForRetrieval);
+            dataMaskResourceTrie  = createResourceTrieMap(dataMaskPolicyEvaluators, options.optimizeTrieForRetrieval);
+            rowFilterResourceTrie = createResourceTrieMap(rowFilterPolicyEvaluators, options.optimizeTrieForRetrieval);
         }
     }
 
@@ -257,6 +258,12 @@ class RangerPolicyRepository {
         List<RangerPolicyEvaluator> ret          = null;
         Set<String>                 resourceKeys = resource == null ? null : resource.getKeys();
 
+        RangerPerfTracer perf = null;
+
+        if(RangerPerfTracer.isPerfTraceEnabled(PERF_TRIE_OP_LOG)) {
+            perf = RangerPerfTracer.getPerfTracer(PERF_TRIE_OP_LOG, "RangerPolicyRepository.getLikelyMatchEvaluators(resource=" + resource.getAsString() + ")");
+        }
+
         if(CollectionUtils.isNotEmpty(resourceKeys)) {
             List<List<RangerPolicyEvaluator>> resourceEvaluatorsList = null;
             List<RangerPolicyEvaluator> smallestList = null;
@@ -313,6 +320,8 @@ class RangerPolicyRepository {
             ret = Collections.emptyList();
         }
 
+        RangerPerfTracer.logAlways(perf);
+
         if(LOG.isDebugEnabled()) {
             LOG.debug("<== RangerPolicyRepository.getLikelyMatchPolicyEvaluators(" + resource.getAsString() + "): evaluatorCount=" + ret.size());
         }
@@ -833,14 +842,14 @@ class RangerPolicyRepository {
         return ret;
     }
 
-    private Map<String, RangerResourceTrie> createResourceTrieMap(List<RangerPolicyEvaluator> evaluators) {
+    private Map<String, RangerResourceTrie> createResourceTrieMap(List<RangerPolicyEvaluator> evaluators, boolean optimizeTrieForRetrieval) {
         final Map<String, RangerResourceTrie> ret;
 
         if (CollectionUtils.isNotEmpty(evaluators) && serviceDef != null && CollectionUtils.isNotEmpty(serviceDef.getResources())) {
             ret = new HashMap<String, RangerResourceTrie>();
 
             for (RangerServiceDef.RangerResourceDef resourceDef : serviceDef.getResources()) {
-                ret.put(resourceDef.getName(), new RangerResourceTrie(resourceDef, evaluators, RangerPolicyEvaluator.EVAL_ORDER_COMPARATOR));
+                ret.put(resourceDef.getName(), new RangerResourceTrie(resourceDef, evaluators, RangerPolicyEvaluator.EVAL_ORDER_COMPARATOR, optimizeTrieForRetrieval));
             }
         } else {
             ret = null;

http://git-wip-us.apache.org/repos/asf/ranger/blob/660707eb/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerResourceTrie.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerResourceTrie.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerResourceTrie.java
index ece577d..506a7e1 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerResourceTrie.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerResourceTrie.java
@@ -24,6 +24,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
 import org.apache.ranger.plugin.model.RangerServiceDef;
 import org.apache.ranger.plugin.policyresourcematcher.RangerPolicyResourceEvaluator;
@@ -36,6 +37,8 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
     private static final Log LOG = LogFactory.getLog(RangerResourceTrie.class);
@@ -43,6 +46,7 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
     private static final Log PERF_TRIE_OP_LOG = RangerPerfTracer.getPerfLogger("resourcetrie.op");
 
     private static final String DEFAULT_WILDCARD_CHARS = "*?";
+    private static final String TRIE_BUILDER_THREAD_COUNT = "ranger.policyengine.trie.builder.thread.count";
 
     private final String        resourceName;
     private final boolean       optIgnoreCase;
@@ -50,14 +54,15 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
     private final String        wildcardChars;
     private final TrieNode<T>   root;
     private final Comparator<T> comparator;
+    private final boolean       isOptimizedForRetrieval;
 
     public RangerResourceTrie(RangerServiceDef.RangerResourceDef resourceDef, List<T> evaluators) {
-        this(resourceDef, evaluators, null);
+        this(resourceDef, evaluators, null, true);
     }
 
-    public RangerResourceTrie(RangerServiceDef.RangerResourceDef resourceDef, List<T> evaluators, Comparator<T> comparator) {
+    public RangerResourceTrie(RangerServiceDef.RangerResourceDef resourceDef, List<T> evaluators, Comparator<T> comparator, boolean isOptimizedForRetrieval) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("==> RangerResourceTrie(" + resourceDef.getName() + ", evaluatorCount=" + evaluators.size() + ")");
+            LOG.debug("==> RangerResourceTrie(" + resourceDef.getName() + ", evaluatorCount=" + evaluators.size() + ", isOptimizedForRetrieval=" + isOptimizedForRetrieval + ")");
         }
 
         RangerPerfTracer perf = null;
@@ -66,6 +71,15 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
             perf = RangerPerfTracer.getPerfTracer(PERF_TRIE_INIT_LOG, "RangerResourceTrie(name=" + resourceDef.getName() + ")");
         }
 
+        int builderThreadCount = RangerConfiguration.getInstance().getInt(TRIE_BUILDER_THREAD_COUNT, 1);
+
+        if (builderThreadCount < 1) {
+            builderThreadCount = 1;
+        }
+
+        LOG.info("builderThreadCount is set to ["+ builderThreadCount +"]");
+        PERF_TRIE_INIT_LOG.info("builderThreadCount is set to ["+ builderThreadCount +"]");
+
         Map<String, String> matcherOptions = resourceDef.getMatcherOptions();
 
         boolean optReplaceTokens = RangerAbstractResourceMatcher.getOptionReplaceTokens(matcherOptions);
@@ -86,59 +100,185 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
         this.optIgnoreCase = RangerAbstractResourceMatcher.getOptionIgnoreCase(matcherOptions);
         this.optWildcard   = RangerAbstractResourceMatcher.getOptionWildCard(matcherOptions);
         this.wildcardChars = optWildcard ? DEFAULT_WILDCARD_CHARS + tokenReplaceSpecialChars : "" + tokenReplaceSpecialChars;
-        this.root          = new TrieNode<>(null);
         this.comparator    = comparator;
+        this.isOptimizedForRetrieval = isOptimizedForRetrieval;
+
+        TrieNode<T> tmpRoot = buildTrie(resourceDef, evaluators, comparator, builderThreadCount);
+
+        if (builderThreadCount > 1 && tmpRoot == null) { // if multi-threaded trie-creation failed, build using a single thread
+            this.root = buildTrie(resourceDef, evaluators, comparator, 1);
+        } else {
+            this.root = tmpRoot;
+        }
+
+        RangerPerfTracer.logAlways(perf);
+
+        if (PERF_TRIE_INIT_LOG.isDebugEnabled()) {
+            PERF_TRIE_INIT_LOG.debug(toString());
+        }
+
+        if (PERF_TRIE_INIT_LOG.isTraceEnabled()) {
+            StringBuilder sb = new StringBuilder();
+            root.toString("", sb);
+            PERF_TRIE_INIT_LOG.trace("Trie Dump:\n{" + sb.toString() + "}");
+        }
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("<== RangerResourceTrie(" + resourceDef.getName() + ", evaluatorCount=" + evaluators.size() + ", isOptimizedForRetrieval=" + isOptimizedForRetrieval + "): " + toString());
+        }
+    }
+
+    public String getResourceName() {
+        return resourceName;
+    }
+
+    public List<T> getEvaluatorsForResource(Object resource) {
+        if (resource instanceof String) {
+            return getEvaluatorsForResource((String) resource);
+        } else if (resource instanceof Collection) {
+            if (CollectionUtils.isEmpty((Collection) resource)) {  // treat empty collection same as empty-string
+                return getEvaluatorsForResource("");
+            } else {
+                @SuppressWarnings("unchecked")
+                Collection<String> resources = (Collection<String>) resource;
+
+                return getEvaluatorsForResources(resources);
+            }
+        }
+
+        return null;
+    }
+
+    private TrieNode<T> buildTrie(RangerServiceDef.RangerResourceDef resourceDef, List<T> evaluators, Comparator<T> comparator, int builderThreadCount) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("==> buildTrie(" + resourceDef.getName() + ", evaluatorCount=" + evaluators.size() + ", isMultiThreaded=" + (builderThreadCount > 1) + ")");
+        }
+
+        TrieNode<T>                           ret                 = new TrieNode<>(null);
+        final boolean                         isMultiThreaded = builderThreadCount > 1;
+        final List<ResourceTrieBuilderThread> builderThreads;
+        final Map<Character, Integer>         builderThreadMap;
+        int                                   lastUsedThreadIndex = 0;
 
-        for(T evaluator : evaluators) {
+        if (isMultiThreaded) {
+            builderThreads = new ArrayList<>();
+            for (int i = 0; i < builderThreadCount; i++) {
+                ResourceTrieBuilderThread t = new ResourceTrieBuilderThread(isOptimizedForRetrieval);
+                builderThreads.add(t);
+                t.start();
+            }
+            builderThreadMap = new HashMap<>();
+        } else {
+            builderThreads = null;
+            builderThreadMap = null;
+        }
+
+        for (T evaluator : evaluators) {
             Map<String, RangerPolicyResource> policyResources = evaluator.getPolicyResource();
-            RangerPolicyResource              policyResource  = policyResources != null ? policyResources.get(resourceName) : null;
+            RangerPolicyResource policyResource = policyResources != null ? policyResources.get(resourceName) : null;
 
-            if(policyResource == null) {
-                if(evaluator.getLeafResourceLevel() != null && resourceDef.getLevel() != null && evaluator.getLeafResourceLevel() < resourceDef.getLevel()) {
-                    root.addWildcardEvaluator(evaluator);
+            if (policyResource == null) {
+                if (evaluator.getLeafResourceLevel() != null && resourceDef.getLevel() != null && evaluator.getLeafResourceLevel() < resourceDef.getLevel()) {
+                    ret.addWildcardEvaluator(evaluator);
                 }
 
                 continue;
             }
 
-            if(policyResource.getIsExcludes()) {
-                root.addWildcardEvaluator(evaluator);
+            if (policyResource.getIsExcludes()) {
+                ret.addWildcardEvaluator(evaluator);
             } else {
                 RangerResourceMatcher resourceMatcher = evaluator.getResourceMatcher(resourceName);
 
-                if(resourceMatcher != null && (resourceMatcher.isMatchAny())) {
-                    root.addWildcardEvaluator(evaluator);
+                if (resourceMatcher != null && (resourceMatcher.isMatchAny())) {
+                    ret.addWildcardEvaluator(evaluator);
                 } else {
-                    if(CollectionUtils.isNotEmpty(policyResource.getValues())) {
+                    if (CollectionUtils.isNotEmpty(policyResource.getValues())) {
                         for (String resource : policyResource.getValues()) {
-                            insert(resource, policyResource.getIsRecursive(), evaluator);
+                            if (!isMultiThreaded) {
+                                insert(ret, resource, policyResource.getIsRecursive(), evaluator);
+                            } else {
+                                try {
+                                    lastUsedThreadIndex = insert(ret, resource, policyResource.getIsRecursive(), evaluator, builderThreadMap, builderThreads, lastUsedThreadIndex);
+                                } catch (InterruptedException ex) {
+                                    LOG.error("Failed to dispatch " + resource + " to " + builderThreads.get(lastUsedThreadIndex));
+                                    LOG.error("Failing and retrying with one thread");
+
+                                    ret = null;
+
+                                    break;
+                                }
+                            }
+                        }
+                        if (ret == null) {
+                            break;
                         }
                     }
                 }
             }
         }
+        if (ret != null) {
+            if (isMultiThreaded) {
+                ret.setup(null, comparator);
+
+                for (ResourceTrieBuilderThread t : builderThreads) {
+                    t.setParentWildcardEvaluators(ret.wildcardEvaluators);
+                    try {
+                        // Send termination signal to each thread
+                        t.add("", false, null);
+                        // Wait for threads to finish work
+                        t.join();
+                        ret.getChildren().putAll(t.getSubtrees());
+                    } catch (InterruptedException ex) {
+                        LOG.error("BuilderThread " + t + " was interrupted:", ex);
+                        LOG.error("Failing and retrying with one thread");
+
+                        ret = null;
+
+                        break;
+                    }
+                }
+            } else {
+                if (isOptimizedForRetrieval) {
+                    RangerPerfTracer postSetupPerf = null;
 
-        root.postSetup(null, comparator);
+                    if (RangerPerfTracer.isPerfTraceEnabled(PERF_TRIE_INIT_LOG)) {
+                        postSetupPerf = RangerPerfTracer.getPerfTracer(PERF_TRIE_INIT_LOG, "RangerResourceTrie(name=" + resourceDef.getName() + "-postSetup)");
+                    }
 
-        RangerPerfTracer.logAlways(perf);
+                    ret.postSetup(null, comparator);
 
-        if (PERF_TRIE_INIT_LOG.isDebugEnabled()) {
-            PERF_TRIE_INIT_LOG.debug(toString());
+                    RangerPerfTracer.logAlways(postSetupPerf);
+                } else {
+                    ret.setup(null, comparator);
+                }
+            }
         }
 
-        if (PERF_TRIE_INIT_LOG.isTraceEnabled()) {
-            StringBuilder sb = new StringBuilder();
-            root.toString("", sb);
-            PERF_TRIE_INIT_LOG.trace("Trie Dump:\n{" + sb.toString() + "}");
+        if (isMultiThreaded) {
+            cleanUpThreads(builderThreads);
         }
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("<== RangerResourceTrie(" + resourceDef.getName() + ", evaluatorCount=" + evaluators.size() + "): " + toString());
+            LOG.debug("<== buildTrie(" + resourceDef.getName() + ", evaluatorCount=" + evaluators.size() + ", isMultiThreaded=" + isMultiThreaded + ") :" +  ret);
         }
+
+        return ret;
     }
 
-    public String getResourceName() {
-        return resourceName;
+    private void cleanUpThreads(List<ResourceTrieBuilderThread> builderThreads) {
+        if (CollectionUtils.isNotEmpty(builderThreads)) {
+            for (ResourceTrieBuilderThread t : builderThreads) {
+                try {
+                    if (t.isAlive()) {
+                        t.interrupt();
+                        t.join();
+                    }
+                } catch (InterruptedException ex) {
+                    LOG.error("Could not terminate thread " + t);
+                }
+            }
+        }
     }
 
     private TrieData getTrieData() {
@@ -162,18 +302,37 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
         return getLookupChar(str.charAt(index));
     }
 
-    private void insert(String resource, boolean isRecursive, T evaluator) {
+    private int insert(TrieNode<T> currentRoot, String resource, boolean isRecursive, T evaluator, Map<Character, Integer> builderThreadMap, List<ResourceTrieBuilderThread> builderThreads, int lastUsedThreadIndex) throws InterruptedException {
+        int          ret    = lastUsedThreadIndex;
+        final String prefix = getNonWildcardPrefix(resource);
+
+        if (StringUtils.isNotEmpty(prefix)) {
+            char    c     = getLookupChar(prefix.charAt(0));
+            Integer index = builderThreadMap.get(c);
+
+            if (index == null) {
+                ret = index = (lastUsedThreadIndex + 1) % builderThreads.size();
+                builderThreadMap.put(c, index);
+            }
+
+            builderThreads.get(index).add(resource, isRecursive, evaluator);
+        } else {
+            currentRoot.addWildcardEvaluator(evaluator);
+        }
+
+        return ret;
+    }
 
+    private void insert(TrieNode<T> currentRoot, String resource, boolean isRecursive, T evaluator) {
         RangerPerfTracer perf = null;
 
         if(RangerPerfTracer.isPerfTraceEnabled(PERF_TRIE_INIT_LOG)) {
             perf = RangerPerfTracer.getPerfTracer(PERF_TRIE_INIT_LOG, "RangerResourceTrie.insert(resource=" + resource + ")");
         }
 
-        TrieNode<T> curr       = root;
-
-        final String prefix       = getNonWildcardPrefix(resource);
-        final boolean isWildcard  = prefix.length() != resource.length();
+        TrieNode<T>   curr       = currentRoot;
+        final String  prefix     = getNonWildcardPrefix(resource);
+        final boolean isWildcard = prefix.length() != resource.length();
 
         if (StringUtils.isNotEmpty(prefix)) {
             curr = curr.getOrCreateChild(prefix);
@@ -189,14 +348,17 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
     }
 
     private String getNonWildcardPrefix(String str) {
-        if (!optWildcard) return str;
+
         int minIndex = str.length();
+
         for (int i = 0; i < wildcardChars.length(); i++) {
             int index = str.indexOf(wildcardChars.charAt(i));
+
             if (index != -1 && index < minIndex) {
                 minIndex = index;
             }
         }
+
         return str.substring(0, minIndex);
     }
 
@@ -211,12 +373,16 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
             perf = RangerPerfTracer.getPerfTracer(PERF_TRIE_OP_LOG, "RangerResourceTrie.getEvaluatorsForResource(resource=" + resource + ")");
         }
 
-        TrieNode<T> curr = root;
-
-        final int   len  = resource.length();
-        int         i    = 0;
+        TrieNode<T> curr   = root;
+        TrieNode<T> parent = null;
+        final int   len    = resource.length();
+        int         i      = 0;
 
         while (i < len) {
+            if (!isOptimizedForRetrieval) {
+                curr.setupIfNeeded(parent, comparator);
+            }
+
             final TrieNode<T> child = curr.getChild(getLookupChar(resource, i));
 
             if (child == null) {
@@ -229,10 +395,15 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
                 break;
             }
 
+            parent = curr;
             curr = child;
             i += childStr.length();
         }
 
+        if (!isOptimizedForRetrieval) {
+            curr.setupIfNeeded(parent, comparator);
+        }
+
         List<T> ret = i == len ? curr.getEvaluators() : curr.getWildcardEvaluators();
 
         RangerPerfTracer.logAlways(perf);
@@ -317,6 +488,91 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
         return sb.toString();
     }
 
+    class ResourceTrieBuilderThread extends Thread {
+
+        class WorkItem {
+            final String  resourceName;
+            final boolean isRecursive;
+            final T       evaluator;
+
+            WorkItem(String resourceName, boolean isRecursive, T evaluator) {
+                this.resourceName   = resourceName;
+                this.isRecursive    = isRecursive;
+                this.evaluator      = evaluator;
+            }
+            @Override
+            public String toString() {
+                return
+                "resourceName=" + resourceName +
+                "isRecursive=" + isRecursive +
+                "evaluator=" + (evaluator != null? evaluator.getId() : null);
+            }
+        }
+
+        private final   TrieNode<T>             thisRoot  = new TrieNode<>(null);
+        private final   BlockingQueue<WorkItem> workQueue = new LinkedBlockingQueue<>();
+        private final   boolean                 isOptimizedForRetrieval;
+        private         List<T>                 parentWildcardEvaluators;
+
+        ResourceTrieBuilderThread(boolean isOptimizedForRetrieval) {
+            this.isOptimizedForRetrieval = isOptimizedForRetrieval;
+        }
+
+        void add(String resourceName, boolean isRecursive, T evaluator) throws InterruptedException {
+            workQueue.put(new WorkItem(resourceName, isRecursive, evaluator));
+        }
+
+        void setParentWildcardEvaluators(List<T> parentWildcardEvaluators) {
+            this.parentWildcardEvaluators = parentWildcardEvaluators;
+        }
+
+        Map<Character, TrieNode<T>> getSubtrees() { return thisRoot.getChildren(); }
+
+        @Override
+        public void run() {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Running " + this);
+            }
+
+            while (true) {
+                final WorkItem workItem;
+
+                try {
+                    workItem = workQueue.take();
+                } catch (InterruptedException exception) {
+                    LOG.error("Thread=" + this + " is interrupted", exception);
+
+                    break;
+                }
+
+                if (workItem.evaluator != null) {
+                    insert(thisRoot, workItem.resourceName, workItem.isRecursive, workItem.evaluator);
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Received termination signal. " + workItem);
+                    }
+                    break;
+                }
+            }
+
+            if (!isInterrupted() && isOptimizedForRetrieval) {
+                RangerPerfTracer postSetupPerf = null;
+
+                if (RangerPerfTracer.isPerfTraceEnabled(PERF_TRIE_INIT_LOG)) {
+                    postSetupPerf = RangerPerfTracer.getPerfTracer(PERF_TRIE_INIT_LOG, "RangerResourceTrie(thread=" + this.getName() + "-postSetup)");
+                }
+
+                thisRoot.postSetup(parentWildcardEvaluators, comparator);
+
+                RangerPerfTracer.logAlways(postSetupPerf);
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Exiting " + this);
+            }
+        }
+    }
+
     class TrieData {
         int nodeCount;
         int leafNodeCount;
@@ -329,11 +585,12 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
     }
 
     class TrieNode<U extends RangerPolicyResourceEvaluator> {
-        private String str;
-        private Map<Character, TrieNode<U>> children = new HashMap<>();
-        private List<U> evaluators;
-        private List<U> wildcardEvaluators;
-        private boolean isSharingParentWildcardEvaluators;
+        private          String                      str;
+        private final    Map<Character, TrieNode<U>> children = new HashMap<>();
+        private          List<U>                     evaluators;
+        private          List<U>                     wildcardEvaluators;
+        private          boolean                     isSharingParentWildcardEvaluators;
+        private volatile boolean                     isSetup = false;
 
         TrieNode(String str) {
             this.str = str;
@@ -490,6 +747,38 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
         }
 
         void postSetup(List<U> parentWildcardEvaluators, Comparator<U> comparator) {
+
+            setup(parentWildcardEvaluators, comparator);
+
+            if (children != null) {
+                for (Map.Entry<Character, TrieNode<U>> entry : children.entrySet()) {
+                    TrieNode<U> child = entry.getValue();
+
+                    child.postSetup(wildcardEvaluators, comparator);
+                }
+            }
+        }
+
+        void setupIfNeeded(TrieNode<U> parent, Comparator<U> comparator) {
+            if (parent == null) {
+                return;
+            }
+
+            boolean setupNeeded = !isSetup;
+
+            if (setupNeeded) {
+                synchronized (this) {
+                    setupNeeded = !isSetup;
+
+                    if (setupNeeded) {
+                        setup(parent.getWildcardEvaluators(), comparator);
+                        isSetup = true;
+                    }
+                }
+            }
+        }
+
+        void setup(List<U> parentWildcardEvaluators, Comparator<U> comparator) {
             // finalize wildcard-evaluators list by including parent's wildcard evaluators
             if (parentWildcardEvaluators != null) {
                 if (CollectionUtils.isEmpty(this.wildcardEvaluators)) {
@@ -522,14 +811,6 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
                     evaluators.sort(comparator);
                 }
             }
-
-            if (children != null) {
-                for (Map.Entry<Character, TrieNode<U>> entry : children.entrySet()) {
-                    TrieNode<U> child = entry.getValue();
-
-                    child.postSetup(wildcardEvaluators, comparator);
-                }
-            }
         }
 
         public void toString(String prefix, StringBuilder sb) {
@@ -567,8 +848,11 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
         }
 
         public void clear() {
-            children = null;
-            evaluators = null;
+            if (children != null) {
+                children.clear();
+            }
+
+            evaluators         = null;
             wildcardEvaluators = null;
         }
     }

http://git-wip-us.apache.org/repos/asf/ranger/blob/660707eb/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyEngine.java
----------------------------------------------------------------------
diff --git a/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyEngine.java b/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyEngine.java
index a82df28..4586561 100644
--- a/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyEngine.java
+++ b/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyEngine.java
@@ -152,7 +152,11 @@ public class TestPolicyEngine {
 				"                <name>ranger.plugin.tag.attr.additional.date.formats</name>\n" +
 				"                <value>abcd||xyz||yyyy/MM/dd'T'HH:mm:ss.SSS'Z'</value>\n" +
 				"        </property>\n" +
-				"</configuration>\n");
+				"        <property>\n" +
+				"                <name>ranger.policyengine.trie.builder.thread.count</name>\n" +
+				"                <value>3</value>\n" +
+				"        </property>\n" +
+                "</configuration>\n");
 		writer.close();
 
 		RangerConfiguration config = RangerConfiguration.getInstance();
@@ -341,6 +345,7 @@ public class TestPolicyEngine {
 		RangerPolicyEngineOptions policyEngineOptions = new RangerPolicyEngineOptions();
 
 		policyEngineOptions.disableTagPolicyEvaluation = false;
+		policyEngineOptions.optimizeTrieForRetrieval = false;
 
 		boolean useForwardedIPAddress = RangerConfiguration.getInstance().getBoolean("ranger.plugin.hive.use.x-forwarded-for.ipaddress", false);
 		String trustedProxyAddressString = RangerConfiguration.getInstance().get("ranger.plugin.hive.trusted.proxy.ipaddresses");
@@ -353,6 +358,7 @@ public class TestPolicyEngine {
 		RangerPolicyEngine policyEngine = new RangerPolicyEngineImpl(testName, servicePolicies, policyEngineOptions);
 		policyEngine.setUseForwardedIPAddress(useForwardedIPAddress);
 		policyEngine.setTrustedProxyAddresses(trustedProxyAddresses);
+
 		long requestCount = 0L;
 
 		RangerAccessRequest request = null;


[2/2] ranger git commit: RANGER-2188: Support multiple threads to build Trie and on-lookup post-setup for Trie nodes - set default to lazy post-setup

Posted by ab...@apache.org.
RANGER-2188: Support multiple threads to build Trie and on-lookup post-setup for Trie nodes - set default to lazy post-setup


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/d533e10a
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/d533e10a
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/d533e10a

Branch: refs/heads/ranger-0.7
Commit: d533e10afbc6951b52967d3d4a906bff8a6ee013
Parents: 660707e
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Tue Aug 21 15:20:45 2018 -0700
Committer: Abhay Kulkarni <ak...@hortonworks.com>
Committed: Wed Sep 26 22:14:38 2018 -0700

----------------------------------------------------------------------
 .../ranger/plugin/policyengine/RangerPolicyEngineOptions.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/d533e10a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java
index 269ad1e..4c63f3b 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineOptions.java
@@ -33,7 +33,7 @@ public class RangerPolicyEngineOptions {
 	public boolean cacheAuditResults = true;
 	public boolean evaluateDelegateAdminOnly = false;
 	public boolean enableTagEnricherWithLocalRefresher = false;
-	public boolean optimizeTrieForRetrieval = true;
+	public boolean optimizeTrieForRetrieval = false;
 
 	public void configureForPlugin(Configuration conf, String propertyPrefix) {
 		disableContextEnrichers = conf.getBoolean(propertyPrefix + ".policyengine.option.disable.context.enrichers", false);
@@ -48,7 +48,7 @@ public class RangerPolicyEngineOptions {
 		}
 		evaluateDelegateAdminOnly = false;
 		enableTagEnricherWithLocalRefresher = false;
-		optimizeTrieForRetrieval = conf.getBoolean(propertyPrefix + ".policyengine.option.optimize.trie.for.retrieval", true);
+		optimizeTrieForRetrieval = conf.getBoolean(propertyPrefix + ".policyengine.option.optimize.trie.for.retrieval", false);
 
 	}