You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by rm...@apache.org on 2015/05/15 00:12:12 UTC

incubator-ranger git commit: Ranger-466: PolicyRefresher should timeout when Ranger Admin is non responsive and should use local cache for policy enforcement if present -Add on Changes

Repository: incubator-ranger
Updated Branches:
  refs/heads/master 97f1d5d05 -> f9fb61102


 Ranger-466: PolicyRefresher should timeout when Ranger Admin is non responsive and should use local cache for policy enforcement if present -Add on Changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/f9fb6110
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/f9fb6110
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/f9fb6110

Branch: refs/heads/master
Commit: f9fb6110274803c949b6262270b288b0274aa299
Parents: 97f1d5d
Author: rmani <rm...@hortonworks.com>
Authored: Thu May 14 15:12:05 2015 -0700
Committer: rmani <rm...@hortonworks.com>
Committed: Thu May 14 15:12:05 2015 -0700

----------------------------------------------------------------------
 .../ranger/plugin/util/PolicyRefresher.java     | 181 +++++++++++--------
 1 file changed, 104 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f9fb6110/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
index 341a65e..0729339 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
@@ -45,9 +45,9 @@ public class PolicyRefresher extends Thread {
 	private final String            cacheFile;
 	private final Gson              gson;
 
-	private long 	pollingIntervalMs     = 30 * 1000;
-	private long 	lastKnownVersion  	  = -1;
-	private boolean policyCacheLoadedOnce = false;
+	private long 	pollingIntervalMs   = 30 * 1000;
+	private long 	lastKnownVersion    = -1;
+	private boolean policiesSetInPlugin = false;
 
 
 	public PolicyRefresher(RangerBasePlugin plugIn, String serviceType, String appId, String serviceName, RangerAdminClient rangerAdmin, long pollingIntervalMs, String cacheDir) {
@@ -128,6 +128,9 @@ public class PolicyRefresher extends Thread {
 
 
 	public void startRefresher() {
+
+		loadPolicy();
+
 		super.start();
 	}
 
@@ -142,122 +145,146 @@ public class PolicyRefresher extends Thread {
 	}
 
 	public void run() {
-		boolean loadFromCacheFlag = false;
 
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").run()");
 		}
 
 		while(true) {
+			loadPolicy();
 			try {
-				ServicePolicies svcPolicies = rangerAdmin.getServicePoliciesIfUpdated(lastKnownVersion);
+				Thread.sleep(pollingIntervalMs);
+			} catch(InterruptedException excp) {
+				LOG.info("PolicyRefresher(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp);
+				break;
+			}
+		}
 
-				boolean isUpdated = svcPolicies != null;
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").run()");
+		}
+	}
 
-				if(isUpdated) {
-					long newVersion = svcPolicies.getPolicyVersion() == null ? -1 : svcPolicies.getPolicyVersion().longValue();
+	private void loadPolicy() {
 
-		        	if(!StringUtils.equals(serviceName, svcPolicies.getServiceName())) {
-		        		LOG.warn("PolicyRefresher(serviceName=" + serviceName + "): ignoring unexpected serviceName '" + svcPolicies.getServiceName() + "' in service-store");
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadPolicy()");
+		}
 
-		        		svcPolicies.setServiceName(serviceName);
-		        	}
+		//load policy from PolicyAmdin
+		ServicePolicies svcPolicies = loadPolicyfromPolicyAdmin();
+
+		if ( svcPolicies == null) {
+		  //if Policy fetch from Policy Admin Fails, load from cache
+		  if (!policiesSetInPlugin) {
+			   svcPolicies = loadFromCache();
+			}
+		} else {
+			saveToCache(svcPolicies);
+		}
 
-					LOG.info("PolicyRefresher(serviceName=" + serviceName + "): found updated version. lastKnownVersion=" + lastKnownVersion + "; newVersion=" + newVersion);
+		if (svcPolicies != null) {
+			plugIn.setPolicies(svcPolicies);
+			policiesSetInPlugin = true;
+		}
 
-					saveToCache(svcPolicies);
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").loadPolicy()");
+		}
+	}
 
