You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by al...@apache.org on 2016/03/03 01:06:59 UTC

[35/50] [abbrv] incubator-ranger git commit: RANGER-627 Add start/stop/progress log messages so processing of Audit's JVM shutdown hooks can be monitored

RANGER-627 Add start/stop/progress log messages so processing of Audit's JVM shutdown hooks can be monitored


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/0ed5a161
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/0ed5a161
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/0ed5a161

Branch: refs/heads/HDP-2.3.2-groupid
Commit: 0ed5a161180755f3fa3ccc523137630bb4a7b2d9
Parents: 6987d30
Author: Alok Lal <al...@apache.org>
Authored: Fri Aug 28 18:19:56 2015 -0700
Committer: Alok Lal <al...@apache.org>
Committed: Fri Sep 11 16:02:17 2015 -0700

----------------------------------------------------------------------
 .../audit/provider/AsyncAuditProvider.java      | 48 ++++++++++++++------
 .../audit/provider/AuditProviderFactory.java    |  9 +++-
 .../ranger/audit/provider/DbAuditProvider.java  | 15 +-----
 .../audit/provider/MultiDestAuditProvider.java  | 10 ++--
 4 files changed, 47 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0ed5a161/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
index f469d80..446ef95 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
@@ -42,6 +42,9 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 	private int     mMaxQueueSize     = 10 * 1024;
 	private int     mMaxFlushInterval = 5000; // 5 seconds
 
+	private static final int mStopLoopIntervalSecs           = 1; // 1 second
+	private static final int mWaitToCompleteLoopIntervalSecs = 1; // 1 second
+
 	// Summary of logs handled
 	private AtomicLong lifeTimeInLogCount  = new AtomicLong(0); // Total count, including drop count
 	private AtomicLong lifeTimeOutLogCount = new AtomicLong(0);
@@ -110,15 +113,24 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 
 	@Override
 	public void stop() {
-		mThread.interrupt();
-
+		LOG.info("==> AsyncAuditProvider.stop()");
 		try {
-			mThread.join();
-		} catch (InterruptedException excp) {
-			LOG.error("AsyncAuditProvider.stop(): failed while waiting for thread to exit", excp);
-		}
+			LOG.info("Interrupting child thread of " + mName + "..." );
+			mThread.interrupt();
+			while (mThread.isAlive()) {
+				try {
+					LOG.info(String.format("Waiting for child thread of %s to exit.  Sleeping for %d secs", mName, mStopLoopIntervalSecs));
+					mThread.join(mStopLoopIntervalSecs * 1000);
+				} catch (InterruptedException e) {
+					LOG.warn("Interrupted while waiting for child thread to join!  Proceeding with stop", e);
+					break;
+				}
+			}
 
-		super.stop();
+			super.stop();
+		} finally {
+			LOG.info("<== AsyncAuditProvider.stop()");
+		}
 	}
 
 	@Override
