You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by rm...@apache.org on 2015/05/15 00:12:12 UTC
incubator-ranger git commit: Ranger-466: PolicyRefresher should
timeout when Ranger Admin is non responsive and should use local cache for
policy enforcement if present -Add on Changes
Repository: incubator-ranger
Updated Branches:
refs/heads/master 97f1d5d05 -> f9fb61102
Ranger-466: PolicyRefresher should timeout when Ranger Admin is non responsive and should use local cache for policy enforcement if present -Add on Changes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/f9fb6110
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/f9fb6110
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/f9fb6110
Branch: refs/heads/master
Commit: f9fb6110274803c949b6262270b288b0274aa299
Parents: 97f1d5d
Author: rmani <rm...@hortonworks.com>
Authored: Thu May 14 15:12:05 2015 -0700
Committer: rmani <rm...@hortonworks.com>
Committed: Thu May 14 15:12:05 2015 -0700
----------------------------------------------------------------------
.../ranger/plugin/util/PolicyRefresher.java | 181 +++++++++++--------
1 file changed, 104 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f9fb6110/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
index 341a65e..0729339 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
@@ -45,9 +45,9 @@ public class PolicyRefresher extends Thread {
private final String cacheFile;
private final Gson gson;
- private long pollingIntervalMs = 30 * 1000;
- private long lastKnownVersion = -1;
- private boolean policyCacheLoadedOnce = false;
+ private long pollingIntervalMs = 30 * 1000;
+ private long lastKnownVersion = -1;
+ private boolean policiesSetInPlugin = false;
public PolicyRefresher(RangerBasePlugin plugIn, String serviceType, String appId, String serviceName, RangerAdminClient rangerAdmin, long pollingIntervalMs, String cacheDir) {
@@ -128,6 +128,9 @@ public class PolicyRefresher extends Thread {
public void startRefresher() {
+
+ loadPolicy();
+
super.start();
}
@@ -142,122 +145,146 @@ public class PolicyRefresher extends Thread {
}
public void run() {
- boolean loadFromCacheFlag = false;
if(LOG.isDebugEnabled()) {
LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").run()");
}
while(true) {
+ loadPolicy();
try {
- ServicePolicies svcPolicies = rangerAdmin.getServicePoliciesIfUpdated(lastKnownVersion);
+ Thread.sleep(pollingIntervalMs);
+ } catch(InterruptedException excp) {
+ LOG.info("PolicyRefresher(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp);
+ break;
+ }
+ }
- boolean isUpdated = svcPolicies != null;
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").run()");
+ }
+ }
- if(isUpdated) {
- long newVersion = svcPolicies.getPolicyVersion() == null ? -1 : svcPolicies.getPolicyVersion().longValue();
+ private void loadPolicy() {
- if(!StringUtils.equals(serviceName, svcPolicies.getServiceName())) {
- LOG.warn("PolicyRefresher(serviceName=" + serviceName + "): ignoring unexpected serviceName '" + svcPolicies.getServiceName() + "' in service-store");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadPolicy()");
+ }
- svcPolicies.setServiceName(serviceName);
- }
+ //load policy from PolicyAmdin
+ ServicePolicies svcPolicies = loadPolicyfromPolicyAdmin();
+
+ if ( svcPolicies == null) {
+ //if Policy fetch from Policy Admin Fails, load from cache
+ if (!policiesSetInPlugin) {
+ svcPolicies = loadFromCache();
+ }
+ } else {
+ saveToCache(svcPolicies);
+ }
- LOG.info("PolicyRefresher(serviceName=" + serviceName + "): found updated version. lastKnownVersion=" + lastKnownVersion + "; newVersion=" + newVersion);
+ if (svcPolicies != null) {
+ plugIn.setPolicies(svcPolicies);
+ policiesSetInPlugin = true;
+ }
- saveToCache(svcPolicies);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").loadPolicy()");
+ }
+ }
- lastKnownVersion = newVersion;
+ private ServicePolicies loadPolicyfromPolicyAdmin() {
- plugIn.setPolicies(svcPolicies);
- } else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("PolicyRefresher(serviceName=" + serviceName + ").run(): no update found. lastKnownVersion=" + lastKnownVersion);
- }
- }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadPolicyfromPolicyAdmin()");
+ }
+
+ ServicePolicies svcPolicies = null;
+
+ try {
+ svcPolicies = rangerAdmin.getServicePoliciesIfUpdated(lastKnownVersion);
+
+ boolean isUpdated = svcPolicies != null;
- loadFromCacheFlag = false;
+ if(isUpdated) {
+ long newVersion = svcPolicies.getPolicyVersion() == null ? -1 : svcPolicies.getPolicyVersion().longValue();
- policyCacheLoadedOnce = false;
+ if(!StringUtils.equals(serviceName, svcPolicies.getServiceName())) {
+ LOG.warn("PolicyRefresher(serviceName=" + serviceName + "): ignoring unexpected serviceName '" + svcPolicies.getServiceName() + "' in service-store");
- } catch(Exception excp) {
- loadFromCacheFlag = true;
- LOG.error("PolicyRefresher(serviceName=" + serviceName + "): failed to refresh policies. Will continue to use last known version of policies (" + lastKnownVersion + ")", excp);
- } finally {
- if (loadFromCacheFlag && !policyCacheLoadedOnce) {
- //If ConnectionTime or PolicyAdmin down, fetch the Policy from Local Cache and load
- LOG.info("PolicyRefresher(serviceName=" + serviceName + "): failed to refresh policy from Policy Manager. Loading Policies from local Cache. lastKnownVersion=" + lastKnownVersion);
- loadFromCache();
- policyCacheLoadedOnce = true;
+ svcPolicies.setServiceName(serviceName);
}
- }
- try {
- Thread.sleep(pollingIntervalMs);
- } catch(InterruptedException excp) {
- LOG.info("PolicyRefresher(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp);
+ LOG.info("PolicyRefresher(serviceName=" + serviceName + "): found updated version. lastKnownVersion=" + lastKnownVersion + "; newVersion=" + newVersion);
- break;
+ lastKnownVersion = newVersion;
+
+ } else {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("PolicyRefresher(serviceName=" + serviceName + ").run(): no update found. lastKnownVersion=" + lastKnownVersion);
+ }
}
- }
+ } catch(Exception excp) {
+ LOG.error("PolicyRefresher(serviceName=" + serviceName + "): failed to refresh policies. Will continue to use last known version of policies (" + lastKnownVersion + ")", excp);
+ }
- if(LOG.isDebugEnabled()) {
- LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").run()");
- }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").loadPolicyfromPolicyAdmin()");
+ }
+
+ return svcPolicies;
}
- private void loadFromCache() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadFromCache()");
- }
- RangerBasePlugin plugIn = this.plugIn;
+ private ServicePolicies loadFromCache() {
- if(plugIn != null) {
- File cacheFile = StringUtils.isEmpty(this.cacheFile) ? null : new File(this.cacheFile);
+ ServicePolicies policies = null;
- if(cacheFile != null && cacheFile.isFile() && cacheFile.canRead()) {
- Reader reader = null;
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadFromCache()");
+ }
- try {
- reader = new FileReader(cacheFile);
+ File cacheFile = StringUtils.isEmpty(this.cacheFile) ? null : new File(this.cacheFile);
- ServicePolicies policies = gson.fromJson(reader, ServicePolicies.class);
+ if(cacheFile != null && cacheFile.isFile() && cacheFile.canRead()) {
+ Reader reader = null;
- if(policies != null) {
- if(!StringUtils.equals(serviceName, policies.getServiceName())) {
- LOG.warn("ignoring unexpected serviceName '" + policies.getServiceName() + "' in cache file '" + cacheFile.getAbsolutePath() + "'");
+ try {
+ reader = new FileReader(cacheFile);
- policies.setServiceName(serviceName);
- }
+ policies = gson.fromJson(reader, ServicePolicies.class);
- lastKnownVersion = policies.getPolicyVersion() == null ? -1 : policies.getPolicyVersion().longValue();
+ if(policies != null) {
+ if(!StringUtils.equals(serviceName, policies.getServiceName())) {
+ LOG.warn("ignoring unexpected serviceName '" + policies.getServiceName() + "' in cache file '" + cacheFile.getAbsolutePath() + "'");
- plugIn.setPolicies(policies);
- }
- } catch (Exception excp) {
- LOG.error("failed to load policies from cache file " + cacheFile.getAbsolutePath(), excp);
- } finally {
- if(reader != null) {
- try {
- reader.close();
- } catch(Exception excp) {
- LOG.error("error while closing opened cache file " + cacheFile.getAbsolutePath(), excp);
- }
+ policies.setServiceName(serviceName);
}
- }
- } else {
- LOG.warn("cache file does not exist or not readble '" + (cacheFile == null ? null : cacheFile.getAbsolutePath()) + "'");
- }
+
+ lastKnownVersion = policies.getPolicyVersion() == null ? -1 : policies.getPolicyVersion().longValue();
+ }
+ } catch (Exception excp) {
+ LOG.error("failed to load policies from cache file " + cacheFile.getAbsolutePath(), excp);
+ } finally {
+ if(reader != null) {
+ try {
+ reader.close();
+ } catch(Exception excp) {
+ LOG.error("error while closing opened cache file " + cacheFile.getAbsolutePath(), excp);
+ }
+ }
+ }
} else {
- LOG.warn("policyEngine is null");
+ LOG.warn("cache file does not exist or not readble '" + (cacheFile == null ? null : cacheFile.getAbsolutePath()) + "'");
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").loadFromCache()");
}
- }
+ return policies;
+ }
+
private void saveToCache(ServicePolicies policies) {
if(LOG.isDebugEnabled()) {
LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").saveToCache()");