You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2019/03/07 21:05:30 UTC
[ranger] branch master updated: RANGER-2349: Provide an API to
download policies and tags
This is an automated email from the ASF dual-hosted git repository.
abhay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push:
new 9c668fb RANGER-2349: Provide an API to download policies and tags
9c668fb is described below
commit 9c668fbdab6a4f327faaf9fc090200c5902e34ed
Author: Abhay Kulkarni <>
AuthorDate: Thu Mar 7 12:55:00 2019 -0800
RANGER-2349: Provide an API to download policies and tags
---
.../plugin/contextenricher/RangerTagEnricher.java | 59 +++++++++-----
.../ranger/plugin/service/RangerAuthContext.java | 6 +-
.../ranger/plugin/service/RangerBasePlugin.java | 93 ++++++++++++++++++++--
.../apache/ranger/plugin/util/DownloadTrigger.java | 36 +++++++++
.../apache/ranger/plugin/util/DownloaderTask.java | 47 +++++++++++
.../apache/ranger/plugin/util/PolicyRefresher.java | 27 ++-----
6 files changed, 221 insertions(+), 47 deletions(-)
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java b/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
index 028efe8..fbf0360 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
@@ -36,6 +36,8 @@ import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
import org.apache.ranger.plugin.policyengine.RangerAccessResource;
import org.apache.ranger.plugin.policyresourcematcher.RangerDefaultPolicyResourceMatcher;
import org.apache.ranger.plugin.policyresourcematcher.RangerPolicyResourceMatcher;
+import org.apache.ranger.plugin.util.DownloadTrigger;
+import org.apache.ranger.plugin.util.DownloaderTask;
import org.apache.ranger.plugin.service.RangerAuthContext;
import org.apache.ranger.plugin.service.RangerBasePlugin;
import org.apache.ranger.plugin.util.RangerAccessRequestUtil;
@@ -57,6 +59,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
public class RangerTagEnricher extends RangerAbstractContextEnricher {
private static final Log LOG = LogFactory.getLog(RangerTagEnricher.class);
@@ -75,6 +80,9 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
private EnrichedServiceTags enrichedServiceTags;
private boolean disableCacheIfServiceNotFound = true;
+ private final BlockingQueue<DownloadTrigger> tagDownloadQueue = new LinkedBlockingQueue<>();
+ private Timer tagDownloadTimer;
+
@Override
public void init() {
if (LOG.isDebugEnabled()) {
@@ -121,7 +129,7 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
tagRetriever.setAppId(appId);
tagRetriever.init(enricherDef.getEnricherOptions());
- tagRefresher = new RangerTagRefresher(tagRetriever, this, -1L, cacheFile, pollingIntervalMs);
+ tagRefresher = new RangerTagRefresher(tagRetriever, this, -1L, tagDownloadQueue, cacheFile);
try {
tagRefresher.populateTags();
@@ -130,6 +138,19 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
}
tagRefresher.setDaemon(true);
tagRefresher.startRefresher();
+
+ tagDownloadTimer = new Timer("policyDownloadTimer", true);
+
+ try {
+ tagDownloadTimer.schedule(new DownloaderTask(tagDownloadQueue), pollingIntervalMs, pollingIntervalMs);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduled tagDownloadRefresher to download tags every " + pollingIntervalMs + " milliseconds");
+ }
+ } catch (IllegalStateException exception) {
+ LOG.error("Error scheduling tagDownloadTimer:", exception);
+ LOG.error("*** Tags will NOT be downloaded every " + pollingIntervalMs + " milliseconds ***");
+ tagDownloadTimer = null;
+ }
}
} else {
LOG.error("No value specified for " + TAG_RETRIEVER_CLASSNAME_OPTION + " in the RangerTagEnricher options");
@@ -178,7 +199,6 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
LOG.debug("<== RangerTagEnricher.enrich(" + request + ") with dataStore:[" + dataStore + "]): tags count=" + (matchedTags == null ? 0 : matchedTags.size()));
}
}
-
/*
* This class implements a cache of result of look-up of keyset of policy-resources for each of the collections of hierarchies
* for policy types: access, datamask and rowfilter. If a keyset is examined for validity in a hierarchy of a policy-type,
@@ -321,6 +341,11 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
super.preCleanup();
+ if (tagDownloadTimer != null) {
+ tagDownloadTimer.cancel();
+ tagDownloadTimer = null;
+ }
+
if (tagRefresher != null) {
tagRefresher.cleanup();
tagRefresher = null;
@@ -332,6 +357,12 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
return ret;
}
+ public void syncTagsWithAdmin(final DownloadTrigger token) throws InterruptedException {
+ tagDownloadQueue.put(token);
+ token.waitForCompletion();
+ }
+
+
private Set<RangerTagForEval> findMatchingTags(final RangerAccessRequest request, EnrichedServiceTags dataStore) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerTagEnricher.findMatchingTags(" + request + ")");
@@ -530,24 +561,19 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
private final RangerTagRetriever tagRetriever;
private final RangerTagEnricher tagEnricher;
private long lastKnownVersion = -1L;
+ private final BlockingQueue<DownloadTrigger> tagDownloadQueue;
private long lastActivationTimeInMillis;
- private final long pollingIntervalMs;
private final String cacheFile;
private boolean hasProvidedTagsToReceiver;
private Gson gson;
-
- final long getPollingIntervalMs() {
- return pollingIntervalMs;
- }
-
- RangerTagRefresher(RangerTagRetriever tagRetriever, RangerTagEnricher tagEnricher, long lastKnownVersion, String cacheFile, long pollingIntervalMs) {
+ RangerTagRefresher(RangerTagRetriever tagRetriever, RangerTagEnricher tagEnricher, long lastKnownVersion, BlockingQueue<DownloadTrigger> tagDownloadQueue, String cacheFile) {
this.tagRetriever = tagRetriever;
this.tagEnricher = tagEnricher;
this.lastKnownVersion = lastKnownVersion;
+ this.tagDownloadQueue = tagDownloadQueue;
this.cacheFile = cacheFile;
- this.pollingIntervalMs = pollingIntervalMs;
try {
gson = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
} catch(Throwable excp) {
@@ -567,30 +593,25 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
public void run() {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> RangerTagRefresher(pollingIntervalMs=" + pollingIntervalMs + ").run()");
+ LOG.debug("==> RangerTagRefresher().run()");
}
while (true) {
try {
-
- // Sleep first and then fetch tags
- if (pollingIntervalMs > 0) {
- Thread.sleep(pollingIntervalMs);
- } else {
- break;
- }
RangerPerfTracer perf = null;
if(RangerPerfTracer.isPerfTraceEnabled(PERF_CONTEXTENRICHER_INIT_LOG)) {
perf = RangerPerfTracer.getPerfTracer(PERF_CONTEXTENRICHER_INIT_LOG, "RangerTagRefresher.populateTags(serviceName=" + tagRetriever.getServiceName() + ",lastKnownVersion=" + lastKnownVersion + ")");
}
+ DownloadTrigger trigger = tagDownloadQueue.take();
populateTags();
+ trigger.signalCompletion();
RangerPerfTracer.log(perf);
} catch (InterruptedException excp) {
- LOG.debug("RangerTagRefresher(pollingIntervalMs=" + pollingIntervalMs + ").run() : interrupted! Exiting thread", excp);
+ LOG.debug("RangerTagRefresher().run() : interrupted! Exiting thread", excp);
break;
}
}
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerAuthContext.java b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerAuthContext.java
index 8b00144..b2cccef 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerAuthContext.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerAuthContext.java
@@ -38,10 +38,10 @@ import org.apache.ranger.plugin.util.RangerAccessRequestUtil;
import org.apache.ranger.plugin.util.ServicePolicies;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
public class RangerAuthContext implements RangerPolicyEngine {
private RangerPolicyEngine policyEngine;
@@ -61,7 +61,7 @@ public class RangerAuthContext implements RangerPolicyEngine {
this.policyEngine = other.getPolicyEngine();
Map<RangerContextEnricher, Object> localReference = other.requestContextEnrichers;
if (MapUtils.isNotEmpty(localReference)) {
- this.requestContextEnrichers = new HashMap<>(localReference);
+ this.requestContextEnrichers = new ConcurrentHashMap<>(localReference);
}
}
}
@@ -77,7 +77,7 @@ public class RangerAuthContext implements RangerPolicyEngine {
public void addOrReplaceRequestContextEnricher(RangerContextEnricher enricher, Object database) {
if (requestContextEnrichers == null) {
- requestContextEnrichers = new HashMap<>();
+ requestContextEnrichers = new ConcurrentHashMap<>();
}
requestContextEnrichers.put(enricher, database);
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
index 96ca317..e52d4de 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
@@ -25,11 +25,14 @@ import java.util.Collection;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,6 +43,8 @@ import org.apache.ranger.audit.provider.AuditProviderFactory;
import org.apache.ranger.audit.provider.StandAloneAuditProviderFactory;
import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
import org.apache.ranger.authorization.utils.StringUtil;
+import org.apache.ranger.plugin.contextenricher.RangerContextEnricher;
+import org.apache.ranger.plugin.contextenricher.RangerTagEnricher;
import org.apache.ranger.plugin.model.RangerPolicy;
import org.apache.ranger.plugin.model.RangerServiceDef;
import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
@@ -52,6 +57,8 @@ import org.apache.ranger.plugin.policyengine.RangerPolicyEngineImpl;
import org.apache.ranger.plugin.policyengine.RangerPolicyEngineOptions;
import org.apache.ranger.plugin.policyengine.RangerResourceAccessInfo;
import org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil;
+import org.apache.ranger.plugin.util.DownloadTrigger;
+import org.apache.ranger.plugin.util.DownloaderTask;
import org.apache.ranger.plugin.util.GrantRevokeRequest;
import org.apache.ranger.plugin.util.PolicyRefresher;
import org.apache.ranger.plugin.util.ServicePolicies;
@@ -76,13 +83,17 @@ public class RangerBasePlugin {
private RangerAccessResultProcessor resultProcessor;
private boolean useForwardedIPAddress;
private String[] trustedProxyAddresses;
+ private Timer policyDownloadTimer;
private Timer policyEngineRefreshTimer;
private RangerAuthContextListener authContextListener;
private AuditProviderFactory auditProviderFactory;
+ private final BlockingQueue<DownloadTrigger> policyDownloadQueue = new LinkedBlockingQueue<>();
+ private final DownloadTrigger accessTrigger = new DownloadTrigger();
+
Map<String, LogHistory> logHistoryList = new Hashtable<String, RangerBasePlugin.LogHistory>();
- int logInterval = 30000; // 30 seconds
+ int logInterval = 30000; // 30 seconds
public static Map<String, RangerBasePlugin> getServicePluginMap() {
return servicePluginMap;
@@ -183,7 +194,6 @@ public class RangerBasePlugin {
String cacheDir = configuration.get(propertyPrefix + ".policy.cache.dir");
serviceName = configuration.get(propertyPrefix + ".service.name");
clusterName = RangerConfiguration.getInstance().get(propertyPrefix + ".ambari.cluster.name", "");
-
useForwardedIPAddress = configuration.getBoolean(propertyPrefix + ".use.x-forwarded-for.ipaddress", false);
String trustedProxyAddressString = configuration.get(propertyPrefix + ".trusted.proxy.ipaddresses");
trustedProxyAddresses = StringUtils.split(trustedProxyAddressString, RANGER_TRUSTED_PROXY_IPADDRESSES_SEPARATOR_CHAR);
@@ -220,10 +230,23 @@ public class RangerBasePlugin {
RangerAdminClient admin = createAdminClient(serviceName, appId, propertyPrefix);
- refresher = new PolicyRefresher(this, serviceType, appId, serviceName, admin, pollingIntervalMs, cacheDir);
+ refresher = new PolicyRefresher(this, serviceType, appId, serviceName, admin, policyDownloadQueue, cacheDir);
refresher.setDaemon(true);
refresher.startRefresher();
+ policyDownloadTimer = new Timer("policyDownloadTimer", true);
+
+ try {
+ policyDownloadTimer.schedule(new DownloaderTask(policyDownloadQueue), pollingIntervalMs, pollingIntervalMs);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduled policyDownloadRefresher to download policies every " + pollingIntervalMs + " milliseconds");
+ }
+ } catch (IllegalStateException exception) {
+ LOG.error("Error scheduling policyDownloadTimer:", exception);
+ LOG.error("*** Policies will NOT be downloaded every " + pollingIntervalMs + " milliseconds ***");
+ policyDownloadTimer = null;
+ }
+
long policyReorderIntervalMs = configuration.getLong(propertyPrefix + ".policy.policyReorderInterval", 60 * 1000);
if (policyReorderIntervalMs >= 0 && policyReorderIntervalMs < 15 * 1000) {
policyReorderIntervalMs = 15 * 1000;
@@ -365,17 +388,24 @@ public class RangerBasePlugin {
Timer policyEngineRefreshTimer = this.policyEngineRefreshTimer;
+ Timer policyDownloadTimer = this.policyDownloadTimer;
+
String serviceName = this.serviceName;
this.serviceName = null;
this.policyEngine = null;
this.refresher = null;
this.policyEngineRefreshTimer = null;
+ this.policyDownloadTimer = null;
if (refresher != null) {
refresher.stopRefresher();
}
+ if (policyDownloadTimer != null) {
+ policyDownloadTimer.cancel();
+ }
+
if (policyEngineRefreshTimer != null) {
policyEngineRefreshTimer.cancel();
}
@@ -528,6 +558,7 @@ public class RangerBasePlugin {
public void registerAuthContextEventListener(RangerAuthContextListener authContextListener) {
this.authContextListener = authContextListener;
}
+
public void unregisterAuthContextEventListener(RangerAuthContextListener authContextListener) {
this.authContextListener = null;
}
@@ -572,6 +603,31 @@ public class RangerBasePlugin {
return ret;
}
+ public void refreshPoliciesAndTags() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> refreshPoliciesAndTags()");
+ }
+ try {
+ // Synch-up policies
+ long oldPolicyVersion = this.policyEngine.getPolicyVersion();
+ syncPoliciesWithAdmin(accessTrigger);
+ long newPolicyVersion = this.policyEngine.getPolicyVersion();
+
+ if (oldPolicyVersion == newPolicyVersion) {
+ // Synch-up tags
+ RangerTagEnricher tagEnricher = getTagEnricher();
+ if (tagEnricher != null) {
+ tagEnricher.syncTagsWithAdmin(accessTrigger);
+ }
+ }
+ } catch (InterruptedException exception) {
+ LOG.error("Failed to update policy-engine, continuing to use old policy-engine and/or tags", exception);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.info("<== refreshPoliciesAndTags()");
+ }
+ }
+
private void auditGrantRevoke(GrantRevokeRequest request, String action, boolean isSuccess, RangerAccessResultProcessor resultProcessor) {
if(request != null && resultProcessor != null) {
RangerAccessRequestImpl accessRequest = new RangerAccessRequestImpl();
@@ -602,7 +658,7 @@ public class RangerBasePlugin {
}
}
- public RangerServiceDef getDefaultServiceDef() {
+ private RangerServiceDef getDefaultServiceDef() {
RangerServiceDef ret = null;
if (StringUtils.isNotBlank(serviceType)) {
@@ -652,7 +708,7 @@ public class RangerBasePlugin {
return false;
}
- static class LogHistory {
+ static private final class LogHistory {
long lastLogTime;
int counter;
}
@@ -672,4 +728,29 @@ public class RangerBasePlugin {
}
}
}
+
+ private void syncPoliciesWithAdmin(final DownloadTrigger token) throws InterruptedException{
+ policyDownloadQueue.put(token);
+ token.waitForCompletion();
+ }
+
+ private RangerTagEnricher getTagEnricher() {
+ RangerTagEnricher ret = null;
+ RangerAuthContext authContext = getCurrentRangerAuthContext();
+ if (authContext != null) {
+ Map<RangerContextEnricher, Object> contextEnricherMap = authContext.getRequestContextEnrichers();
+ if (MapUtils.isNotEmpty(contextEnricherMap)) {
+ Set<RangerContextEnricher> contextEnrichers = contextEnricherMap.keySet();
+
+ for (RangerContextEnricher enricher : contextEnrichers) {
+ if (enricher instanceof RangerTagEnricher) {
+ ret = (RangerTagEnricher) enricher;
+ break;
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
}
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloadTrigger.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloadTrigger.java
new file mode 100644
index 0000000..f5754f0
--- /dev/null
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloadTrigger.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.plugin.util;
+
+public final class DownloadTrigger {
+ private boolean isNotified = false;
+
+ public synchronized void waitForCompletion() throws InterruptedException {
+ while (!isNotified) {
+ wait();
+ }
+ isNotified = false;
+ }
+
+ public synchronized void signalCompletion() {
+ isNotified = true;
+ notifyAll();
+ }
+}
\ No newline at end of file
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloaderTask.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloaderTask.java
new file mode 100644
index 0000000..1345f6f
--- /dev/null
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloaderTask.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.plugin.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+
+public final class DownloaderTask extends TimerTask {
+ private static final Log LOG = LogFactory.getLog(DownloaderTask.class);
+
+ private final DownloadTrigger timerTrigger = new DownloadTrigger();
+ private final BlockingQueue<DownloadTrigger> queue;
+
+ public DownloaderTask(BlockingQueue<DownloadTrigger> queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ try {
+ queue.put(timerTrigger);
+ timerTrigger.waitForCompletion();
+ } catch (InterruptedException excp) {
+ LOG.error("Caught exception. Exiting thread");
+ }
+ }
+}
\ No newline at end of file
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
index e85612a..0e52c31 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
@@ -24,6 +24,7 @@ import java.io.FileReader;
import java.io.FileWriter;
import java.io.Reader;
import java.io.Writer;
+import java.util.concurrent.BlockingQueue;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -45,18 +46,19 @@ public class PolicyRefresher extends Thread {
private final String serviceType;
private final String serviceName;
private final RangerAdminClient rangerAdmin;
+ private final BlockingQueue<DownloadTrigger> policyDownloadQueue;
+
private final String cacheFileName;
private final String cacheDir;
private final Gson gson;
private final boolean disableCacheIfServiceNotFound;
- private long pollingIntervalMs = 30 * 1000;
private long lastKnownVersion = -1L;
private long lastActivationTimeInMillis;
private boolean policiesSetInPlugin;
private boolean serviceDefSetInPlugin;
- public PolicyRefresher(RangerBasePlugin plugIn, String serviceType, String appId, String serviceName, RangerAdminClient rangerAdmin, long pollingIntervalMs, String cacheDir) {
+ public PolicyRefresher(RangerBasePlugin plugIn, String serviceType, String appId, String serviceName, RangerAdminClient rangerAdmin, BlockingQueue<DownloadTrigger> policyDownloadQueue, String cacheDir) {
if(LOG.isDebugEnabled()) {
LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").PolicyRefresher()");
}
@@ -65,7 +67,7 @@ public class PolicyRefresher extends Thread {
this.serviceType = serviceType;
this.serviceName = serviceName;
this.rangerAdmin = rangerAdmin;
- this.pollingIntervalMs = pollingIntervalMs;
+ this.policyDownloadQueue = policyDownloadQueue;
if(StringUtils.isEmpty(appId)) {
appId = serviceType;
@@ -122,20 +124,6 @@ public class PolicyRefresher extends Thread {
return rangerAdmin;
}
- /**
- * @return the pollingIntervalMilliSeconds
- */
- public long getPollingIntervalMs() {
- return pollingIntervalMs;
- }
-
- /**
- * @param pollingIntervalMilliSeconds the pollingIntervalMilliSeconds to set
- */
- public void setPollingIntervalMilliSeconds(long pollingIntervalMilliSeconds) {
- this.pollingIntervalMs = pollingIntervalMilliSeconds;
- }
-
public long getLastActivationTimeInMillis() {
return lastActivationTimeInMillis;
}
@@ -168,9 +156,10 @@ public class PolicyRefresher extends Thread {
}
while(true) {
- loadPolicy();
try {
- Thread.sleep(pollingIntervalMs);
+ DownloadTrigger trigger = policyDownloadQueue.take();
+ loadPolicy();
+ trigger.signalCompletion();
} catch(InterruptedException excp) {
LOG.info("PolicyRefresher(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp);
break;