You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2019/03/07 21:05:30 UTC

[ranger] branch master updated: RANGER-2349: Provide an API to download policies and tags

This is an automated email from the ASF dual-hosted git repository.

abhay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c668fb  RANGER-2349: Provide an API to download policies and tags
9c668fb is described below

commit 9c668fbdab6a4f327faaf9fc090200c5902e34ed
Author: Abhay Kulkarni <>
AuthorDate: Thu Mar 7 12:55:00 2019 -0800

    RANGER-2349: Provide an API to download policies and tags
---
 .../plugin/contextenricher/RangerTagEnricher.java  | 59 +++++++++-----
 .../ranger/plugin/service/RangerAuthContext.java   |  6 +-
 .../ranger/plugin/service/RangerBasePlugin.java    | 93 ++++++++++++++++++++--
 .../apache/ranger/plugin/util/DownloadTrigger.java | 36 +++++++++
 .../apache/ranger/plugin/util/DownloaderTask.java  | 47 +++++++++++
 .../apache/ranger/plugin/util/PolicyRefresher.java | 27 ++-----
 6 files changed, 221 insertions(+), 47 deletions(-)

diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java b/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
index 028efe8..fbf0360 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
@@ -36,6 +36,8 @@ import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
 import org.apache.ranger.plugin.policyengine.RangerAccessResource;
 import org.apache.ranger.plugin.policyresourcematcher.RangerDefaultPolicyResourceMatcher;
 import org.apache.ranger.plugin.policyresourcematcher.RangerPolicyResourceMatcher;
+import org.apache.ranger.plugin.util.DownloadTrigger;
+import org.apache.ranger.plugin.util.DownloaderTask;
 import org.apache.ranger.plugin.service.RangerAuthContext;
 import org.apache.ranger.plugin.service.RangerBasePlugin;
 import org.apache.ranger.plugin.util.RangerAccessRequestUtil;
@@ -57,6 +59,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public class RangerTagEnricher extends RangerAbstractContextEnricher {
 	private static final Log LOG = LogFactory.getLog(RangerTagEnricher.class);
@@ -75,6 +80,9 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 	private EnrichedServiceTags                enrichedServiceTags;
 	private boolean                            disableCacheIfServiceNotFound = true;
 
+	private final BlockingQueue<DownloadTrigger> tagDownloadQueue = new LinkedBlockingQueue<>();
+	private Timer                              tagDownloadTimer;
+
 	@Override
 	public void init() {
 		if (LOG.isDebugEnabled()) {
@@ -121,7 +129,7 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 				tagRetriever.setAppId(appId);
 				tagRetriever.init(enricherDef.getEnricherOptions());
 
-				tagRefresher = new RangerTagRefresher(tagRetriever, this, -1L, cacheFile, pollingIntervalMs);
+				tagRefresher = new RangerTagRefresher(tagRetriever, this, -1L, tagDownloadQueue, cacheFile);
 
 				try {
 					tagRefresher.populateTags();
@@ -130,6 +138,19 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 				}
 				tagRefresher.setDaemon(true);
 				tagRefresher.startRefresher();
+
+				tagDownloadTimer = new Timer("policyDownloadTimer", true);
+
+				try {
+					tagDownloadTimer.schedule(new DownloaderTask(tagDownloadQueue), pollingIntervalMs, pollingIntervalMs);
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Scheduled tagDownloadRefresher to download tags every " + pollingIntervalMs + " milliseconds");
+					}
+				} catch (IllegalStateException exception) {
+					LOG.error("Error scheduling tagDownloadTimer:", exception);
+					LOG.error("*** Tags will NOT be downloaded every " + pollingIntervalMs + " milliseconds ***");
+					tagDownloadTimer = null;
+				}
 			}
 		} else {
 			LOG.error("No value specified for " + TAG_RETRIEVER_CLASSNAME_OPTION + " in the RangerTagEnricher options");
@@ -178,7 +199,6 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 			LOG.debug("<== RangerTagEnricher.enrich(" + request + ") with dataStore:[" + dataStore + "]): tags count=" + (matchedTags == null ? 0 : matchedTags.size()));
 		}
 	}