@@ -144,6 +156,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 					flush();
 				}
 			} catch (InterruptedException excp) {
+				LOG.info("AsyncAuditProvider.run - Interrupted!  Breaking out of while loop.");
 				break;
 			} catch (Exception excp) {
 				logFailedEvent(event, excp);
@@ -237,16 +250,21 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 	public void waitToComplete(long maxWaitSeconds) {
 		LOG.debug("==> AsyncAuditProvider.waitToComplete()");
 
-		for (long waitTime = 0; !isEmpty()
-				&& (maxWaitSeconds <= 0 || maxWaitSeconds > waitTime); waitTime++) {
-			try {
-				Thread.sleep(1000);
-			} catch (Exception excp) {
-				// ignore
+		try {
+			for (long waitTime = 0; !isEmpty()
+					&& (maxWaitSeconds <= 0 || maxWaitSeconds > waitTime); waitTime += mWaitToCompleteLoopIntervalSecs) {
+				try {
+					LOG.info(String.format("%d messages yet to be flushed by %s.  Sleeoping for %d sec", mQueue.size(), mName, mWaitToCompleteLoopIntervalSecs));
+					Thread.sleep(mWaitToCompleteLoopIntervalSecs * 1000);
+				} catch (InterruptedException excp) {
+					// someone really wants service to exit, abandon unwritten audits and exit.
+					LOG.warn("Caught interrupted exception! " + mQueue.size() + " messages still unflushed!  Won't wait for queue to flush, exiting...", excp);
+					break;
+				}
 			}
+		} finally {
+			LOG.debug("<== AsyncAuditProvider.waitToComplete()");
 		}
-
-		LOG.debug("<== AsyncAuditProvider.waitToComplete()");
 	}
 
 	private long getTimeTillNextFlush() {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0ed5a161/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index 1146e0b..723b528 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -451,8 +451,13 @@ public class AuditProviderFactory {
 		}
 
 		public void run() {
-			mProvider.waitToComplete();
-			mProvider.stop();
+			LOG.info("==> JVMShutdownHook.run()");
+			try {
+				mProvider.waitToComplete();
+				mProvider.stop();
+			} finally {
+				LOG.info("<== JVMShutdownHook.run()");
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0ed5a161/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
index f23f17d..8319d36 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
@@ -106,7 +106,7 @@ public class DbAuditProvider extends AuditDestination {
 		try {
 			if(preCreate(event)) {
 				DaoManager daoMgr = daoManager;
-	
+
 				if(daoMgr != null) {
 					event.persist(daoMgr);
 	
@@ -120,6 +120,7 @@ public class DbAuditProvider extends AuditDestination {
 				logFailedEvent(event);
 			}
 		}
+		LOG.debug("<== DbAuditProvider.log()");
 		return isSuccess;
 	}
 
@@ -167,18 +168,6 @@ public class DbAuditProvider extends AuditDestination {
 
 		cleanUp();
 	}
-	
-	@Override
-    public void waitToComplete() {
-		LOG.info("DbAuditProvider.waitToComplete()");
-		waitToComplete(-1);
-	}
-
-	@Override
-	public void waitToComplete(long timeout) {
-		LOG.info("DbAuditProvider.waitToComplete():timeout=" + timeout);
-
-	}
 
 	@Override
 	public void flush() {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0ed5a161/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
index 26108ca..282f5ab 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
@@ -161,7 +161,7 @@ public class MultiDestAuditProvider extends BaseAuditHandler {
 			try {
 				provider.start();
 			} catch (Throwable excp) {
-				LOG.error("AsyncAuditProvider.start(): failed for provider { "
+				LOG.error("MultiDestAuditProvider.start(): failed for provider { "
 						+ provider.getClass().getName() + " }", excp);
 			}
 		}
@@ -173,7 +173,7 @@ public class MultiDestAuditProvider extends BaseAuditHandler {
 			try {
 				provider.stop();
 			} catch (Throwable excp) {
-				LOG.error("AsyncAuditProvider.stop(): failed for provider { "
+				LOG.error("MultiDestAuditProvider.stop(): failed for provider { "
 						+ provider.getClass().getName() + " }", excp);
 			}
 		}
@@ -186,7 +186,7 @@ public class MultiDestAuditProvider extends BaseAuditHandler {
 				provider.waitToComplete();
 			} catch (Throwable excp) {
 				LOG.error(
-						"AsyncAuditProvider.waitToComplete(): failed for provider { "
+						"MultiDestAuditProvider.waitToComplete(): failed for provider { "
 								+ provider.getClass().getName() + " }", excp);
 			}
 		}
@@ -199,7 +199,7 @@ public class MultiDestAuditProvider extends BaseAuditHandler {
 				provider.waitToComplete(timeout);
 			} catch (Throwable excp) {
 				LOG.error(
-						"AsyncAuditProvider.waitToComplete(): failed for provider { "
+						"MultiDestAuditProvider.waitToComplete(): failed for provider { "
 								+ provider.getClass().getName() + " }", excp);
 			}
 		}
@@ -211,7 +211,7 @@ public class MultiDestAuditProvider extends BaseAuditHandler {
 			try {
 				provider.flush();
 			} catch (Throwable excp) {
-				LOG.error("AsyncAuditProvider.flush(): failed for provider { "
+				LOG.error("MultiDestAuditProvider.flush(): failed for provider { "
 						+ provider.getClass().getName() + " }", excp);
 			}
 		}