-		        	lastKnownVersion = newVersion;
+	private ServicePolicies loadPolicyfromPolicyAdmin() { 
 
-					plugIn.setPolicies(svcPolicies);
-				} else {
-					if(LOG.isDebugEnabled()) {
-						LOG.debug("PolicyRefresher(serviceName=" + serviceName + ").run(): no update found. lastKnownVersion=" + lastKnownVersion);
-					}
-				}
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadPolicyfromPolicyAdmin()");
+		}
+
+		ServicePolicies svcPolicies = null;
+
+		try {
+			svcPolicies = rangerAdmin.getServicePoliciesIfUpdated(lastKnownVersion);
+
+			boolean isUpdated = svcPolicies != null;
 
-				loadFromCacheFlag	  = false;
+			if(isUpdated) {
+				long newVersion = svcPolicies.getPolicyVersion() == null ? -1 : svcPolicies.getPolicyVersion().longValue();
 
-				policyCacheLoadedOnce = false;
+				if(!StringUtils.equals(serviceName, svcPolicies.getServiceName())) {
+					LOG.warn("PolicyRefresher(serviceName=" + serviceName + "): ignoring unexpected serviceName '" + svcPolicies.getServiceName() + "' in service-store");
 
-			} catch(Exception excp) {
-				loadFromCacheFlag = true;
-				LOG.error("PolicyRefresher(serviceName=" + serviceName + "): failed to refresh policies. Will continue to use last known version of policies (" + lastKnownVersion + ")", excp);
-			} finally {
-				if (loadFromCacheFlag && !policyCacheLoadedOnce) {
-					//If ConnectionTime or PolicyAdmin down, fetch the Policy from Local Cache and load
-					LOG.info("PolicyRefresher(serviceName=" + serviceName + "): failed to refresh policy from Policy Manager. Loading Policies from local Cache. lastKnownVersion=" + lastKnownVersion);
-					loadFromCache();
-					policyCacheLoadedOnce = true;
+					svcPolicies.setServiceName(serviceName);
 				}
-			}
 
-			try {
-				Thread.sleep(pollingIntervalMs);
-			} catch(InterruptedException excp) {
-				LOG.info("PolicyRefresher(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp);
+				LOG.info("PolicyRefresher(serviceName=" + serviceName + "): found updated version. lastKnownVersion=" + lastKnownVersion + "; newVersion=" + newVersion);
 
-				break;
+			   	lastKnownVersion = newVersion;
+
+			} else {
+				if(LOG.isDebugEnabled()) {
+					LOG.debug("PolicyRefresher(serviceName=" + serviceName + ").run(): no update found. lastKnownVersion=" + lastKnownVersion);
+				}
 			}
-		}
+   		 } catch(Exception excp) {
+   			LOG.error("PolicyRefresher(serviceName=" + serviceName + "): failed to refresh policies. Will continue to use last known version of policies (" + lastKnownVersion + ")", excp);
+   		 }
 
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").run()");
-		}
+		 if(LOG.isDebugEnabled()) {
+			LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").loadPolicyfromPolicyAdmin()");
+		 }
+
+		 return svcPolicies;
 	}
 
-	private void loadFromCache() {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadFromCache()");
-		}
 
-		RangerBasePlugin plugIn = this.plugIn;
+	private ServicePolicies loadFromCache() {
 
-		if(plugIn != null) {
-	    	File cacheFile = StringUtils.isEmpty(this.cacheFile) ? null : new File(this.cacheFile);
+		ServicePolicies policies = null;
 
-	    	if(cacheFile != null && cacheFile.isFile() && cacheFile.canRead()) {
-	    		Reader reader = null;
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadFromCache()");
+		}
 
-	    		try {
-		        	reader = new FileReader(cacheFile);
+		File cacheFile = StringUtils.isEmpty(this.cacheFile) ? null : new File(this.cacheFile);
 
-			        ServicePolicies policies = gson.fromJson(reader, ServicePolicies.class);
+    	if(cacheFile != null && cacheFile.isFile() && cacheFile.canRead()) {
+    		Reader reader = null;
 
-			        if(policies != null) {
-			        	if(!StringUtils.equals(serviceName, policies.getServiceName())) {
-			        		LOG.warn("ignoring unexpected serviceName '" + policies.getServiceName() + "' in cache file '" + cacheFile.getAbsolutePath() + "'");
+    		try {
+	        	reader = new FileReader(cacheFile);
 
-			        		policies.setServiceName(serviceName);
-			        	}
+		        policies = gson.fromJson(reader, ServicePolicies.class);
 
-			        	lastKnownVersion = policies.getPolicyVersion() == null ? -1 : policies.getPolicyVersion().longValue();
+		        if(policies != null) {
+		        	if(!StringUtils.equals(serviceName, policies.getServiceName())) {
+		        		LOG.warn("ignoring unexpected serviceName '" + policies.getServiceName() + "' in cache file '" + cacheFile.getAbsolutePath() + "'");
 
-			        	plugIn.setPolicies(policies);
-			        }
-		        } catch (Exception excp) {
-		        	LOG.error("failed to load policies from cache file " + cacheFile.getAbsolutePath(), excp);
-		        } finally {
-		        	if(reader != null) {
-		        		try {
-		        			reader.close();
-		        		} catch(Exception excp) {
-		        			LOG.error("error while closing opened cache file " + cacheFile.getAbsolutePath(), excp);
-		        		}
+		        		policies.setServiceName(serviceName);
 		        	}
-		        }
-			} else {
-				LOG.warn("cache file does not exist or not readble '" + (cacheFile == null ? null : cacheFile.getAbsolutePath()) + "'");
-			}
+
+		        	lastKnownVersion = policies.getPolicyVersion() == null ? -1 : policies.getPolicyVersion().longValue();
+		         }
+	        } catch (Exception excp) {
+	        	LOG.error("failed to load policies from cache file " + cacheFile.getAbsolutePath(), excp);
+	        } finally {
+	        	if(reader != null) {
+	        		try {
+	        			reader.close();
+	        		} catch(Exception excp) {
+	        			LOG.error("error while closing opened cache file " + cacheFile.getAbsolutePath(), excp);
+	        		}
+	        	}
+	        }
 		} else {
-			LOG.warn("policyEngine is null");
+			LOG.warn("cache file does not exist or not readble '" + (cacheFile == null ? null : cacheFile.getAbsolutePath()) + "'");
 		}
 
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").loadFromCache()");
 		}
-	}
 
+		return policies;
+	}
+	
 	private void saveToCache(ServicePolicies policies) {
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").saveToCache()");