-
 	/*
 	 * This class implements a cache of result of look-up of keyset of policy-resources for each of the collections of hierarchies
 	 * for policy types: access, datamask and rowfilter. If a keyset is examined for validity in a hierarchy of a policy-type,
@@ -321,6 +341,11 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 
 		super.preCleanup();
 
+		if (tagDownloadTimer != null) {
+			tagDownloadTimer.cancel();
+			tagDownloadTimer = null;
+		}
+
 		if (tagRefresher != null) {
 			tagRefresher.cleanup();
 			tagRefresher = null;
@@ -332,6 +357,12 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 		return ret;
 	}
 
+	public void syncTagsWithAdmin(final DownloadTrigger token) throws InterruptedException {
+		tagDownloadQueue.put(token);
+		token.waitForCompletion();
+	}
+
+
 	private Set<RangerTagForEval> findMatchingTags(final RangerAccessRequest request, EnrichedServiceTags dataStore) {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> RangerTagEnricher.findMatchingTags(" + request + ")");
@@ -530,24 +561,19 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 		private final RangerTagRetriever tagRetriever;
 		private final RangerTagEnricher tagEnricher;
 		private long lastKnownVersion = -1L;
+		private final BlockingQueue<DownloadTrigger> tagDownloadQueue;
 		private long lastActivationTimeInMillis;
 
-		private final long pollingIntervalMs;
 		private final String cacheFile;
 		private boolean hasProvidedTagsToReceiver;
 		private Gson gson;
 
-
-		final long getPollingIntervalMs() {
-			return pollingIntervalMs;
-		}
-
-		RangerTagRefresher(RangerTagRetriever tagRetriever, RangerTagEnricher tagEnricher, long lastKnownVersion, String cacheFile, long pollingIntervalMs) {
+		RangerTagRefresher(RangerTagRetriever tagRetriever, RangerTagEnricher tagEnricher, long lastKnownVersion, BlockingQueue<DownloadTrigger> tagDownloadQueue, String cacheFile) {
 			this.tagRetriever = tagRetriever;
 			this.tagEnricher = tagEnricher;
 			this.lastKnownVersion = lastKnownVersion;
+			this.tagDownloadQueue = tagDownloadQueue;
 			this.cacheFile = cacheFile;
-			this.pollingIntervalMs = pollingIntervalMs;
 			try {
 				gson = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
 			} catch(Throwable excp) {
@@ -567,30 +593,25 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 		public void run() {
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("==> RangerTagRefresher(pollingIntervalMs=" + pollingIntervalMs + ").run()");
+				LOG.debug("==> RangerTagRefresher().run()");
 			}
 
 			while (true) {
 
 				try {
-
-					// Sleep first and then fetch tags
-					if (pollingIntervalMs > 0) {
-						Thread.sleep(pollingIntervalMs);
-					} else {
-						break;
-					}
 					RangerPerfTracer perf = null;
 
 					if(RangerPerfTracer.isPerfTraceEnabled(PERF_CONTEXTENRICHER_INIT_LOG)) {
 						perf = RangerPerfTracer.getPerfTracer(PERF_CONTEXTENRICHER_INIT_LOG, "RangerTagRefresher.populateTags(serviceName=" + tagRetriever.getServiceName() + ",lastKnownVersion=" + lastKnownVersion + ")");
 					}
+					DownloadTrigger trigger = tagDownloadQueue.take();
 					populateTags();
+					trigger.signalCompletion();
 
 					RangerPerfTracer.log(perf);
 
 				} catch (InterruptedException excp) {
-					LOG.debug("RangerTagRefresher(pollingIntervalMs=" + pollingIntervalMs + ").run() : interrupted! Exiting thread", excp);
+					LOG.debug("RangerTagRefresher().run() : interrupted! Exiting thread", excp);
 					break;
 				}
 			}
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerAuthContext.java b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerAuthContext.java
index 8b00144..b2cccef 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerAuthContext.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerAuthContext.java
@@ -38,10 +38,10 @@ import org.apache.ranger.plugin.util.RangerAccessRequestUtil;
 import org.apache.ranger.plugin.util.ServicePolicies;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class RangerAuthContext implements RangerPolicyEngine {
     private RangerPolicyEngine policyEngine;
@@ -61,7 +61,7 @@ public class RangerAuthContext implements RangerPolicyEngine {
 		    this.policyEngine = other.getPolicyEngine();
 		    Map<RangerContextEnricher, Object> localReference = other.requestContextEnrichers;
 		    if (MapUtils.isNotEmpty(localReference)) {
-			    this.requestContextEnrichers = new HashMap<>(localReference);
+			    this.requestContextEnrichers = new ConcurrentHashMap<>(localReference);
 		    }
 	    }
     }
@@ -77,7 +77,7 @@ public class RangerAuthContext implements RangerPolicyEngine {
 
     public void addOrReplaceRequestContextEnricher(RangerContextEnricher enricher, Object database) {
         if (requestContextEnrichers == null) {
-            requestContextEnrichers = new HashMap<>();
+            requestContextEnrichers = new ConcurrentHashMap<>();
         }
 
         requestContextEnrichers.put(enricher, database);
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
index 96ca317..e52d4de 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
@@ -25,11 +25,14 @@ import java.util.Collection;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,6 +43,8 @@ import org.apache.ranger.audit.provider.AuditProviderFactory;
 import org.apache.ranger.audit.provider.StandAloneAuditProviderFactory;
 import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
 import org.apache.ranger.authorization.utils.StringUtil;
+import org.apache.ranger.plugin.contextenricher.RangerContextEnricher;
+import org.apache.ranger.plugin.contextenricher.RangerTagEnricher;
 import org.apache.ranger.plugin.model.RangerPolicy;
 import org.apache.ranger.plugin.model.RangerServiceDef;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
@@ -52,6 +57,8 @@ import org.apache.ranger.plugin.policyengine.RangerPolicyEngineImpl;
 import org.apache.ranger.plugin.policyengine.RangerPolicyEngineOptions;
 import org.apache.ranger.plugin.policyengine.RangerResourceAccessInfo;
 import org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil;
+import org.apache.ranger.plugin.util.DownloadTrigger;
+import org.apache.ranger.plugin.util.DownloaderTask;
 import org.apache.ranger.plugin.util.GrantRevokeRequest;
 import org.apache.ranger.plugin.util.PolicyRefresher;
 import org.apache.ranger.plugin.util.ServicePolicies;
@@ -76,13 +83,17 @@ public class RangerBasePlugin {
 	private RangerAccessResultProcessor resultProcessor;
 	private boolean                   useForwardedIPAddress;
 	private String[]                  trustedProxyAddresses;
+	private Timer                     policyDownloadTimer;
 	private Timer                     policyEngineRefreshTimer;
 	private RangerAuthContextListener authContextListener;
 	private AuditProviderFactory      auditProviderFactory;
 
+	private final BlockingQueue<DownloadTrigger> policyDownloadQueue = new LinkedBlockingQueue<>();
+	private final DownloadTrigger                accessTrigger       = new DownloadTrigger();
+
 
 	Map<String, LogHistory> logHistoryList = new Hashtable<String, RangerBasePlugin.LogHistory>();
-	int logInterval = 30000; // 30 seconds
+	int                     logInterval    = 30000; // 30 seconds
 
 	public static Map<String, RangerBasePlugin> getServicePluginMap() {
 		return servicePluginMap;
@@ -183,7 +194,6 @@ public class RangerBasePlugin {
 		String cacheDir          = configuration.get(propertyPrefix + ".policy.cache.dir");
 		serviceName = configuration.get(propertyPrefix + ".service.name");
 		clusterName = RangerConfiguration.getInstance().get(propertyPrefix + ".ambari.cluster.name", "");
-
 		useForwardedIPAddress = configuration.getBoolean(propertyPrefix + ".use.x-forwarded-for.ipaddress", false);
 		String trustedProxyAddressString = configuration.get(propertyPrefix + ".trusted.proxy.ipaddresses");
 		trustedProxyAddresses = StringUtils.split(trustedProxyAddressString, RANGER_TRUSTED_PROXY_IPADDRESSES_SEPARATOR_CHAR);
@@ -220,10 +230,23 @@ public class RangerBasePlugin {
 
 		RangerAdminClient admin = createAdminClient(serviceName, appId, propertyPrefix);
 
-		refresher = new PolicyRefresher(this, serviceType, appId, serviceName, admin, pollingIntervalMs, cacheDir);
+		refresher = new PolicyRefresher(this, serviceType, appId, serviceName, admin, policyDownloadQueue, cacheDir);
 		refresher.setDaemon(true);
 		refresher.startRefresher();
 
+		policyDownloadTimer = new Timer("policyDownloadTimer", true);
+
+		try {
+			policyDownloadTimer.schedule(new DownloaderTask(policyDownloadQueue), pollingIntervalMs, pollingIntervalMs);
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Scheduled policyDownloadRefresher to download policies every " + pollingIntervalMs + " milliseconds");
+			}
+		} catch (IllegalStateException exception) {
+			LOG.error("Error scheduling policyDownloadTimer:", exception);
+			LOG.error("*** Policies will NOT be downloaded every " + pollingIntervalMs + " milliseconds ***");
+			policyDownloadTimer = null;
+		}
+
 		long policyReorderIntervalMs = configuration.getLong(propertyPrefix + ".policy.policyReorderInterval", 60 * 1000);
 		if (policyReorderIntervalMs >= 0 && policyReorderIntervalMs < 15 * 1000) {
 			policyReorderIntervalMs = 15 * 1000;
@@ -365,17 +388,24 @@ public class RangerBasePlugin {
 
 		Timer policyEngineRefreshTimer = this.policyEngineRefreshTimer;
 
+		Timer policyDownloadTimer = this.policyDownloadTimer;
+
 		String serviceName = this.serviceName;
 
 		this.serviceName  = null;
 		this.policyEngine = null;
 		this.refresher    = null;
 		this.policyEngineRefreshTimer = null;
+		this.policyDownloadTimer = null;
 
 		if (refresher != null) {
 			refresher.stopRefresher();
 		}
 
+		if (policyDownloadTimer != null) {
+			policyDownloadTimer.cancel();
+		}
+
 		if (policyEngineRefreshTimer != null) {
 			policyEngineRefreshTimer.cancel();
 		}
@@ -528,6 +558,7 @@ public class RangerBasePlugin {
 	public void registerAuthContextEventListener(RangerAuthContextListener authContextListener) {
 		this.authContextListener = authContextListener;
 	}
+
 	public void unregisterAuthContextEventListener(RangerAuthContextListener authContextListener) {
 		this.authContextListener = null;
 	}
@@ -572,6 +603,31 @@ public class RangerBasePlugin {
 		return ret;
 	}
 
+	public void refreshPoliciesAndTags() {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> refreshPoliciesAndTags()");
+		}
+		try {
+			// Synch-up policies
+			long oldPolicyVersion = this.policyEngine.getPolicyVersion();
+			syncPoliciesWithAdmin(accessTrigger);
+			long newPolicyVersion = this.policyEngine.getPolicyVersion();
+
+			if (oldPolicyVersion == newPolicyVersion) {
+				// Synch-up tags
+				RangerTagEnricher tagEnricher = getTagEnricher();
+				if (tagEnricher != null) {
+					tagEnricher.syncTagsWithAdmin(accessTrigger);
+				}
+			}
+		} catch (InterruptedException exception) {
+			LOG.error("Failed to update policy-engine, continuing to use old policy-engine and/or tags", exception);
+		}
+		if (LOG.isDebugEnabled()) {
+			LOG.info("<== refreshPoliciesAndTags()");
+		}
+	}
+
 	private void auditGrantRevoke(GrantRevokeRequest request, String action, boolean isSuccess, RangerAccessResultProcessor resultProcessor) {
 		if(request != null && resultProcessor != null) {
 			RangerAccessRequestImpl accessRequest = new RangerAccessRequestImpl();
@@ -602,7 +658,7 @@ public class RangerBasePlugin {
 		}
 	}
 
-	public RangerServiceDef getDefaultServiceDef() {
+	private RangerServiceDef getDefaultServiceDef() {
 		RangerServiceDef ret = null;
 
 		if (StringUtils.isNotBlank(serviceType)) {
@@ -652,7 +708,7 @@ public class RangerBasePlugin {
 		return false;
 	}
 
-	static class LogHistory {
+	static private final class LogHistory {
 		long lastLogTime;
 		int counter;
 	}
@@ -672,4 +728,29 @@ public class RangerBasePlugin {
 			}
 		}
 	}
+
+	private void syncPoliciesWithAdmin(final DownloadTrigger token) throws InterruptedException{
+		policyDownloadQueue.put(token);
+		token.waitForCompletion();
+	}
+
+	private RangerTagEnricher getTagEnricher() {
+		RangerTagEnricher ret = null;
+		RangerAuthContext authContext = getCurrentRangerAuthContext();
+		if (authContext != null) {
+			Map<RangerContextEnricher, Object> contextEnricherMap = authContext.getRequestContextEnrichers();
+			if (MapUtils.isNotEmpty(contextEnricherMap)) {
+				Set<RangerContextEnricher> contextEnrichers = contextEnricherMap.keySet();
+
+				for (RangerContextEnricher enricher : contextEnrichers) {
+					if (enricher instanceof RangerTagEnricher) {
+						ret = (RangerTagEnricher) enricher;
+						break;
+					}
+				}
+			}
+		}
+		return ret;
+	}
+
 }
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloadTrigger.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloadTrigger.java
new file mode 100644
index 0000000..f5754f0
--- /dev/null
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloadTrigger.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.plugin.util;
+
+public final class DownloadTrigger {
+    private boolean isNotified = false;
+
+    public synchronized void waitForCompletion() throws InterruptedException {
+        while (!isNotified) {
+            wait();
+        }
+        isNotified = false;
+    }
+
+    public synchronized void signalCompletion() {
+        isNotified = true;
+        notifyAll();
+    }
+}
\ No newline at end of file
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloaderTask.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloaderTask.java
new file mode 100644
index 0000000..1345f6f
--- /dev/null
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloaderTask.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.plugin.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+
+public final class  DownloaderTask extends TimerTask {
+    private static final Log LOG = LogFactory.getLog(DownloaderTask.class);
+
+    private final DownloadTrigger timerTrigger = new DownloadTrigger();
+    private final BlockingQueue<DownloadTrigger> queue;
+
+    public DownloaderTask(BlockingQueue<DownloadTrigger> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public void run() {
+        try {
+            queue.put(timerTrigger);
+            timerTrigger.waitForCompletion();
+        } catch (InterruptedException excp) {
+            LOG.error("Caught exception. Exiting thread");
+        }
+    }
+}
\ No newline at end of file
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
index e85612a..0e52c31 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
@@ -24,6 +24,7 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.Reader;
 import java.io.Writer;
+import java.util.concurrent.BlockingQueue;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -45,18 +46,19 @@ public class PolicyRefresher extends Thread {
 	private final String            serviceType;
 	private final String            serviceName;
 	private final RangerAdminClient rangerAdmin;
+	private final BlockingQueue<DownloadTrigger> policyDownloadQueue;
+
 	private final String            cacheFileName;
 	private final String            cacheDir;
 	private final Gson              gson;
 	private final boolean           disableCacheIfServiceNotFound;
 
-	private long 	pollingIntervalMs   = 30 * 1000;
 	private long 	lastKnownVersion    = -1L;
 	private long	lastActivationTimeInMillis;
 	private boolean policiesSetInPlugin;
 	private boolean serviceDefSetInPlugin;
 
-	public PolicyRefresher(RangerBasePlugin plugIn, String serviceType, String appId, String serviceName, RangerAdminClient rangerAdmin, long pollingIntervalMs, String cacheDir) {
+	public PolicyRefresher(RangerBasePlugin plugIn, String serviceType, String appId, String serviceName, RangerAdminClient rangerAdmin, BlockingQueue<DownloadTrigger> policyDownloadQueue, String cacheDir) {
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").PolicyRefresher()");
 		}
@@ -65,7 +67,7 @@ public class PolicyRefresher extends Thread {
 		this.serviceType       = serviceType;
 		this.serviceName       = serviceName;
 		this.rangerAdmin       = rangerAdmin;
-		this.pollingIntervalMs = pollingIntervalMs;
+		this.policyDownloadQueue = policyDownloadQueue;
 
 		if(StringUtils.isEmpty(appId)) {
 			appId = serviceType;
@@ -122,20 +124,6 @@ public class PolicyRefresher extends Thread {
 		return rangerAdmin;
 	}
 
-	/**
-	 * @return the pollingIntervalMilliSeconds
-	 */
-	public long getPollingIntervalMs() {
-		return pollingIntervalMs;
-	}
-
-	/**
-	 * @param pollingIntervalMilliSeconds the pollingIntervalMilliSeconds to set
-	 */
-	public void setPollingIntervalMilliSeconds(long pollingIntervalMilliSeconds) {
-		this.pollingIntervalMs = pollingIntervalMilliSeconds;
-	}
-
 	public long getLastActivationTimeInMillis() {
 		return lastActivationTimeInMillis;
 	}
@@ -168,9 +156,10 @@ public class PolicyRefresher extends Thread {
 		}
 
 		while(true) {
-			loadPolicy();
 			try {
-				Thread.sleep(pollingIntervalMs);
+				DownloadTrigger trigger = policyDownloadQueue.take();
+				loadPolicy();
+				trigger.signalCompletion();
 			} catch(InterruptedException excp) {
 				LOG.info("PolicyRefresher(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp);
 				break;